- Consuming
- Publishing
- Relay Mode
- Continuously relay messages from your RabbitMQ instance to a Streamdal collection
- Continuously relay messages from an SQS queue to a Streamdal collection
- Continuously relay messages from an Azure queue to a Streamdal collection
- Continuously relay messages from an Azure topic to a Streamdal collection
- Continuously relay messages for multiple Redis channels to a Streamdal collection
- Continuously relay messages for multiple Redis streams to a Streamdal collection
- Continuously relay messages from a Kafka topic (on Confluent) to a Streamdal collection (via CLI)
- Continuously relay messages from a MQTT topic to a Streamdal collection
- Continuously relay messages from a NATS JetStream stream to a Streamdal collection
- Change Data Capture
- Advanced Usage
Read X number of messages
plumber read aws-sqs --queue-name=orders --max-num-messages=10
Read messages and delete them afterwards
plumber read aws-sqs --queue-name=orders --max-num-messages=10 --auto-delete
Continuously read messages
plumber read aws-sqs --queue-name=orders --continuous
Poll for new messages for X seconds
plumber read aws-sqs --queue-name=orders --wait-time-seconds=20
plumber read rabbit \
--address="amqp://localhost:5672" \
--exchange-name=testex \
--queue-name=testqueue \
--binding-key="orders.#" \
--continuous
plumber read rabbit-streams --dsn rabbitmq-stream://guest:guest@localhost:5552 --stream new_orders --offset last
Read a single message
plumber read kafka --topics orders --address="broker1.domain.com:9092"
You may specify multiple brokers by specifying the --address
flag multiple times
plumber read kafka --topics orders \
--address="broker1.domain.com:9092" \
--address="broker2.domain.com:9092" \
--address="broker3.domain.com:9092" \
--continuous
Continuously read messages
plumber read kafka --topics orders --address="broker1.domain.com:9092" --continuous
To read from multiple topics, specify multiple topics delimited with a comma (without spaces):
plumber read kafka --topics one,two -f --pretty
Reading from a topic
export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."
plumber read azure-service-bus --topic="new-orders" --subscription="copy-of-new-orders"
Reading from a queue
export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."
plumber read azure-service-bus --queue "new-orders"
Read first available message from any partition
export EVENTHUB_CONNECTION_STRING="Endpoint=sb://plumbertest.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=....=;EntityPath=...""
plumber read azure-event-hub
plumber read nats --address="nats://user:[email protected]:4222" --subject "test-subject"
plumber read nats-streaming --address="nats://user:[email protected]:4222" --channel "orders" --cluster-id "test-cluster" --client-id "plumber"
plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream "orders.>" --client-id "plumber"
Create and use a durable consumer:
plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream foo --create-durable-consumer
Use an existing durable consumer:
plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream foo --existing-durable-consumer --consumer-name existing_consumer
Create a new durable consumer at a specific stream start sequence:
plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream foo --create-durable-consumer --consumer-start-sequence 42
NOTE: By default, plumber
will remove any consumers it creates. To leave consumers untouched, set --keep-consumer
.
plumber read redis-pubsub --address="localhost:6379" --channels="new-orders"
plumber read redis-streams --address="localhost:6379" --streams="new-orders"
plumber read gcp-pubsub --project-id=PROJECT_ID --subscription-id=SUBSCRIPTION
plumber read mqtt --address tcp://localhost:1883 --topic iotdata --qos-level at_least_once
# Or connect with TLS:
plumber read mqtt --address ssl://localhost:8883 --topic iotdata --qos-level at_least_once
# TLS using certificates
plumber read mqtt --address ssl://localhost:8883 --topic iotdata --qos-level at_least_once --tls-ca-cert=/path/to/ca_certificate.pem --tls-client-key=/path/to/client_key.pem --tls-client-cert=/path/to/client_certificate.pem
plumber read pulsar --topic NEWORDERS --name plumber
plumber read nsq --lookupd-address localhost:4161 --topic orders --channel neworders
NOTE: This method is deprecated. See Thrift Decoding with IDL Files for an improved method of reading thrift
Plumber can decode thrift output, and display it as nested JSON. The key is the field's ID, and the
value is the actual value in the message. Add the --pretty
flag to colorize output.
plumber read kafka --topics orders --decode-type thrift --pretty
{
"1": 54392501,
"2": "Test Order",
"3": {
"1": "Product Name",
"2": "green",
"3": "2091.99"
}
}
NEW Support for decoding with IDL files
Plumber can now use your .thrift IDL files to decode the output with field/enum names
$ read kafka --topics thrifttest \
--thrift-struct sh.batch.schema.Account \
--thrift-dirs ./test-assets/thrift/schema/ \
--decode-type thrift \
--pretty
{
"emails": [
"[email protected]",
"[email protected]"
],
"id": 321,
"model": {
"includedvalue": "value of included struct"
},
"name": "Mark Gregan",
"permissions": [
"create",
"read",
"update",
"delete"
],
"price": 1.23,
"subm": {
"value": "submessage value here"
},
"teams": {
"123": "554bf385-ce1f-4deb-9a99-8864c1df52b5"
},
"testconst": 1234,
"type": "VIP",
"unionthing": {
"thing_int": null,
"thing_string": "Daniel"
}
}
Reading from a single shard
plumber read kinesis --stream orders --shard shardId-000000000000 --latest --max-records 10
Read from all shards
plumber read kinesis --stream orders --continuous
plumber read memphis --address localhost:6666 --station myevents
plumber write aws-sqs --queue-name=NewOrders --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write rabbit --address="amqp://rabbit.yourdomain.net:5672" --exchange-name=NewOrders --routing-key="orders.oregon.coffee" --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write rabbit-streams --dsn rabbitmq-stream://guest:guest@localhost:5552 --stream new_orders --input '{"order_id": "A-3458-654-1", "status": "processed"}'
plumber write kafka --address localhost:9092 --topics neworders --input '{"order_id": "A-3458-654-1", "status": "processed"}'
You may specify multiple brokers by specifying the --address
flag multiple times.
To read from more than one topic, you may specify multiple --topic
flags.
plumber write kafka --topics neworders \
--address "broker1.domain.com:9092" \
--address "broker2.domain.com:9092" \
--address "broker3.domain.com:9092" \
--input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write aws-sns --topic="arn:aws:sns:us-east-2:123456789012:MyTopic" --input="New event is ready for processing!"
Publishing to a topic
export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."
plumber write azure-service-bus --topic="new-orders" --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
Publishing to a queue
export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."
plumber write azure-service-bus --queue="new-orders" --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
Publish to random partition
export EVENTHUB_CONNECTION_STRING="Endpoint=sb://plumbertest.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=....=;EntityPath=...""
plumber write azure-eventhub --input "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}" --message-id "neworder123"
Publish to specific partition key
export EVENTHUB_CONNECTION_STRING="Endpoint=sb://plumbertest.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=....=;EntityPath=...""
plumber write azure-eventhub --input "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}" --message-id "neworder123" --partition-key "neworders"
plumber write nats --address="nats://user:[email protected]:4222" --subject "test-subject" --input "Hello World"
plumber write nats-streaming --address="nats://user:[email protected]:4222" --channel "orders" --cluster-id "test-cluster" --client-id "plumber-producer" --input "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber read nats-jetstream --dsn="nats://user:[email protected]:4222" --stream "orders.>" --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write redis-pubsub --address="localhost:6379" --channels="new-orders" --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write redis-streams --address="localhost:6379" --streams="new-orders" --key foo --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write gcp-pubsub --topic-id=TOPIC --project-id=PROJECT_ID --input='{"Sensor":"Room J","Temp":19}'
plumber write mqtt --address tcp://localhost:1883 --topic iotdata --qos-level at_least_once --input "{\"id\": 123, \"temperature\": 15}"
# or connect with TLS:
plumber write mqtt --address ssl://localhost:8883 --topic iotdata --qos-level at_least_once --input "{\"id\": 123, \"temperature\": 15}"
# TLS using certificates
plumber write mqtt --address ssl://localhost:8883 --topic iotdata --qos-level at_least_once --tls-ca-cert=/path/to/ca_certificate.pem --tls-client-key=/path/to/client_key.pem --tls-client-cert=/path/to/client_certificate.pem --input "{\"id\": 123, \"temperature\": 15}"
plumber write pulsar --topic NEWORDERS --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write nsq --nsqd-address localhost:4050 --topic orders --input="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write kinesis --stream teststream --partition-key orders --input "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
plumber write memphis --address localhost:6666 --station myevents --input "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
$ docker run --name plumber-rabbit -p 8080:8080 \
-e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
-e PLUMBER_RELAY_RABBIT_EXCHANGE=my_exchange \
-e PLUMBER_RELAY_RABBIT_QUEUE=my_queue \
-e PLUMBER_RELAY_RABBIT_ROUTING_KEY=some.routing.key \
-e PLUMBER_RELAY_RABBIT_QUEUE_EXCLUSIVE=false \
-e PLUMBER_RELAY_RABBIT_QUEUE_DURABLE=true \
batchcorp/plumber plumber relay rabbit
docker run -d --name plumber-sqs -p 8080:8080 \
-e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
-e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
-e PLUMBER_RELAY_SQS_QUEUE_NAME=TestQueue \
-e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
batchcorp/plumber plumber relay aws-sqs
docker run -d --name plumber-azure -p 8080:8080 \
-e SERVICEBUS_CONNECTION_STRING="Endpoint=sb://mybus.servicebus.windows.net/;SharedAccessKeyName..."
-e PLUMBER_RELAY_AZURE_QUEUE_NAME=neworders \
-e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
batchcorp/plumber plumber relay azure-service-bus
docker run -d --name plumber-azure -p 8080:8080 \
-e SERVICEBUS_CONNECTION_STRING="Endpoint=sb://mybus.servicebus.windows.net/;SharedAccessKeyName..."
-e PLUMBER_RELAY_AZURE_TOPIC_NAME=neworders \
-e PLUMBER_RELAY_AZURE_SUBSCRIPTION=some-sub \
-e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
batchcorp/plumber plumber relay azure-service-bus
docker run -d --name plumber-redis-pubsub -p 8080:8080 \
-e PLUMBER_RELAY_REDIS_PUBSUB_ADDRESS=localhost:6379 \
-e PLUMBER_RELAY_REDIS_PUBSUB_CHANNELS=channel1,channel2 \
-e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
batchcorp/plumber plumber relay redis-pubsub
docker run -d --name plumber-redis-streams -p 8080:8080 \
-e PLUMBER_RELAY_REDIS_STREAMS_ADDRESS=localhost:6379 \
-e PLUMBER_RELAY_REDIS_STREAMS_STREAMS=stream1,stream2 \
-e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
batchcorp/plumber plumber relay redis-streams
docker run -d --name plumber-kafka -p 8080:8080 \
-e PLUMBER_RELAY_TOKEN="$YOUR-STREAMDAL-TOKEN-HERE"
-e PLUMBER_RELAY_KAFKA_ADDRESS="pkc-4kgmg.us-west-2.aws.confluent.cloud:9092,pkc-5kgmg.us-west-2.aws.confluent.cloud:9092"
-e PLUMBER_RELAY_KAFKA_TOPIC="$YOUR_TOPIC"
-e PLUMBER_RELAY_KAFKA_INSECURE_TLS="true"
-e PLUMBER_RELAY_KAFKA_USERNAME="$YOUR_CONFLUENT_API_KEY"
-e PLUMBER_RELAY_KAFKA_PASSWORD="$YOUR_CONFLUENT_API_SECRET"
-e PLUMBER_RELAY_KAFKA_SASL_TYPE="plain"
batchcorp/plumber plumber relay kafka
docker run -d --name plumber-mqtt -p 8080:8080 \
-e PLUMBER_RELAY_MQTT_ADDRESS=tcp://localhost:1883 \
-e PLUMBER_RELAY_MQTT_TOPIC=iotdata \
-e PLUMBER_RELAY_MQTT_QOS=1 \
-e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
batchcorp/plumber plumber relay mqtt
docker run -d --name plumber-natsjs -p 8080:8080 \
-e PLUMBER_RELAY_NATS_JETSTREAM_DSN=nats://localhost:4222 \
-e PLUMBER_RELAY_NATS_JETSTREAM_CLIENT_ID=plumber \
-e PLUMBER_RELAY_NATS_JETSTREAM_STREAM=orders \
-e PLUMBER_RELAY_TOKEN=$YOUR-STREAMDAL-TOKEN-HERE \
batchcorp/plumber plumber relay mqtt
See documentation at https://docs.streamdal.com/event-ingestion/change-data-capture/postgresql for instructions on setting up PostgreSQL CDC.
docker run -d --name plumber-cdc-mongo -p 8080:8080 \
-e PLUMBER_RELAY_CDCMONGO_DSN=mongodb://mongo.mysite.com:27017 \
-e PLUMBER_RELAY_CDCMONGO_DATABASE=mydb \
-e PLUMBER_RELAY_CDCMONGO_COLLECTION=customers \
-e PLUMBER_RELAY_TOKEN=YOUR_STREAMDAL_TOKEN_HERE \
batchcorp/plumber plumber relay cdc-mongo
For more advanced mongo usage, see documentation at https://docs.streamdal.com/event-ingestion/change-data-capture/mongodb
Protobuf is supported for both encode and decode for all backends. There are three flags that must be specified for protobuf:
--encode-type
for writes or--decode-type
for reads--protobuf-dirs
pointing to a directory that contains your.proto
files--protobuf-root-message
which indicates what type plumber should attempt to encode/decode output/input
NOTE: --protobuf-root-message
must specify the FULL path to the type. Ie.
events.MyType
(MyType
is not enough!).
$ plumber read rabbit --address="amqp://localhost" --exchange events --binding-key \# \
--decode-type protobuf \
--protobuf-dirs ~/schemas \
--protobuf-root-message pkg.Message \
--continuous
1: {"some-attribute": 123, "numbers" : [1, 2, 3]}
2: {"some-attribute": 424, "numbers" : [325]}
3: {"some-attribute": 49, "numbers" : [958, 288, 289, 290]}
4: ERROR: Cannot decode message as protobuf "Message"
5: {"some-attribute": 394, "numbers" : [4, 5, 6, 7, 8]}
^C
NOTE: "jsonpb" is just a JSON representation of your protobuf event. When you
use it as the --input-type
, plumber
will read the JSON blob and attempt
to decode it into your specified root message, followed by writing the []byte
slice to the message bus.
$ plumber write rabbit --exchange events --routing-key foo.bar \
--protobuf-dirs ~/schemas \
--protobuf-root-message pkg.Message \
--input-file ~/fakes/some-jsonpb-file.json \
--encode-type jsonpb
Plumber supports "shallow envelope" protobuf messages consisting of one type of protobuf message used to decode
the message itself, and another type of message used to decode the protobuf contents of a payload field inside the envelope. The payload field must be of bytes
type.
To read/write shallow envelope messages with plumber, you will need to specify the following additional flags:
--protobuf-envelope-type shallow
- To indicate that the message is a shallow envelope--shallow-envelope-field-number
- Protobuf field number of the envelope's field which contains the encoded protobuf data--shallow-envelope-message
- The protobuf message name used to encode the data in the field
Example protobuf we will read/write with:
syntax = "proto3";
package shallow;
// Represents a shallow envelope
message Envelope {
string id = 1;
bytes data = 2;
}
// Message is what goes into Envelope's Data field
message Payload {
string name = 1;
}
plumber write kafka --topics testing \
--protobuf-dirs test-assets/shallow-envelope/ \
--protobuf-root-message shallow.Envelope \
--input-file test-assets/shallow-envelope/example-payload.json \
--protobuf-envelope-type shallow \
--shallow-envelope-message shallow.Payload \
--shallow-envelope-field-number=2 \
--encode-type jsonpb
plumber read kafka --topics testing \
--protobuf-dirs test-assets/shallow-envelope/ \
--protobuf-root-message shallow.Envelope \
--protobuf-envelope-type shallow \
--shallow-envelope-message shallow.Payload \
--shallow-envelope-field-number=2 \
--decode-type protobuf
Plumber supports using protobuf file descriptor set files for decoding and encoding protobuf messages, instead of
using a directory of .proto
files. This method is more reliable than using --protobuf-dirs
flag as it ensures
that there won't be any include path issues.
For help with generating an .fds
file from your .proto
files, see https://docs.streamdal.com/platform/components/what-are-schemas#protocol-buffers
plumber write kafka --topics fdstest1 \
--protobuf-descriptor-set test-assets/protobuf-any/sample/protos.fds \
--protobuf-root-message sample.Envelope \
--encode-type jsonpb \
--input-file test-assets/protobuf-any/payload.json
plumber read kafka --topics fdstest1 \
--protobuf-descriptor-set test-assets/protobuf-any/sample/protos.fds \
--protobuf-root-message sample.Envelope \
--decode-type protobuf
$ plumber write kafka --topics=orders --avro-schema-file=some_schema.avsc --input-file=your_data.json
$ plumber read kafka --topics=orders --avro-schema-file=some_schema.avsc
NOTE: CloudEvents are currently only supported for: Kafka, NATS, NATS Streaming, and NATS JetStream
Plumber supports emitting CloudEvent messages.
By default, if the contents of --input
or --input-file
represents a valid cloudevent in JSON format, the data
will be unmarshaled into a cloud event. Any --ce-*
flags specified will override their respective values in the event
before the event is published.
If the value of --input
or --input-file
is not a valid cloudevent in JSON format, a new cloudevent will be created
and the input will be set as the data field's value. Other fields will be set using the values supplied via --ce-*
flags.
Example Kafka publish:
plumber write kafka --encode-type cloudevent --topics myevents --input-file test-assets/cloudevents/example.json