Simple Apache Kafka client capable of consume from and produce to a topic with support of protobuf messages.
To consume from a topic just indicate the consume
command, the bootstrap servers and the topic. This will consume all partitions from that topic and print the messages value to the console.
$ kafka-client consume broker1:9092,broker2:9092,broker3:9092 Topic
To stop consuming, just press Ctrl-C.
If you want to store the messages to a file, use the --output
flag to indicate the output file. If just the --output
flag is used, the file will contain the raw bytes of the messages key and value. This is equivalent to using the --raw
flag. This is useful if you want to save some messages and then produce the saved messages to a different (or the same) topic.
$ kafka-client consume broker1:9092,broker2:9092,broker3:9092 Topic --output messages.bin
If you want to save the messages value as text (human readable) to the file use the --text
flag in conjunction with the --output
.
$ kafka-client consume broker1:9092,broker2:9092,broker3:9092 Topic --output messages.json --text
To see all the supported flags of the consume´ command use use the
help consume` command:
$ kafka-client help consume
consume command uses bootstrap_servers to get the brokers of the Kafka cluster
and consume messages from the indicated topic printing them to stdout unless a
filename is provided by the --output flag.
Usage:
kafka-client consume bootstrap_servers topic [flags]
Examples:
kafka-client consume localhost:9092 my_topic
Flags:
-h, --help help for consume
--import-path strings directory from which proto sources can be imported. (default [.])
-o, --output string write to file instead of stdout.
--proto string write the message as JSON using the given protobuf message type.
--proto-file strings the name of a proto source file. Imports will be resolved using the given --import-path flags. Multiple proto files can be specified by specifying multiple --proto-file flags. (default [*.proto])
-r, --raw write the message as raw bytes (default true if an output file is given).
-t, --text write the message as text (default true if no output file is given).
Global Flags:
-c, --client-id string client ID to sent to Kafka (default "kafka-client")
--config string config file (default is $HOME/.kafka-client.yaml)
-d, --duration duration time to wait before exiting
-q, --quiet enable quiet mode
To produce to a topic use the produce
command, the bootstrap servers and the topic. The client will listen for messages from the stdin and will produce a message to the topic for every line.
$ kafka-client produce broker1:9092,broker2:9092,broker3:9092 Topic
To stop producing, just press Ctrl-C.
If you want to read the messages from a file, use the --input
flag to indicate the input file. If just the --input
flag is used, the client expects the file to be in the same format as the generated by the --output
flag. This is equivalent to using the --raw
flag. This is useful if you want to produce some messages that were consumed from a topic using the --output
flag.
$ kafka-client produce broker1:9092,broker2:9092,broker3:9092 Topic --input messages.bin
If the file contains text messages, one message per line, use the --text
flag in conjunction with the --input
.
$ kafka-client produce broker1:9092,broker2:9092,broker3:9092 Topic --input messages.json --text
It is also possible to throttle the message production using the --period
flag to indicate the time to wait between messages.
$ kafka-client produce broker1:9092,broker2:9092,broker3:9092 Topic --input messages.bin --period 250ms
To see all the supported flags of the produce´ command use the
help produce` command:
$ kafka-client help produce
produce command uses bootstrap_servers to get the brokers of the Kafka cluster
and produces messages to the indicated topic reading them from stdin unless a
filename is provided by the --input flag.
Usage:
kafka-client produce bootstrap_servers topic [flags]
Examples:
kafka-client produce localhost:9092 my_topic
Flags:
-h, --help help for produce
--import-path strings directory from which proto sources can be imported. (default [.])
-i, --input string read from file instead of stdin.
-p, --period duration time to wait between producing two messages.
--proto string read the message as JSON using the given protobuf message type.
--proto-file strings the name of a proto source file. Imports will be resolved using the given --import-path flags. Multiple proto files can be specified by specifying multiple --proto-file flags. (default [*.proto])
-r, --raw read the message as raw bytes (default true if an input file is given).
-t, --text read the message as text (default true if no input file is given).
Global Flags:
-c, --client-id string client ID to sent to Kafka (default "kafka-client")
--config string config file (default is $HOME/.kafka-client.yaml)
-d, --duration duration time to wait before exiting
-q, --quiet enable quiet mode
This client also allows to bridge to topics, consume messages from one topic and send them to another, using the bridge
command.
$ kafka-client bridge broker1:9092,broker2:9092,broker3:9092 topic1 broker4:9092,broker5:9092,broker6:9092 topic2
The production of messages can also be throtteled with the --period
flag.
$ kafka-client bridge broker1:9092,broker2:9092,broker3:9092 topic1 broker4:9092,broker5:9092,broker6:9092 topic2 --period 250ms
To see all the supported flags of the consume´ command use use the
help bridge` command:
$ kafka-client help bridge
bridge command consumes messages from from_topic, using from_bootstrap_servers
to get the brokers of the source Kafka cluster, and produces those messages to
to_topic, using to_bootstrap_servers to get the brokers of the destination
Kafka cluster.
Usage:
kafka-client bridge source_bootstrap_servers source_topic destination_bootstrap_servers destination_topic [flags]
Examples:
kafka-client bridge localhost:9092 from_topic localhost:9092 to_topic
Flags:
-h, --help help for bridge
-p, --period duration time to wait between producing two messages.
Global Flags:
-c, --client-id string client ID to sent to Kafka (default "kafka-client")
--config string config file (default is $HOME/.kafka-client.yaml)
-d, --duration duration time to wait before exiting
-q, --quiet enable quiet mode
The consume
and produce
commands support decoding/encoding messages using protobuf.
To use the protobuf support you must:
- Use the
--import-path
flag to indicate the paths where the protobuf files are located. - Use the
--proto-file
flag to indicate the protobuf file to compile. - Use the
--proto
flag with the fully qualified name of the message to use to decode/encode the message.
Example:
$ kafka-client consume broker1:9092 topic --import-path protos/ --proto-file mymessages.proto --proto=mymessages.MyMessage
When using the --proto
flag, the protobuf messages will be written or read using the JSON representation.
The kafka-client
has support for a configuration file where you can configure Kafka clusters and also the default value for some of the flags. By default, the command expects the configuration file to be at $HOME/.kafka-client.yaml
but the configuration file can be customized with the --config
global flag.
The name of the clusters can be used anywhere a broker list is expected.
Example of configuration file:
client-id: my-client-id
import-path:
- /home/me/myprotos/
proto-file:
- mymessages.proto
- myothermessages.proto
clusters:
cluster1: broker1:9092,broker2:9092,broker3:9092
cluster2: broker4:9092,broker5:9092,broker6:9092
This is an example of using a cluster name from the configuration to consume from a topic:
$ kafka-client consume cluster1 Topic --proto=mymessages.MyMessage
Note that even we are specifying the protobuf message type mymessages.MyMessage
we are not specifying either the import-path
nor the proto-file
and that is because these exist in the configuration file.
Environment variables can also be used as source of configuration. The environment variables are prefixed with KAFKA_CLIENT
, all uppercase and replacing dots and hyphens with underscore. For example, the environment variable to configure the client-id
is KAFKA_CLIENT_CLIENT_ID
.
The following flags are configurable via the configuration file or environment variables: client-id
, duration
, import-path
, period
, proto-file
and quiet
. And the clusters
key is used to configure kafka clusters.
Finally, kafka-client
has a wonderful autocomplete functionality. It will use the clusters configured in the configuration file when autocompleting the brokers and will connect to the brokers and retrieve the existing topics when autocompleting the topic.
To see the supported shells execute kafka-client help completion
and for instructions on setting up the autocomplete for a particular shell execute
kafka-client help completion <shell>
.
For example:
$ kafka-client help completion bash
Generate the autocompletion script for the bash shell.
This script depends on the 'bash-completion' package.
If it is not installed already, you can install it via your OS's package manager.
To load completions in your current shell session:
source <(kafka-client completion bash)
To load completions for every new session, execute once:
#### Linux:
kafka-client completion bash > /etc/bash_completion.d/kafka-client
#### macOS:
kafka-client completion bash > $(brew --prefix)/etc/bash_completion.d/kafka-client
You will need to start a new shell for this setup to take effect.
Usage:
kafka-client completion bash
Flags:
-h, --help help for bash
--no-descriptions disable completion descriptions
Global Flags:
-c, --client-id string client ID to sent to Kafka (default "kafka-client")
--config string config file (default is $HOME/.kafka-client.yaml)
-d, --duration duration time to wait before exiting
-q, --quiet enable quiet mode