Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Scala 2.13 (drop Scala 2.12) #1448

Merged
merged 7 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/check-build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ jobs:
fail-fast: false
matrix:
include:
- { java-version: [email protected], scala-version: 2.12.15, sbt-opts: '' }
- { java-version: [email protected], scala-version: 2.13.8, sbt-opts: '' }
- { java-version: [email protected], scala-version: 2.12.15, sbt-opts: '-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseJVMCICompiler' }
- { java-version: [email protected], scala-version: 2.13.8, sbt-opts: '-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseJVMCICompiler' }
steps:
- name: Checkout
Expand Down
9 changes: 4 additions & 5 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Welcome! Thank you for contributing to the Alpakka Kafka connector!
# Welcome! Thank you for contributing to Alpakka Kafka!

We follow the standard GitHub [fork & pull](https://help.github.com/articles/using-pull-requests/#fork--pull) approach to pull requests. Just fork the official repo, develop in a branch, and submit a PR!

Expand Down Expand Up @@ -27,7 +27,7 @@ This is the process for committing code into master.

1. After the review you should fix the issues (review comments, CI failures) by pushing a new commit for new review, iterating until the reviewers give their thumbs up and CI tests pass.

1. When the branch conflicts with its merge target (either by way of git merge conflict or failing CI tests), do **not** merge the target branch into your feature branch. Instead rebase your branch onto the target branch and update it with `git push -f`.
1. When the branch conflicts with its merge target (either by way of git merge conflict or failing CI tests), do **not** merge the target branch into your feature branch. Instead, rebase your branch onto the target branch and update it with `git push -f`.

## Pull Request Requirements

Expand All @@ -43,7 +43,7 @@ For a Pull Request to be considered at all it has to meet these requirements:

1. Regardless if the code introduces new features or fixes bugs or regressions, it must have comprehensive tests.

1. The code must be well documented in the Lightbend's standard documentation format (see the [Documentation](documentation) section below).
1. The code must be well documented in the Lightbend's standard documentation format (see the [Documentation](#documentation) section below).

1. The commit messages must properly describe the changes, see [further below](#creating-commits-and-writing-commit-messages).

Expand All @@ -59,8 +59,7 @@ Prepare code snippets to be integrated by Paradox in the tests. Such example sho

Use ScalaDoc if you see the need to describe the API usage better than the naming does.

Run `sbt docs/paradox` to generate reference docs while developing. Generated documentation can be
found in the `./docs/target/paradox/site/main` directory.
Run `sbt verifyDocs` to generate reference docs while developing. The generated documentation will open in your browser.


## External Dependencies
Expand Down
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Alpakka Kafka Connector [![scaladex-badge][]][scaladex] [![maven-central-badge][]][maven-central] [![gh-actions-badge][]][gh-actions]
=======================
Alpakka Kafka [![scaladex-badge][]][scaladex] [![maven-central-badge][]][maven-central] [![gh-actions-badge][]][gh-actions]
=============

[scaladex]: https://index.scala-lang.org/akka/alpakka-kafka/akka-stream-kafka/
[scaladex-badge]: https://index.scala-lang.org/akka/alpakka-kafka/akka-stream-kafka/latest.svg?target=_2.13
Expand All @@ -25,7 +25,7 @@ Documentation

- **[Alpakka Kafka connector reference](https://doc.akka.io/docs/akka-stream-kafka/current/) documentation**

To keep up with the latest Alpakka releases check out [Alpakka releases](https://doc.akka.io/docs/alpakka/current/release-notes/index.html) and [Alpakka Kafka connector releases](https://doc.akka.io/docs/alpakka-kafka/current/release-notes/index.html).
To keep up with the latest Alpakka releases check out [Alpakka releases](https://github.com/akka/alpakka/releases) and [Alpakka Kafka releases](https://github.com/akka/alpakka-kafka/releases).


Community
Expand All @@ -41,27 +41,29 @@ In addition to that, you may enjoy following:
- The [Akka Team Blog](https://akka.io/blog/)
- [@akkateam](https://twitter.com/akkateam) on Twitter
- Questions tagged [#alpakka on StackOverflow](https://stackoverflow.com/questions/tagged/alpakka)
- Questions tagged [**#alpakka-kafka** on StackOverflow](https://stackoverflow.com/questions/tagged/alpakka-kafka)
- Questions tagged [**#alpakka** on StackOverflow](https://stackoverflow.com/questions/tagged/alpakka)

The Kafka connector was originally created as **Reactive Kafka** by [<img src="https://files.softwaremill.com/logo/logo.svg" alt="SoftwareMill logo" height="25">](https://softwaremill.com).


Contributing
------------

[Lightbend](https://www.lightbend.com/) is committed to Alpakka and the Akka team works on it.
[Lightbend](https://www.lightbend.com/) is the steward of Akka and Alpakka.

Contributions are *very* welcome! Lightbend appreciates community contributions by both those new to Alpakka and those more experienced.

Contributions are *very* welcome! The Akka team appreciates community contributions by both those new to Alpakka and those more experienced.
Alpakka depends on the community to keep up with the ever-growing number of technologies with which to integrate. Please step up and share the successful Akka Stream integrations you implement with the Alpakka community.

If you find an issue that you'd like to see fixed, the quickest way to make that happen is to implement the fix and submit a pull request.

Refer to the [CONTRIBUTING.md](CONTRIBUTING.md) file for more details about the workflow, and general hints on how to prepare your pull request.

You can also ask for clarifications or guidance in GitHub issues directly.

Caveat Emptor
-------------

Alpakka components are not always binary compatible between releases. API changes that are not backward compatible might be introduced as we refine and simplify based on your feedback. A module may be dropped in any release without prior deprecation.

Support for Alpakka Kafka is available via [Akka Platform Subscription](https://www.lightbend.com/akka-platform#subscription).
Support for the Alpakka Kafka connector is available via Lightbend's [Akka Platform subscription](https://www.lightbend.com/akka-platform#subscription).
39 changes: 8 additions & 31 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ name := "akka-stream-kafka"

val Nightly = sys.env.get("EVENT_NAME").contains("schedule")

// align with versions in .github/workflows/check-build-test.yml
val Scala212 = "2.12.15"
// align ignore-prefixes in scripts/link-validator.conf
val Scala213 = "2.13.8"

Expand All @@ -26,9 +24,6 @@ val slf4jVersion = "1.7.32"
// that depends on the same Kafka version, as is defined above
// See https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer?repo=confluent-packages
val confluentAvroSerializerVersion = "7.0.1"
val scalapb = "com.thesamet.scalapb" %% "scalapb-runtime" % "0.10.11"
val kafkaBrokerWithoutSlf4jLog4j = "org.apache.kafka" %% "kafka" % kafkaVersion % Provided exclude ("org.slf4j", "slf4j-log4j12")

val confluentLibsExclusionRules = Seq(
ExclusionRule("log4j", "log4j"),
ExclusionRule("org.slf4j", "slf4j-log4j12"),
Expand Down Expand Up @@ -75,30 +70,23 @@ val commonSettings = Def.settings(
startYear := Some(2014),
licenses := Seq("Apache-2.0" -> url("https://opensource.org/licenses/Apache-2.0")),
description := "Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.",
crossScalaVersions := Seq(Scala212, Scala213),
crossScalaVersions := Seq(Scala213),
scalaVersion := Scala213,
crossVersion := CrossVersion.binary,
javacOptions ++= Seq(
"-Xlint:deprecation",
"-Xlint:unchecked"
),
scalacOptions ++= Seq(
"-deprecation",
"-encoding",
"UTF-8", // yes, this is 2 args
"-feature",
"-unchecked",
"-Xlint",
"-Ywarn-dead-code",
"-Ywarn-numeric-widen"
"-Wconf:cat=feature:w,cat=deprecation:w,cat=unchecked:w,cat=lint:w,cat=unused:w,cat=w-flag:w"
) ++ {
if (scalaBinaryVersion.value == "2.13") Seq.empty
else Seq("-Yno-adapted-args", "-Xfuture")
} ++ {
if (insideCI.value && !Nightly) Seq("-Xfatal-warnings")
if (insideCI.value && !Nightly) Seq("-Werror")
else Seq.empty
},
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
"-Wconf:cat=scaladoc:i",
"-doc-title",
"Alpakka Kafka",
"-doc-version",
Expand Down Expand Up @@ -199,10 +187,8 @@ lazy val core = project
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-discovery" % akkaVersion % Provided,
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
"org.scala-lang.modules" %% "scala-collection-compat" % "2.6.0"
"org.apache.kafka" % "kafka-clients" % kafkaVersion
),
Compile / compile / scalacOptions += "-Wconf:msg=[import scala.collection.compat._]:s",
mimaPreviousArtifacts := Set(
organization.value %% name.value % previousStableVersion.value
.getOrElse(throw new Error("Unable to determine previous version"))
Expand Down Expand Up @@ -281,18 +267,9 @@ lazy val tests = project
"org.slf4j" % "log4j-over-slf4j" % slf4jVersion % Test,
// Schema registry uses Glassfish which uses java.util.logging
"org.slf4j" % "jul-to-slf4j" % slf4jVersion % Test,
"org.mockito" % "mockito-core" % "4.2.0" % Test
) ++ {
scalaBinaryVersion.value match {
case "2.13" =>
Seq(scalapb)
case "2.12" =>
Seq(
scalapb,
kafkaBrokerWithoutSlf4jLog4j
)
}
},
"org.mockito" % "mockito-core" % "4.2.0" % Test,
"com.thesamet.scalapb" %% "scalapb-runtime" % "0.10.11" % Test
),
resolvers ++= Seq(
"Confluent Maven Repo" at "https://packages.confluent.io/maven/"
),
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/akka/kafka/Metadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetAndTimestamp}
import org.apache.kafka.common.{PartitionInfo, TopicPartition}

import scala.jdk.CollectionConverters._
import scala.collection.compat._
import scala.util.Try

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ import scala.concurrent.{ExecutionContext, Future}
}

protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = {
case (_, msg: KafkaConsumerActor.Internal.Messages[K, V]) =>
case (_, msg: KafkaConsumerActor.Internal.Messages[K @unchecked, V @unchecked]) =>
// might be more than one in flight when we assign/revoke tps
if (msg.requestId == requestId)
requested = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private final class CommitCollectorStageLogic(
pushed = true
} else pushOnNextPull = true
} else scheduleCommit()
case _ => log.warning("unexpected timer [{}]", timerKey)
}
if (!pushed) suspendContext()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ private[internal] trait CommitObservationLogic { self: GraphStageLogic =>
batch.filter(_.equals(gtp)),
offsetAndMetadata.offset()
)
case unknownBatchImpl: CommittableOffsetBatch =>
case null =>
throw new IllegalArgumentException(
s"Unknown CommittableOffsetBatch, got [${unknownBatchImpl.getClass.getName}], " +
s"expected [${classOf[CommittableOffsetBatchImpl].getName}]"
s"Unknown Committable implementation, got [null]"
)
case unknownImpl =>
throw new IllegalArgumentException(
s"Unknown Committable implementation, got [${unknownImpl.getClass.getName}]"
)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,

override protected def onTimer(timerKey: Any): Unit = timerKey match {
case CommittingProducerSinkStage.CommitNow => commit(Interval)
case _ => log.warning("unexpected timer [{}]", timerKey)
}

private def collectOffset(offset: Committable): Unit =
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ import scala.util.control.NonFatal
}

def expectSettings: Receive = LoggingReceive.withLabel("expectSettings") {
case s: ConsumerSettings[K, V] =>
case s: ConsumerSettings[K @unchecked, V @unchecked] =>
applySettings(s)

case scala.util.Failure(e) =>
Expand Down Expand Up @@ -413,7 +413,7 @@ import scala.util.control.NonFatal
pollTimeout = settings.pollTimeout.asJava
offsetForTimesTimeout = settings.getOffsetForTimesTimeout
positionTimeout = settings.getPositionTimeout
val progressTrackingFactory: () => ConsumerProgressTracking = ensureProgressTracker
val progressTrackingFactory: () => ConsumerProgressTracking = () => ensureProgressTracker()
commitRefreshing = CommitRefreshing(settings.commitRefreshInterval, progressTrackingFactory)
resetProtection = ConsumerResetProtection(log, settings.resetProtectionSettings, progressTrackingFactory)
try {
Expand Down Expand Up @@ -546,7 +546,7 @@ import scala.util.control.NonFatal
// commit partitions that are currently assigned to the consumer. For high volume topics, this can lead to small
// amounts of replayed data during a rebalance, but for low volume topics we can ensure that consumers never appear
// 'stuck' because of out-of-order commits from slow consumers.
val assignedOffsetsToCommit = aggregatedOffsets.filterKeys(consumer.assignment().contains).toMap
val assignedOffsetsToCommit = aggregatedOffsets.view.filterKeys(consumer.assignment().contains).toMap
progressTracker.commitRequested(assignedOffsetsToCommit)
val replyTo = commitSenders
// flush the data before calling `consumer.commitAsync` which might call the callback synchronously
Expand Down Expand Up @@ -708,20 +708,21 @@ import scala.util.control.NonFatal
}
)

case Metadata.GetCommittedOffset(partition) =>
@nowarn("msg=deprecated:s") val resp = Metadata.CommittedOffset(
case req: Metadata.GetCommittedOffset @nowarn("cat=deprecation") =>
@nowarn("cat=deprecation") val resp = Metadata.CommittedOffset(
Try {
@nowarn("msg=deprecated:s") val offset = consumer.committed(partition, settings.getMetadataRequestTimeout)
@nowarn("cat=deprecation") val offset = consumer.committed(req.partition, settings.getMetadataRequestTimeout)
offset
},
partition
req.partition
)
resp
}

private def stopFromMessage(msg: StopLike) = msg match {
case Stop => sender()
case StopFromStage(sourceStageId) => s"StageId [$sourceStageId]"
case other => s"unknown: [$other]"
}

/**
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/akka/kafka/internal/MessageBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.OffsetFetchResponse

import scala.collection.compat._
import scala.compat.java8.FutureConverters.FutureOps
import scala.concurrent.Future
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -171,6 +170,8 @@ private[kafka] final class CommittableOffsetBatchImpl(
def updated(committable: Committable): CommittableOffsetBatch = committable match {
case offset: CommittableOffset => updatedWithOffset(offset)
case batch: CommittableOffsetBatch => updatedWithBatch(batch)
case null => throw new IllegalArgumentException(s"unexpected Committable [null]")
case _ => throw new IllegalArgumentException(s"unexpected Committable [${committable.getClass}]")
}

private[internal] def committerFor(groupTopicPartition: GroupTopicPartition) =
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import akka.pattern.{ask, AskTimeoutException}
import akka.stream.scaladsl.Source
import akka.stream.stage.GraphStageLogic.StageActor
import akka.stream.stage._
import akka.stream.{ActorMaterializerHelper, Attributes, Outlet, SourceShape}
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.util.Timeout
import org.apache.kafka.common.TopicPartition

Expand Down Expand Up @@ -83,9 +83,11 @@ private class SubSourceLogic[K, V, Msg](
failStage(e)
case (_, Terminated(ref)) if ref == consumerActor =>
failStage(new ConsumerFailed)
case (_, msg) =>
log.warning("ignoring message [{}]", msg)
}
consumerActor = {
val extendedActorSystem = ActorMaterializerHelper.downcast(materializer).system.asInstanceOf[ExtendedActorSystem]
val extendedActorSystem = materializer.system.asInstanceOf[ExtendedActorSystem]
extendedActorSystem.systemActorOf(akka.kafka.KafkaConsumerActor.props(sourceActor.ref, settings),
s"kafka-consumer-$actorNumber")
}
Expand All @@ -106,7 +108,8 @@ private class SubSourceLogic[K, V, Msg](
case (formerlyUnknown, offsetMap) =>
val updatedFormerlyUnknown = formerlyUnknown -- (partitionsToRevoke ++ partitionsInStartup ++ pendingPartitions)
// Filter out the offsetMap so that we don't re-seek for partitions that have been revoked
seekAndEmitSubSources(updatedFormerlyUnknown, offsetMap.filterKeys(k => !partitionsToRevoke.contains(k)).toMap)
seekAndEmitSubSources(updatedFormerlyUnknown,
offsetMap.view.filterKeys(k => !partitionsToRevoke.contains(k)).toMap)
}

private val partitionAssignedCB = getAsyncCallback[Set[TopicPartition]] { assigned =>
Expand Down Expand Up @@ -174,6 +177,7 @@ private class SubSourceLogic[K, V, Msg](
partitionsToRevoke.flatMap(subSources.get).map(_.control).foreach(_.shutdown())
subSources --= partitionsToRevoke
partitionsToRevoke = Set.empty
case _ => log.warning("unexpected timer [{}]", timerKey)
}

private val subsourceCancelledCB: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)] =
Expand Down Expand Up @@ -273,6 +277,8 @@ private class SubSourceLogic[K, V, Msg](
case (_, Terminated(ref)) if ref == consumerActor =>
onShutdown()
completeStage()
case (_, msg) =>
log.warning("ignoring message [{}]", msg)
}
materializer.scheduleOnce(
settings.stopTimeout,
Expand Down Expand Up @@ -420,7 +426,7 @@ private abstract class SubSourceStageLogic[K, V, Msg](
}

protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = {
case (_, msg: KafkaConsumerActor.Internal.Messages[K, V]) =>
case (_, msg: KafkaConsumerActor.Internal.Messages[K @unchecked, V @unchecked]) =>
requested = false
buffer = buffer ++ msg.messages
pump()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import akka.kafka.internal.ProducerStage.ProducerCompletionState
import akka.kafka.{ConsumerMessage, ProducerSettings}
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape}
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata, OffsetAndMetadata}
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition

Expand Down Expand Up @@ -218,6 +218,7 @@ private final class TransactionalProducerStageLogic[K, V, P](

override protected def postSend(msg: Envelope[K, V, P]): Unit = msg.passThrough match {
case o: ConsumerMessage.PartitionOffsetCommittedMarker => batchOffsets = batchOffsets.updated(o)
case _ =>
}

override def onCompletionSuccess(): Unit = {
Expand All @@ -240,7 +241,7 @@ private final class TransactionalProducerStageLogic[K, V, P](
group,
batch.offsets)
val offsetMap = batch.offsetMap()
producer.sendOffsetsToTransaction(offsetMap.asJava, group)
producer.sendOffsetsToTransaction(offsetMap.asJava, new ConsumerGroupMetadata(group))
producer.commitTransaction()
log.debug("Committed transaction for transactional id '{}' consumer group '{}' with offsets: {}",
transactionalId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import akka.util.Timeout
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, OffsetAndMetadata}
import org.apache.kafka.common.{IsolationLevel, TopicPartition}

import scala.collection.compat._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContext, Future}

Expand Down
Loading