Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add back Scala 2.12 #1538

Merged
merged 4 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/check-build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ jobs:
- name: Cache Coursier cache
uses: coursier/cache-action@v6

- name: Compile all code with fatal warnings for Java 11 and Scala 2.13
- name: Compile all code with fatal warnings for Java 11 and all Scala versions
# Run locally with: env CI=true sbt 'clean ; Test/compile ; It/compile'
run: sbt "; Test/compile; It/compile"
run: sbt "++ Test/compile; It/compile"

check-docs:
name: Check Docs
Expand Down Expand Up @@ -105,8 +105,8 @@ jobs:
fail-fast: false
matrix:
include:
- { java-version: [email protected], scala-version: 2.13.8, sbt-opts: '' }
- { java-version: [email protected], scala-version: 2.13.8, sbt-opts: '-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseJVMCICompiler' }
- { java-version: [email protected], sbt-opts: '' }
- { java-version: [email protected], sbt-opts: '-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseJVMCICompiler' }
steps:
- name: Checkout
uses: actions/checkout@v2
Expand All @@ -128,7 +128,7 @@ jobs:
uses: coursier/cache-action@v6

- name: Run tests with Scala ${{ matrix.scala-version }} and Java ${{ matrix.java-version }}
run: sbt "++${{ matrix.scala-version }} test" ${{ matrix.sbt-opts }}
run: sbt ${{ matrix.sbt-opts }} "test"

- name: Print logs on failure
if: ${{ failure() }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ jobs:
echo $GUSTAV_KEY | base64 -di > .github/id_rsa
chmod 600 .github/id_rsa
ssh-add .github/id_rsa
sbt "++2.13.8 docs/publishRsync"
sbt "docs/publishRsync"
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import javax.management.{Attribute, MBeanServerConnection, ObjectName}

import scala.concurrent.duration.{FiniteDuration, _}
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

private[benchmarks] trait InflightMetrics {
import InflightMetrics._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetCommitCallbac
import org.apache.kafka.common.TopicPartition

import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

object KafkaConsumerBenchmarks extends LazyLogging {
val pollTimeoutMs: Duration = Duration.ofMillis(50L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.kafka.benchmarks.app.RunTestCommand
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

case class KafkaConsumerTestFixture(topic: String, msgCount: Int, consumer: KafkaConsumer[Array[Byte], String]) {
def close(): Unit = consumer.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetada
import org.apache.kafka.common.TopicPartition

import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.concurrent.duration.FiniteDuration

object KafkaTransactionBenchmarks extends LazyLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.apache.kafka.common.serialization.{
StringSerializer
}

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

case class KafkaTransactionTestFixture(sourceTopic: String,
sinkTopic: String,
Expand Down
7 changes: 4 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ val Nightly = sys.env.get("EVENT_NAME").contains("schedule")
// align ignore-prefixes in scripts/link-validator.conf
// align in release.yml
val Scala213 = "2.13.8"
val Scala212 = "2.12.16"

val AkkaBinaryVersionForDocs = "2.6"
val akkaVersion = "2.6.19"
Expand Down Expand Up @@ -71,7 +72,7 @@ val commonSettings = Def.settings(
startYear := Some(2014),
licenses := Seq(("BUSL-1.1", url("https://raw.githubusercontent.com/akka/alpakka-kafka/master/LICENSE"))), // FIXME change s/master/v3.1.0/ when released
description := "Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.",
crossScalaVersions := Seq(Scala213),
crossScalaVersions := Seq(Scala213, Scala212),
scalaVersion := Scala213,
crossVersion := CrossVersion.binary,
javacOptions ++= Seq(
Expand All @@ -81,9 +82,9 @@ val commonSettings = Def.settings(
scalacOptions ++= Seq(
"-encoding",
"UTF-8", // yes, this is 2 args
"-Wconf:cat=feature:w,cat=deprecation:w,cat=unchecked:w,cat=lint:w,cat=unused:w,cat=w-flag:w"
"-Wconf:cat=feature:w,cat=deprecation&msg=.*JavaConverters.*:s,cat=unchecked:w,cat=lint:w,cat=unused:w,cat=w-flag:w"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid all warnings for JavaConverters, but not silence everything
https://www.scala-lang.org/2021/01/12/configuring-and-suppressing-warnings.html

) ++ {
if (insideCI.value && !Nightly) Seq("-Werror")
if (insideCI.value && !Nightly && scalaVersion.value != Scala212) Seq("-Werror")
else Seq.empty
},
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/ConsumerMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ object ConsumerMessage {
* Create an offset batch out of a list of offsets.
*/
def createCommittableOffsetBatch[T <: Committable](offsets: java.util.List[T]): CommittableOffsetBatch = {
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
CommittableOffsetBatch(offsets.asScala.toList)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import com.typesafe.config.Config
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.Deserializer

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._
import scala.concurrent.{ExecutionContext, Future}
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/akka/kafka/Metadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.actor.NoSerializationVerificationNeeded
import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetAndTimestamp}
import org.apache.kafka.common.{PartitionInfo, TopicPartition}

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.util.Try

/**
Expand Down Expand Up @@ -40,7 +40,7 @@ object Metadata {
def getResponse: Optional[java.util.Map[String, java.util.List[PartitionInfo]]] =
response
.map { m =>
Optional.of(m.view.mapValues(_.asJava).toMap.asJava)
Optional.of(m.iterator.map { case (k, v) => k -> v.asJava }.toMap.asJava)
}
.getOrElse(Optional.empty())
}
Expand Down Expand Up @@ -90,7 +90,7 @@ object Metadata {
def getResponse: Optional[java.util.Map[TopicPartition, java.lang.Long]] =
response
.map { m =>
Optional.of(m.view.mapValues(Long.box).toMap.asJava)
Optional.of(m.iterator.map { case (k, v) => k -> Long.box(v) }.toMap.asJava)
}
.getOrElse(Optional.empty())
}
Expand Down Expand Up @@ -120,7 +120,7 @@ object Metadata {
def getResponse: Optional[java.util.Map[TopicPartition, java.lang.Long]] =
response
.map { m =>
Optional.of(m.view.mapValues(Long.box).toMap.asJava)
Optional.of(m.iterator.map { case (k, v) => k -> Long.box(v) }.toMap.asJava)
}
.getOrElse(Optional.empty())
}
Expand Down Expand Up @@ -164,7 +164,7 @@ object Metadata {
* Warning: KafkaConsumer documentation states that this method may block indefinitely if the partition does not exist.
*/
def createGetOffsetForTimes(timestampsToSearch: java.util.Map[TopicPartition, java.lang.Long]): GetOffsetsForTimes =
GetOffsetsForTimes(timestampsToSearch.asScala.view.mapValues(_.toLong).toMap)
GetOffsetsForTimes(timestampsToSearch.asScala.iterator.map { case (k, v) => k -> v.toLong }.toMap)

/**
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/ProducerMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.NotUsed
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}

import scala.collection.immutable
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/**
* Classes that are used in both [[javadsl.Producer]] and
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/ProducerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.typesafe.config.Config
import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig}
import org.apache.kafka.common.serialization.Serializer

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration._
import akka.util.JavaDurationConverters._
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/Subscriptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import akka.kafka.internal.PartitionAssignmentHelpers.EmptyPartitionAssignmentHa
import org.apache.kafka.common.TopicPartition

import scala.annotation.varargs
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

sealed trait Subscription {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.annotation.InternalApi
import com.typesafe.config.{Config, ConfigObject}

import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import akka.util.JavaDurationConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.annotation.InternalApi
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecords, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/**
* Maintain our own OffsetAndTimestamp which can tolerate negative timestamps, which happen for old clients that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import akka.kafka.internal.KafkaConsumerActor.Internal.Seek
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/**
* Added as part of https://github.com/akka/alpakka-kafka/issues/1286 to avoid reprocessing data in case of Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import akka.stream.stage.GraphStageLogic
import akka.util.Timeout
import org.apache.kafka.common.{Metric, MetricName}

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.compat.java8.FutureConverters.{CompletionStageOps, FutureOps}
import scala.concurrent.{ExecutionContext, Future, Promise}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.kafka.common.errors.RebalanceInProgressException
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Success, Try}
Expand Down Expand Up @@ -546,7 +546,8 @@ import scala.util.control.NonFatal
// commit partitions that are currently assigned to the consumer. For high volume topics, this can lead to small
// amounts of replayed data during a rebalance, but for low volume topics we can ensure that consumers never appear
// 'stuck' because of out-of-order commits from slow consumers.
val assignedOffsetsToCommit = aggregatedOffsets.view.filterKeys(consumer.assignment().contains).toMap
val assignedOffsetsToCommit =
aggregatedOffsets.iterator.filter { case (k, _) => consumer.assignment().contains(k) }.toMap
progressTracker.commitRequested(assignedOffsetsToCommit)
val replyTo = commitSenders
// flush the data before calling `consumer.commitAsync` which might call the callback synchronously
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/akka/kafka/internal/MessageBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse

import scala.compat.java8.FutureConverters.FutureOps
import scala.concurrent.Future
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/** Internal API */
@InternalApi
Expand Down Expand Up @@ -165,7 +165,8 @@ private[kafka] final class CommittableOffsetBatchImpl(
private val committers: Map[GroupTopicPartition, KafkaAsyncConsumerCommitterRef],
override val batchSize: Long
) extends CommittableOffsetBatch {
def offsets: Map[GroupTopicPartition, Long] = offsetsAndMetadata.view.mapValues(_.offset() - 1L).toMap
def offsets: Map[GroupTopicPartition, Long] =
offsetsAndMetadata.iterator.map { case (k, v) => k -> (v.offset() - 1L) }.toMap

def updated(committable: Committable): CommittableOffsetBatch = committable match {
case offset: CommittableOffset => updatedWithOffset(offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import akka.kafka.{AutoSubscription, RestrictedConsumer, TopicPartitionsAssigned
import akka.stream.stage.AsyncCallback
import org.apache.kafka.common.TopicPartition

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/**
* Internal API.
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ private class SubSourceLogic[K, V, Msg](
case (formerlyUnknown, offsetMap) =>
val updatedFormerlyUnknown = formerlyUnknown -- (partitionsToRevoke ++ partitionsInStartup ++ pendingPartitions)
// Filter out the offsetMap so that we don't re-seek for partitions that have been revoked
seekAndEmitSubSources(updatedFormerlyUnknown,
offsetMap.view.filterKeys(k => !partitionsToRevoke.contains(k)).toMap)
seekAndEmitSubSources(updatedFormerlyUnknown, offsetMap.iterator.filterNot {
case (k, _) => partitionsToRevoke.contains(k)
}.toMap)
}

private val partitionAssignedCB = getAsyncCallback[Set[TopicPartition]] { assigned =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.kafka.common.TopicPartition

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/**
* INTERNAL API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private[internal] abstract class TransactionalSourceLogic[K, V, Msg](shape: Sour

private def drainHandling: PartialFunction[(ActorRef, Any), Unit] = {
case (sender, Committed(offsets)) =>
inFlightRecords.committed(offsets.view.mapValues(_.offset() - 1).toMap)
inFlightRecords.committed(offsets.iterator.map { case (k, v) => k -> (v.offset() - 1L) }.toMap)
sender.tell(Done, sourceActor.ref)
case (sender, CommittingFailure) => {
log.info("Committing failed, resetting in flight offsets")
Expand Down Expand Up @@ -409,7 +409,7 @@ private final class TransactionalSubSourceStageLogic[K, V](

private def drainHandling: PartialFunction[(ActorRef, Any), Unit] = {
case (sender, Committed(offsets)) =>
inFlightRecords.committed(offsets.view.mapValues(_.offset() - 1).toMap)
inFlightRecords.committed(offsets.iterator.map { case (k, v) => k -> (v.offset() - 1L) }.toMap)
sender ! Done
case (sender, CommittingFailure) => {
log.info("Committing failed, resetting in flight offsets")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/javadsl/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import akka.{Done, NotUsed}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration

Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.apache.kafka.common.{PartitionInfo, TopicPartition}

import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContextExecutor
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient) {

Expand All @@ -26,7 +26,7 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient
metadataClient
.getBeginningOffsets(partitions.asScala.toSet)
.map { beginningOffsets =>
beginningOffsets.view.mapValues(Long.box).toMap.asJava
beginningOffsets.iterator.map { case (k, v) => k -> Long.box(v) }.toMap.asJava
}(ExecutionContexts.parasitic)
.toJava

Expand All @@ -42,7 +42,7 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient
metadataClient
.getEndOffsets(partitions.asScala.toSet)
.map { endOffsets =>
endOffsets.view.mapValues(Long.box).toMap.asJava
endOffsets.iterator.map { case (k, v) => k -> Long.box(v) }.toMap.asJava
}(ExecutionContexts.parasitic)
.toJava

Expand All @@ -56,7 +56,7 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient
metadataClient
.listTopics()
.map { topics =>
topics.view.mapValues(partitionsInfo => partitionsInfo.asJava).toMap.asJava
topics.view.iterator.map { case (k, v) => k -> v.asJava }.toMap.asJava
}(ExecutionContexts.parasitic)
.toJava

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.kafka.ProducerMessage
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.collection.immutable

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringDeserializer, StringSerializer}
import org.slf4j.Logger

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

/**
* Common functions for scaladsl and javadsl Testkit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName

import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._

object TestcontainersKafka {
trait Spec extends KafkaSpec {
Expand Down
Loading