Skip to content

Commit

Permalink
Commit refreshing: fix bug from aggregating offsets (#975)
Browse files Browse the repository at this point in the history
* Commit refreshing: fix bug introduced #862
* Run RetentionPeriodSpec against Kafka 2.0.0
  • Loading branch information
ennru authored and seglo committed Nov 21, 2019
1 parent f10f2dc commit 5485946
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 19 deletions.
5 changes: 1 addition & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -294,20 +294,17 @@ lazy val tests = project
Test / unmanagedSources / excludeFilter := {
if (scalaBinaryVersion.value == "2.13") {
HiddenFileFilter ||
"RetentionPeriodSpec.scala" ||
"MultiConsumerSpec.scala" ||
"ReconnectSpec.scala" ||
"EmbeddedKafkaSampleSpec.scala" ||
"TransactionsSpec.scala" ||
"SerializationSpec.scala" ||
"PartitionExamples.scala" ||
"TransactionsExample.scala" ||
"ConnectionCheckerSpec.scala" ||
"EmbeddedKafkaWithSchemaRegistryTest.java" ||
"AssignmentTest.java" ||
"ProducerExampleTest.java" ||
"SerializationTest.java" ||
"TransactionsExampleTest.java"
"SerializationTest.java"
} else (Test / unmanagedSources / excludeFilter).value
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,9 @@ import scala.util.control.NonFatal

private def commitAggregatedOffsets(): Unit = if (commitMaps.nonEmpty && !rebalanceInProgress) {
val replyTo = commitSenders
commit(aggregateOffsets(commitMaps), msg => replyTo.foreach(_ ! msg))
val aggregatedOffsets = aggregateOffsets(commitMaps)
commitRefreshing.add(aggregatedOffsets)
commit(aggregatedOffsets, msg => replyTo.foreach(_ ! msg))
commitMaps = List.empty
commitSenders = Vector.empty
}
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/paradox/testing-testcontainers.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Java
: @@snip [snip](/tests/src/test/java/docs/javadsl/TestkitTestcontainersTest.java) { #testcontainers-settings }

<!-- NOTE: Can't use paradox to link to `KafkaContainer` because it shares the same package name as the main artifact `org.testcontainers.containers`, but is published separately https://static.javadoc.io/org.testcontainers/kafka/version/ -->
To see what options are available for configuring testcontainers using `configureKakfa` and `configureZooKeeper` in @scaladoc[KafkaTestkitTestcontainersSettings](akka.kafka.testkit.KafkaTestkitTestcontainersSettings) see the API docs for [`KafkaContainer`](https://static.javadoc.io/org.testcontainers/kafka/$testcontainers.version$/org/testcontainers/containers/KafkaContainer.html) and @javadoc[GenericContainer](org.testcontainers.containers.GenericContainer).
To see what options are available for configuring testcontainers using `configureKafka` and `configureZooKeeper` in @scaladoc[KafkaTestkitTestcontainersSettings](akka.kafka.testkit.KafkaTestkitTestcontainersSettings) see the API docs for [`KafkaContainer`](https://static.javadoc.io/org.testcontainers/kafka/$testcontainers.version$/org/testcontainers/containers/KafkaContainer.html) and @javadoc[GenericContainer](org.testcontainers.containers.GenericContainer).

## Testing with a Docker Kafka cluster from Java code

Expand Down
32 changes: 19 additions & 13 deletions tests/src/test/scala/akka/kafka/scaladsl/RetentionPeriodSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,32 @@ import java.util.concurrent.ConcurrentLinkedQueue

import akka.Done
import akka.kafka._
import akka.kafka.testkit.scaladsl.EmbeddedKafkaLike
import akka.kafka.testkit.KafkaTestkitTestcontainersSettings
import akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import akka.stream.testkit.scaladsl.TestSink
import net.manub.embeddedkafka.EmbeddedKafkaConfig

import scala.jdk.CollectionConverters._
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

class RetentionPeriodSpec extends SpecBase(kafkaPort = KafkaPorts.RetentionPeriodSpec) with EmbeddedKafkaLike {

override def createKafkaConfig: EmbeddedKafkaConfig =
EmbeddedKafkaConfig(kafkaPort,
zooKeeperPort,
Map(
"offsets.topic.replication.factor" -> "1",
"offsets.retention.minutes" -> "1",
"offsets.retention.check.interval.ms" -> "100"
))
class RetentionPeriodSpec extends SpecBase with TestcontainersKafkaPerClassLike {

override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system)
// The bug commit refreshing circumvents was fixed in Kafka 2.1.0
// https://issues.apache.org/jira/browse/KAFKA-4682
// Confluent Platform 5.0.0 bundles Kafka 2.0.0
// https://docs.confluent.io/current/installation/versions-interoperability.html
.withConfluentPlatformVersion("5.0.0")
.withInternalTopicsReplicationFactor(1)
.withConfigureKafka { brokerContainers =>
brokerContainers.foreach {
_.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
.withEnv("KAFKA_OFFSETS_RETENTION_MINUTES", "1")
.withEnv("KAFKA_OFFSETS_RETENTION_CHECK_INTERVAL_MS", "100")
}
}

"After retention period (1 min) consumer" must {

Expand Down

0 comments on commit 5485946

Please sign in to comment.