Skip to content

Commit

Permalink
tests, logs and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
anand-h-codes committed Aug 25, 2024
1 parent ca66775 commit d270a12
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 100 deletions.
2 changes: 1 addition & 1 deletion docs/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ A trident-spout is actually run within a storm _bolt_. The storm-spout of a trid

You should only store static data, and as little of it as possible, into the metadata record (note: maybe you _can_ store more interesting things; you shouldn't, though)

### How often is the 'emitPartitionBatchNew' function called?
### How often is the 'emitBatchNew' function called?

Since the MBC is the actual spout, all the tuples in a batch are just members of its tupletree. That means storm's "max spout pending" config effectively defines the number of concurrent batches trident runs. The MBC emits a new batch if it has fewer than max-spending tuples pending and if at least one [trident batch interval]({{page.git-blob-base}}/conf/defaults.yaml#L115)'s worth of seconds has passed since the last batch.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public KafkaTridentOpaqueSpoutEmitter(KafkaTridentSpoutEmitter<K, V> emitter) {
}

@Override
public Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> emitPartitionBatch(TransactionAttempt tx,
TridentCollector collector, Set<KafkaTridentSpoutTopicPartition> partitions, Map<KafkaTridentSpoutTopicPartition,
Map<String, Object>> lastPartitionMetaMap) {
return emitter.emitPartitionBatchNew(tx, collector, partitions, lastPartitionMetaMap);
public Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> emitNewBatch(TransactionAttempt tx,
TridentCollector collector, Set<KafkaTridentSpoutTopicPartition> partitions,
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastBatchMetaMap) {
return emitter.emitNewBatch(tx, collector, partitions, lastBatchMetaMap);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,16 @@ && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) {
/**
* Emit a new batch.
*/
public Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> emitPartitionBatchNew(TransactionAttempt tx,
TridentCollector collector, Collection<KafkaTridentSpoutTopicPartition> partitions, Map<KafkaTridentSpoutTopicPartition,
Map<String, Object>> lastPartitionMetaMap) {
public Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> emitNewBatch(TransactionAttempt tx,
TridentCollector collector, Set<KafkaTridentSpoutTopicPartition> partitions,
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastBatchMetaMap) {

LOG.debug("Processing batch: [transaction = {}], [currBatchPartitions = {}], [lastBatchMetadata = {}], [collector = {}]",
tx, partitions, lastBatchMetaMap, collector);

Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> ret = new HashMap<>();

seekAllPartitions(partitions, lastPartitionMetaMap);
seekAllPartitions(partitions, lastBatchMetaMap);

ConsumerRecords<K, V> poll = consumer.poll(Duration.ofMillis(pollTimeoutMs));
for (KafkaTridentSpoutTopicPartition partition : partitions) {
Expand All @@ -194,58 +197,11 @@ public Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> emitPartitionBa
topologyContext.getStormId()).toMap());
}
}

return ret;
}

public Map<String, Object> emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {

LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
tx, currBatchPartition, lastBatch, collector);

final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();

throwIfEmittingForUnassignedPartition(currBatchTp);

KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();

try {
// pause other topic-partitions to only poll from current topic-partition
pausedTopicPartitions = pauseTopicPartitions(currBatchTp);

seek(currBatchTp, lastBatchMeta);

final List<ConsumerRecord<K, V>> records = consumer.poll(pollTimeoutMs).records(currBatchTp);
LOG.debug("Polled [{}] records from Kafka.", records.size());

if (!records.isEmpty()) {
for (ConsumerRecord<K, V> record : records) {
emitTuple(collector, record);
}
// build new metadata based on emitted records
currentBatch = new KafkaTridentSpoutBatchMetadata(
records.get(0).offset(),
records.get(records.size() - 1).offset(),
topologyContext.getStormId());
} else {
//Build new metadata based on the consumer position.
//We want the next emit to start at the current consumer position,
//so make a meta that indicates that position - 1 is the last emitted offset
//This helps us avoid cases like STORM-3279, and simplifies the seek logic.
long lastEmittedOffset = consumer.position(currBatchTp) - 1;
currentBatch = new KafkaTridentSpoutBatchMetadata(lastEmittedOffset, lastEmittedOffset, topologyContext.getStormId());
}
} finally {
consumer.resume(pausedTopicPartitions);
LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
for (KafkaTridentSpoutTopicPartition kttp : ret.keySet()) {
LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], "
+ "[currBatchMetadata = {}], [collector = {}]", tx, kttp, lastBatchMetaMap.get(kttp), ret.get(kttp), collector);
}
LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], "
+ "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);

return currentBatch.toMap();
return ret;
}

private void seekAllPartitions(Collection<KafkaTridentSpoutTopicPartition> partitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,10 @@ public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(List<Map<Strin
}

@Override
public Map<String, Object> emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition partition, Map<String, Object> lastPartitionMeta) {
return emitter.emitPartitionBatchNew(tx, collector, partition, lastPartitionMeta);
}

@Override
public Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> emitPartitionBatchNew(TransactionAttempt tx,
public Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> emitBatchNew(TransactionAttempt tx,
TridentCollector collector, Set<KafkaTridentSpoutTopicPartition> partitions,
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastPartitionMetaMap) {
return emitter.emitPartitionBatchNew(tx, collector, partitions, lastPartitionMetaMap);
return emitter.emitNewBatch(tx, collector, partitions, lastPartitionMetaMap);
}

@Override
Expand All @@ -62,8 +56,8 @@ public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionRes
}

@Override
public void emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition partition, Map<String, Object> partitionMeta) {
public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition partition, Map<String, Object> partitionMeta) {
emitter.reEmitPartitionBatch(tx, collector, partition, partitionMeta);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -107,10 +108,15 @@ private KafkaTridentSpoutEmitter<String, String> createEmitter(FirstPollOffsetSt

private Map<String, Object> doEmitNewBatchTest(FirstPollOffsetStrategy firstPollOffsetStrategy, TridentCollector collectorMock, TopicPartition tp, Map<String, Object> previousBatchMeta) {
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(firstPollOffsetStrategy);

TransactionAttempt txid = new TransactionAttempt(10L, 0);
return emitNewBatch(emitter,txid,collectorMock,tp, previousBatchMeta);
}

private Map<String, Object> emitNewBatch(KafkaTridentSpoutEmitter<String, String> emitter, TransactionAttempt txid, TridentCollector collectorMock, TopicPartition tp, Map<String, Object> previousBatchMeta) {
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
return emitter.emitPartitionBatchNew(txid, collectorMock, kttp, previousBatchMeta);
Map<KafkaTridentSpoutTopicPartition, Map<String, Object>> lastBatchMetaMap = new HashMap<>();
lastBatchMetaMap.put(kttp, previousBatchMeta);
return emitter.emitNewBatch(txid, collectorMock, Collections.singleton(kttp), lastBatchMetaMap).get(kttp);
}

@Test
Expand Down Expand Up @@ -166,7 +172,7 @@ public void testEmitEmptyBatches() {
//Emit 10 empty batches, simulating no new records being present in Kafka
for (int i = 0; i < 10; i++) {
TransactionAttempt txid = new TransactionAttempt((long) i, 0);
lastBatchMeta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, lastBatchMeta);
lastBatchMeta = emitNewBatch(emitter, txid, collectorMock, partition, lastBatchMeta);
KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(lastBatchMeta);
assertThat("Since the first poll strategy is LATEST, the meta should indicate that the last message has already been emitted", deserializedMeta.getFirstOffset(), is(lastOffsetInKafka));
assertThat("Since the first poll strategy is LATEST, the meta should indicate that the last message has already been emitted", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
Expand All @@ -176,8 +182,8 @@ public void testEmitEmptyBatches() {
int numNewRecords = 10;
List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstNewRecordOffset, numNewRecords);
newRecords.forEach(consumer::addRecord);
lastBatchMeta = emitter.emitPartitionBatchNew(new TransactionAttempt(11L, 0), collectorMock, kttp, lastBatchMeta);

TransactionAttempt txid = new TransactionAttempt(11L, 0);
lastBatchMeta = emitNewBatch(emitter,txid, collectorMock, partition, lastBatchMeta);
verify(collectorMock, times(numNewRecords)).emit(emitCaptor.capture());
List<List<Object>> emits = emitCaptor.getAllValues();
assertThat(emits.get(0).get(0), is(firstNewRecordOffset));
Expand Down Expand Up @@ -228,8 +234,7 @@ public void testEmitEmptyFirstBatch() {
KafkaTridentSpoutBatchMetadata preRedeployLastMeta = new KafkaTridentSpoutBatchMetadata(firstEmittedOffset, firstEmittedOffset + emittedRecords - 1, "an old topology");
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.LATEST);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preRedeployLastMeta.toMap());
Map<String, Object> meta = emitNewBatch(emitter,txid, collectorMock, partition, preRedeployLastMeta.toMap());

verify(collectorMock, never()).emit(anyList());
KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta);
Expand All @@ -240,7 +245,7 @@ public void testEmitEmptyFirstBatch() {
int numNewRecords = 10;
List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstNewRecordOffset, numNewRecords);
newRecords.forEach(consumer::addRecord);
meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, meta);
meta = emitNewBatch(emitter,txid, collectorMock, partition, meta);

verify(collectorMock, times(numNewRecords)).emit(emitCaptor.capture());
List<List<Object>> emits = emitCaptor.getAllValues();
Expand All @@ -264,8 +269,7 @@ public void testUnconditionalStrategyWhenSpoutWorkerIsRestarted(FirstPollOffsetS
KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + lastBatchEmittedRecords - 1, topologyId);
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(firstPollOffsetStrategy);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
Map<String, Object> meta = emitNewBatch(emitter,txid, collectorMock, partition, preExecutorRestartLastMeta.toMap());

long firstEmittedOffset = preRestartEmittedOffset + lastBatchEmittedRecords;
int emittedRecords = recordsInKafka - preRestartEmittedRecords;
Expand All @@ -288,8 +292,7 @@ public void testEarliestStrategyWhenTopologyIsRedeployed() {
KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + preRestartEmittedRecords - 1, "Some older topology");
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.EARLIEST);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
Map<String, Object> meta = emitNewBatch(emitter, txid, collectorMock, partition, preExecutorRestartLastMeta.toMap());

verify(collectorMock, times(recordsInKafka)).emit(emitCaptor.capture());
List<List<Object>> emits = emitCaptor.getAllValues();
Expand All @@ -311,7 +314,7 @@ public void testLatestStrategyWhenTopologyIsRedeployed() {
KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.LATEST);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
Map<String, Object> meta = emitNewBatch(emitter,txid, collectorMock, partition, preExecutorRestartLastMeta.toMap());

verify(collectorMock, never()).emit(anyList());
}
Expand All @@ -337,12 +340,11 @@ public void testTimeStampStrategyWhenTopologyIsRedeployed() {
HashMap<TopicPartition, List<ConsumerRecord<String, String>>> topicPartitionMap = new HashMap<>();
List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition, timeStampStartOffset, recordsInKafka);
topicPartitionMap.put(partition, newRecords);
when(kafkaConsumer.poll(pollTimeout)).thenReturn(new ConsumerRecords<>(topicPartitionMap));
when(kafkaConsumer.poll(Duration.ofMillis(pollTimeout))).thenReturn(new ConsumerRecords<>(topicPartitionMap));

KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(kafkaConsumer, FirstPollOffsetStrategy.TIMESTAMP);
TransactionAttempt txid = new TransactionAttempt(0L, 0);
KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
Map<String, Object> meta = emitNewBatch(emitter,txid, collectorMock, partition, preExecutorRestartLastMeta.toMap());

verify(collectorMock, times(recordsInKafka)).emit(emitCaptor.capture());
verify(kafkaConsumer, times(1)).seek(partition, timeStampStartOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ interface Emitter<PartitionsT, PartitionT extends ISpoutPartition, M> {
/**
* Emit a batch of tuples for a list of partitions/transactions.
*
* <p>Return the metadata describing this batch that will be used as lastPartitionMeta for defining the
* <p>Return the map of metadata describing this batch that will be used as lastPartitionMeta for defining the
* parameters of the next batch for each partition.
*/
Map<PartitionT, M> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
Set<PartitionT> partitions, Map<PartitionT, M> lastPartitionMetaMap);
Map<PartitionT, M> emitNewBatch(TransactionAttempt tx, TridentCollector collector,
Set<PartitionT> partitions, Map<PartitionT, M> lastBatchMetaMap);

/**
* This method is called when this task is responsible for a new set of partitions. Should be used to manage things like connections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,11 @@ interface Emitter<PartitionsT, PartitionT extends ISpoutPartition, X> {
List<PartitionT> getOrderedPartitions(PartitionsT allPartitionInfo);

/**
* Emit a batch of tuples for a partition/transaction that's never been emitted before. Return the metadata that can be used to
* Emit a batch of tuples for the partitions that's never been emitted before. Return the metadata that can be used to
* reconstruct this partition/batch in the future.
*/
X emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector, PartitionT partition, X lastPartitionMeta);

Map<PartitionT, X> emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector, Set<PartitionT> partitions,
Map<PartitionT, X> lastPartitionMetaMap);
Map<PartitionT, X> emitBatchNew(TransactionAttempt tx, TridentCollector collector, Set<PartitionT> partitions,
Map<PartitionT, X> lastPartitionMetaMap);

/**
* This method is called when this task is responsible for a new set of partitions. Should be used to manage things like connections
Expand All @@ -76,7 +74,7 @@ Map<PartitionT, X> emitPartitionBatchNew(TransactionAttempt tx, TridentCollector
* Emit a batch of tuples for a partition/transaction that has been emitted before, using the metadata created when it was first
* emitted.
*/
void emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, PartitionT partition, X partitionMeta);
void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collector, PartitionT partition, X partitionMeta);

/**
* Get the partitions assigned to the given task.
Expand Down
Loading

0 comments on commit d270a12

Please sign in to comment.