Skip to content

binokkio/gingester

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Gingester - generic ingest framework

General usage:

    <> mark placeholders, [] mark optionals

    -cf/--cli-file [[<scope>]:]<path/to/file> [<parameters>]
        add cli from file, supports Freemarker square-bracket-tag/square-bracket-interpolation templating
        provide [[<scope>]:] to process the cli in a scope named <scope>
        provide [:] to process the cli in a scope given the name of the most recently added id
        provide [<parameters>] in JSON syntax to be used as template data

    -cr/--cli-resource [[<scope>]:]<path/to/resource> [<parameters>]
        add cli from resource, supports Freemarker square-bracket-tag/square-bracket-interpolation templating
        provide [[<scope>]:] to process the cli in a scope named <scope>
        provide [:] to process the cli in a scope given the name of the most recently added id
        provide [<parameters>] in JSON syntax to be used as template data

    -d/--divert <id1> [<id2> [...]]
        remove all transformers downstream of the given ids and have the next transformer added to the flow
        take their place

    -dm/--debug/--debug-mode
        get more detailed exception information at the cost of performance

    -e/--excepts [<link1> [<link2> [...]]]
        catch uncaught exceptions from the previous transformer and downstream
        exceptions are routed to the builtin error logging transformer if no links are provided
        provide [<link1> [<link2> [...]]] to route the exceptions to those links

    -f/--fetch [<key>]
        fetch value from stash by key

    -gs/--graceful-sigint
        enable graceful handling of sigint

    -l/--links <link1> [<link2> [...]]
        link the previous transformer to the given links, it will not automatically link to the next transformer

    -p/--probe [<target>] [<contextLimit>]
        print the items flowing through this point in the flow
        provide "stderr" for [<target>] to print to stderr instead of stdout
        provide an integer for [<contextLimit>] to control the amount of context printed

    -pt/--point/--passthrough [<workers>][.[<queueSize>][.<maxBatchSize>]] [<id>[!]]
        add a passthrough transformer to the flow, see -t for further details

    -r/--report
        the interval in seconds at which to report throughput, 0 to disable

    -s/--stash [<key>]
        stash value under key

    -sf/--sync-from <link1> [<link2> [...]]
        mark the linked transformers to be synced with when -stf/-stft/-stof/-stt is used later

    -sfpt/--sync-from-point/--sync-from-passthrough [<workers>][.[<queueSize>][.<maxBatchSize>]] [<id>[!]]
        equal to -pt but marks the transformer to be synced with when -stf/-stft/-stof/-stt is used later

    -sfs/--sync-from-stash [<key>]
        stash value under key and mark this point to be synced with when -stf/-stft/-stof/-stt is used later

    -sft/--sync-from-transformer [<workers>][.[<queueSize>][.<maxBatchSize>]] [<id>:]<name>[!] [<parameters>]
        equal to -t but marks the transformer to be synced with when -stf/-stft/-stof/-stt is used later

    -stf/--sync-to-fetch [<key>]
        fetch value from stash by key everytime any of the transformers most recently marked by
        -sf/-sfpt/-sfs/sft/stft finishes, or the ingest-seed if there are no marks

    -stft/--sync-to-and-from-transformer
            [<workers>][.[<queueSize>][.<maxBatchSize>]] [<id>:]<name>[!] [<parameters>]
        equal to -t but syncs the transformer with those most recently marked by -sf/-sfpt/-sfs/sft/stft, or the
        ingest-seed if there are no marks, also marks the transformer to be synced with when
        -stf/-stft/-stof/-stt is used later

    -stof/--sync-to-on-finish
        yield a finish signal everytime any of the transformers most recently marked by -sf/-sfpt/-sfs/sft/stft
        finishes, or the ingest-seed if there are no marks

    -stt/--sync-to-transformer [<workers>][.[<queueSize>][.<maxBatchSize>]] [<id>:]<name>[!] [<parameters>]
        equal to -t but syncs the transformer with those most recently marked by -sf/-sfpt/-sfs/sft/stft, or the
        ingest-seed if there are no marks

    -t/--transformer [<workers>][.[<queueSize>][.<maxBatchSize>]] [<id>:]<name>[!] [<parameters>]
        add transformer to the flow
        provide [<workers>] to enable async execution for the transformer by the provided number of workers
        provide [<queueSize>] to set the queue size for this transformer
        provide [<maxBatchSize>] to limit the maximum batch size for this transformer
        provide [<id>] to be able to refer to this transformer by the provided id
        provide [!] to enable throughput reporting for the transformer
        provide [<parameters>] to configure the transformer

    -w/--swap [<key>]
        stash value and fetch previous value

    -wi/--wormhole-in
        passthrough items and send values to the closest upstream -wo/--wormhole-out

    -wo/--wormhole-out
        passthrough items and yield values from the closest downstream -wi/--wormhole-in

    --
        prevent the previous transformer from automatically being linked to the next transformer

    ++ <comment> ++
        has no effect

    -h/--help
        print this help and exit


Concepts:

    Flow: the network of transformers processing the items.
    Item: a value and associated context in the flow.
    Seed: a special item used to start the flow by passing it to all transformers without incoming links.
    Transformer: a node in the flow that processes items.
    Yield: the result of a transformer.


    Upstream/Downstream:

        Given the flow `-t StringDef 'hello' -t Repeat 3 -t StringAppend '!' -t Join -t Speak` the upstream
        of the StringAppend transformer are the StringDef and Repeat transformers, and the downstream of the
        StringAppend transformer are the Join and Speak transformers.


    Context/Stash:

        Every item in the flow has context. This context contains key-value pairs that were stashed upstream.
        The values can be fetched and continue as items in the flow, used in transformer argument templates, and
        used by transformers internally. Take for example `-t StringDef hello -t Repeat 3 --probe`. StringDef
        will yield "hello" and Repeat will repeat it 3 times, each time stashing the zero-based repetition
        number as "description". The probe by default writes the item context followed by the item value to
        stdout. The first output of the above flow is:

        ---- 0 ----
        {
          Repeat: {
            description: 0
          }
        }

        hello
        -----------

        The Fetch transformer can be used to fetch a stashed value and use it as an item value in the flow.
        E.g. `-t Repeat 3 -f description -t StdOut` would write 0, 1, and 2 to stdout.

        Most string arguments can be templated using the Apache Freemarker templating language. E.g.
        `-t Repeat 3 -t StringDef 'Hello, ${description}!' -t Speak`.

        If there are multiple values stashed under the same key, the most recently stashed value will be used.
        If a different value is needed the transformer that stashed it must be specified, e.g.
        `-f Repeat.description`.

        A subset of the stash can be fetched ordinally. A transformer can mark one of the key-value pairs it
        stashes for inclusion in ordinal fetches. For example the Stash transformer marks the input it stashes
        for inclusion in ordinal fetches. In the following example "hello" and then "world" are stashed and then
        "hello" is fetched ordinally and printed: `-t StringDef hello -s -t StringDef world -s -f ^2 -t StdOut`.
        The number in the ordinal fetch defines what to fetch starting at 1 for the most recently stashed item.


    Synchronization:

        Some transformers can synchronize with the output of an upstream transformer.

        Given the instructions `-t PathSearch '*.csv' -t DsvToJson -t Join -t PathWrite all.ndjson` assume that
        PathSearch finds 3 CSV files, each containing a header and 100 records. DsvToJson will output 300 JSON
        objects which will all be joined together and written to the file `all.ndjson`.

        To get an output file for each input file, Join needs to be in sync with the output of PathSearch, like
        so: `-sft PathSearch '*.csv' -t DsvToJson -stt Join -t PathWrite '${path.tail}.ndjson'`. Now the
        synced-to Join will create a separate output for each output of the synced-from PathSearch. Because Join
        is in sync with PathSearch its downstream has access to the PathSearch context, which is used in the
        PathWrite filename.

        When a transformer requires synchronization but is not instructed to be in sync with anything, as in the
        second paragraph of this section, it is synced with the seed.

        Grouping is a special form of synchronization where multiple items flowing through a grouping
        transformer are assigned the same synchronization context. This differs from normal synchronization
        where each item leaving the synced-from transformer gets its own synchronization context. For example,
        the GroupByCountLimit transformer counts the items flowing through it and assigns them the same
        synchronization context until it hits the given limit, at which point it switches to a new
        synchronization context. A synced-to ListCollect transformer would then output a list for every group.


    Wormhole:

        Flows are directed and acyclic, which makes it hard to create flows for some cases. Consuming a paginated
        JSON API for example, where each page contains a reference to the next page. A try at consuming such an
        API could look like this:

        -t Http GET 'https://example.org/json-api?search=example'
        -t InputStreamToJson -s page
        ++ process page here ++
        -t Http GET '${page.next}'
        -t InputStreamToJson -s page
        ++ process page here ++
        -t Http GET '${page.next}'
        ++ and so forth ++

        The first Http transformer yields the URL for the second page, and since that can not flow back to the
        first Http transformer, a second Http transformer must be defined to get the second page, and so forth. A
        wormhole solves this issue by allowing values to travel upstream.

        -t StringDef 'https://example.org/json-api?search=example'
        --wormhole-out
        -t Http GET '${__in__}'
        -t InputStreamToJson
        --sync-from-stash page
        -t JsonPath '$.next' optional
        --wormhole-in
        --sync-to-fetch page
        ++ process pages here ++

        Here the URL for the next page travels through the wormhole back to the same Http transformer.

        The transformers between the 2 points of the wormhole must not have their own workers. Only a single value
        can travel through the wormhole for each item that --wormhole-out yields. The value loses the context it
        gained between --wormhole-out and --wormhole-in.

About

Generic ingest framework.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Languages