diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 4db2f320b..bf1dff6ee 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -43,7 +43,7 @@ akka.kafka.consumer { # The stage will delay stopping the internal actor to allow processing of # messages already in the stream (required for successful committing). - # Prefer use of `DrainingControl` over a large stop-timeout. + # This can be set to 0 for streams using `DrainingControl`. stop-timeout = 30s # Duration to wait for `KafkaConsumer.close` to finish. @@ -58,20 +58,11 @@ akka.kafka.consumer { # If commits take longer than this time a warning is logged commit-time-warning = 1s - # Not used anymore (since 1.0-RC1) - # wakeup-timeout = 3s - - # Not used anymore (since 1.0-RC1) - # max-wakeups = 10 - # Not relevant for Kafka after version 2.1.0. # If set to a finite duration, the consumer will re-send the last committed offsets periodically # for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682. commit-refresh-interval = infinite - # Not used anymore (since 1.0-RC1) - # wakeup-debug = true - # Fully qualified config path which holds the dispatcher configuration # to be used by the KafkaConsumerActor. Some blocking may occur. use-dispatcher = "akka.kafka.default-dispatcher" diff --git a/docs/src/main/paradox/consumer.md b/docs/src/main/paradox/consumer.md index 89ad47677..6d3f3c374 100644 --- a/docs/src/main/paradox/consumer.md +++ b/docs/src/main/paradox/consumer.md @@ -39,30 +39,53 @@ These factory methods are part of the @scala[@scaladoc[Transactional API](akka.k ## Settings -When creating a consumer stream you need to pass in `ConsumerSettings` (@scaladoc[API](akka.kafka.ConsumerSettings)) that define things like: +When creating a consumer source you need to pass in `ConsumerSettings` (@scaladoc[API](akka.kafka.ConsumerSettings)) that define things like: * de-serializers for the keys and values * bootstrap servers of the Kafka cluster * group id for the consumer, note that offsets are always committed for a given consumer group * Kafka consumer tuning parameters +Alpakka Kafka's defaults for all settings are defined in `reference.conf` which is included in the library JAR. + +Important consumer settings +: | Setting | Description | +|-------------|----------------------------------------------| +| stop-timeout | The stage will delay stopping the internal actor to allow processing of messages already in the stream (required for successful committing). This can be set to 0 for streams using `DrainingControl`. | +| kafka-clients | Section for properties passed unchanged to the Kafka client (see @link:[Kafka's Consumer Configs](http://kafka.apache.org/documentation/#consumerconfigs) { open=new } ) | +| connection-checker | Configuration to let the stream fail if the connection to the Kafka broker fails. | + +reference.conf (HOCON) +: @@ snip [snip](/core/src/main/resources/reference.conf) { #consumer-settings } + +The Kafka documentation [Consumer Configs](http://kafka.apache.org/documentation/#consumerconfigs) lists the settings, their defaults and importance. More detailed explanations are given in the @javadoc[KafkaConsumer API](org.apache.kafka.clients.consumer.KafkaConsumer) and constants are defined in @javadoc[ConsumerConfig API](org.apache.kafka.clients.consumer.ConsumerConfig). + + +### Programmatic construction + +Stream-specific settings like the de-serializers and consumer group ID should be set programmatically. Settings that apply to many consumers may be set in `application.conf` or use @ref:[config inheritance](#config-inheritance). + Scala : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #settings } Java : @@ snip [snip](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #settings } -In addition to programmatic construction of the `ConsumerSettings` (@scaladoc[API](akka.kafka.ConsumerSettings)) it can also be created from configuration (`application.conf`). -When creating `ConsumerSettings` with the `ActorSystem` (@scaladoc[API](akka.actor.ActorSystem)) settings it uses the config section `akka.kafka.consumer`. The format of these settings files are described in the [Typesafe Config Documentation](https://github.com/lightbend/config#using-hocon-the-json-superset). +### Config inheritance +`ConsumerSettings` (@scaladoc[API](akka.kafka.ConsumerSettings)) are created from configuration in `application.conf` (with defaults in `reference.conf`). The format of these settings files are described in the [HOCON Config Documentation](https://github.com/lightbend/config#using-hocon-the-json-superset). A recommended setup is to rely on config inheritance as below: -@@ snip [snip](/core/src/main/resources/reference.conf) { #consumer-settings } +application.conf (HOCON) +: @@ snip [app.conf](/tests/src/test/resources/application.conf) { #consumer-config-inheritance } -`ConsumerSettings` (@scaladoc[API](akka.kafka.ConsumerSettings)) can also be created from any other `Config` section with the same layout as above. +Read the settings that inherit the defaults from "akka.kafka.consumer" settings: -The Kafka documentation [Consumer Configs](http://kafka.apache.org/documentation/#consumerconfigs) lists the settings, their defaults and importance. More detailed explanations are given in the @javadoc[KafkaConsumer API](org.apache.kafka.clients.consumer.KafkaConsumer) and constants are defined in @javadoc[ConsumerConfig API](org.apache.kafka.clients.consumer.ConsumerConfig). +Scala +: @@ snip [read](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #config-inheritance } +Java +: @@ snip [read](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #config-inheritance } ## Offset Storage external to Kafka @@ -133,16 +156,15 @@ When creating a `Committer.sink` you need to pass in `CommitterSettings` (@scala Table : | Setting | Description | Default Value | -|-------------|----------------------------------------------|-----| +|-------------|----------------------------------------------|---------------| | maxBatch | maximum number of messages to commit at once | 1000 | | maxInterval | maximum interval between commits | 10 seconds | -| parallelism | parallelsim for async committing | 1 | +| parallelism | maximum number of commit batches in flight | 100 | reference.conf : @@snip [snip](/core/src/main/resources/reference.conf) { #committer-settings } - -The bigger the values are, the less load you put on Kafka and the smaller are chances that committing offsets will become a bottleneck. However, increasing these values also means that in case of a failure you will have to re-process more messages. +All commit batches are aggregated internally and passed on to Kafka very often (in every poll cycle), the Committer settings configure how the stream sends the offsets to the internal actor which communicates with the Kafka broker. Increasing these values means that in case of a failure you may have to re-process more messages. If you use Kafka older than version 2.1.0 and consume from a topic with low activity, and possibly no messages arrive for more than 24 hours, consider enabling periodical commit refresh (`akka.kafka.consumer.commit-refresh-interval` configuration parameters), otherwise offsets might expire in the Kafka storage. This has been fixed in Kafka 2.1.0 (See [KAFKA-4682](https://issues.apache.org/jira/browse/KAFKA-4682)). @@ -163,7 +185,7 @@ These factory methods are part of the @scala[@scaladoc[Committer API](akka.kafka The `Consumer.commitWithMetadataSource` allows you to add metadata to the committed offset based on the last consumed record. -Note that the first offset provided to the consumer during a partition assignment will not contain metadata. This offset can get committed due to a periodic commit refresh (`akka.kafka.consumer.commit-refresh-interval` configuration parmeters) and the commit will not contain metadata. +Note that the first offset provided to the consumer during a partition assignment will not contain metadata. This offset can get committed due to a periodic commit refresh (`akka.kafka.consumer.commit-refresh-interval` configuration parameters) and the commit will not contain metadata. Scala : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #commitWithMetadata } @@ -171,7 +193,10 @@ Scala Java : @@ snip [snip](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #commitWithMetadata } -If you commit the offset before processing the message you get "at-most-once" delivery semantics, this is provided by `Consumer.atMostOnceSource`. However, `atMostOnceSource` **commits the offset for each message and that is rather slow**, batching of commits is recommended. + +## Consume "at-most-once" + +If you commit the offset before processing the message you get "at-most-once" delivery semantics, this is provided by `Consumer.atMostOnceSource`. However, `atMostOnceSource` **commits the offset for each message and that is rather slow**, batching of commits is recommended. If your "at-most-once" requirements are more relaxed, consider a `Consumer.plainSource` and enable Kafka's auto committing with `enable.auto.commit = true`. Scala : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #atMostOnce } @@ -179,7 +204,10 @@ Scala Java : @@ snip [snip](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #atMostOnce } -Maintaining at-least-once delivery semantics requires care, many risks and solutions are covered in @ref:[At-Least-Once Delivery](atleastonce.md). + +## Consume "at-least-once" + +How to achieve at-least-once delivery semantics is covered in @ref:[At-Least-Once Delivery](atleastonce.md). ## Connecting Producer and Consumer @@ -188,22 +216,12 @@ For cases when you need to read messages from one topic, transform or enrich the The `committableSink` accepts implementations `ProducerMessage.Envelope` (@scaladoc[API](akka.kafka.ProducerMessage$$Envelope)) that contain the offset to commit the consumption of the originating message (of type `ConsumerMessage.Committable` (@scaladoc[API](akka.kafka.ConsumerMessage$$Committable))). See @ref[Producing messages](producer.md#producing-messages) about different implementations of `Envelope` supported. -Note that there is a risk that something fails after publishing but before committing, so `committableSink` has "at-least-once" delivery semantics. - Scala : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #consumerToProducerSink } Java : @@ snip [snip](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #consumerToProducerSink } -As `Producer.committableSink`'s committing of messages one-by-one is rather slow, prefer a flow together with batching of commits with `Committer.sink`. - -Scala -: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #consumerToProducerFlow } - -Java -: @@ snip [snip](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #consumerToProducerFlow } - @@@note There is a risk that something fails after publishing, but before committing, so `committableSink` has "at-least-once" delivery semantics. @@ -213,6 +231,15 @@ To get delivery guarantees, please read about @ref[transactions](transactions.md @@@ +As `Producer.committableSink`'s committing of messages one-by-one is rather slow, prefer a flow together with batching of commits with `Committer.sink`. + +Scala +: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #consumerToProducerFlow } + +Java +: @@ snip [snip](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #consumerToProducerFlow } + + ## Source per partition `Consumer.plainPartitionedSource` diff --git a/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java b/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java index 970ff1f71..f15da6384 100644 --- a/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java +++ b/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java @@ -143,6 +143,16 @@ public CompletionStage storeProcessedOffset(long offset) { // ... } } // #plainSource + @Test + void configInheritance() throws Exception { + // #config-inheritance + Config config = system.settings().config().getConfig("our-kafka-consumer"); + ConsumerSettings consumerSettings = + ConsumerSettings.create(config, new StringDeserializer(), new StringDeserializer()); + // #config-inheritance + assertEquals("kafka-host:9092", consumerSettings.getProperty("bootstrap.servers")); + } + @Test void atMostOnce() throws Exception { ConsumerSettings consumerSettings = diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf index 16fa27481..bb5b16605 100644 --- a/tests/src/test/resources/application.conf +++ b/tests/src/test/resources/application.conf @@ -18,3 +18,10 @@ akka { } } +# #consumer-config-inheritance +our-kafka-consumer: ${akka.kafka.consumer} { + kafka-clients { + bootstrap.servers = "kafka-host:9092" + } +} +# #consumer-config-inheritance diff --git a/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala b/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala index 68382e5ad..858d73a27 100644 --- a/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala +++ b/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala @@ -110,6 +110,14 @@ class ConsumerExample extends DocsSpecBase with TestcontainersKafkaLike { consumerSettingsWithAutoCommit } + "ConsumerSettings" should "read from settings that inherit default" in { + // #config-inheritance + val config = system.settings.config.getConfig("our-kafka-consumer") + val consumerSettings = ConsumerSettings(config, new StringDeserializer, new StringDeserializer) + // #config-inheritance + consumerSettings.getProperty("bootstrap.servers") shouldBe "kafka-host:9092" + } + "Consume messages at-most-once" should "work" in assertAllStagesStopped { val consumerSettings = createSettings().withGroupId(createGroupId()) val topic = createTopic()