-
Notifications
You must be signed in to change notification settings - Fork 0
Generic ingest framework.
License
binokkio/gingester
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Gingester - generic ingest framework General usage: <> mark placeholders, [] mark optionals -cf/--cli-file [[<scope>]:]<path/to/file> [<parameters>] add cli from file, supports Freemarker square-bracket-tag/square-bracket-interpolation templating provide [[<scope>]:] to process the cli in a scope named <scope> provide [:] to process the cli in a scope given the name of the most recently added id provide [<parameters>] in JSON syntax to be used as template data -cr/--cli-resource [[<scope>]:]<path/to/resource> [<parameters>] add cli from resource, supports Freemarker square-bracket-tag/square-bracket-interpolation templating provide [[<scope>]:] to process the cli in a scope named <scope> provide [:] to process the cli in a scope given the name of the most recently added id provide [<parameters>] in JSON syntax to be used as template data -d/--divert <id1> [<id2> [...]] remove all transformers downstream of the given ids and have the next transformer added to the flow take their place -dm/--debug/--debug-mode get more detailed exception information at the cost of performance -e/--excepts [<link1> [<link2> [...]]] catch uncaught exceptions from the previous transformer and downstream exceptions are routed to the builtin error logging transformer if no links are provided provide [<link1> [<link2> [...]]] to route the exceptions to those links -f/--fetch [<key>] fetch value from stash by key -gs/--graceful-sigint enable graceful handling of sigint -l/--links <link1> [<link2> [...]] link the previous transformer to the given links, it will not automatically link to the next transformer -p/--probe [<target>] [<contextLimit>] print the items flowing through this point in the flow provide "stderr" for [<target>] to print to stderr instead of stdout provide an integer for [<contextLimit>] to control the amount of context printed -pt/--point/--passthrough [<workers>][.[<queueSize>][.<maxBatchSize>]] [<id>[!]] add a passthrough transformer to the flow, see -t for further details -r/--report the interval in seconds at which to report throughput, 0 to disable -s/--stash [<key>] stash value under key -sf/--sync-from <link1> [<link2> [...]] mark the linked transformers to be synced with when -stf/-stft/-stof/-stt is used later -sfpt/--sync-from-point/--sync-from-passthrough [<workers>][.[<queueSize>][.<maxBatchSize>]] [<id>[!]] equal to -pt but marks the transformer to be synced with when -stf/-stft/-stof/-stt is used later -sfs/--sync-from-stash [<key>] stash value under key and mark this point to be synced with when -stf/-stft/-stof/-stt is used later -sft/--sync-from-transformer [<workers>][.[<queueSize>][.<maxBatchSize>]] [<id>:]<name>[!] [<parameters>] equal to -t but marks the transformer to be synced with when -stf/-stft/-stof/-stt is used later -stf/--sync-to-fetch [<key>] fetch value from stash by key everytime any of the transformers most recently marked by -sf/-sfpt/-sfs/sft/stft finishes, or the ingest-seed if there are no marks -stft/--sync-to-and-from-transformer [<workers>][.[<queueSize>][.<maxBatchSize>]] [<id>:]<name>[!] [<parameters>] equal to -t but syncs the transformer with those most recently marked by -sf/-sfpt/-sfs/sft/stft, or the ingest-seed if there are no marks, also marks the transformer to be synced with when -stf/-stft/-stof/-stt is used later -stof/--sync-to-on-finish yield a finish signal everytime any of the transformers most recently marked by -sf/-sfpt/-sfs/sft/stft finishes, or the ingest-seed if there are no marks -stt/--sync-to-transformer [<workers>][.[<queueSize>][.<maxBatchSize>]] [<id>:]<name>[!] [<parameters>] equal to -t but syncs the transformer with those most recently marked by -sf/-sfpt/-sfs/sft/stft, or the ingest-seed if there are no marks -t/--transformer [<workers>][.[<queueSize>][.<maxBatchSize>]] [<id>:]<name>[!] [<parameters>] add transformer to the flow provide [<workers>] to enable async execution for the transformer by the provided number of workers provide [<queueSize>] to set the queue size for this transformer provide [<maxBatchSize>] to limit the maximum batch size for this transformer provide [<id>] to be able to refer to this transformer by the provided id provide [!] to enable throughput reporting for the transformer provide [<parameters>] to configure the transformer -w/--swap [<key>] stash value and fetch previous value -wi/--wormhole-in passthrough items and send values to the closest upstream -wo/--wormhole-out -wo/--wormhole-out passthrough items and yield values from the closest downstream -wi/--wormhole-in -- prevent the previous transformer from automatically being linked to the next transformer ++ <comment> ++ has no effect -h/--help print this help and exit Concepts: Flow: the network of transformers processing the items. Item: a value and associated context in the flow. Seed: a special item used to start the flow by passing it to all transformers without incoming links. Transformer: a node in the flow that processes items. Yield: the result of a transformer. Upstream/Downstream: Given the flow `-t StringDef 'hello' -t Repeat 3 -t StringAppend '!' -t Join -t Speak` the upstream of the StringAppend transformer are the StringDef and Repeat transformers, and the downstream of the StringAppend transformer are the Join and Speak transformers. Context/Stash: Every item in the flow has context. This context contains key-value pairs that were stashed upstream. The values can be fetched and continue as items in the flow, used in transformer argument templates, and used by transformers internally. Take for example `-t StringDef hello -t Repeat 3 --probe`. StringDef will yield "hello" and Repeat will repeat it 3 times, each time stashing the zero-based repetition number as "description". The probe by default writes the item context followed by the item value to stdout. The first output of the above flow is: ---- 0 ---- { Repeat: { description: 0 } } hello ----------- The Fetch transformer can be used to fetch a stashed value and use it as an item value in the flow. E.g. `-t Repeat 3 -f description -t StdOut` would write 0, 1, and 2 to stdout. Most string arguments can be templated using the Apache Freemarker templating language. E.g. `-t Repeat 3 -t StringDef 'Hello, ${description}!' -t Speak`. If there are multiple values stashed under the same key, the most recently stashed value will be used. If a different value is needed the transformer that stashed it must be specified, e.g. `-f Repeat.description`. A subset of the stash can be fetched ordinally. A transformer can mark one of the key-value pairs it stashes for inclusion in ordinal fetches. For example the Stash transformer marks the input it stashes for inclusion in ordinal fetches. In the following example "hello" and then "world" are stashed and then "hello" is fetched ordinally and printed: `-t StringDef hello -s -t StringDef world -s -f ^2 -t StdOut`. The number in the ordinal fetch defines what to fetch starting at 1 for the most recently stashed item. Synchronization: Some transformers can synchronize with the output of an upstream transformer. Given the instructions `-t PathSearch '*.csv' -t DsvToJson -t Join -t PathWrite all.ndjson` assume that PathSearch finds 3 CSV files, each containing a header and 100 records. DsvToJson will output 300 JSON objects which will all be joined together and written to the file `all.ndjson`. To get an output file for each input file, Join needs to be in sync with the output of PathSearch, like so: `-sft PathSearch '*.csv' -t DsvToJson -stt Join -t PathWrite '${path.tail}.ndjson'`. Now the synced-to Join will create a separate output for each output of the synced-from PathSearch. Because Join is in sync with PathSearch its downstream has access to the PathSearch context, which is used in the PathWrite filename. When a transformer requires synchronization but is not instructed to be in sync with anything, as in the second paragraph of this section, it is synced with the seed. Grouping is a special form of synchronization where multiple items flowing through a grouping transformer are assigned the same synchronization context. This differs from normal synchronization where each item leaving the synced-from transformer gets its own synchronization context. For example, the GroupByCountLimit transformer counts the items flowing through it and assigns them the same synchronization context until it hits the given limit, at which point it switches to a new synchronization context. A synced-to ListCollect transformer would then output a list for every group. Wormhole: Flows are directed and acyclic, which makes it hard to create flows for some cases. Consuming a paginated JSON API for example, where each page contains a reference to the next page. A try at consuming such an API could look like this: -t Http GET 'https://example.org/json-api?search=example' -t InputStreamToJson -s page ++ process page here ++ -t Http GET '${page.next}' -t InputStreamToJson -s page ++ process page here ++ -t Http GET '${page.next}' ++ and so forth ++ The first Http transformer yields the URL for the second page, and since that can not flow back to the first Http transformer, a second Http transformer must be defined to get the second page, and so forth. A wormhole solves this issue by allowing values to travel upstream. -t StringDef 'https://example.org/json-api?search=example' --wormhole-out -t Http GET '${__in__}' -t InputStreamToJson --sync-from-stash page -t JsonPath '$.next' optional --wormhole-in --sync-to-fetch page ++ process pages here ++ Here the URL for the next page travels through the wormhole back to the same Http transformer. The transformers between the 2 points of the wormhole must not have their own workers. Only a single value can travel through the wormhole for each item that --wormhole-out yields. The value loses the context it gained between --wormhole-out and --wormhole-in.
About
Generic ingest framework.