Skip to content

Latest commit

 

History

History
495 lines (367 loc) · 15.9 KB

EXAMPLES.md

File metadata and controls

495 lines (367 loc) · 15.9 KB

Plumber Usage Examples

Consuming

AWS SQS

Read X number of messages

plumber read aws-sqs --queue-name=orders --max-num-messages=10

Read messages and delete them afterwards

plumber read aws-sqs --queue-name=orders --max-num-messages=10 --auto-delete

Continuously read messages

plumber read aws-sqs --queue-name=orders --follow

Poll for new messages for X seconds

plumber read aws-sqs --queue-name=orders --wait-time-seconds=20
RabbitMQ
plumber read rabbit
    --address="amqp://localhost:5672" \
    --exchange=testex \
    --queue=testqueue \
    --routing-key="orders.#"
    --follow
Kafka

Read a single message

plumber read kafka --topic orders --address="broker1.domain.com:9092"

You may specify multiple brokers by specifying the --address flag multiple times

plumber read kafka --topic orders \
    --address="broker1.domain.com:9092" \
    --address="broker2.domain.com:9092" \
    --address="broker3.domain.com:9092" \
    --follow

Continuously read messages

plumber read kafka --topic orders --address="broker1.domain.com:9092" --follow
Azure Service Bus

Reading from a topic

export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."

plumber read azure --topic="new-orders" --subscription="copy-of-new-orders"

Reading from a queue

export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."

plumber read azure --queue "new-orders"
Azure Event Hub

Read first available message from any partition

export EVENTHUB_CONNECTION_STRING="Endpoint=sb://plumbertest.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=....=;EntityPath=...""

plumber read azure-eventhub
NATS
plumber read nats --address="nats://user:[email protected]:4222" --subject "test-subject"
NATS Streaming
plumber read nats-streaming --address="nats://user:[email protected]:4222" --channel "orders" --cluster-id "test-cluster" --client-id "plumber"
Redis PubSub
plumber read redis-pubsub --address="localhost:6379" --channels="new-orders"
Redis Streams
plumber read redis-streams --address="localhost:6379" --streams="new-orders"
GCP Pub/Sub
plumber read gcp-pubsub --project-id=PROJECT_ID --sub-id=SUBSCRIPTION
MQTT
plumber read mqtt --address tcp://localhost:1883 --topic iotdata -qos 1

Apache Pulsar

plumber read pulsar --topic NEWORDERS --name plumber

NSQ

plumber read nsq --lookupd-address localhost:4161 --topic orders --channel neworders 

Publishing

AWS SQS
plumber write aws-sqs --queue-name=NewOrders --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
RabbitMQ
plumber write rabbit --address="aqmp://rabbit.yourdomain.net:5672" --exchange=NewOrders --routing-key="orders.oregon.coffee" --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
Kafka
plumber write kafka --address="localhost:9092" --topic=neworders --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"

You may specify multiple brokers by specifying the --address flag multiple times.

To read from more than one topic, you may specify multiple --topic flags.

plumber write kafka --topic neworders \
    --address "broker1.domain.com:9092" \
    --address "broker2.domain.com:9092" \
    --address "broker3.domain.com:9092" \
    --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
AWS SNS
plumber write aws-sns --topic="arn:aws:sns:us-east-2:123456789012:MyTopic" --input-data="A new is ready for processing!"
Azure Service Bus

Publishing to a topic

export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."

plumber write azure --topic="new-orders" --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"

Publishing to a queue

export SERVICEBUS_CONNECTION_STRING="Endpoint=sb://plumbertopictest.servicebus.windows.net/;SharedAccessKeyName=...."

plumber write azure --queue="new-orders" --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
Azure Event Hub

Publish to random partition

export EVENTHUB_CONNECTION_STRING="Endpoint=sb://plumbertest.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=....=;EntityPath=...""

plumber write azure-eventhub --input-data "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}" --message-id "neworder123"

Publish to specific partition key

export EVENTHUB_CONNECTION_STRING="Endpoint=sb://plumbertest.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=....=;EntityPath=...""

plumber write azure-eventhub --input-data "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}" --message-id "neworder123" --partition-key "neworders"
NATS
plumber write nats --address="nats://user:[email protected]:4222" --subject "test-subject" --input-data "Hello World"
NATS Streaming
plumber write nats-streaming --address="nats://user:[email protected]:4222" --channel "orders" --cluster-id "test-cluster" --client-id "plumber-producer" --input-data "{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
Redis PubSub
plumber write redis-pubsub --address="localhost:6379" --channels="new-orders" --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
Redis Streams
plumber write redis-streams --address="localhost:6379" --streams="new-orders" --key foo --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
GCP Pub/Sub
plumber write gcp-pubsub --topic-id=TOPIC --project-id=PROJECT_ID --input-data='{"Sensor":"Room J","Temp":19}' 
MQTT
plumber write mqtt --address tcp://localhost:1883 --topic iotdata -qos 1 --input-data "{\"id\": 123, \"temperature\": 15}"
Apache Pulsar
plumber write pulsar --topic NEWORDERS --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
NSQ
plumger write nsq --nsqd-address localhost:4050 --topic orders --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"

Relay Mode

Continuously relay messages from your RabbitMQ instance to a Batch.sh collection
$ docker run --name plumber-rabbit -p 8080:8080 \
    -e PLUMBER_RELAY_TYPE=rabbit \
    -e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
    -e PLUMBER_RELAY_RABBIT_EXCHANGE=my_exchange \
    -e PLUMBER_RELAY_RABBIT_QUEUE=my_queue \
    -e PLUMBER_RELAY_RABBIT_ROUTING_KEY=some.routing.key \
    -e PLUMBER_RELAY_RABBIT_QUEUE_EXCLUSIVE=false \
    -e PLUMBER_RELAY_RABBIT_QUEUE_DURABLE=true \
    batchcorp/plumber
Continuously relay messages from an SQS queue to a Batch.sh collection
docker run -d --name plumber-sqs -p 8080:8080 \
    -e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
    -e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
    -e PLUMBER_RELAY_SQS_QUEUE_NAME=TestQueue \
    -e PLUMBER_RELAY_TYPE=aws-sqs \
    -e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
    batchcorp/plumber 
Continuously relay messages from an Azure queue to a Batch.sh collection
docker run -d --name plumber-azure -p 8080:8080 \
    -e SERVICEBUS_CONNECTION_STRING="Endpoint=sb://mybus.servicebus.windows.net/;SharedAccessKeyName..."
    -e PLUMBER_RELAY_AZURE_QUEUE_NAME=neworders \
    -e PLUMBER_RELAY_TYPE=azure \
    -e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
    batchcorp/plumber 
Continuously relay messages from an Azure topic to a Batch.sh collection
docker run -d --name plumber-azure -p 8080:8080 \
    -e SERVICEBUS_CONNECTION_STRING="Endpoint=sb://mybus.servicebus.windows.net/;SharedAccessKeyName..."
    -e PLUMBER_RELAY_AZURE_TOPIC_NAME=neworders \
    -e PLUMBER_RELAY_AZURE_SUBSCRIPTION=some-sub \
    -e PLUMBER_RELAY_TYPE=azure \
    -e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
    batchcorp/plumber 
Continuously relay messages from multiple Redis channels to a Batch.sh collection
docker run -d --name plumber-redis-pubsub -p 8080:8080 \
    -e PLUMBER_RELAY_REDIS_PUBSUB_ADDRESS=localhost:6379 \
    -e PLUMBER_RELAY_REDIS_PUBSUB_CHANNELS=channel1,channel2 \
    -e PLUMBER_RELAY_TYPE=redis-pubsub \
    -e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
    batchcorp/plumber 
Continuously relay messages from multiple Redis streams to a Batch.sh collection
docker run -d --name plumber-redis-streams -p 8080:8080 \
    -e PLUMBER_RELAY_REDIS_STREAMS_ADDRESS=localhost:6379 \
    -e PLUMBER_RELAY_REDIS_STREAMS_STREAMS=stream1,stream2 \
    -e PLUMBER_RELAY_TYPE=redis-streams \
    -e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
    batchcorp/plumber 
Continuously relay messages from a Kafka topic (on Confluent) to a Batch.sh collection (via CLI)
export PLUMBER_RELAY_TYPE="kafka"
export PLUMBER_RELAY_TOKEN="$YOUR-BATCHSH-TOKEN-HERE"
export PLUMBER_RELAY_KAFKA_ADDRESS="pkc-4kgmg.us-west-2.aws.confluent.cloud:9092,pkc-5kgmg.us-west-2.aws.confluent.cloud:9092"
export PLUMBER_RELAY_KAFKA_TOPIC="$YOUR_TOPIC"
export PLUMBER_RELAY_KAFKA_INSECURE_TLS="true"
export PLUMBER_RELAY_KAFKA_USERNAME="$YOUR_CONFLUENT_API_KEY"
export PLUMBER_RELAY_KAFKA_PASSWORD="$YOUR_CONFLUENT_API_SECRET"
export PLUMBER_RELAY_KAFKA_SASL_TYPE="plain"

$ plumber relay
Continuously relay messages from a MQTT topic to a Batch.sh collection
docker run -d --name plumber-mqtt -p 8080:8080 \
    -e PLUMBER_RELAY_MQTT_ADDRESS=tcp://localhost:1883 \
    -e PLUMBER_RELAY_MQTT_TOPIC=iotdata \
    -e PLUMBER_RELAY_MQTT_QOS=1 \
    -e PLUMBER_RELAY_TYPE=mqtt \
    -e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
    batchcorp/plumber 

Change Data Capture

Continuously relay Postgres change events to a Batch.sh collection

See documentation at https://docs.batch.sh/event-ingestion/change-data-capture/postgresql for instructions on setting up PostgreSQL CDC

Continuously relay Monbodb change stream events to a Batch.sh collection
docker run -d --name plumber-cdc-mongo -p 8080:8080 \
    -e PLUMBER_RELAY_CDCMONGO_DSN=mongodb://mongo.mysite.com:27017 \
    -e PLUMBER_RELAY_TYPE=cdc-mongo \
    -e PLUMBER_RELAY_CDCMONGO_DATABASE=mydb \
    -e PLUMBER_RELAY_CDCMONGO_COLLECTION=customers \
    -e PLUMBER_RELAY_TOKEN=YOUR_BATCHSH_TOKEN_HERE \
    batchcorp/plumber 

For more advanced mongo usage, see documentation at https://docs.batch.sh/event-ingestion/change-data-capture/mongodb

Advanced Usage

Decoding protobuf encoded messages and viewing them live
$ plumber read rabbit --address="amqp://localhost" --exchange events --routing-key \# \
  --protobuf-dir ~/schemas --protobuf-root-message pkg.Message --follow
1: {"some-attribute": 123, "numbers" : [1, 2, 3]}
2: {"some-attribute": 424, "numbers" : [325]}
3: {"some-attribute": 49, "numbers" : [958, 288, 289, 290]}
4: ERROR: Cannot decode message as protobuf "Message"
5: {"some-attribute": 394, "numbers" : [4, 5, 6, 7, 8]}
^C

Writing protobuf messages with source jsonpb

NOTE: "jsonpb" is just a JSON representation of your protobuf event. When you use it as the --input-type, plumber will read the JSON blob and attempt to decode it into your specified root message, followed by writing the []byte slice to the message bus.

$ plumber write rabbit --exchange events --routing-key foo.bar  \
  --protobuf-dir ~/schemas --protobuf-root-message pkg.Message \
  --input-file ~/fakes/some-jsonpb-file.json --input-type jsonpb
Using Avro schemas when reading or writing
$ plumber write kafka --topic=orders --avro-schema=some_schema.avsc --input-file=your_data.json
$ plumber read kafka --topic=orders --avro-schema=some_schema.avsc
If your schemas are located in multiple places, you can specify `--protobuf-dir` multiple times. Treat it the same as you would `protoc -I`.
Prometheus Metrics

In relay mode, plumber will launch a http server exposing Prometheus metrics running at http://localhost:8080/metrics

Prometheus metrics can be pulled from Plumber by adding a new source to your prometheus.yml config

scrape_configs:
- job_name: plumber
  scrape_interval: 5s
  static_configs:
  - targets:
    - your-hostname:8080

You may modify the listen address/port using the PLUMBER_RELAY_HTTP_LISTEN_ADDRESS environment variable or the --listen-address flag.

The following metrics are available in addition to all golang metrics

Metric Type Description
plumber_relay_rate gauge Current rare of messages being relayed to Batch.sh (5 second interval)
plumber_relay_total counter Total number of events relayed to Batch.sh
plumber_read_errors counter Number of errors when reading messages
plumber_grpc_errors counter Number of errors when making GRPC calls