Skip to content

Commit

Permalink
STORM-4076 KafkaTridentSpoutEmitters can poll all partitions at once …
Browse files Browse the repository at this point in the history
…instead of one at a time (#3679)

* change Kafka Trident consumer poll strategy
  • Loading branch information
anand-h-codes authored Sep 7, 2024
1 parent 14f0944 commit 9227ad9
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 81 deletions.
2 changes: 1 addition & 1 deletion docs/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ A trident-spout is actually run within a storm _bolt_. The storm-spout of a trid

You should only store static data, and as little of it as possible, into the metadata record (note: maybe you _can_ store more interesting things; you shouldn't, though)

### How often is the 'emitPartitionBatchNew' function called?
### How often is the 'emitBatchNew' function called?

Since the MBC is the actual spout, all the tuples in a batch are just members of its tupletree. That means storm's "max spout pending" config effectively defines the number of concurrent batches trident runs. The MBC emits a new batch if it has fewer than max-spending tuples pending and if at least one [trident batch interval]({{page.git-blob-base}}/conf/defaults.yaml#L115)'s worth of seconds has passed since the last batch.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
Expand All @@ -37,11 +39,13 @@ public KafkaTridentOpaqueSpoutEmitter(KafkaTridentSpoutEmitter<K, V> emitter) {
}

@Override
public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition partition, Map<String, Object> lastPartitionMeta) {
return emitter.emitPartitionBatchNew(tx, collector, partition, lastPartitionMeta);
public Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> emitBatchNew(TransactionAttempt tx,
TridentCollector collector, Set<KafkaTridentSpoutTopicPartition> partitions,
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastBatchMetaMap) {
return emitter.emitBatchNew(tx, collector, partitions, lastBatchMetaMap);
}


@Override
public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
emitter.refreshPartitions(partitionResponsibilities);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -163,54 +164,57 @@ && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) {
/**
* Emit a new batch.
*/
public Map<String, Object> emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {
public Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> emitBatchNew(TransactionAttempt tx,
TridentCollector collector, Set<KafkaTridentSpoutTopicPartition> partitions,
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastBatchMetaMap) {

LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
tx, currBatchPartition, lastBatch, collector);
LOG.debug("Processing batch: [transaction = {}], [currBatchPartitions = {}], [lastBatchMetadata = {}], [collector = {}]",
tx, partitions, lastBatchMetaMap, collector);

final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();

throwIfEmittingForUnassignedPartition(currBatchTp);

KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();

try {
// pause other topic-partitions to only poll from current topic-partition
pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> partitionToBatchMeta = new HashMap<>();

seek(currBatchTp, lastBatchMeta);

final List<ConsumerRecord<K, V>> records = consumer.poll(pollTimeoutMs).records(currBatchTp);
LOG.debug("Polled [{}] records from Kafka.", records.size());
seekAllPartitions(partitions, lastBatchMetaMap);

ConsumerRecords<K, V> poll = consumer.poll(Duration.ofMillis(pollTimeoutMs));
for (KafkaTridentSpoutTopicPartition partition : partitions) {
final List<ConsumerRecord<K, V>> records = poll.records(partition.getTopicPartition());
if (!records.isEmpty()) {
for (ConsumerRecord<K, V> record : records) {
emitTuple(collector, record);
}
// build new metadata based on emitted records
currentBatch = new KafkaTridentSpoutBatchMetadata(
records.get(0).offset(),
records.get(records.size() - 1).offset(),
topologyContext.getStormId());
partitionToBatchMeta.put(partition, new KafkaTridentSpoutBatchMetadata(
records.get(0).offset(),
records.get(records.size() - 1).offset(),
topologyContext.getStormId()).toMap());
} else {
//Build new metadata based on the consumer position.
//We want the next emit to start at the current consumer position,
//so make a meta that indicates that position - 1 is the last emitted offset
//This helps us avoid cases like STORM-3279, and simplifies the seek logic.
long lastEmittedOffset = consumer.position(currBatchTp) - 1;
currentBatch = new KafkaTridentSpoutBatchMetadata(lastEmittedOffset, lastEmittedOffset, topologyContext.getStormId());
long lastEmittedOffset = consumer.position(partition.getTopicPartition()) - 1;
partitionToBatchMeta.put(partition, new KafkaTridentSpoutBatchMetadata(lastEmittedOffset, lastEmittedOffset,
topologyContext.getStormId()).toMap());
}
} finally {
consumer.resume(pausedTopicPartitions);
LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
}
LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], "
+ "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
for (KafkaTridentSpoutTopicPartition kttp : partitionToBatchMeta.keySet()) {
LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], "
+ "[currBatchMetadata = {}], [collector = {}]", tx, kttp, lastBatchMetaMap.get(kttp),
partitionToBatchMeta.get(kttp), collector);
}
return partitionToBatchMeta;
}

return currentBatch.toMap();
private void seekAllPartitions(Collection<KafkaTridentSpoutTopicPartition> partitions,
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastPartitionMetaMap) {

for (KafkaTridentSpoutTopicPartition partition : partitions) {
TopicPartition currentBatchTp = partition.getTopicPartition();
throwIfEmittingForUnassignedPartition(currentBatchTp);
Map<String, Object> lastBatch = lastPartitionMetaMap.get(partition);
KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
seek(currentBatchTp, lastBatchMeta);
}
}

private boolean isFirstPollOffsetStrategyIgnoringCommittedOffsets() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IPartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
Expand All @@ -42,9 +44,10 @@ public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(List<Map<Strin
}

@Override
public Map<String, Object> emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition partition, Map<String, Object> lastPartitionMeta) {
return emitter.emitPartitionBatchNew(tx, collector, partition, lastPartitionMeta);
public Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> emitBatchNew(TransactionAttempt tx,
TridentCollector collector, Set<KafkaTridentSpoutTopicPartition> partitions,
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastPartitionMetaMap) {
return emitter.emitBatchNew(tx, collector, partitions, lastPartitionMetaMap);
}

@Override
Expand All @@ -53,7 +56,7 @@ public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionRes
}

@Override
public void emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition partition, Map<String, Object> partitionMeta) {
emitter.reEmitPartitionBatch(tx, collector, partition, partitionMeta);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -121,10 +122,15 @@ private KafkaTridentSpoutEmitter<String, String> createEmitter(FirstPollOffsetSt

private Map<String, Object> doEmitNewBatchTest(FirstPollOffsetStrategy firstPollOffsetStrategy, TridentCollector collectorMock, TopicPartition tp, Map<String, Object> previousBatchMeta) {
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(firstPollOffsetStrategy);

TransactionAttempt txid = new TransactionAttempt(10L, 0);
return emitBatchNew(emitter,txid,collectorMock,tp, previousBatchMeta);
}

private Map<String, Object> emitBatchNew(KafkaTridentSpoutEmitter<String, String> emitter, TransactionAttempt txid, TridentCollector collectorMock, TopicPartition tp, Map<String, Object> previousBatchMeta) {
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
return emitter.emitPartitionBatchNew(txid, collectorMock, kttp, previousBatchMeta);
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastBatchMetaMap = new HashMap<>();
lastBatchMetaMap.put(kttp, previousBatchMeta);
return emitter.emitBatchNew(txid, collectorMock, Collections.singleton(kttp), lastBatchMetaMap).get(kttp);
}

@Test
Expand Down Expand Up @@ -180,7 +186,7 @@ public void testEmitEmptyBatches() {
//Emit 10 empty batches, simulating no new records being present in Kafka
for (int i = 0; i < 10; i++) {
TransactionAttempt txid = new TransactionAttempt((long) i, 0);
lastBatchMeta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, lastBatchMeta);
lastBatchMeta = emitBatchNew(emitter, txid, collectorMock, partition, lastBatchMeta);
KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(lastBatchMeta);
assertThat("Since the first poll strategy is LATEST, the meta should indicate that the last message has already been emitted", deserializedMeta.getFirstOffset(), is(lastOffsetInKafka));
assertThat("Since the first poll strategy is LATEST, the meta should indicate that the last message has already been emitted", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
Expand All @@ -190,8 +196,8 @@ public void testEmitEmptyBatches() {
int numNewRecords = 10;
List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstNewRecordOffset, numNewRecords);
newRecords.forEach(consumer::addRecord);
lastBatchMeta = emitter.emitPartitionBatchNew(new TransactionAttempt(11L, 0), collectorMock, kttp, lastBatchMeta);

TransactionAttempt txid = new TransactionAttempt(11L, 0);
lastBatchMeta = emitBatchNew(emitter,txid, collectorMock, partition, lastBatchMeta);
verify(collectorMock, times(numNewRecords)).emit(emitCaptor.capture());
List<List<Object>> emits = emitCaptor.getAllValues();
assertThat(emits.get(0).get(0), is(firstNewRecordOffset));
Expand Down Expand Up @@ -242,8 +248,7 @@ public void testEmitEmptyFirstBatch() {
KafkaTridentSpoutBatchMetadata preRedeployLastMeta = new KafkaTridentSpoutBatchMetadata(firstEmittedOffset, firstEmittedOffset + emittedRecords - 1, "an old topology");
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.LATEST);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preRedeployLastMeta.toMap());
Map<String, Object> meta = emitBatchNew(emitter,txid, collectorMock, partition, preRedeployLastMeta.toMap());

verify(collectorMock, never()).emit(anyList());
KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta);
Expand All @@ -254,7 +259,7 @@ public void testEmitEmptyFirstBatch() {
int numNewRecords = 10;
List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstNewRecordOffset, numNewRecords);
newRecords.forEach(consumer::addRecord);
meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, meta);
meta = emitBatchNew(emitter,txid, collectorMock, partition, meta);

verify(collectorMock, times(numNewRecords)).emit(emitCaptor.capture());
List<List<Object>> emits = emitCaptor.getAllValues();
Expand All @@ -278,8 +283,7 @@ public void testUnconditionalStrategyWhenSpoutWorkerIsRestarted(FirstPollOffsetS
KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + lastBatchEmittedRecords - 1, topologyId);
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(firstPollOffsetStrategy);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
Map<String, Object> meta = emitBatchNew(emitter,txid, collectorMock, partition, preExecutorRestartLastMeta.toMap());

long firstEmittedOffset = preRestartEmittedOffset + lastBatchEmittedRecords;
int emittedRecords = recordsInKafka - preRestartEmittedRecords;
Expand All @@ -302,8 +306,7 @@ public void testEarliestStrategyWhenTopologyIsRedeployed() {
KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + preRestartEmittedRecords - 1, "Some older topology");
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.EARLIEST);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
Map<String, Object> meta = emitBatchNew(emitter, txid, collectorMock, partition, preExecutorRestartLastMeta.toMap());

verify(collectorMock, times(recordsInKafka)).emit(emitCaptor.capture());
List<List<Object>> emits = emitCaptor.getAllValues();
Expand All @@ -325,7 +328,7 @@ public void testLatestStrategyWhenTopologyIsRedeployed() {
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.LATEST);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
Map<String, Object> meta = emitBatchNew(emitter,txid, collectorMock, partition, preExecutorRestartLastMeta.toMap());

verify(collectorMock, never()).emit(anyList());
}
Expand All @@ -351,12 +354,11 @@ public void testTimeStampStrategyWhenTopologyIsRedeployed() {
HashMap<TopicPartition, List<ConsumerRecord<String, String>>> topicPartitionMap = new HashMap<>();
List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition, timeStampStartOffset, recordsInKafka);
topicPartitionMap.put(partition, newRecords);
when(kafkaConsumer.poll(pollTimeout)).thenReturn(new ConsumerRecords<>(topicPartitionMap));
when(kafkaConsumer.poll(Duration.ofMillis(pollTimeout))).thenReturn(new ConsumerRecords<>(topicPartitionMap));

KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(kafkaConsumer, adminClient, FirstPollOffsetStrategy.TIMESTAMP);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
Map<String, Object> meta = emitBatchNew(emitter,txid, collectorMock, partition, preExecutorRestartLastMeta.toMap());

verify(collectorMock, times(recordsInKafka)).emit(emitCaptor.capture());
verify(kafkaConsumer, times(1)).seek(partition, timeStampStartOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.topology.TransactionAttempt;
Expand Down Expand Up @@ -67,12 +69,13 @@ interface Coordinator<PartitionsT> {

interface Emitter<PartitionsT, PartitionT extends ISpoutPartition, M> {
/**
* Emit a batch of tuples for a partition/transaction.
* Emit a batch of tuples for a list of partitions/transactions.
*
* <p>Return the metadata describing this batch that will be used as lastPartitionMeta for defining the
* parameters of the next batch.
* <p>Return the map of metadata describing this batch that will be used as lastPartitionMeta for defining the
* parameters of the next batch for each partition.
*/
M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, PartitionT partition, M lastPartitionMeta);
Map<PartitionT, M> emitBatchNew(TransactionAttempt tx, TridentCollector collector,
Set<PartitionT> partitions, Map<PartitionT, M> lastBatchMetaMap);

/**
* This method is called when this task is responsible for a new set of partitions. Should be used to manage things like connections
Expand Down
Loading

0 comments on commit 9227ad9

Please sign in to comment.