Skip to content

Commit

Permalink
Add back Scala 2.12 (#1538)
Browse files Browse the repository at this point in the history
* because it has ripple effect to Alpakka and then Cassandra plugin
* we need to coordinate this decision across all Akka projects, later

Reverts parts of #1448
  • Loading branch information
patriknw authored Sep 20, 2022
1 parent 92c1a24 commit 36d335b
Show file tree
Hide file tree
Showing 44 changed files with 66 additions and 64 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/check-build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ jobs:
- name: Cache Coursier cache
uses: coursier/cache-action@v6

- name: Compile all code with fatal warnings for Java 11 and Scala 2.13
- name: Compile all code with fatal warnings for Java 11 and all Scala versions
# Run locally with: env CI=true sbt 'clean ; Test/compile ; It/compile'
run: sbt "; Test/compile; It/compile"
run: sbt "++ Test/compile; It/compile"

check-docs:
name: Check Docs
Expand Down Expand Up @@ -105,8 +105,8 @@ jobs:
fail-fast: false
matrix:
include:
- { java-version: [email protected], scala-version: 2.13.8, sbt-opts: '' }
- { java-version: [email protected], scala-version: 2.13.8, sbt-opts: '-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseJVMCICompiler' }
- { java-version: [email protected], sbt-opts: '' }
- { java-version: [email protected], sbt-opts: '-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseJVMCICompiler' }
steps:
- name: Checkout
uses: actions/checkout@v2
Expand All @@ -128,7 +128,7 @@ jobs:
uses: coursier/cache-action@v6

- name: Run tests with Scala ${{ matrix.scala-version }} and Java ${{ matrix.java-version }}
run: sbt "++${{ matrix.scala-version }} test" ${{ matrix.sbt-opts }}
run: sbt ${{ matrix.sbt-opts }} "test"

- name: Print logs on failure
if: ${{ failure() }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ jobs:
echo $GUSTAV_KEY | base64 -di > .github/id_rsa
chmod 600 .github/id_rsa
ssh-add .github/id_rsa
sbt "++2.13.8 docs/publishRsync"
sbt "docs/publishRsync"
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import javax.management.{Attribute, MBeanServerConnection, ObjectName}

import scala.concurrent.duration.{FiniteDuration, _}
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

private[benchmarks] trait InflightMetrics {
import InflightMetrics._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetCommitCallbac
import org.apache.kafka.common.TopicPartition

import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

object KafkaConsumerBenchmarks extends LazyLogging {
val pollTimeoutMs: Duration = Duration.ofMillis(50L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

case class KafkaConsumerTestFixture(topic: String, msgCount: Int, consumer: KafkaConsumer[Array[Byte], String]) {
def close(): Unit = consumer.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetada
import org.apache.kafka.common.TopicPartition

import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.concurrent.duration.FiniteDuration

object KafkaTransactionBenchmarks extends LazyLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.apache.kafka.common.serialization.{
StringSerializer
}

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

case class KafkaTransactionTestFixture(sourceTopic: String,
sinkTopic: String,
Expand Down
7 changes: 4 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ val Nightly = sys.env.get("EVENT_NAME").contains("schedule")
// align ignore-prefixes in scripts/link-validator.conf
// align in release.yml
val Scala213 = "2.13.8"
val Scala212 = "2.12.16"

val AkkaBinaryVersionForDocs = "2.6"
val akkaVersion = "2.6.19"
Expand Down Expand Up @@ -71,7 +72,7 @@ val commonSettings = Def.settings(
startYear := Some(2014),
licenses := Seq(("BUSL-1.1", url("https://raw.githubusercontent.com/akka/alpakka-kafka/master/LICENSE"))), // FIXME change s/master/v3.1.0/ when released
description := "Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.",
crossScalaVersions := Seq(Scala213),
crossScalaVersions := Seq(Scala213, Scala212),
scalaVersion := Scala213,
crossVersion := CrossVersion.binary,
javacOptions ++= Seq(
Expand All @@ -81,9 +82,9 @@ val commonSettings = Def.settings(
scalacOptions ++= Seq(
"-encoding",
"UTF-8", // yes, this is 2 args
"-Wconf:cat=feature:w,cat=deprecation:w,cat=unchecked:w,cat=lint:w,cat=unused:w,cat=w-flag:w"
"-Wconf:cat=feature:w,cat=deprecation&msg=.*JavaConverters.*:s,cat=unchecked:w,cat=lint:w,cat=unused:w,cat=w-flag:w"
) ++ {
if (insideCI.value && !Nightly) Seq("-Werror")
if (insideCI.value && !Nightly && scalaVersion.value != Scala212) Seq("-Werror")
else Seq.empty
},
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/ConsumerMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ object ConsumerMessage {
* Create an offset batch out of a list of offsets.
*/
def createCommittableOffsetBatch[T <: Committable](offsets: java.util.List[T]): CommittableOffsetBatch = {
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
CommittableOffsetBatch(offsets.asScala.toList)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import com.typesafe.config.Config
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.Deserializer

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._
import scala.concurrent.{ExecutionContext, Future}
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/akka/kafka/Metadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.actor.NoSerializationVerificationNeeded
import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetAndTimestamp}
import org.apache.kafka.common.{PartitionInfo, TopicPartition}

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.util.Try

/**
Expand Down Expand Up @@ -40,7 +40,7 @@ object Metadata {
def getResponse: Optional[java.util.Map[String, java.util.List[PartitionInfo]]] =
response
.map { m =>
Optional.of(m.view.mapValues(_.asJava).toMap.asJava)
Optional.of(m.iterator.map { case (k, v) => k -> v.asJava }.toMap.asJava)
}
.getOrElse(Optional.empty())
}
Expand Down Expand Up @@ -90,7 +90,7 @@ object Metadata {
def getResponse: Optional[java.util.Map[TopicPartition, java.lang.Long]] =
response
.map { m =>
Optional.of(m.view.mapValues(Long.box).toMap.asJava)
Optional.of(m.iterator.map { case (k, v) => k -> Long.box(v) }.toMap.asJava)
}
.getOrElse(Optional.empty())
}
Expand Down Expand Up @@ -120,7 +120,7 @@ object Metadata {
def getResponse: Optional[java.util.Map[TopicPartition, java.lang.Long]] =
response
.map { m =>
Optional.of(m.view.mapValues(Long.box).toMap.asJava)
Optional.of(m.iterator.map { case (k, v) => k -> Long.box(v) }.toMap.asJava)
}
.getOrElse(Optional.empty())
}
Expand Down Expand Up @@ -164,7 +164,7 @@ object Metadata {
* Warning: KafkaConsumer documentation states that this method may block indefinitely if the partition does not exist.
*/
def createGetOffsetForTimes(timestampsToSearch: java.util.Map[TopicPartition, java.lang.Long]): GetOffsetsForTimes =
GetOffsetsForTimes(timestampsToSearch.asScala.view.mapValues(_.toLong).toMap)
GetOffsetsForTimes(timestampsToSearch.asScala.iterator.map { case (k, v) => k -> v.toLong }.toMap)

/**
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/ProducerMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.NotUsed
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}

import scala.collection.immutable
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/**
* Classes that are used in both [[javadsl.Producer]] and
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/ProducerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.typesafe.config.Config
import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig}
import org.apache.kafka.common.serialization.Serializer

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration._
import akka.util.JavaDurationConverters._
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/Subscriptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import akka.kafka.internal.PartitionAssignmentHelpers.EmptyPartitionAssignmentHa
import org.apache.kafka.common.TopicPartition

import scala.annotation.varargs
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

sealed trait Subscription {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.annotation.InternalApi
import com.typesafe.config.{Config, ConfigObject}

import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import akka.util.JavaDurationConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.annotation.InternalApi
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecords, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/**
* Maintain our own OffsetAndTimestamp which can tolerate negative timestamps, which happen for old clients that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import akka.kafka.internal.KafkaConsumerActor.Internal.Seek
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/**
* Added as part of https://github.com/akka/alpakka-kafka/issues/1286 to avoid reprocessing data in case of Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import akka.stream.stage.GraphStageLogic
import akka.util.Timeout
import org.apache.kafka.common.{Metric, MetricName}

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.compat.java8.FutureConverters.{CompletionStageOps, FutureOps}
import scala.concurrent.{ExecutionContext, Future, Promise}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.kafka.common.errors.RebalanceInProgressException
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Success, Try}
Expand Down Expand Up @@ -546,7 +546,8 @@ import scala.util.control.NonFatal
// commit partitions that are currently assigned to the consumer. For high volume topics, this can lead to small
// amounts of replayed data during a rebalance, but for low volume topics we can ensure that consumers never appear
// 'stuck' because of out-of-order commits from slow consumers.
val assignedOffsetsToCommit = aggregatedOffsets.view.filterKeys(consumer.assignment().contains).toMap
val assignedOffsetsToCommit =
aggregatedOffsets.iterator.filter { case (k, _) => consumer.assignment().contains(k) }.toMap
progressTracker.commitRequested(assignedOffsetsToCommit)
val replyTo = commitSenders
// flush the data before calling `consumer.commitAsync` which might call the callback synchronously
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/akka/kafka/internal/MessageBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse

import scala.compat.java8.FutureConverters.FutureOps
import scala.concurrent.Future
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/** Internal API */
@InternalApi
Expand Down Expand Up @@ -165,7 +165,8 @@ private[kafka] final class CommittableOffsetBatchImpl(
private val committers: Map[GroupTopicPartition, KafkaAsyncConsumerCommitterRef],
override val batchSize: Long
) extends CommittableOffsetBatch {
def offsets: Map[GroupTopicPartition, Long] = offsetsAndMetadata.view.mapValues(_.offset() - 1L).toMap
def offsets: Map[GroupTopicPartition, Long] =
offsetsAndMetadata.iterator.map { case (k, v) => k -> (v.offset() - 1L) }.toMap

def updated(committable: Committable): CommittableOffsetBatch = committable match {
case offset: CommittableOffset => updatedWithOffset(offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import akka.kafka.{AutoSubscription, RestrictedConsumer, TopicPartitionsAssigned
import akka.stream.stage.AsyncCallback
import org.apache.kafka.common.TopicPartition

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/**
* Internal API.
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ private class SubSourceLogic[K, V, Msg](
case (formerlyUnknown, offsetMap) =>
val updatedFormerlyUnknown = formerlyUnknown -- (partitionsToRevoke ++ partitionsInStartup ++ pendingPartitions)
// Filter out the offsetMap so that we don't re-seek for partitions that have been revoked
seekAndEmitSubSources(updatedFormerlyUnknown,
offsetMap.view.filterKeys(k => !partitionsToRevoke.contains(k)).toMap)
seekAndEmitSubSources(updatedFormerlyUnknown, offsetMap.iterator.filterNot {
case (k, _) => partitionsToRevoke.contains(k)
}.toMap)
}

private val partitionAssignedCB = getAsyncCallback[Set[TopicPartition]] { assigned =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.kafka.common.TopicPartition

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/**
* INTERNAL API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private[internal] abstract class TransactionalSourceLogic[K, V, Msg](shape: Sour

private def drainHandling: PartialFunction[(ActorRef, Any), Unit] = {
case (sender, Committed(offsets)) =>
inFlightRecords.committed(offsets.view.mapValues(_.offset() - 1).toMap)
inFlightRecords.committed(offsets.iterator.map { case (k, v) => k -> (v.offset() - 1L) }.toMap)
sender.tell(Done, sourceActor.ref)
case (sender, CommittingFailure) => {
log.info("Committing failed, resetting in flight offsets")
Expand Down Expand Up @@ -409,7 +409,7 @@ private final class TransactionalSubSourceStageLogic[K, V](

private def drainHandling: PartialFunction[(ActorRef, Any), Unit] = {
case (sender, Committed(offsets)) =>
inFlightRecords.committed(offsets.view.mapValues(_.offset() - 1).toMap)
inFlightRecords.committed(offsets.iterator.map { case (k, v) => k -> (v.offset() - 1L) }.toMap)
sender ! Done
case (sender, CommittingFailure) => {
log.info("Committing failed, resetting in flight offsets")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/javadsl/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import akka.{Done, NotUsed}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration

Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.apache.kafka.common.{PartitionInfo, TopicPartition}

import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContextExecutor
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient) {

Expand All @@ -26,7 +26,7 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient
metadataClient
.getBeginningOffsets(partitions.asScala.toSet)
.map { beginningOffsets =>
beginningOffsets.view.mapValues(Long.box).toMap.asJava
beginningOffsets.iterator.map { case (k, v) => k -> Long.box(v) }.toMap.asJava
}(ExecutionContexts.parasitic)
.toJava

Expand All @@ -42,7 +42,7 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient
metadataClient
.getEndOffsets(partitions.asScala.toSet)
.map { endOffsets =>
endOffsets.view.mapValues(Long.box).toMap.asJava
endOffsets.iterator.map { case (k, v) => k -> Long.box(v) }.toMap.asJava
}(ExecutionContexts.parasitic)
.toJava

Expand All @@ -56,7 +56,7 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient
metadataClient
.listTopics()
.map { topics =>
topics.view.mapValues(partitionsInfo => partitionsInfo.asJava).toMap.asJava
topics.view.iterator.map { case (k, v) => k -> v.asJava }.toMap.asJava
}(ExecutionContexts.parasitic)
.toJava

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.kafka.ProducerMessage
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.collection.immutable

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringDeserializer, StringSerializer}
import org.slf4j.Logger

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/**
* Common functions for scaladsl and javadsl Testkit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName

import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

object TestcontainersKafka {
trait Spec extends KafkaSpec {
Expand Down
Loading

0 comments on commit 36d335b

Please sign in to comment.