From a3ea52c9f666d96b3cf3d1caa66723d05396987f Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Thu, 23 Dec 2021 13:52:41 +0100 Subject: [PATCH 1/7] Drop Scala 2.12 --- .github/workflows/check-build-test.yml | 2 -- CONTRIBUTING.md | 9 +++--- README.md | 15 +++++----- build.sbt | 29 ++++--------------- core/src/main/scala/akka/kafka/Metadata.scala | 1 - .../kafka/internal/CommitCollectorStage.scala | 1 + .../internal/CommitObservationLogic.scala | 5 ++-- .../CommittingProducerSinkStage.scala | 1 + .../kafka/internal/KafkaConsumerActor.scala | 9 +++--- .../akka/kafka/internal/MessageBuilder.scala | 1 - .../akka/kafka/internal/SubSourceLogic.scala | 10 +++++-- .../internal/TransactionalProducerStage.scala | 1 + .../kafka/internal/TransactionalSources.scala | 1 - .../akka/kafka/javadsl/MetadataClient.scala | 3 +- .../scala/akka/kafka/TransactionsOps.scala | 6 ++-- .../akka/kafka/javadsl/ControlSpec.scala | 9 ++---- .../akka/kafka/scaladsl/ControlSpec.scala | 9 ++---- 17 files changed, 45 insertions(+), 67 deletions(-) diff --git a/.github/workflows/check-build-test.yml b/.github/workflows/check-build-test.yml index ea9c4862f..8d39aedd2 100644 --- a/.github/workflows/check-build-test.yml +++ b/.github/workflows/check-build-test.yml @@ -105,9 +105,7 @@ jobs: fail-fast: false matrix: include: - - { java-version: adopt@1.8, scala-version: 2.12.15, sbt-opts: '' } - { java-version: adopt@1.8, scala-version: 2.13.8, sbt-opts: '' } - - { java-version: adopt@1.11.0-9, scala-version: 2.12.15, sbt-opts: '-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseJVMCICompiler' } - { java-version: adopt@1.11.0-9, scala-version: 2.13.8, sbt-opts: '-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseJVMCICompiler' } steps: - name: Checkout diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d49b51d69..699876197 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,4 +1,4 @@ -# Welcome! Thank you for contributing to the Alpakka Kafka connector! +# Welcome! Thank you for contributing to Alpakka Kafka! We follow the standard GitHub [fork & pull](https://help.github.com/articles/using-pull-requests/#fork--pull) approach to pull requests. Just fork the official repo, develop in a branch, and submit a PR! @@ -27,7 +27,7 @@ This is the process for committing code into master. 1. After the review you should fix the issues (review comments, CI failures) by pushing a new commit for new review, iterating until the reviewers give their thumbs up and CI tests pass. -1. When the branch conflicts with its merge target (either by way of git merge conflict or failing CI tests), do **not** merge the target branch into your feature branch. Instead rebase your branch onto the target branch and update it with `git push -f`. +1. When the branch conflicts with its merge target (either by way of git merge conflict or failing CI tests), do **not** merge the target branch into your feature branch. Instead, rebase your branch onto the target branch and update it with `git push -f`. ## Pull Request Requirements @@ -43,7 +43,7 @@ For a Pull Request to be considered at all it has to meet these requirements: 1. Regardless if the code introduces new features or fixes bugs or regressions, it must have comprehensive tests. -1. The code must be well documented in the Lightbend's standard documentation format (see the [Documentation](documentation) section below). +1. The code must be well documented in the Lightbend's standard documentation format (see the [Documentation](#documentation) section below). 1. The commit messages must properly describe the changes, see [further below](#creating-commits-and-writing-commit-messages). @@ -59,8 +59,7 @@ Prepare code snippets to be integrated by Paradox in the tests. Such example sho Use ScalaDoc if you see the need to describe the API usage better than the naming does. -Run `sbt docs/paradox` to generate reference docs while developing. Generated documentation can be -found in the `./docs/target/paradox/site/main` directory. +Run `sbt verifyDocs` to generate reference docs while developing. The generated documentation will open in your browser. ## External Dependencies diff --git a/README.md b/README.md index 1b1bd010d..cab9fcebe 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ -Alpakka Kafka Connector [![scaladex-badge][]][scaladex] [![maven-central-badge][]][maven-central] [![gh-actions-badge][]][gh-actions] -======================= +Alpakka Kafka [![scaladex-badge][]][scaladex] [![maven-central-badge][]][maven-central] [![gh-actions-badge][]][gh-actions] [scaladex]: https://index.scala-lang.org/akka/alpakka-kafka/akka-stream-kafka/ [scaladex-badge]: https://index.scala-lang.org/akka/alpakka-kafka/akka-stream-kafka/latest.svg?target=_2.13 @@ -25,7 +24,7 @@ Documentation - **[Alpakka Kafka connector reference](https://doc.akka.io/docs/akka-stream-kafka/current/) documentation** -To keep up with the latest Alpakka releases check out [Alpakka releases](https://doc.akka.io/docs/alpakka/current/release-notes/index.html) and [Alpakka Kafka connector releases](https://doc.akka.io/docs/alpakka-kafka/current/release-notes/index.html). +To keep up with the latest Alpakka releases check out [Alpakka releases](https://github.com/akka/alpakka/releases) and [Alpakka Kafka releases](https://github.com/akka/alpakka-kafka/releases). Community @@ -41,7 +40,7 @@ In addition to that, you may enjoy following: - The [Akka Team Blog](https://akka.io/blog/) - [@akkateam](https://twitter.com/akkateam) on Twitter - Questions tagged [#alpakka on StackOverflow](https://stackoverflow.com/questions/tagged/alpakka) -- Questions tagged [**#alpakka-kafka** on StackOverflow](https://stackoverflow.com/questions/tagged/alpakka-kafka) +- Questions tagged [**#alpakka** on StackOverflow](https://stackoverflow.com/questions/tagged/alpakka) The Kafka connector was originally created as **Reactive Kafka** by [SoftwareMill logo](https://softwaremill.com). @@ -49,19 +48,21 @@ The Kafka connector was originally created as **Reactive Kafka** by [ url("https://opensource.org/licenses/Apache-2.0")), description := "Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.", - crossScalaVersions := Seq(Scala212, Scala213), + crossScalaVersions := Seq(Scala213), scalaVersion := Scala213, crossVersion := CrossVersion.binary, javacOptions ++= Seq( @@ -92,9 +87,6 @@ val commonSettings = Def.settings( "-Ywarn-dead-code", "-Ywarn-numeric-widen" ) ++ { - if (scalaBinaryVersion.value == "2.13") Seq.empty - else Seq("-Yno-adapted-args", "-Xfuture") - } ++ { if (insideCI.value && !Nightly) Seq("-Xfatal-warnings") else Seq.empty }, @@ -199,10 +191,8 @@ lazy val core = project libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream" % akkaVersion, "com.typesafe.akka" %% "akka-discovery" % akkaVersion % Provided, - "org.apache.kafka" % "kafka-clients" % kafkaVersion, - "org.scala-lang.modules" %% "scala-collection-compat" % "2.6.0" + "org.apache.kafka" % "kafka-clients" % kafkaVersion ), - Compile / compile / scalacOptions += "-Wconf:msg=[import scala.collection.compat._]:s", mimaPreviousArtifacts := Set( organization.value %% name.value % previousStableVersion.value .getOrElse(throw new Error("Unable to determine previous version")) @@ -281,18 +271,9 @@ lazy val tests = project "org.slf4j" % "log4j-over-slf4j" % slf4jVersion % Test, // Schema registry uses Glassfish which uses java.util.logging "org.slf4j" % "jul-to-slf4j" % slf4jVersion % Test, - "org.mockito" % "mockito-core" % "4.2.0" % Test - ) ++ { - scalaBinaryVersion.value match { - case "2.13" => - Seq(scalapb) - case "2.12" => - Seq( - scalapb, - kafkaBrokerWithoutSlf4jLog4j - ) - } - }, + "org.mockito" % "mockito-core" % "4.2.0" % Test, + "com.thesamet.scalapb" %% "scalapb-runtime" % "0.10.11" % Test + ), resolvers ++= Seq( "Confluent Maven Repo" at "https://packages.confluent.io/maven/" ), diff --git a/core/src/main/scala/akka/kafka/Metadata.scala b/core/src/main/scala/akka/kafka/Metadata.scala index 151c7e9c7..b71da572b 100644 --- a/core/src/main/scala/akka/kafka/Metadata.scala +++ b/core/src/main/scala/akka/kafka/Metadata.scala @@ -12,7 +12,6 @@ import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetAndTimestamp} import org.apache.kafka.common.{PartitionInfo, TopicPartition} import scala.jdk.CollectionConverters._ -import scala.collection.compat._ import scala.util.Try /** diff --git a/core/src/main/scala/akka/kafka/internal/CommitCollectorStage.scala b/core/src/main/scala/akka/kafka/internal/CommitCollectorStage.scala index 37ea595b0..bb40ffebf 100644 --- a/core/src/main/scala/akka/kafka/internal/CommitCollectorStage.scala +++ b/core/src/main/scala/akka/kafka/internal/CommitCollectorStage.scala @@ -75,6 +75,7 @@ private final class CommitCollectorStageLogic( pushed = true } else pushOnNextPull = true } else scheduleCommit() + case _ => } if (!pushed) suspendContext() } diff --git a/core/src/main/scala/akka/kafka/internal/CommitObservationLogic.scala b/core/src/main/scala/akka/kafka/internal/CommitObservationLogic.scala index 8b62c1fcd..1f38e41a5 100644 --- a/core/src/main/scala/akka/kafka/internal/CommitObservationLogic.scala +++ b/core/src/main/scala/akka/kafka/internal/CommitObservationLogic.scala @@ -42,10 +42,9 @@ private[internal] trait CommitObservationLogic { self: GraphStageLogic => batch.filter(_.equals(gtp)), offsetAndMetadata.offset() ) - case unknownBatchImpl: CommittableOffsetBatch => + case unknownImpl => throw new IllegalArgumentException( - s"Unknown CommittableOffsetBatch, got [${unknownBatchImpl.getClass.getName}], " + - s"expected [${classOf[CommittableOffsetBatchImpl].getName}]" + s"Unknown Committable implementation, got [${unknownImpl.getClass.getName}]" ) } diff --git a/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala b/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala index fcbfcd677..e78119875 100644 --- a/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala +++ b/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala @@ -167,6 +167,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, override protected def onTimer(timerKey: Any): Unit = timerKey match { case CommittingProducerSinkStage.CommitNow => commit(Interval) + case _ => } private def collectOffset(offset: Committable): Unit = diff --git a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala index 57a272654..5b6c2c6ce 100644 --- a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala @@ -413,7 +413,7 @@ import scala.util.control.NonFatal pollTimeout = settings.pollTimeout.asJava offsetForTimesTimeout = settings.getOffsetForTimesTimeout positionTimeout = settings.getPositionTimeout - val progressTrackingFactory: () => ConsumerProgressTracking = ensureProgressTracker + val progressTrackingFactory: () => ConsumerProgressTracking = () => ensureProgressTracker() commitRefreshing = CommitRefreshing(settings.commitRefreshInterval, progressTrackingFactory) resetProtection = ConsumerResetProtection(log, settings.resetProtectionSettings, progressTrackingFactory) try { @@ -546,7 +546,7 @@ import scala.util.control.NonFatal // commit partitions that are currently assigned to the consumer. For high volume topics, this can lead to small // amounts of replayed data during a rebalance, but for low volume topics we can ensure that consumers never appear // 'stuck' because of out-of-order commits from slow consumers. - val assignedOffsetsToCommit = aggregatedOffsets.filterKeys(consumer.assignment().contains).toMap + val assignedOffsetsToCommit = aggregatedOffsets.view.filterKeys(consumer.assignment().contains).toMap progressTracker.commitRequested(assignedOffsetsToCommit) val replyTo = commitSenders // flush the data before calling `consumer.commitAsync` which might call the callback synchronously @@ -709,9 +709,9 @@ import scala.util.control.NonFatal ) case Metadata.GetCommittedOffset(partition) => - @nowarn("msg=deprecated:s") val resp = Metadata.CommittedOffset( + @nowarn("cat=deprecation") val resp = Metadata.CommittedOffset( Try { - @nowarn("msg=deprecated:s") val offset = consumer.committed(partition, settings.getMetadataRequestTimeout) + @nowarn("cat=deprecation") val offset = consumer.committed(partition, settings.getMetadataRequestTimeout) offset }, partition @@ -722,6 +722,7 @@ import scala.util.control.NonFatal private def stopFromMessage(msg: StopLike) = msg match { case Stop => sender() case StopFromStage(sourceStageId) => s"StageId [$sourceStageId]" + case other => s"unknown: [$other]" } /** diff --git a/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala b/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala index 720050ea8..81428dbe9 100644 --- a/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala +++ b/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala @@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.OffsetFetchResponse -import scala.collection.compat._ import scala.compat.java8.FutureConverters.FutureOps import scala.concurrent.Future import scala.jdk.CollectionConverters._ diff --git a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala index c5a3d9d5c..a8d675d45 100644 --- a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala +++ b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala @@ -18,7 +18,7 @@ import akka.pattern.{ask, AskTimeoutException} import akka.stream.scaladsl.Source import akka.stream.stage.GraphStageLogic.StageActor import akka.stream.stage._ -import akka.stream.{ActorMaterializerHelper, Attributes, Outlet, SourceShape} +import akka.stream.{Attributes, Outlet, SourceShape} import akka.util.Timeout import org.apache.kafka.common.TopicPartition @@ -83,9 +83,10 @@ private class SubSourceLogic[K, V, Msg]( failStage(e) case (_, Terminated(ref)) if ref == consumerActor => failStage(new ConsumerFailed) + case _ => } consumerActor = { - val extendedActorSystem = ActorMaterializerHelper.downcast(materializer).system.asInstanceOf[ExtendedActorSystem] + val extendedActorSystem = materializer.system.asInstanceOf[ExtendedActorSystem] extendedActorSystem.systemActorOf(akka.kafka.KafkaConsumerActor.props(sourceActor.ref, settings), s"kafka-consumer-$actorNumber") } @@ -106,7 +107,8 @@ private class SubSourceLogic[K, V, Msg]( case (formerlyUnknown, offsetMap) => val updatedFormerlyUnknown = formerlyUnknown -- (partitionsToRevoke ++ partitionsInStartup ++ pendingPartitions) // Filter out the offsetMap so that we don't re-seek for partitions that have been revoked - seekAndEmitSubSources(updatedFormerlyUnknown, offsetMap.filterKeys(k => !partitionsToRevoke.contains(k)).toMap) + seekAndEmitSubSources(updatedFormerlyUnknown, + offsetMap.view.filterKeys(k => !partitionsToRevoke.contains(k)).toMap) } private val partitionAssignedCB = getAsyncCallback[Set[TopicPartition]] { assigned => @@ -174,6 +176,7 @@ private class SubSourceLogic[K, V, Msg]( partitionsToRevoke.flatMap(subSources.get).map(_.control).foreach(_.shutdown()) subSources --= partitionsToRevoke partitionsToRevoke = Set.empty + case _ => } private val subsourceCancelledCB: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] = @@ -273,6 +276,7 @@ private class SubSourceLogic[K, V, Msg]( case (_, Terminated(ref)) if ref == consumerActor => onShutdown() completeStage() + case _ => } materializer.scheduleOnce( settings.stopTimeout, diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala index 003213b19..5f5667f25 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala @@ -218,6 +218,7 @@ private final class TransactionalProducerStageLogic[K, V, P]( override protected def postSend(msg: Envelope[K, V, P]): Unit = msg.passThrough match { case o: ConsumerMessage.PartitionOffsetCommittedMarker => batchOffsets = batchOffsets.updated(o) + case _ => } override def onCompletionSuccess(): Unit = { diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala b/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala index d1d76826e..04ca7369b 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalSources.scala @@ -25,7 +25,6 @@ import akka.util.Timeout import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, OffsetAndMetadata} import org.apache.kafka.common.{IsolationLevel, TopicPartition} -import scala.collection.compat._ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Await, ExecutionContext, Future} diff --git a/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala b/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala index 371697488..09f3448b1 100644 --- a/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala +++ b/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala @@ -15,9 +15,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.{PartitionInfo, TopicPartition} import scala.compat.java8.FutureConverters._ -import scala.collection.compat._ -import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContextExecutor +import scala.jdk.CollectionConverters._ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient) { diff --git a/tests/src/test/scala/akka/kafka/TransactionsOps.scala b/tests/src/test/scala/akka/kafka/TransactionsOps.scala index 93df9a05a..78ccb810f 100644 --- a/tests/src/test/scala/akka/kafka/TransactionsOps.scala +++ b/tests/src/test/scala/akka/kafka/TransactionsOps.scala @@ -120,8 +120,10 @@ trait TransactionsOps extends TestSuite with Matchers { case (_, value) => duplicates.contains(value) } .groupBy(_._2) // message - // workaround for Scala collection refactoring of `mapValues` to remain compat with 2.12/2.13 cross build - .map { case (k, v) => (k, v.map(_._1)) } // keep offset + .view + .mapValues { v => + v.map(_._1) + } // keep offset .filter { case (_, offsets) => offsets.distinct.size > 1 } diff --git a/tests/src/test/scala/akka/kafka/javadsl/ControlSpec.scala b/tests/src/test/scala/akka/kafka/javadsl/ControlSpec.scala index 64cf68c40..6f548e5ca 100644 --- a/tests/src/test/scala/akka/kafka/javadsl/ControlSpec.scala +++ b/tests/src/test/scala/akka/kafka/javadsl/ControlSpec.scala @@ -61,8 +61,7 @@ class ControlSpec extends AnyWordSpec with ScalaFutures with Matchers with LogCa ) val value = drainingControl.drainAndShutdown(ec).toScala.failed.futureValue value shouldBe a[RuntimeException] - // endWith to accustom Scala 2.11 and 2.12 - value.getMessage should endWith("expected") + value.getMessage should be("expected") control.shutdownCalled.get() should be(true) } @@ -75,8 +74,7 @@ class ControlSpec extends AnyWordSpec with ScalaFutures with Matchers with LogCa ) val value = drainingControl.drainAndShutdown(ec).toScala.failed.futureValue value shouldBe a[RuntimeException] - // endWith to accustom Scala 2.11 and 2.12 - value.getMessage should endWith("expected") + value.getMessage should be("expected") control.shutdownCalled.get() should be(true) } @@ -86,8 +84,7 @@ class ControlSpec extends AnyWordSpec with ScalaFutures with Matchers with LogCa val drainingControl = Consumer.createDrainingControl(control, Future.successful(Done).toJava) val value = drainingControl.drainAndShutdown(ec).toScala.failed.futureValue value shouldBe a[RuntimeException] - // endWith to accustom Scala 2.11 and 2.12 - value.getMessage should endWith("expected") + value.getMessage should be("expected") control.shutdownCalled.get() should be(true) } diff --git a/tests/src/test/scala/akka/kafka/scaladsl/ControlSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/ControlSpec.scala index c70afa176..f77da60ce 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/ControlSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/ControlSpec.scala @@ -51,8 +51,7 @@ class ControlSpec extends AnyWordSpec with ScalaFutures with Matchers with LogCa val drainingControl = DrainingControl.apply(control, Future.failed(new RuntimeException("expected"))) val value = drainingControl.drainAndShutdown().failed.futureValue value shouldBe a[RuntimeException] - // endWith to accustom Scala 2.11 and 2.12 - value.getMessage should endWith("expected") + value.getMessage should be("expected") control.shutdownCalled.get() should be(true) } @@ -62,8 +61,7 @@ class ControlSpec extends AnyWordSpec with ScalaFutures with Matchers with LogCa val drainingControl = DrainingControl.apply(control, Future.failed(new RuntimeException("expected"))) val value = drainingControl.drainAndShutdown().failed.futureValue value shouldBe a[RuntimeException] - // endWith to accustom Scala 2.11 and 2.12 - value.getMessage should endWith("expected") + value.getMessage should be("expected") control.shutdownCalled.get() should be(true) } @@ -73,8 +71,7 @@ class ControlSpec extends AnyWordSpec with ScalaFutures with Matchers with LogCa val drainingControl = DrainingControl.apply(control, Future.successful(Done)) val value = drainingControl.drainAndShutdown().failed.futureValue value shouldBe a[RuntimeException] - // endWith to accustom Scala 2.11 and 2.12 - value.getMessage should endWith("expected") + value.getMessage should be("expected") control.shutdownCalled.get() should be(true) } } From 1684c2eb16f0f8d25c7b7932c554537d07ffa575 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Thu, 23 Dec 2021 14:22:29 +0100 Subject: [PATCH 2/7] Tackle remaining warnings --- build.sbt | 2 +- .../scala/akka/kafka/internal/BaseSingleSourceLogic.scala | 2 +- .../main/scala/akka/kafka/internal/KafkaConsumerActor.scala | 2 +- core/src/main/scala/akka/kafka/internal/MessageBuilder.scala | 1 + core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala | 2 +- .../akka/kafka/internal/TransactionalProducerStage.scala | 4 ++-- 6 files changed, 7 insertions(+), 6 deletions(-) diff --git a/build.sbt b/build.sbt index 9ee01a88a..b87065a76 100644 --- a/build.sbt +++ b/build.sbt @@ -87,7 +87,7 @@ val commonSettings = Def.settings( "-Ywarn-dead-code", "-Ywarn-numeric-widen" ) ++ { - if (insideCI.value && !Nightly) Seq("-Xfatal-warnings") + if (insideCI.value && !Nightly) Seq("-Werror") else Seq.empty }, Compile / doc / scalacOptions := scalacOptions.value ++ Seq( diff --git a/core/src/main/scala/akka/kafka/internal/BaseSingleSourceLogic.scala b/core/src/main/scala/akka/kafka/internal/BaseSingleSourceLogic.scala index daf20bb3b..8885fd6bf 100644 --- a/core/src/main/scala/akka/kafka/internal/BaseSingleSourceLogic.scala +++ b/core/src/main/scala/akka/kafka/internal/BaseSingleSourceLogic.scala @@ -63,7 +63,7 @@ import scala.concurrent.{ExecutionContext, Future} } protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = { - case (_, msg: KafkaConsumerActor.Internal.Messages[K, V]) => + case (_, msg: KafkaConsumerActor.Internal.Messages[K @unchecked, V @unchecked]) => // might be more than one in flight when we assign/revoke tps if (msg.requestId == requestId) requested = false diff --git a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala index 5b6c2c6ce..92b9b4b13 100644 --- a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala @@ -296,7 +296,7 @@ import scala.util.control.NonFatal } def expectSettings: Receive = LoggingReceive.withLabel("expectSettings") { - case s: ConsumerSettings[K, V] => + case s: ConsumerSettings[K @unchecked, V @unchecked] => applySettings(s) case scala.util.Failure(e) => diff --git a/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala b/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala index 81428dbe9..11afb21a8 100644 --- a/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala +++ b/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala @@ -170,6 +170,7 @@ private[kafka] final class CommittableOffsetBatchImpl( def updated(committable: Committable): CommittableOffsetBatch = committable match { case offset: CommittableOffset => updatedWithOffset(offset) case batch: CommittableOffsetBatch => updatedWithBatch(batch) + case _ => this } private[internal] def committerFor(groupTopicPartition: GroupTopicPartition) = diff --git a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala index a8d675d45..4851babf7 100644 --- a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala +++ b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala @@ -424,7 +424,7 @@ private abstract class SubSourceStageLogic[K, V, Msg]( } protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = { - case (_, msg: KafkaConsumerActor.Internal.Messages[K, V]) => + case (_, msg: KafkaConsumerActor.Internal.Messages[K @unchecked, V @unchecked]) => requested = false buffer = buffer ++ msg.messages pump() diff --git a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala index 5f5667f25..ba2ca0de2 100644 --- a/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala +++ b/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala @@ -14,7 +14,7 @@ import akka.kafka.internal.ProducerStage.ProducerCompletionState import akka.kafka.{ConsumerMessage, ProducerSettings} import akka.stream.stage._ import akka.stream.{Attributes, FlowShape} -import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata, OffsetAndMetadata} import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.TopicPartition @@ -241,7 +241,7 @@ private final class TransactionalProducerStageLogic[K, V, P]( group, batch.offsets) val offsetMap = batch.offsetMap() - producer.sendOffsetsToTransaction(offsetMap.asJava, group) + producer.sendOffsetsToTransaction(offsetMap.asJava, new ConsumerGroupMetadata(group)) producer.commitTransaction() log.debug("Committed transaction for transactional id '{}' consumer group '{}' with offsets: {}", transactionalId, From 19c9eb841761aacccdc0c224d73b649943db3f18 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Thu, 23 Dec 2021 14:51:37 +0100 Subject: [PATCH 3/7] warning configuration --- build.sbt | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/build.sbt b/build.sbt index b87065a76..9103c869f 100644 --- a/build.sbt +++ b/build.sbt @@ -78,19 +78,15 @@ val commonSettings = Def.settings( "-Xlint:unchecked" ), scalacOptions ++= Seq( - "-deprecation", "-encoding", "UTF-8", // yes, this is 2 args - "-feature", - "-unchecked", - "-Xlint", - "-Ywarn-dead-code", - "-Ywarn-numeric-widen" + "-Wconf:cat=feature:w,cat=deprecation:w,cat=unchecked:w,cat=lint:w,cat=unused:w,cat=w-flag:w" ) ++ { if (insideCI.value && !Nightly) Seq("-Werror") else Seq.empty }, Compile / doc / scalacOptions := scalacOptions.value ++ Seq( + "-Wconf:cat=scaladoc:i", "-doc-title", "Alpakka Kafka", "-doc-version", From 3adbf9d32405a7744af89fbb94e1ea0d623d8df5 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Thu, 23 Dec 2021 15:20:35 +0100 Subject: [PATCH 4/7] adapt mock verification --- .../test/scala/akka/kafka/internal/ProducerSpec.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala b/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala index 7de126fae..39a8fbd7a 100644 --- a/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala +++ b/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala @@ -19,7 +19,7 @@ import akka.stream.{ActorAttributes, Supervision} import akka.testkit.TestKit import akka.{Done, NotUsed} import com.typesafe.config.ConfigFactory -import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata, OffsetAndMetadata} import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringSerializer @@ -33,7 +33,6 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.matchers.should.Matchers -import scala.annotation.nowarn import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.jdk.CollectionConverters._ @@ -642,20 +641,18 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K, V])(implicit ec: Execu inOrder.verify(mock).beginTransaction() } - @nowarn("cat=deprecation") def verifyTxCommit(po: ConsumerMessage.PartitionOffset) = { val inOrder = Mockito.inOrder(mock) val offsets = Map(new TopicPartition(po.key.topic, po.key.partition) -> new OffsetAndMetadata(po.offset + 1)).asJava - inOrder.verify(mock).sendOffsetsToTransaction(offsets, po.key.groupId) + inOrder.verify(mock).sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata(po.key.groupId)) inOrder.verify(mock).commitTransaction() inOrder.verify(mock).beginTransaction() } - @nowarn("cat=deprecation") def verifyTxCommitWhenShutdown(po: ConsumerMessage.PartitionOffset) = { val inOrder = Mockito.inOrder(mock) val offsets = Map(new TopicPartition(po.key.topic, po.key.partition) -> new OffsetAndMetadata(po.offset + 1)).asJava - inOrder.verify(mock).sendOffsetsToTransaction(offsets, po.key.groupId) + inOrder.verify(mock).sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata(po.key.groupId)) inOrder.verify(mock).commitTransaction() } From affcb974378afa65e016d9b83ac074a64ce267f2 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Thu, 13 Jan 2022 13:27:29 +0100 Subject: [PATCH 5/7] ignore warning --- .../main/scala/akka/kafka/internal/KafkaConsumerActor.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala index 92b9b4b13..4e9f2a764 100644 --- a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala @@ -708,13 +708,13 @@ import scala.util.control.NonFatal } ) - case Metadata.GetCommittedOffset(partition) => + case req: Metadata.GetCommittedOffset @nowarn("cat=deprecation") => @nowarn("cat=deprecation") val resp = Metadata.CommittedOffset( Try { - @nowarn("cat=deprecation") val offset = consumer.committed(partition, settings.getMetadataRequestTimeout) + @nowarn("cat=deprecation") val offset = consumer.committed(req.partition, settings.getMetadataRequestTimeout) offset }, - partition + req.partition ) resp } From 1aeef85666fe343a8afde7b265aebd7b8b2c8d34 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Thu, 13 Jan 2022 13:29:42 +0100 Subject: [PATCH 6/7] Title --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index cab9fcebe..dd7996489 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ Alpakka Kafka [![scaladex-badge][]][scaladex] [![maven-central-badge][]][maven-central] [![gh-actions-badge][]][gh-actions] +============= [scaladex]: https://index.scala-lang.org/akka/alpakka-kafka/akka-stream-kafka/ [scaladex-badge]: https://index.scala-lang.org/akka/alpakka-kafka/akka-stream-kafka/latest.svg?target=_2.13 From 8c74d4d8002c3097fe610f19fd1abecdb23ef95b Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Thu, 13 Jan 2022 17:25:03 +0100 Subject: [PATCH 7/7] Be more conservative on ignored matches --- .../scala/akka/kafka/internal/CommitCollectorStage.scala | 2 +- .../akka/kafka/internal/CommitObservationLogic.scala | 4 ++++ .../akka/kafka/internal/CommittingProducerSinkStage.scala | 2 +- .../main/scala/akka/kafka/internal/MessageBuilder.scala | 3 ++- .../main/scala/akka/kafka/internal/SubSourceLogic.scala | 8 +++++--- 5 files changed, 13 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/akka/kafka/internal/CommitCollectorStage.scala b/core/src/main/scala/akka/kafka/internal/CommitCollectorStage.scala index bb40ffebf..afabf89ec 100644 --- a/core/src/main/scala/akka/kafka/internal/CommitCollectorStage.scala +++ b/core/src/main/scala/akka/kafka/internal/CommitCollectorStage.scala @@ -75,7 +75,7 @@ private final class CommitCollectorStageLogic( pushed = true } else pushOnNextPull = true } else scheduleCommit() - case _ => + case _ => log.warning("unexpected timer [{}]", timerKey) } if (!pushed) suspendContext() } diff --git a/core/src/main/scala/akka/kafka/internal/CommitObservationLogic.scala b/core/src/main/scala/akka/kafka/internal/CommitObservationLogic.scala index 1f38e41a5..7a598429b 100644 --- a/core/src/main/scala/akka/kafka/internal/CommitObservationLogic.scala +++ b/core/src/main/scala/akka/kafka/internal/CommitObservationLogic.scala @@ -42,6 +42,10 @@ private[internal] trait CommitObservationLogic { self: GraphStageLogic => batch.filter(_.equals(gtp)), offsetAndMetadata.offset() ) + case null => + throw new IllegalArgumentException( + s"Unknown Committable implementation, got [null]" + ) case unknownImpl => throw new IllegalArgumentException( s"Unknown Committable implementation, got [${unknownImpl.getClass.getName}]" diff --git a/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala b/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala index e78119875..138fd4979 100644 --- a/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala +++ b/core/src/main/scala/akka/kafka/internal/CommittingProducerSinkStage.scala @@ -167,7 +167,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V, override protected def onTimer(timerKey: Any): Unit = timerKey match { case CommittingProducerSinkStage.CommitNow => commit(Interval) - case _ => + case _ => log.warning("unexpected timer [{}]", timerKey) } private def collectOffset(offset: Committable): Unit = diff --git a/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala b/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala index 11afb21a8..5576130b0 100644 --- a/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala +++ b/core/src/main/scala/akka/kafka/internal/MessageBuilder.scala @@ -170,7 +170,8 @@ private[kafka] final class CommittableOffsetBatchImpl( def updated(committable: Committable): CommittableOffsetBatch = committable match { case offset: CommittableOffset => updatedWithOffset(offset) case batch: CommittableOffsetBatch => updatedWithBatch(batch) - case _ => this + case null => throw new IllegalArgumentException(s"unexpected Committable [null]") + case _ => throw new IllegalArgumentException(s"unexpected Committable [${committable.getClass}]") } private[internal] def committerFor(groupTopicPartition: GroupTopicPartition) = diff --git a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala index 4851babf7..3e45df06d 100644 --- a/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala +++ b/core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala @@ -83,7 +83,8 @@ private class SubSourceLogic[K, V, Msg]( failStage(e) case (_, Terminated(ref)) if ref == consumerActor => failStage(new ConsumerFailed) - case _ => + case (_, msg) => + log.warning("ignoring message [{}]", msg) } consumerActor = { val extendedActorSystem = materializer.system.asInstanceOf[ExtendedActorSystem] @@ -176,7 +177,7 @@ private class SubSourceLogic[K, V, Msg]( partitionsToRevoke.flatMap(subSources.get).map(_.control).foreach(_.shutdown()) subSources --= partitionsToRevoke partitionsToRevoke = Set.empty - case _ => + case _ => log.warning("unexpected timer [{}]", timerKey) } private val subsourceCancelledCB: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] = @@ -276,7 +277,8 @@ private class SubSourceLogic[K, V, Msg]( case (_, Terminated(ref)) if ref == consumerActor => onShutdown() completeStage() - case _ => + case (_, msg) => + log.warning("ignoring message [{}]", msg) } materializer.scheduleOnce( settings.stopTimeout,