Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-4076 KafkaTridentSpoutEmitters can poll all partitions at once instead of one at a time #3679

Merged
merged 5 commits into from
Sep 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ A trident-spout is actually run within a storm _bolt_. The storm-spout of a trid

You should only store static data, and as little of it as possible, into the metadata record (note: maybe you _can_ store more interesting things; you shouldn't, though)

### How often is the 'emitPartitionBatchNew' function called?
### How often is the 'emitBatchNew' function called?

Since the MBC is the actual spout, all the tuples in a batch are just members of its tupletree. That means storm's "max spout pending" config effectively defines the number of concurrent batches trident runs. The MBC emits a new batch if it has fewer than max-spending tuples pending and if at least one [trident batch interval]({{page.git-blob-base}}/conf/defaults.yaml#L115)'s worth of seconds has passed since the last batch.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
Expand All @@ -37,11 +39,13 @@ public KafkaTridentOpaqueSpoutEmitter(KafkaTridentSpoutEmitter<K, V> emitter) {
}

@Override
public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition partition, Map<String, Object> lastPartitionMeta) {
return emitter.emitPartitionBatchNew(tx, collector, partition, lastPartitionMeta);
public Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> emitBatchNew(TransactionAttempt tx,
TridentCollector collector, Set<KafkaTridentSpoutTopicPartition> partitions,
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastBatchMetaMap) {
return emitter.emitBatchNew(tx, collector, partitions, lastBatchMetaMap);
}


@Override
public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
emitter.refreshPartitions(partitionResponsibilities);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -163,54 +164,57 @@ && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) {
/**
* Emit a new batch.
*/
public Map<String, Object> emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {
public Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> emitBatchNew(TransactionAttempt tx,
TridentCollector collector, Set<KafkaTridentSpoutTopicPartition> partitions,
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastBatchMetaMap) {

LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
tx, currBatchPartition, lastBatch, collector);
LOG.debug("Processing batch: [transaction = {}], [currBatchPartitions = {}], [lastBatchMetadata = {}], [collector = {}]",
tx, partitions, lastBatchMetaMap, collector);

final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();

throwIfEmittingForUnassignedPartition(currBatchTp);

KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();

try {
// pause other topic-partitions to only poll from current topic-partition
pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> partitionToBatchMeta = new HashMap<>();

seek(currBatchTp, lastBatchMeta);

final List<ConsumerRecord<K, V>> records = consumer.poll(pollTimeoutMs).records(currBatchTp);
LOG.debug("Polled [{}] records from Kafka.", records.size());
seekAllPartitions(partitions, lastBatchMetaMap);

ConsumerRecords<K, V> poll = consumer.poll(Duration.ofMillis(pollTimeoutMs));
for (KafkaTridentSpoutTopicPartition partition : partitions) {
final List<ConsumerRecord<K, V>> records = poll.records(partition.getTopicPartition());
if (!records.isEmpty()) {
for (ConsumerRecord<K, V> record : records) {
emitTuple(collector, record);
}
// build new metadata based on emitted records
currentBatch = new KafkaTridentSpoutBatchMetadata(
records.get(0).offset(),
records.get(records.size() - 1).offset(),
topologyContext.getStormId());
partitionToBatchMeta.put(partition, new KafkaTridentSpoutBatchMetadata(
records.get(0).offset(),
records.get(records.size() - 1).offset(),
topologyContext.getStormId()).toMap());
} else {
//Build new metadata based on the consumer position.
//We want the next emit to start at the current consumer position,
//so make a meta that indicates that position - 1 is the last emitted offset
//This helps us avoid cases like STORM-3279, and simplifies the seek logic.
long lastEmittedOffset = consumer.position(currBatchTp) - 1;
currentBatch = new KafkaTridentSpoutBatchMetadata(lastEmittedOffset, lastEmittedOffset, topologyContext.getStormId());
long lastEmittedOffset = consumer.position(partition.getTopicPartition()) - 1;
partitionToBatchMeta.put(partition, new KafkaTridentSpoutBatchMetadata(lastEmittedOffset, lastEmittedOffset,
topologyContext.getStormId()).toMap());
}
} finally {
consumer.resume(pausedTopicPartitions);
LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
}
LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], "
+ "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
for (KafkaTridentSpoutTopicPartition kttp : partitionToBatchMeta.keySet()) {
LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], "
+ "[currBatchMetadata = {}], [collector = {}]", tx, kttp, lastBatchMetaMap.get(kttp),
partitionToBatchMeta.get(kttp), collector);
}
return partitionToBatchMeta;
}

return currentBatch.toMap();
private void seekAllPartitions(Collection<KafkaTridentSpoutTopicPartition> partitions,
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastPartitionMetaMap) {

for (KafkaTridentSpoutTopicPartition partition : partitions) {
TopicPartition currentBatchTp = partition.getTopicPartition();
throwIfEmittingForUnassignedPartition(currentBatchTp);
Map<String, Object> lastBatch = lastPartitionMetaMap.get(partition);
KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
seek(currentBatchTp, lastBatchMeta);
}
}

private boolean isFirstPollOffsetStrategyIgnoringCommittedOffsets() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IPartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
Expand All @@ -42,9 +44,10 @@ public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(List<Map<Strin
}

@Override
public Map<String, Object> emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition partition, Map<String, Object> lastPartitionMeta) {
return emitter.emitPartitionBatchNew(tx, collector, partition, lastPartitionMeta);
public Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> emitBatchNew(TransactionAttempt tx,
TridentCollector collector, Set<KafkaTridentSpoutTopicPartition> partitions,
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastPartitionMetaMap) {
return emitter.emitBatchNew(tx, collector, partitions, lastPartitionMetaMap);
}

@Override
Expand All @@ -53,7 +56,7 @@ public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionRes
}

@Override
public void emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition partition, Map<String, Object> partitionMeta) {
emitter.reEmitPartitionBatch(tx, collector, partition, partitionMeta);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -107,10 +108,15 @@ private KafkaTridentSpoutEmitter<String, String> createEmitter(FirstPollOffsetSt

private Map<String, Object> doEmitNewBatchTest(FirstPollOffsetStrategy firstPollOffsetStrategy, TridentCollector collectorMock, TopicPartition tp, Map<String, Object> previousBatchMeta) {
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(firstPollOffsetStrategy);

TransactionAttempt txid = new TransactionAttempt(10L, 0);
return emitBatchNew(emitter,txid,collectorMock,tp, previousBatchMeta);
}

private Map<String, Object> emitBatchNew(KafkaTridentSpoutEmitter<String, String> emitter, TransactionAttempt txid, TridentCollector collectorMock, TopicPartition tp, Map<String, Object> previousBatchMeta) {
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
return emitter.emitPartitionBatchNew(txid, collectorMock, kttp, previousBatchMeta);
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastBatchMetaMap = new HashMap<>();
lastBatchMetaMap.put(kttp, previousBatchMeta);
return emitter.emitBatchNew(txid, collectorMock, Collections.singleton(kttp), lastBatchMetaMap).get(kttp);
}

@Test
Expand Down Expand Up @@ -166,7 +172,7 @@ public void testEmitEmptyBatches() {
//Emit 10 empty batches, simulating no new records being present in Kafka
for (int i = 0; i < 10; i++) {
TransactionAttempt txid = new TransactionAttempt((long) i, 0);
lastBatchMeta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, lastBatchMeta);
lastBatchMeta = emitBatchNew(emitter, txid, collectorMock, partition, lastBatchMeta);
KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(lastBatchMeta);
assertThat("Since the first poll strategy is LATEST, the meta should indicate that the last message has already been emitted", deserializedMeta.getFirstOffset(), is(lastOffsetInKafka));
assertThat("Since the first poll strategy is LATEST, the meta should indicate that the last message has already been emitted", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
Expand All @@ -176,8 +182,8 @@ public void testEmitEmptyBatches() {
int numNewRecords = 10;
List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstNewRecordOffset, numNewRecords);
newRecords.forEach(consumer::addRecord);
lastBatchMeta = emitter.emitPartitionBatchNew(new TransactionAttempt(11L, 0), collectorMock, kttp, lastBatchMeta);

TransactionAttempt txid = new TransactionAttempt(11L, 0);
lastBatchMeta = emitBatchNew(emitter,txid, collectorMock, partition, lastBatchMeta);
verify(collectorMock, times(numNewRecords)).emit(emitCaptor.capture());
List<List<Object>> emits = emitCaptor.getAllValues();
assertThat(emits.get(0).get(0), is(firstNewRecordOffset));
Expand Down Expand Up @@ -228,8 +234,7 @@ public void testEmitEmptyFirstBatch() {
KafkaTridentSpoutBatchMetadata preRedeployLastMeta = new KafkaTridentSpoutBatchMetadata(firstEmittedOffset, firstEmittedOffset + emittedRecords - 1, "an old topology");
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.LATEST);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preRedeployLastMeta.toMap());
Map<String, Object> meta = emitBatchNew(emitter,txid, collectorMock, partition, preRedeployLastMeta.toMap());

verify(collectorMock, never()).emit(anyList());
KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta);
Expand All @@ -240,7 +245,7 @@ public void testEmitEmptyFirstBatch() {
int numNewRecords = 10;
List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstNewRecordOffset, numNewRecords);
newRecords.forEach(consumer::addRecord);
meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, meta);
meta = emitBatchNew(emitter,txid, collectorMock, partition, meta);

verify(collectorMock, times(numNewRecords)).emit(emitCaptor.capture());
List<List<Object>> emits = emitCaptor.getAllValues();
Expand All @@ -264,8 +269,7 @@ public void testUnconditionalStrategyWhenSpoutWorkerIsRestarted(FirstPollOffsetS
KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + lastBatchEmittedRecords - 1, topologyId);
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(firstPollOffsetStrategy);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
Map<String, Object> meta = emitBatchNew(emitter,txid, collectorMock, partition, preExecutorRestartLastMeta.toMap());

long firstEmittedOffset = preRestartEmittedOffset + lastBatchEmittedRecords;
int emittedRecords = recordsInKafka - preRestartEmittedRecords;
Expand All @@ -288,8 +292,7 @@ public void testEarliestStrategyWhenTopologyIsRedeployed() {
KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + preRestartEmittedRecords - 1, "Some older topology");
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.EARLIEST);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
Map<String, Object> meta = emitBatchNew(emitter, txid, collectorMock, partition, preExecutorRestartLastMeta.toMap());

verify(collectorMock, times(recordsInKafka)).emit(emitCaptor.capture());
List<List<Object>> emits = emitCaptor.getAllValues();
Expand All @@ -311,7 +314,7 @@ public void testLatestStrategyWhenTopologyIsRedeployed() {
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.LATEST);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
Map<String, Object> meta = emitBatchNew(emitter,txid, collectorMock, partition, preExecutorRestartLastMeta.toMap());

verify(collectorMock, never()).emit(anyList());
}
Expand All @@ -337,12 +340,11 @@ public void testTimeStampStrategyWhenTopologyIsRedeployed() {
HashMap<TopicPartition, List<ConsumerRecord<String, String>>> topicPartitionMap = new HashMap<>();
List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition, timeStampStartOffset, recordsInKafka);
topicPartitionMap.put(partition, newRecords);
when(kafkaConsumer.poll(pollTimeout)).thenReturn(new ConsumerRecords<>(topicPartitionMap));
when(kafkaConsumer.poll(Duration.ofMillis(pollTimeout))).thenReturn(new ConsumerRecords<>(topicPartitionMap));

KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(kafkaConsumer, FirstPollOffsetStrategy.TIMESTAMP);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
Map<String, Object> meta = emitBatchNew(emitter,txid, collectorMock, partition, preExecutorRestartLastMeta.toMap());

verify(collectorMock, times(recordsInKafka)).emit(emitCaptor.capture());
verify(kafkaConsumer, times(1)).seek(partition, timeStampStartOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.topology.TransactionAttempt;
Expand Down Expand Up @@ -67,12 +69,13 @@ interface Coordinator<PartitionsT> {

interface Emitter<PartitionsT, PartitionT extends ISpoutPartition, M> {
/**
* Emit a batch of tuples for a partition/transaction.
* Emit a batch of tuples for a list of partitions/transactions.
*
* <p>Return the metadata describing this batch that will be used as lastPartitionMeta for defining the
* parameters of the next batch.
* <p>Return the map of metadata describing this batch that will be used as lastPartitionMeta for defining the
* parameters of the next batch for each partition.
*/
M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, PartitionT partition, M lastPartitionMeta);
Map<PartitionT, M> emitBatchNew(TransactionAttempt tx, TridentCollector collector,
Set<PartitionT> partitions, Map<PartitionT, M> lastBatchMetaMap);

/**
* This method is called when this task is responsible for a new set of partitions. Should be used to manage things like connections
Expand Down
Loading