Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala3 builds #1603

Merged
merged 1 commit into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ val Nightly = sys.env.get("EVENT_NAME").contains("schedule")
// align in release.yml
val Scala213 = "2.13.10"
val Scala212 = "2.12.17"
val Scala3 = "3.1.3"
val Scala2Versions = Seq(Scala213, Scala212)
val ScalaVersions = Scala2Versions :+ Scala3

val Scala3Settings = Seq(crossScalaVersions := ScalaVersions)

val AkkaBinaryVersionForDocs = "2.7"
val akkaVersion = "2.7.0"
Expand All @@ -19,7 +24,7 @@ val kafkaVersion = "3.3.1"
val KafkaVersionForDocs = "33"
// This should align with the ScalaTest version used in the Akka 2.7.x testkit
// https://github.com/akka/akka/blob/main/project/Dependencies.scala#L41
val scalatestVersion = "3.1.4"
val scalatestVersion = "3.2.12"
val testcontainersVersion = "1.17.6"
val slf4jVersion = "1.7.36"
// this depends on Kafka, and should be upgraded to such latest version
Expand Down Expand Up @@ -76,7 +81,7 @@ val commonSettings = Def.settings(
startYear := Some(2014),
licenses := Seq(("BUSL-1.1", url("https://raw.githubusercontent.com/akka/alpakka-kafka/master/LICENSE"))), // FIXME change s/master/v4.0.1/ when released
description := "Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.",
crossScalaVersions := Seq(Scala213, Scala212),
crossScalaVersions := Scala2Versions,
scalaVersion := Scala213,
crossVersion := CrossVersion.binary,
javacOptions ++= Seq(
Expand All @@ -90,7 +95,7 @@ val commonSettings = Def.settings(
"8",
"-Wconf:cat=feature:w,cat=deprecation&msg=.*JavaConverters.*:s,cat=unchecked:w,cat=lint:w,cat=unused:w,cat=w-flag:w"
) ++ {
if (insideCI.value && !Nightly && scalaVersion.value != Scala212) Seq("-Werror")
if (insideCI.value && !Nightly && scalaVersion.value != Scala212 && scalaVersion.value != Scala3) Seq("-Werror")
else Seq.empty
},
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
Expand All @@ -101,20 +106,27 @@ val commonSettings = Def.settings(
version.value,
"-sourcepath",
(ThisBuild / baseDirectory).value.toString,
"-skip-packages",
"akka.pattern:scala", // for some reason Scaladoc creates this
"-doc-source-url", {
val branch = if (isSnapshot.value) "master" else s"v${version.value}"
s"https://github.com/akka/alpakka-kafka/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}"
},
"-doc-canonical-base-url",
"https://doc.akka.io/api/alpakka-kafka/current/"
),
) ++ {
if (scalaBinaryVersion.value.startsWith("3")) {
Seq("-skip-packages:akka.pattern") // different usage in scala3
} else {
Seq("-skip-packages", "akka.pattern") // for some reason Scaladoc creates this
}
},
// make use of https://github.com/scala/scala/pull/8663
Compile / doc / scalacOptions ++= Seq(
"-jdk-api-doc-base",
s"https://docs.oracle.com/en/java/javase/${JavaDocLinkVersion}/docs/api/java.base/"
),
Compile / doc / scalacOptions ++= {
if (scalaBinaryVersion.value.startsWith("3")) {
Seq(s"-external-mappings:https://docs.oracle.com/en/java/javase/${JavaDocLinkVersion}/docs/api/java.base/") // different usage in scala3
} else {
Seq("-jdk-api-doc-base", s"https://docs.oracle.com/en/java/javase/${JavaDocLinkVersion}/docs/api/java.base/")
}
},
Compile / doc / scalacOptions -= "-Xfatal-warnings",
// show full stack traces and test case durations
testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oDF"),
Expand Down Expand Up @@ -208,6 +220,7 @@ lazy val core = project
),
mimaBinaryIssueFilters += ProblemFilters.exclude[Problem]("akka.kafka.internal.*")
)
.settings(Scala3Settings)

lazy val testkit = project
.dependsOn(core)
Expand All @@ -231,6 +244,7 @@ lazy val testkit = project
),
mimaBinaryIssueFilters += ProblemFilters.exclude[Problem]("akka.kafka.testkit.internal.*")
)
.settings(Scala3Settings)

lazy val clusterSharding = project
.in(file("./cluster-sharding"))
Expand All @@ -249,6 +263,8 @@ lazy val clusterSharding = project
.getOrElse(throw new Error("Unable to determine previous version"))
)
)
.configs(IntegrationTest) // make CI not fail
.settings(Scala3Settings)

lazy val tests = project
.dependsOn(core, testkit, clusterSharding)
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/mima-filters/4.0.0.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Scala3 requirement
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffset.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffset.copy$default$1")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffset.copy$default$2")
3 changes: 0 additions & 3 deletions core/src/main/scala/akka/kafka/ConsumerMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ object ConsumerMessage {
override def _2: Long = offset

override def canEqual(that: Any): Boolean = that.isInstanceOf[PartitionOffset]

def copy(key: GroupTopicPartition = this.key, offset: Long = this.offset): PartitionOffset =
PartitionOffset(key, offset)
sebastian-alfers marked this conversation as resolved.
Show resolved Hide resolved
}

object PartitionOffset extends AbstractFunction2[GroupTopicPartition, Long, PartitionOffset] {
Expand Down
40 changes: 40 additions & 0 deletions core/src/main/scala/akka/kafka/ProducerMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,32 @@ object ProducerMessage {
def passThrough: PassThrough = message.passThrough
}

object Result {

/**
* Provide access to constructor which Scala3 makes private.
*/
def apply[K, V, PassThrough](metadata: RecordMetadata,
message: Message[K, V, PassThrough]): Result[K, V, PassThrough] =
new Result(metadata, message)
}

final case class MultiResultPart[K, V] private (
metadata: RecordMetadata,
record: ProducerRecord[K, V]
)

object MultiResultPart {

/**
* Provide access to constructor which Scala3 makes private.
*/
def apply[K, V](
metadata: RecordMetadata,
record: ProducerRecord[K, V]
): MultiResultPart[K, V] = new MultiResultPart[K, V](metadata, record)
}

/**
* [[Results]] implementation emitted when all messages in a [[MultiMessage]] have been
* successfully published.
Expand All @@ -233,11 +254,30 @@ object ProducerMessage {
def getParts(): java.util.Collection[MultiResultPart[K, V]] = parts.asJavaCollection
}

object MultiResult {

/**
* Provide access to constructor which Scala3 makes private.
*/
def apply[K, V, PassThrough](parts: immutable.Seq[MultiResultPart[K, V]],
passThrough: PassThrough): MultiResult[K, V, PassThrough] =
new MultiResult(parts, passThrough)
}

/**
* [[Results]] implementation emitted when a [[PassThroughMessage]] has passed
* through the flow.
*/
final case class PassThroughResult[K, V, PassThrough] private (passThrough: PassThrough)
extends Results[K, V, PassThrough]

object PassThroughResult {

/**
* Provide access to constructor which Scala3 makes private.
*/
def apply[K, V, PassThrough](passThrough: PassThrough): PassThroughResult[K, V, PassThrough] =
new PassThroughResult(passThrough)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:

case passthrough: PassThroughMessage[K, V, P] =>
postSend(passthrough)
val future = Future.successful(PassThroughResult[K, V, P](in.passThrough)).asInstanceOf[Future[OUT]]
val future = Future.successful(PassThroughResult(in.passThrough)).asInstanceOf[Future[OUT]]
push(stage.out, future)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ package akka.kafka.internal
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.kafka.ProducerSettings
import akka.stream.Materializer
import akka.stream.stage._
import akka.util.JavaDurationConverters._
import org.apache.kafka.clients.producer.Producer

import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
import scala.util.{Failure, Success}

Expand All @@ -37,8 +39,7 @@ private[kafka] object DeferredProducer {
* INTERNAL API
*/
@InternalApi
private[kafka] trait DeferredProducer[K, V] {
self: GraphStageLogic with StageIdLogging =>
private[kafka] trait DeferredProducer[K, V] extends GraphStageLogic with StageIdLogging {

import DeferredProducer._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.kafka.common.errors.{
TimeoutException
}
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}

import scala.annotation.nowarn
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -749,7 +748,6 @@ import scala.util.control.NonFatal
with NoSerializationVerificationNeeded {
override def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]): Unit
override def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]): Unit
override def onPartitionsLost(partitions: java.util.Collection[TopicPartition]): Unit

def postStop(): Unit = ()
}
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/akka/kafka/internal/MessageBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,6 @@ private[kafka] final class CommittableOffsetBatchImpl(
val metadata = newOffset match {
case offset: CommittableOffsetMetadata =>
offset.metadata
case _ =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CommittableOffsetMetadata is the only subtype of a sealed trait - so it should be ok to be removed as scala3 complained about it.

OffsetFetchResponse.NO_METADATA
}

val newOffsets =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object DiscoverySupport {
}

private def checkClassOrThrow(system: ActorSystemImpl): Unit =
system.dynamicAccess.getClassFor("akka.discovery.Discovery$") match {
system.dynamicAccess.getClassFor[AnyRef]("akka.discovery.Discovery$") match {
case Failure(_: ClassNotFoundException | _: NoClassDefFoundError) =>
throw new IllegalStateException(
s"Akka Discovery is being used but the `akka-discovery` library is not on the classpath, it must be added explicitly. See https://doc.akka.io/docs/alpakka-kafka/current/discovery.html"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ object ProducerResultFactory {
def multiResult[K, V, PassThrough](
parts: java.util.Collection[ProducerMessage.MultiResultPart[K, V]],
passThrough: PassThrough
): ProducerMessage.MultiResult[K, V, PassThrough] = ProducerMessage.MultiResult(parts.asScala.toList, passThrough)
): ProducerMessage.MultiResult[K, V, PassThrough] =
ProducerMessage.MultiResult(parts.asScala.toList, passThrough)

def passThroughResult[K, V, PassThrough](
passThrough: PassThrough
Expand Down