Skip to content

Commit

Permalink
Offset batch: remove intermediary structure (#870)
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru authored Aug 23, 2019
1 parent c2b9470 commit 05ecc0c
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 26 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ val commonSettings = Def.settings(
Seq(
"-doc-source-url", {
val branch = if (isSnapshot.value) "master" else s"v${version.value}"
s"https://github.com/akka/alpakka-kafka/tree/${branch}€{FILE_PATH_EXT}#€{FILE_LINE}"
s"https://github.com/akka/alpakka-kafka/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}"
},
"-doc-canonical-base-url",
"https://doc.akka.io/api/alpakka-kafka/current/"
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/akka/kafka/ConsumerMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import akka.Done
import akka.annotation.{DoNotInherit, InternalApi}
import akka.kafka.internal.{CommittableOffsetBatchImpl, CommittedMarker}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition

import scala.concurrent.Future
import scala.runtime.AbstractFunction2
Expand Down Expand Up @@ -137,7 +138,9 @@ object ConsumerMessage {
groupId: String,
topic: String,
partition: Int
)
) {
def topicPartition: TopicPartition = new TopicPartition(topic, partition)
}

object CommittableOffsetBatch {
val empty: CommittableOffsetBatch = new CommittableOffsetBatchImpl(Map.empty, Map.empty, 0)
Expand Down
30 changes: 9 additions & 21 deletions core/src/main/scala/akka/kafka/internal/CommittableSources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,7 @@ package akka.kafka.internal

import akka.actor.ActorRef
import akka.annotation.InternalApi
import akka.kafka.ConsumerMessage.{
CommittableMessage,
CommittableOffset,
CommittableOffsetBatch,
PartitionOffsetMetadata
}
import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset, CommittableOffsetBatch, GroupTopicPartition}
import akka.kafka._
import akka.kafka.internal.KafkaConsumerActor.Internal.{Commit, CommitSingle}
import akka.kafka.scaladsl.Consumer.Control
Expand All @@ -26,7 +21,6 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, Offset
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.OffsetFetchResponse

import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -139,26 +133,20 @@ private[kafka] class KafkaAsyncConsumerCommitterRef(consumerActor: ActorRef, com
)
)

private def commit(offsets: immutable.Seq[PartitionOffsetMetadata]): Future[Done] = {
val offsetsMap: Map[TopicPartition, OffsetAndMetadata] = offsets.map { offset =>
new TopicPartition(offset.key.topic, offset.key.partition) ->
new OffsetAndMetadata(offset.offset + 1, offset.metadata)
}.toMap
sendCommit(Commit(offsetsMap))
}

def commit(batch: CommittableOffsetBatch): Future[Done] = batch match {
case b: CommittableOffsetBatchImpl =>
val futures = b.offsetsAndMetadata.groupBy(_._1.groupId).map {
val groupIdOffsetMaps: Map[String, Map[GroupTopicPartition, OffsetAndMetadata]] =
b.offsetsAndMetadata.groupBy(_._1.groupId)
val futures = groupIdOffsetMaps.map {
case (groupId, offsetsMap) =>
val committer = b.committers.getOrElse(
groupId,
throw new IllegalStateException(s"Unknown committer, got [$groupId]")
)
val offsets: immutable.Seq[PartitionOffsetMetadata] = offsetsMap.map {
case (ctp, offset) => PartitionOffsetMetadata(ctp, offset.offset(), offset.metadata())
}.toList
committer.commit(offsets)
val offsets: Map[TopicPartition, OffsetAndMetadata] = offsetsMap.map {
case (gtp, offset) => gtp.topicPartition -> offset
}
committer.sendCommit(Commit(offsets))
}
Future.sequence(futures).map(_ => Done)

Expand All @@ -169,7 +157,7 @@ private[kafka] class KafkaAsyncConsumerCommitterRef(consumerActor: ActorRef, com
)
}

private[this] def sendCommit(msg: AnyRef): Future[Done] = {
private def sendCommit(msg: AnyRef): Future[Done] = {
import akka.pattern.ask
consumerActor
.ask(msg)(Timeout(commitTimeout))
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/akka/kafka/internal/MessageBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private[kafka] final class CommittableOffsetBatchImpl(
val committers: Map[String, KafkaAsyncConsumerCommitterRef],
override val batchSize: Long
) extends CommittableOffsetBatch {
def offsets: Map[GroupTopicPartition, Long] = offsetsAndMetadata.view.mapValues(_.offset()).toMap
def offsets: Map[GroupTopicPartition, Long] = offsetsAndMetadata.view.mapValues(_.offset() - 1L).toMap

def updated(committable: Committable): CommittableOffsetBatch = committable match {
case offset: CommittableOffset => updatedWithOffset(offset)
Expand All @@ -180,7 +180,7 @@ private[kafka] final class CommittableOffsetBatchImpl(
}

val newOffsets =
offsetsAndMetadata.updated(key, new OffsetAndMetadata(committableOffset.partitionOffset.offset, metadata))
offsetsAndMetadata.updated(key, new OffsetAndMetadata(committableOffset.partitionOffset.offset + 1L, metadata))

val committer = committableOffset match {
case c: CommittableOffsetImpl => c.committer
Expand Down Expand Up @@ -238,7 +238,7 @@ private[kafka] final class CommittableOffsetBatchImpl(
offsets.asJava

override def toString(): String =
s"CommittableOffsetBatch(batchSize=$batchSize, ${offsetsAndMetadata.mkString("->")})"
s"CommittableOffsetBatch(batchSize=$batchSize, ${offsets.mkString(", ")})"

override def commitScaladsl(): Future[Done] =
if (batchSize == 0L)
Expand Down

0 comments on commit 05ecc0c

Please sign in to comment.