Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track Max & mean seconds to receive state messages #15586

Merged
merged 5 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ properties:
type: integer
recordsCommitted:
type: integer # if unset, committed records could not be computed
meanSecondsBeforeSourceStateMessageEmitted:
type: integer
gosusnp marked this conversation as resolved.
Show resolved Hide resolved
maxSecondsBeforeSourceStateMessageEmitted:
type: integer
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public static ImmutableMap<String, Object> generateJobAttemptMetadata(final Job
metadata.put("volume_rows", syncSummary.getRecordsSynced());
metadata.put("count_state_messages_from_source", syncSummary.getTotalStats().getSourceStateMessagesEmitted());
metadata.put("count_state_messages_from_destination", syncSummary.getTotalStats().getDestinationStateMessagesEmitted());
metadata.put("max_seconds_before_source_state_message_emitted",
syncSummary.getTotalStats().getMaxSecondsBeforeSourceStateMessageEmitted());
metadata.put("mean_seconds_before_source_state_message_emitted",
syncSummary.getTotalStats().getMeanSecondsBeforeSourceStateMessageEmitted());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class JobTrackerTest {
.put("volume_mb", SYNC_BYTES_SYNC)
.put("count_state_messages_from_source", 3L)
.put("count_state_messages_from_destination", 1L)
.put("max_seconds_before_source_state_message_emitted", 5L)
.put("mean_seconds_before_source_state_message_emitted", 4L)
.build();
private static final ImmutableMap<String, Object> SYNC_CONFIG_METADATA = ImmutableMap.<String, Object>builder()
.put(JobTracker.CONFIG + ".source.key", JobTracker.SET)
Expand Down Expand Up @@ -496,6 +498,8 @@ private Attempt getAttemptMock() {
when(attempt.getOutput()).thenReturn(java.util.Optional.of(jobOutput));
when(syncStats.getSourceStateMessagesEmitted()).thenReturn(3L);
when(syncStats.getDestinationStateMessagesEmitted()).thenReturn(1L);
when(syncStats.getMaxSecondsBeforeSourceStateMessageEmitted()).thenReturn(5L);
when(syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()).thenReturn(4L);
return attempt;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ else if (hasFailed.get()) {
.withRecordsEmitted(messageTracker.getTotalRecordsEmitted())
.withBytesEmitted(messageTracker.getTotalBytesEmitted())
.withSourceStateMessagesEmitted(messageTracker.getTotalSourceStateMessagesEmitted())
.withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted());
.withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted())
.withMaxSecondsBeforeSourceStateMessageEmitted(messageTracker.getMaxSecondsToReceiveSourceStateMessage())
.withMeanSecondsBeforeSourceStateMessageEmitted(messageTracker.getMeanSecondsToReceiveSourceStateMessage());

if (outputStatus == ReplicationStatus.COMPLETED) {
totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.joda.time.Seconds;

@Slf4j
public class AirbyteMessageTracker implements MessageTracker {
Expand All @@ -40,6 +42,8 @@ public class AirbyteMessageTracker implements MessageTracker {
private final AtomicReference<State> destinationOutputState;
private final AtomicLong totalSourceEmittedStateMessages;
private final AtomicLong totalDestinationEmittedStateMessages;
private Long maxSecondsToReceiveSourceStateMessage;
private Long meanSecondsToReceiveSourceStateMessage;
private final Map<Short, Long> streamToRunningCount;
private final HashFunction hashFunction;
private final BiMap<String, Short> streamNameToIndex;
Expand All @@ -49,6 +53,8 @@ public class AirbyteMessageTracker implements MessageTracker {
private final List<AirbyteTraceMessage> destinationErrorTraceMessages;
private final List<AirbyteTraceMessage> sourceErrorTraceMessages;
private final StateAggregator stateAggregator;
private DateTime firstRecordReceivedAt;
private final DateTime lastStateMessageReceivedAt;

private short nextStreamIndex;

Expand All @@ -74,6 +80,8 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, final
this.destinationOutputState = new AtomicReference<>();
this.totalSourceEmittedStateMessages = new AtomicLong(0L);
this.totalDestinationEmittedStateMessages = new AtomicLong(0L);
this.maxSecondsToReceiveSourceStateMessage = 0L;
this.meanSecondsToReceiveSourceStateMessage = 0L;
this.streamToRunningCount = new HashMap<>();
this.streamNameToIndex = HashBiMap.create();
this.hashFunction = Hashing.murmur3_32_fixed();
Expand All @@ -85,6 +93,8 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, final
this.destinationErrorTraceMessages = new ArrayList<>();
this.sourceErrorTraceMessages = new ArrayList<>();
this.stateAggregator = stateAggregator;
this.firstRecordReceivedAt = null;
this.lastStateMessageReceivedAt = null;
}

@Override
Expand All @@ -111,6 +121,10 @@ public void acceptFromDestination(final AirbyteMessage message) {
* total byte count for the record's stream.
*/
private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage) {
if (firstRecordReceivedAt == null) {
firstRecordReceivedAt = DateTime.now();
}
Comment on lines +124 to +126
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this is a global time, and not per-stream


final short streamIndex = getStreamIndex(recordMessage.getStream());

final long currentRunningCount = streamToRunningCount.getOrDefault(streamIndex, 0L);
Expand All @@ -131,6 +145,7 @@ private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage)
* correctly.
*/
private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) {
updateMaxAndMeanSecondsToReceiveStateMessage(DateTime.now());
sourceOutputState.set(new State().withState(stateMessage.getData()));
totalSourceEmittedStateMessages.incrementAndGet();
final int stateHash = getStateHashCode(stateMessage);
Expand Down Expand Up @@ -327,4 +342,48 @@ public Long getTotalDestinationStateMessagesEmitted() {
return totalDestinationEmittedStateMessages.get();
}

@Override
public Long getMaxSecondsToReceiveSourceStateMessage() {
return maxSecondsToReceiveSourceStateMessage;
}

@Override
public Long getMeanSecondsToReceiveSourceStateMessage() {
return meanSecondsToReceiveSourceStateMessage;
}

private void updateMaxAndMeanSecondsToReceiveStateMessage(final DateTime stateMessageReceivedAt) {
final Long secondsSinceLastStateMessage = calculateSecondsSinceLastStateEmitted(stateMessageReceivedAt);
if (maxSecondsToReceiveSourceStateMessage < secondsSinceLastStateMessage) {
maxSecondsToReceiveSourceStateMessage = secondsSinceLastStateMessage;
}

if (meanSecondsToReceiveSourceStateMessage == 0) {
meanSecondsToReceiveSourceStateMessage = secondsSinceLastStateMessage;
} else {
final Long newMeanSeconds =
calculateMean(meanSecondsToReceiveSourceStateMessage, totalSourceEmittedStateMessages.get(), secondsSinceLastStateMessage);
meanSecondsToReceiveSourceStateMessage = newMeanSeconds;
}
}

private Long calculateSecondsSinceLastStateEmitted(final DateTime stateMessageReceivedAt) {
if (lastStateMessageReceivedAt != null) {
return Long.valueOf(Seconds.secondsBetween(lastStateMessageReceivedAt, stateMessageReceivedAt).getSeconds());
} else if (firstRecordReceivedAt != null) {
return Long.valueOf(Seconds.secondsBetween(firstRecordReceivedAt, stateMessageReceivedAt).getSeconds());
} else {
// If we receive a State Message before a Record Message there is no previous timestamp to use for a
// calculation
return 0L;
}
}

@VisibleForTesting
protected Long calculateMean(final Long currentMean, final Long totalCount, final Long newDataPoint) {
final Long previousCount = totalCount - 1;
final double result = (Double.valueOf(currentMean * previousCount) / totalCount) + (Double.valueOf(newDataPoint) / totalCount);
return (long) result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public interface MessageTracker {

Long getTotalDestinationStateMessagesEmitted();

Long getMaxSecondsToReceiveSourceStateMessage();

Long getMeanSecondsToReceiveSourceStateMessage();

AirbyteTraceMessage getFirstDestinationErrorTraceMessage();

AirbyteTraceMessage getFirstSourceErrorTraceMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
when(messageTracker.getTotalDestinationStateMessagesEmitted()).thenReturn(1L);
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L));
when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L));
when(messageTracker.getMaxSecondsToReceiveSourceStateMessage()).thenReturn(5L);
when(messageTracker.getMeanSecondsToReceiveSourceStateMessage()).thenReturn(4L);

final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
Expand All @@ -467,6 +469,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
.withBytesEmitted(100L)
.withSourceStateMessagesEmitted(3L)
.withDestinationStateMessagesEmitted(1L)
.withMaxSecondsBeforeSourceStateMessageEmitted(5L)
.withMeanSecondsBeforeSourceStateMessageEmitted(4L)
.withRecordsCommitted(12L)) // since success, should use emitted count
.withStreamStats(Collections.singletonList(
new StreamSyncStats()
Expand All @@ -476,7 +480,9 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
.withRecordsEmitted(12L)
.withRecordsCommitted(12L) // since success, should use emitted count
.withSourceStateMessagesEmitted(null)
.withDestinationStateMessagesEmitted(null)))))
.withDestinationStateMessagesEmitted(null)
.withMaxSecondsBeforeSourceStateMessageEmitted(null)
.withMeanSecondsBeforeSourceStateMessageEmitted(null)))))
.withOutputCatalog(syncInput.getCatalog())
.withState(new State().withState(expectedState));

Expand Down Expand Up @@ -548,6 +554,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L));
when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L));
when(messageTracker.getStreamToCommittedRecords()).thenReturn(Optional.of(Collections.singletonMap(STREAM1, 6L)));
when(messageTracker.getMaxSecondsToReceiveSourceStateMessage()).thenReturn(10L);
when(messageTracker.getMeanSecondsToReceiveSourceStateMessage()).thenReturn(8L);

final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
Expand All @@ -565,6 +573,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
.withBytesEmitted(100L)
.withSourceStateMessagesEmitted(3L)
.withDestinationStateMessagesEmitted(2L)
.withMaxSecondsBeforeSourceStateMessageEmitted(10L)
.withMeanSecondsBeforeSourceStateMessageEmitted(8L)
.withRecordsCommitted(6L);
final List<StreamSyncStats> expectedStreamStats = Collections.singletonList(
new StreamSyncStats()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,13 @@ void testErrorTraceMessageFailureWithNoTraceErrors() throws Exception {
assertEquals(messageTracker.errorTraceMessageFailure(Long.valueOf(123), 1), null);
}

@Test
void testCalculateMean() throws Exception {
// Mean for 3 state messages is 5, 4th state message is 9, new mean should be 6
assertEquals(6L, messageTracker.calculateMean(5L, 4L, 9L));

// Mean for 5 state messages is 10, 4th state message is 12, new mean is 10.33 rounded down to 10
assertEquals(10L, messageTracker.calculateMean(10L, 6L, 12L));
}

}