Skip to content

Commit

Permalink
support collect currentFetchEventTimeLag metric
Browse files Browse the repository at this point in the history
  • Loading branch information
Tan-JiaLiang committed Apr 30, 2024
1 parent 51d015b commit 872f2d1
Show file tree
Hide file tree
Showing 13 changed files with 165 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,13 @@ public void collect(T record, long timestamp) {
}
}

@Override
public void collect(T record, long timestamp, long fetchTime) {
if (!isEndOfStreamReached(record)) {
sourceOutput.collect(record, timestamp, fetchTime);
}
}

/**
* Judge and handle the eof record.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ public void collect(E record, long timestamp) {
collect(record);
}

@Override
public void collect(E record, long timestamp, long fetchTime) {
collect(record);
}

@Override
public void emitWatermark(Watermark watermark) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ public void collect(E record) {}
@Override
public void collect(E record, long timestamp) {}

@Override
public void collect(E record, long timestamp, long fetchTime) {}

@Override
public void emitWatermark(Watermark watermark) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,23 @@ public interface SourceOutput<T> extends WatermarkOutput {
* @param timestamp the timestamp of the record.
*/
void collect(T record, long timestamp);

/**
* Emit a record with a timestamp and pass its fetchTime.
*
* <p>Use this method if you capture the fetchTime and the source system has timestamps attached
* to records. Typical examples would be Logs, PubSubs, or Message Queues, like Kafka or
* Kinesis, which store a timestamp with each event. If the source system does not have a notion
* of records with timestamps, use {@link TimestampAssigner#NO_TIMESTAMP} instead.
*
* <p>The events typically still pass through a {@link TimestampAssigner}, which may decide to
* either use this source-provided timestamp, or replace it with a timestamp stored within the
* event (for example if the event was a JSON object one could configure aTimestampAssigner that
* extracts one of the object's fields and uses that as a timestamp).
*
* @param record the record to emit.
* @param timestamp the timestamp of the record.
* @param fetchTime the timestamp of the record fetched.
*/
void collect(T record, long timestamp, long fetchTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ public void collect(E record, long timestamp) {
collect(record);
}

@Override
public void collect(E record, long timestamp, long fetchTime) {
collect(record);
}

@Override
public void emitWatermark(Watermark watermark) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class InternalSourceReaderMetricGroup extends ProxyMetricGroup<MetricGrou
private boolean firstWatermark = true;
private long currentMaxDesiredWatermark;
private boolean firstDesiredWatermark = true;
private boolean firstFetchEventTimeLag = true;
private long currentFetchEventTimeLag;

private InternalSourceReaderMetricGroup(
MetricGroup parentMetricGroup,
Expand Down Expand Up @@ -116,6 +118,15 @@ public void idlingStarted() {
}
}

public void setCurrentFetchEventTimeLag(long currentFetchEventTimeLag) {
this.currentFetchEventTimeLag = currentFetchEventTimeLag;
if (firstFetchEventTimeLag) {
parentMetricGroup.gauge(
MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, this::getFetchTimeLag);
firstFetchEventTimeLag = false;
}
}

/**
* Called when a watermark was emitted.
*
Expand Down Expand Up @@ -166,6 +177,11 @@ long getEmitTimeLag() {
: UNDEFINED;
}

@VisibleForTesting
public long getFetchTimeLag() {
return this.currentFetchEventTimeLag;
}

long getWatermarkLag() {
return getLastEmitTime() - lastWatermark;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
Expand All @@ -40,17 +41,20 @@
public class NoOpTimestampsAndWatermarks<T> implements TimestampsAndWatermarks<T> {

private final TimestampAssigner<T> timestamps;
private final InternalSourceReaderMetricGroup metricGroup;

/** Creates a new {@link NoOpTimestampsAndWatermarks} with the given TimestampAssigner. */
public NoOpTimestampsAndWatermarks(TimestampAssigner<T> timestamps) {
public NoOpTimestampsAndWatermarks(
TimestampAssigner<T> timestamps, InternalSourceReaderMetricGroup metricGroup) {
this.timestamps = checkNotNull(timestamps);
this.metricGroup = checkNotNull(metricGroup);
}

@Override
public ReaderOutput<T> createMainOutput(
PushingAsyncDataInput.DataOutput<T> output, WatermarkUpdateListener watermarkEmitted) {
checkNotNull(output);
return new TimestampsOnlyOutput<>(output, timestamps);
return new TimestampsOnlyOutput<>(output, timestamps, metricGroup);
}

@Override
Expand Down Expand Up @@ -81,14 +85,17 @@ private static final class TimestampsOnlyOutput<T> implements ReaderOutput<T> {

private final PushingAsyncDataInput.DataOutput<T> output;
private final TimestampAssigner<T> timestampAssigner;
private final InternalSourceReaderMetricGroup metricGroup;
private final StreamRecord<T> reusingRecord;

private TimestampsOnlyOutput(
PushingAsyncDataInput.DataOutput<T> output,
TimestampAssigner<T> timestampAssigner) {
TimestampAssigner<T> timestampAssigner,
InternalSourceReaderMetricGroup metricGroup) {

this.output = output;
this.timestampAssigner = timestampAssigner;
this.metricGroup = metricGroup;
this.reusingRecord = new StreamRecord<>(null);
}

Expand All @@ -99,10 +106,19 @@ public void collect(T record) {

@Override
public void collect(T record, long timestamp) {
collect(record, timestamp, TimestampAssigner.NO_TIMESTAMP);
}

@Override
public void collect(T record, long timestamp, long fetchTime) {
try {
output.emitRecord(
reusingRecord.replace(
record, timestampAssigner.extractTimestamp(record, timestamp)));
long assignedTimestamp = timestampAssigner.extractTimestamp(record, timestamp);
if (fetchTime != TimestampAssigner.NO_TIMESTAMP
&& assignedTimestamp != TimestampAssigner.NO_TIMESTAMP) {
metricGroup.setCurrentFetchEventTimeLag(fetchTime - assignedTimestamp);
}

output.emitRecord(reusingRecord.replace(record, assignedTimestamp));
} catch (ExceptionInChainedOperatorException e) {
throw e;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

Expand Down Expand Up @@ -57,6 +58,8 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater

private final ProcessingTimeService timeService;

private final InternalSourceReaderMetricGroup metricGroup;

private final long periodicWatermarkInterval;

@Nullable private SplitLocalOutputs<T> currentPerSplitOutputs;
Expand All @@ -70,12 +73,14 @@ public ProgressiveTimestampsAndWatermarks(
WatermarkGeneratorSupplier<T> watermarksFactory,
WatermarkGeneratorSupplier.Context watermarksContext,
ProcessingTimeService timeService,
InternalSourceReaderMetricGroup metricGroup,
Duration periodicWatermarkInterval) {

this.timestampAssigner = timestampAssigner;
this.watermarksFactory = watermarksFactory;
this.watermarksContext = watermarksContext;
this.timeService = timeService;
this.metricGroup = metricGroup;

long periodicWatermarkIntervalMillis;
try {
Expand Down Expand Up @@ -115,15 +120,17 @@ public ReaderOutput<T> createMainOutput(
watermarkUpdateListener,
timestampAssigner,
watermarksFactory,
watermarksContext);
watermarksContext,
metricGroup);

currentMainOutput =
new StreamingReaderOutput<>(
output,
idlenessManager.getMainOutput(),
timestampAssigner,
watermarkGenerator,
currentPerSplitOutputs);
currentPerSplitOutputs,
metricGroup);

return currentMainOutput;
}
Expand Down Expand Up @@ -174,9 +181,16 @@ private static final class StreamingReaderOutput<T> extends SourceOutputWithWate
WatermarkOutput watermarkOutput,
TimestampAssigner<T> timestampAssigner,
WatermarkGenerator<T> watermarkGenerator,
SplitLocalOutputs<T> splitLocalOutputs) {

super(output, watermarkOutput, watermarkOutput, timestampAssigner, watermarkGenerator);
SplitLocalOutputs<T> splitLocalOutputs,
InternalSourceReaderMetricGroup metricGroup) {

super(
output,
watermarkOutput,
watermarkOutput,
timestampAssigner,
watermarkGenerator,
metricGroup);
this.splitLocalOutputs = splitLocalOutputs;
}

Expand Down Expand Up @@ -208,20 +222,23 @@ private static final class SplitLocalOutputs<T> {
private final WatermarkGeneratorSupplier<T> watermarksFactory;
private final WatermarkGeneratorSupplier.Context watermarkContext;
private final WatermarkUpdateListener watermarkUpdateListener;
private final InternalSourceReaderMetricGroup metricGroup;

private SplitLocalOutputs(
PushingAsyncDataInput.DataOutput<T> recordOutput,
WatermarkOutput watermarkOutput,
WatermarkUpdateListener watermarkUpdateListener,
TimestampAssigner<T> timestampAssigner,
WatermarkGeneratorSupplier<T> watermarksFactory,
WatermarkGeneratorSupplier.Context watermarkContext) {
WatermarkGeneratorSupplier.Context watermarkContext,
InternalSourceReaderMetricGroup metricGroup) {

this.recordOutput = recordOutput;
this.timestampAssigner = timestampAssigner;
this.watermarksFactory = watermarksFactory;
this.watermarkContext = watermarkContext;
this.watermarkUpdateListener = watermarkUpdateListener;
this.metricGroup = metricGroup;

this.watermarkMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
this.localOutputs =
Expand Down Expand Up @@ -251,7 +268,8 @@ SourceOutput<T> createOutputForSplit(String splitId) {
onEventOutput,
periodicOutput,
timestampAssigner,
watermarks);
watermarks,
metricGroup);

localOutputs.put(splitId, localOutput);
return localOutput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.flink.streaming.api.operators.source;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
Expand Down Expand Up @@ -68,6 +71,8 @@ public class SourceOutputWithWatermarks<T> implements SourceOutput<T> {

private final WatermarkOutput periodicWatermarkOutput;

private final InternalSourceReaderMetricGroup metricGroup;

private final StreamRecord<T> reusingRecord;

/**
Expand All @@ -79,13 +84,15 @@ protected SourceOutputWithWatermarks(
WatermarkOutput onEventWatermarkOutput,
WatermarkOutput periodicWatermarkOutput,
TimestampAssigner<T> timestampAssigner,
WatermarkGenerator<T> watermarkGenerator) {
WatermarkGenerator<T> watermarkGenerator,
InternalSourceReaderMetricGroup metricGroup) {

this.recordsOutput = checkNotNull(recordsOutput);
this.onEventWatermarkOutput = checkNotNull(onEventWatermarkOutput);
this.periodicWatermarkOutput = checkNotNull(periodicWatermarkOutput);
this.timestampAssigner = checkNotNull(timestampAssigner);
this.watermarkGenerator = checkNotNull(watermarkGenerator);
this.metricGroup = checkNotNull(metricGroup);
this.reusingRecord = new StreamRecord<>(null);
}

Expand All @@ -103,8 +110,17 @@ public final void collect(T record) {

@Override
public final void collect(T record, long timestamp) {
collect(record, timestamp, TimestampAssigner.NO_TIMESTAMP);
}

@Override
public void collect(T record, long timestamp, long fetchTime) {
try {
final long assignedTimestamp = timestampAssigner.extractTimestamp(record, timestamp);
if (fetchTime != TimestampAssigner.NO_TIMESTAMP
&& assignedTimestamp != TimestampAssigner.NO_TIMESTAMP) {
metricGroup.setCurrentFetchEventTimeLag(fetchTime - assignedTimestamp);
}

// IMPORTANT: The event must be emitted before the watermark generator is called.
recordsOutput.emitRecord(reusingRecord.replace(record, assignedTimestamp));
Expand Down Expand Up @@ -144,6 +160,11 @@ public final void emitPeriodicWatermark() {
watermarkGenerator.onPeriodicEmit(periodicWatermarkOutput);
}

@VisibleForTesting
public final InternalSourceReaderMetricGroup getMetricGroup() {
return metricGroup;
}

// ------------------------------------------------------------------------
// Factories
// ------------------------------------------------------------------------
Expand All @@ -157,13 +178,15 @@ public static <E> SourceOutputWithWatermarks<E> createWithSeparateOutputs(
WatermarkOutput onEventWatermarkOutput,
WatermarkOutput periodicWatermarkOutput,
TimestampAssigner<E> timestampAssigner,
WatermarkGenerator<E> watermarkGenerator) {
WatermarkGenerator<E> watermarkGenerator,
InternalSourceReaderMetricGroup metricGroup) {

return new SourceOutputWithWatermarks<>(
recordsOutput,
onEventWatermarkOutput,
periodicWatermarkOutput,
timestampAssigner,
watermarkGenerator);
watermarkGenerator,
metricGroup);
}
}
Loading

0 comments on commit 872f2d1

Please sign in to comment.