Skip to content

Latest commit

 

History

History
760 lines (563 loc) · 24.6 KB

examples.md

File metadata and controls

760 lines (563 loc) · 24.6 KB

Plumber Usage Examples

Consuming

AWS SQS

Read X number of messages

plumber read aws-sqs --queue-name=orders --max-num-messages=10

Read messages and delete them afterwards

plumber read aws-sqs --queue-name=orders --max-num-messages=10 --auto-delete

Continuously read messages

plumber read aws-sqs --queue-name=orders --continuous

Poll for new messages for X seconds

plumber read aws-sqs --queue-name=orders --wait-time-seconds=20
RabbitMQ
plumber read rabbit \
    --address="amqp://localhost:5672" \
    --exchange-name=testex \
    --queue-name=testqueue \
    --binding-key="orders.#" \
    --continuous
RabbitMQ Streams
plumber read rabbit-streams --dsn rabbitmq-stream://guest:guest@localhost:5552 --stream new_orders --offset last
Kafka

Read a single message

plumber read kafka --topics orders --address="broker1.domain.com:9092"

You may specify multiple brokers by specifying the --address flag multiple times

plumber read kafka --topics orders \
    --address="broker1.domain.com:9092" \
    --address="broker2.domain.com:9092" \
    --address="broker3.domain.com:9092" \
    --continuous

Continuously read messages

plumber read kafka --topics orders --address="broker1.domain.com:9092" --continuous

To read from multiple topics, specify multiple topics delimited with a comma (without spaces):

plumber read kafka --topics one,two -f --pretty
Azure Service Bus

Reading from a topic

export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."

plumber read azure-service-bus --topic="new-orders" --subscription="copy-of-new-orders"

Reading from a queue

export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."

plumber read azure-service-bus --queue "new-orders"
Azure Event Hub

Read first available message from any partition

export EVENTHUB_CONNECTION_STRING="Endpoint=sb://plumbertest.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=....=;EntityPath=...""

plumber read azure-event-hub
NATS
plumber read nats --address="nats://user:[email protected]:4222" --subject "test-subject"
NATS Streaming
plumber read nats-streaming --address="nats://user:[email protected]:4222" --channel "orders" --cluster-id "test-cluster" --client-id "plumber"
NATS JetStream
plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream "orders.>" --client-id "plumber"

Create and use a durable consumer:

plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream foo --create-durable-consumer

Use an existing durable consumer:

plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream foo --existing-durable-consumer --consumer-name existing_consumer

Create a new durable consumer at a specific stream start sequence:

plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream foo --create-durable-consumer --consumer-start-sequence 42

NOTE: By default, plumber will remove any consumers it creates. To leave consumers untouched, set --keep-consumer.

Redis PubSub
plumber read redis-pubsub --address="localhost:6379" --channels="new-orders"
Redis Streams
plumber read redis-streams --address="localhost:6379" --streams="new-orders"
GCP Pub/Sub
plumber read gcp-pubsub --project-id=PROJECT_ID --subscription-id=SUBSCRIPTION
MQTT
plumber read mqtt --address tcp://localhost:1883 --topic iotdata --qos-level at_least_once

# Or connect with TLS:

plumber read mqtt --address ssl://localhost:8883 --topic iotdata --qos-level at_least_once

# TLS using certificates

plumber read mqtt --address ssl://localhost:8883 --topic iotdata --qos-level at_least_once --tls-ca-cert=/path/to/ca_certificate.pem --tls-client-key=/path/to/client_key.pem --tls-client-cert=/path/to/client_certificate.pem

Apache Pulsar

plumber read pulsar --topic NEWORDERS --name plumber

NSQ

plumber read nsq --lookupd-address localhost:4161 --topic orders --channel neworders 

Thrift Decoding

NOTE: This method is deprecated. See Thrift Decoding with IDL Files for an improved method of reading thrift

Plumber can decode thrift output, and display it as nested JSON. The key is the field's ID, and the value is the actual value in the message. Add the --pretty flag to colorize output.

plumber read kafka --topics orders --decode-type thrift --pretty

{
  "1": 54392501,
  "2": "Test Order",
  "3": {
    "1": "Product Name",
    "2": "green",
    "3": "2091.99"
  }
}

Thrift Decoding with IDL files

NEW Support for decoding with IDL files

Plumber can now use your .thrift IDL files to decode the output with field/enum names

$ read kafka --topics thrifttest \
      --thrift-struct sh.batch.schema.Account \
      --thrift-dirs ./test-assets/thrift/schema/ \
      --decode-type thrift \
      --pretty
{
  "emails": [
    "[email protected]",
    "[email protected]"
  ],
  "id": 321,
  "model": {
    "includedvalue": "value of included struct"
  },
  "name": "Mark Gregan",
  "permissions": [
    "create",
    "read",
    "update",
    "delete"
  ],
  "price": 1.23,
  "subm": {
    "value": "submessage value here"
  },
  "teams": {
    "123": "554bf385-ce1f-4deb-9a99-8864c1df52b5"
  },
  "testconst": 1234,
  "type": "VIP",
  "unionthing": {
    "thing_int": null,
    "thing_string": "Daniel"
  }
}

AWS Kinesis

Reading from a single shard

plumber read kinesis --stream orders --shard shardId-000000000000 --latest --max-records 10

Read from all shards

plumber read kinesis --stream orders --continuous
Memphis
plumber read memphis --address localhost:6666 --station myevents

Publishing

AWS SQS
plumber write aws-sqs --queue-name=NewOrders --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
RabbitMQ
plumber write rabbit --address="amqp://rabbit.yourdomain.net:5672" --exchange-name=NewOrders --routing-key="orders.oregon.coffee" --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
RabbitMQ Streams
plumber write rabbit-streams --dsn rabbitmq-stream://guest:guest@localhost:5552 --stream new_orders --input '{"order_id": "A-3458-654-1", "status": "processed"}'
Kafka
plumber write kafka --address localhost:9092 --topics neworders --input '{"order_id": "A-3458-654-1", "status": "processed"}'

You may specify multiple brokers by specifying the --address flag multiple times.

To read from more than one topic, you may specify multiple --topic flags.

plumber write kafka --topics neworders \
    --address "broker1.domain.com:9092" \
    --address "broker2.domain.com:9092" \
    --address "broker3.domain.com:9092" \
    --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
AWS SNS
plumber write aws-sns --topic="arn:aws:sns:us-east-2:123456789012:MyTopic" --input="New event is ready for processing!"
Azure Service Bus

Publishing to a topic

export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."

plumber write azure-service-bus --topic="new-orders" --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"

Publishing to a queue

export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."

plumber write azure-service-bus --queue="new-orders" --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
Azure Event Hub

Publish to random partition

export EVENTHUB_CONNECTION_STRING="Endpoint=sb://plumbertest.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=....=;EntityPath=...""

plumber write azure-eventhub --input "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}" --message-id "neworder123"

Publish to specific partition key

export EVENTHUB_CONNECTION_STRING="Endpoint=sb://plumbertest.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=....=;EntityPath=...""

plumber write azure-eventhub --input "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}" --message-id "neworder123" --partition-key "neworders"
NATS
plumber write nats --address="nats://user:[email protected]:4222" --subject "test-subject" --input "Hello World"
NATS Streaming
plumber write nats-streaming --address="nats://user:[email protected]:4222" --channel "orders" --cluster-id "test-cluster" --client-id "plumber-producer" --input "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
NATS JetStream
plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream "orders.>" --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
Redis PubSub
plumber write redis-pubsub --address="localhost:6379" --channels="new-orders" --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
Redis Streams
plumber write redis-streams --address="localhost:6379" --streams="new-orders" --key foo --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
GCP Pub/Sub
plumber write gcp-pubsub --topic-id=TOPIC --project-id=PROJECT_ID --input='{"Sensor":"Room J","Temp":19}' 
MQTT
plumber write mqtt --address tcp://localhost:1883 --topic iotdata --qos-level at_least_once --input "{\"id\": 123, \"temperature\": 15}"

# or connect with TLS:

plumber write mqtt --address ssl://localhost:8883 --topic iotdata --qos-level at_least_once --input "{\"id\": 123, \"temperature\": 15}"

# TLS using certificates

plumber write mqtt --address ssl://localhost:8883 --topic iotdata --qos-level at_least_once --tls-ca-cert=/path/to/ca_certificate.pem --tls-client-key=/path/to/client_key.pem --tls-client-cert=/path/to/client_certificate.pem --input "{\"id\": 123, \"temperature\": 15}"
Apache Pulsar
plumber write pulsar --topic NEWORDERS --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
NSQ
plumber write nsq --nsqd-address localhost:4050 --topic orders --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
AWS Kinesis
plumber write kinesis --stream teststream --partition-key orders --input "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
Memphis
plumber write memphis --address localhost:6666 --station myevents --input "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"

Relay Mode

Continuously relay messages from your RabbitMQ instance to a Streamdal collection
$ docker run --name plumber-rabbit -p 8080:8080 \
    -e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
    -e PLUMBER_RELAY_RABBIT_EXCHANGE=my_exchange \
    -e PLUMBER_RELAY_RABBIT_QUEUE=my_queue \
    -e PLUMBER_RELAY_RABBIT_ROUTING_KEY=some.routing.key \
    -e PLUMBER_RELAY_RABBIT_QUEUE_EXCLUSIVE=false \
    -e PLUMBER_RELAY_RABBIT_QUEUE_DURABLE=true \
    batchcorp/plumber plumber relay rabbit
Continuously relay messages from an SQS queue to a Streamdal collection
docker run -d --name plumber-sqs -p 8080:8080 \
    -e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
    -e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
    -e PLUMBER_RELAY_SQS_QUEUE_NAME=TestQueue \
    -e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
    batchcorp/plumber plumber relay aws-sqs
Continuously relay messages from an Azure queue to a Streamdal collection
docker run -d --name plumber-azure -p 8080:8080 \
    -e SERVICEBUS_CONNECTION_STRING="Endpoint=sb://mybus.servicebus.windows.net/;SharedAccessKeyName..."
    -e PLUMBER_RELAY_AZURE_QUEUE_NAME=neworders \
    -e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
    batchcorp/plumber plumber relay azure-service-bus
Continuously relay messages from an Azure topic to a Streamdal collection
docker run -d --name plumber-azure -p 8080:8080 \
    -e SERVICEBUS_CONNECTION_STRING="Endpoint=sb://mybus.servicebus.windows.net/;SharedAccessKeyName..."
    -e PLUMBER_RELAY_AZURE_TOPIC_NAME=neworders \
    -e PLUMBER_RELAY_AZURE_SUBSCRIPTION=some-sub \
    -e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
    batchcorp/plumber plumber relay azure-service-bus
Continuously relay messages from multiple Redis channels to a Streamdal collection
docker run -d --name plumber-redis-pubsub -p 8080:8080 \
    -e PLUMBER_RELAY_REDIS_PUBSUB_ADDRESS=localhost:6379 \
    -e PLUMBER_RELAY_REDIS_PUBSUB_CHANNELS=channel1,channel2 \
    -e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
    batchcorp/plumber plumber relay redis-pubsub
Continuously relay messages from multiple Redis streams to a Streamdal collection
docker run -d --name plumber-redis-streams -p 8080:8080 \
    -e PLUMBER_RELAY_REDIS_STREAMS_ADDRESS=localhost:6379 \
    -e PLUMBER_RELAY_REDIS_STREAMS_STREAMS=stream1,stream2 \
    -e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
    batchcorp/plumber plumber relay redis-streams
Continuously relay messages from a Kafka topic (on Confluent) to a Streamdal collection (via CLI)
docker run -d --name plumber-kafka -p 8080:8080 \
    -e PLUMBER_RELAY_TOKEN="$YOUR-STREAMDAL-TOKEN-HERE"
    -e PLUMBER_RELAY_KAFKA_ADDRESS="pkc-4kgmg.us-west-2.aws.confluent.cloud:9092,pkc-5kgmg.us-west-2.aws.confluent.cloud:9092"
    -e PLUMBER_RELAY_KAFKA_TOPIC="$YOUR_TOPIC"
    -e PLUMBER_RELAY_KAFKA_INSECURE_TLS="true"
    -e PLUMBER_RELAY_KAFKA_USERNAME="$YOUR_CONFLUENT_API_KEY"
    -e PLUMBER_RELAY_KAFKA_PASSWORD="$YOUR_CONFLUENT_API_SECRET"
    -e PLUMBER_RELAY_KAFKA_SASL_TYPE="plain"
    batchcorp/plumber plumber relay kafka
Continuously relay messages from a MQTT topic to a Streamdal collection
docker run -d --name plumber-mqtt -p 8080:8080 \
    -e PLUMBER_RELAY_MQTT_ADDRESS=tcp://localhost:1883 \
    -e PLUMBER_RELAY_MQTT_TOPIC=iotdata \
    -e PLUMBER_RELAY_MQTT_QOS=1 \
    -e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
    batchcorp/plumber plumber relay mqtt
Continuously relay messages from a NATS JetStream stream to a Streamdal collection
docker run -d --name plumber-natsjs -p 8080:8080 \
    -e PLUMBER_RELAY_NATS_JETSTREAM_DSN=nats://localhost:4222 \
    -e PLUMBER_RELAY_NATS_JETSTREAM_CLIENT_ID=plumber \
    -e PLUMBER_RELAY_NATS_JETSTREAM_STREAM=orders \
    -e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
    batchcorp/plumber plumber relay mqtt

Change Data Capture

Continuously relay Postgres change events to a Streamdal collection

See documentation at https://docs.streamdal.com/event-ingestion/change-data-capture/postgresql for instructions on setting up PostgreSQL CDC.

Continuously relay MongoDB change stream events to a Streamdal collection
docker run -d --name plumber-cdc-mongo -p 8080:8080 \
    -e PLUMBER_RELAY_CDCMONGO_DSN=mongodb://mongo.mysite.com:27017 \
    -e PLUMBER_RELAY_CDCMONGO_DATABASE=mydb \
    -e PLUMBER_RELAY_CDCMONGO_COLLECTION=customers \
    -e PLUMBER_RELAY_TOKEN=YOUR_STREAMDAL_TOKEN_HERE \
    batchcorp/plumber plumber relay cdc-mongo

For more advanced mongo usage, see documentation at https://docs.streamdal.com/event-ingestion/change-data-capture/mongodb

Advanced Usage

Decoding protobuf encoded messages and viewing them live

Protobuf is supported for both encode and decode for all backends. There are three flags that must be specified for protobuf:

  1. --encode-type for writes or --decode-type for reads
  2. --protobuf-dirs pointing to a directory that contains your .proto files
  3. --protobuf-root-message which indicates what type plumber should attempt to encode/decode output/input

NOTE: --protobuf-root-message must specify the FULL path to the type. Ie. events.MyType (MyType is not enough!).

$ plumber read rabbit --address="amqp://localhost" --exchange events --binding-key \# \
  --decode-type protobuf \
  --protobuf-dirs ~/schemas \ 
  --protobuf-root-message pkg.Message \
  --continuous
  
1: {"some-attribute": 123, "numbers" : [1, 2, 3]}
2: {"some-attribute": 424, "numbers" : [325]}
3: {"some-attribute": 49, "numbers" : [958, 288, 289, 290]}
4: ERROR: Cannot decode message as protobuf "Message"
5: {"some-attribute": 394, "numbers" : [4, 5, 6, 7, 8]}
^C

Writing protobuf messages with source jsonpb

NOTE: "jsonpb" is just a JSON representation of your protobuf event. When you use it as the --input-type, plumber will read the JSON blob and attempt to decode it into your specified root message, followed by writing the []byte slice to the message bus.

$ plumber write rabbit --exchange events --routing-key foo.bar  \
  --protobuf-dirs ~/schemas \
  --protobuf-root-message pkg.Message \
  --input-file ~/fakes/some-jsonpb-file.json \ 
  --encode-type jsonpb

Shallow envelope protobuf messages

Plumber supports "shallow envelope" protobuf messages consisting of one type of protobuf message used to decode the message itself, and another type of message used to decode the protobuf contents of a payload field inside the envelope. The payload field must be of bytes type.

To read/write shallow envelope messages with plumber, you will need to specify the following additional flags:

  1. --protobuf-envelope-type shallow - To indicate that the message is a shallow envelope
  2. --shallow-envelope-field-number - Protobuf field number of the envelope's field which contains the encoded protobuf data
  3. --shallow-envelope-message - The protobuf message name used to encode the data in the field

Example protobuf we will read/write with:

syntax = "proto3";

package shallow;

// Represents a shallow envelope
message Envelope {
  string id = 1;
  bytes data = 2;
}

// Message is what goes into Envelope's Data field
message Payload {
  string name = 1;
}
Writing shallow envelope
plumber write kafka --topics testing \
  --protobuf-dirs test-assets/shallow-envelope/ \
  --protobuf-root-message shallow.Envelope \
  --input-file test-assets/shallow-envelope/example-payload.json \
  --protobuf-envelope-type shallow \
  --shallow-envelope-message shallow.Payload \
  --shallow-envelope-field-number=2 \
  --encode-type jsonpb

Reading shallow envelope

plumber read kafka --topics testing \
  --protobuf-dirs test-assets/shallow-envelope/ \
  --protobuf-root-message shallow.Envelope \
  --protobuf-envelope-type shallow \
  --shallow-envelope-message shallow.Payload \
  --shallow-envelope-field-number=2 \
  --decode-type protobuf

Using File Descriptor Sets

Plumber supports using protobuf file descriptor set files for decoding and encoding protobuf messages, instead of using a directory of .proto files. This method is more reliable than using --protobuf-dirs flag as it ensures that there won't be any include path issues.

For help with generating an .fds file from your .proto files, see https://docs.streamdal.com/platform/components/what-are-schemas#protocol-buffers

Writing using FDS

plumber write kafka --topics fdstest1 \
  --protobuf-descriptor-set test-assets/protobuf-any/sample/protos.fds \
  --protobuf-root-message sample.Envelope \
  --encode-type jsonpb \
  --input-file test-assets/protobuf-any/payload.json

Reading using FDS

plumber read kafka --topics fdstest1 \
  --protobuf-descriptor-set test-assets/protobuf-any/sample/protos.fds \
  --protobuf-root-message sample.Envelope \
  --decode-type protobuf

Using Avro schemas when reading or writing

$ plumber write kafka --topics=orders --avro-schema-file=some_schema.avsc --input-file=your_data.json
$ plumber read kafka --topics=orders --avro-schema-file=some_schema.avsc

Publish CloudEvents

NOTE: CloudEvents are currently only supported for: Kafka, NATS, NATS Streaming, and NATS JetStream

Plumber supports emitting CloudEvent messages.

By default, if the contents of --input or --input-file represents a valid cloudevent in JSON format, the data will be unmarshaled into a cloud event. Any --ce-* flags specified will override their respective values in the event before the event is published.

If the value of --input or --input-file is not a valid cloudevent in JSON format, a new cloudevent will be created and the input will be set as the data field's value. Other fields will be set using the values supplied via --ce-* flags.

Example Kafka publish:

plumber write kafka --encode-type cloudevent --topics myevents --input-file test-assets/cloudevents/example.json