Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise Committing docs #881

Merged
merged 3 commits into from
Sep 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ akka.kafka.consumer {

# The stage will delay stopping the internal actor to allow processing of
# messages already in the stream (required for successful committing).
# Prefer use of `DrainingControl` over a large stop-timeout.
# This can be set to 0 for streams using `DrainingControl`.
stop-timeout = 30s

# Duration to wait for `KafkaConsumer.close` to finish.
Expand All @@ -58,20 +58,11 @@ akka.kafka.consumer {
# If commits take longer than this time a warning is logged
commit-time-warning = 1s

# Not used anymore (since 1.0-RC1)
# wakeup-timeout = 3s

# Not used anymore (since 1.0-RC1)
# max-wakeups = 10

# Not relevant for Kafka after version 2.1.0.
# If set to a finite duration, the consumer will re-send the last committed offsets periodically
# for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
commit-refresh-interval = infinite

# Not used anymore (since 1.0-RC1)
# wakeup-debug = true

# Fully qualified config path which holds the dispatcher configuration
# to be used by the KafkaConsumerActor. Some blocking may occur.
use-dispatcher = "akka.kafka.default-dispatcher"
Expand Down
73 changes: 50 additions & 23 deletions docs/src/main/paradox/consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,53 @@ These factory methods are part of the @scala[@scaladoc[Transactional API](akka.k

## Settings

When creating a consumer stream you need to pass in `ConsumerSettings` (@scaladoc[API](akka.kafka.ConsumerSettings)) that define things like:
When creating a consumer source you need to pass in `ConsumerSettings` (@scaladoc[API](akka.kafka.ConsumerSettings)) that define things like:

* de-serializers for the keys and values
* bootstrap servers of the Kafka cluster
* group id for the consumer, note that offsets are always committed for a given consumer group
* Kafka consumer tuning parameters

Alpakka Kafka's defaults for all settings are defined in `reference.conf` which is included in the library JAR.

Important consumer settings
: | Setting | Description |
|-------------|----------------------------------------------|
| stop-timeout | The stage will delay stopping the internal actor to allow processing of messages already in the stream (required for successful committing). This can be set to 0 for streams using `DrainingControl`. |
| kafka-clients | Section for properties passed unchanged to the Kafka client (see @link:[Kafka's Consumer Configs](http://kafka.apache.org/documentation/#consumerconfigs) { open=new } ) |
| connection-checker | Configuration to let the stream fail if the connection to the Kafka broker fails. |

reference.conf (HOCON)
: @@ snip [snip](/core/src/main/resources/reference.conf) { #consumer-settings }

The Kafka documentation [Consumer Configs](http://kafka.apache.org/documentation/#consumerconfigs) lists the settings, their defaults and importance. More detailed explanations are given in the @javadoc[KafkaConsumer API](org.apache.kafka.clients.consumer.KafkaConsumer) and constants are defined in @javadoc[ConsumerConfig API](org.apache.kafka.clients.consumer.ConsumerConfig).


### Programmatic construction

Stream-specific settings like the de-serializers and consumer group ID should be set programmatically. Settings that apply to many consumers may be set in `application.conf` or use @ref:[config inheritance](#config-inheritance).

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #settings }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #settings }

In addition to programmatic construction of the `ConsumerSettings` (@scaladoc[API](akka.kafka.ConsumerSettings)) it can also be created from configuration (`application.conf`).

When creating `ConsumerSettings` with the `ActorSystem` (@scaladoc[API](akka.actor.ActorSystem)) settings it uses the config section `akka.kafka.consumer`. The format of these settings files are described in the [Typesafe Config Documentation](https://github.com/lightbend/config#using-hocon-the-json-superset).
### Config inheritance

`ConsumerSettings` (@scaladoc[API](akka.kafka.ConsumerSettings)) are created from configuration in `application.conf` (with defaults in `reference.conf`). The format of these settings files are described in the [HOCON Config Documentation](https://github.com/lightbend/config#using-hocon-the-json-superset). A recommended setup is to rely on config inheritance as below:

@@ snip [snip](/core/src/main/resources/reference.conf) { #consumer-settings }
application.conf (HOCON)
: @@ snip [app.conf](/tests/src/test/resources/application.conf) { #consumer-config-inheritance }

`ConsumerSettings` (@scaladoc[API](akka.kafka.ConsumerSettings)) can also be created from any other `Config` section with the same layout as above.
Read the settings that inherit the defaults from "akka.kafka.consumer" settings:

The Kafka documentation [Consumer Configs](http://kafka.apache.org/documentation/#consumerconfigs) lists the settings, their defaults and importance. More detailed explanations are given in the @javadoc[KafkaConsumer API](org.apache.kafka.clients.consumer.KafkaConsumer) and constants are defined in @javadoc[ConsumerConfig API](org.apache.kafka.clients.consumer.ConsumerConfig).
Scala
: @@ snip [read](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #config-inheritance }

Java
: @@ snip [read](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #config-inheritance }

## Offset Storage external to Kafka

Expand Down Expand Up @@ -133,16 +156,15 @@ When creating a `Committer.sink` you need to pass in `CommitterSettings` (@scala

Table
: | Setting | Description | Default Value |
|-------------|----------------------------------------------|-----|
|-------------|----------------------------------------------|---------------|
| maxBatch | maximum number of messages to commit at once | 1000 |
| maxInterval | maximum interval between commits | 10 seconds |
| parallelism | parallelsim for async committing | 1 |
| parallelism | maximum number of commit batches in flight | 100 |

reference.conf
: @@snip [snip](/core/src/main/resources/reference.conf) { #committer-settings }


The bigger the values are, the less load you put on Kafka and the smaller are chances that committing offsets will become a bottleneck. However, increasing these values also means that in case of a failure you will have to re-process more messages.
All commit batches are aggregated internally and passed on to Kafka very often (in every poll cycle), the Committer settings configure how the stream sends the offsets to the internal actor which communicates with the Kafka broker. Increasing these values means that in case of a failure you may have to re-process more messages.

If you use Kafka older than version 2.1.0 and consume from a topic with low activity, and possibly no messages arrive for more than 24 hours, consider enabling periodical commit refresh (`akka.kafka.consumer.commit-refresh-interval` configuration parameters), otherwise offsets might expire in the Kafka storage. This has been fixed in Kafka 2.1.0 (See [KAFKA-4682](https://issues.apache.org/jira/browse/KAFKA-4682)).

Expand All @@ -163,23 +185,29 @@ These factory methods are part of the @scala[@scaladoc[Committer API](akka.kafka

The `Consumer.commitWithMetadataSource` allows you to add metadata to the committed offset based on the last consumed record.

Note that the first offset provided to the consumer during a partition assignment will not contain metadata. This offset can get committed due to a periodic commit refresh (`akka.kafka.consumer.commit-refresh-interval` configuration parmeters) and the commit will not contain metadata.
Note that the first offset provided to the consumer during a partition assignment will not contain metadata. This offset can get committed due to a periodic commit refresh (`akka.kafka.consumer.commit-refresh-interval` configuration parameters) and the commit will not contain metadata.

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #commitWithMetadata }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #commitWithMetadata }

If you commit the offset before processing the message you get "at-most-once" delivery semantics, this is provided by `Consumer.atMostOnceSource`. However, `atMostOnceSource` **commits the offset for each message and that is rather slow**, batching of commits is recommended.

## Consume "at-most-once"

If you commit the offset before processing the message you get "at-most-once" delivery semantics, this is provided by `Consumer.atMostOnceSource`. However, `atMostOnceSource` **commits the offset for each message and that is rather slow**, batching of commits is recommended. If your "at-most-once" requirements are more relaxed, consider a `Consumer.plainSource` and enable Kafka's auto committing with `enable.auto.commit = true`.

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #atMostOnce }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #atMostOnce }

Maintaining at-least-once delivery semantics requires care, many risks and solutions are covered in @ref:[At-Least-Once Delivery](atleastonce.md).

## Consume "at-least-once"

How to achieve at-least-once delivery semantics is covered in @ref:[At-Least-Once Delivery](atleastonce.md).


## Connecting Producer and Consumer
Expand All @@ -188,22 +216,12 @@ For cases when you need to read messages from one topic, transform or enrich the

The `committableSink` accepts implementations `ProducerMessage.Envelope` (@scaladoc[API](akka.kafka.ProducerMessage$$Envelope)) that contain the offset to commit the consumption of the originating message (of type `ConsumerMessage.Committable` (@scaladoc[API](akka.kafka.ConsumerMessage$$Committable))). See @ref[Producing messages](producer.md#producing-messages) about different implementations of `Envelope` supported.

Note that there is a risk that something fails after publishing but before committing, so `committableSink` has "at-least-once" delivery semantics.

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #consumerToProducerSink }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #consumerToProducerSink }

As `Producer.committableSink`'s committing of messages one-by-one is rather slow, prefer a flow together with batching of commits with `Committer.sink`.

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #consumerToProducerFlow }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #consumerToProducerFlow }

@@@note

There is a risk that something fails after publishing, but before committing, so `committableSink` has "at-least-once" delivery semantics.
Expand All @@ -213,6 +231,15 @@ To get delivery guarantees, please read about @ref[transactions](transactions.md
@@@


As `Producer.committableSink`'s committing of messages one-by-one is rather slow, prefer a flow together with batching of commits with `Committer.sink`.

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #consumerToProducerFlow }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java) { #consumerToProducerFlow }


## Source per partition

`Consumer.plainPartitionedSource`
Expand Down
10 changes: 10 additions & 0 deletions tests/src/test/java/docs/javadsl/ConsumerExampleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ public CompletionStage<Done> storeProcessedOffset(long offset) { // ... }
}
// #plainSource

@Test
void configInheritance() throws Exception {
// #config-inheritance
Config config = system.settings().config().getConfig("our-kafka-consumer");
ConsumerSettings<String, String> consumerSettings =
ConsumerSettings.create(config, new StringDeserializer(), new StringDeserializer());
// #config-inheritance
assertEquals("kafka-host:9092", consumerSettings.getProperty("bootstrap.servers"));
}

@Test
void atMostOnce() throws Exception {
ConsumerSettings<String, String> consumerSettings =
Expand Down
7 changes: 7 additions & 0 deletions tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,10 @@ akka {
}
}

# #consumer-config-inheritance
our-kafka-consumer: ${akka.kafka.consumer} {
kafka-clients {
bootstrap.servers = "kafka-host:9092"
}
}
# #consumer-config-inheritance
8 changes: 8 additions & 0 deletions tests/src/test/scala/docs/scaladsl/ConsumerExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ class ConsumerExample extends DocsSpecBase with TestcontainersKafkaLike {
consumerSettingsWithAutoCommit
}

"ConsumerSettings" should "read from settings that inherit default" in {
// #config-inheritance
val config = system.settings.config.getConfig("our-kafka-consumer")
val consumerSettings = ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
// #config-inheritance
consumerSettings.getProperty("bootstrap.servers") shouldBe "kafka-host:9092"
}

"Consume messages at-most-once" should "work" in assertAllStagesStopped {
val consumerSettings = createSettings().withGroupId(createGroupId())
val topic = createTopic()
Expand Down