Skip to content

Commit

Permalink
Merge pull request #1740 from AnuGayan/upMaster
Browse files Browse the repository at this point in the history
Improve persisted aggregation and logs
  • Loading branch information
AnuGayan authored Aug 31, 2021
2 parents d39606e + b2e65df commit 326f4b8
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,17 @@ private Map<String, Boolean> isSafeToPurgeTheDuration(long purgeTime, Table pare
purgingCheckState.put(IS_PARENT_TABLE_HAS_AGGREGATED_DATA, dataInParentTable != null
&& dataInParentTable.length > 0);
} catch (Exception e) {
LOG.error("Error occurred while checking whether the data is safe to purge from aggregation " +
"tables for the aggregation " + aggregationDefinition.getId(), e);
if (e.getMessage().contains("deadlocked")) {
errorMessage = "Deadlock observed while checking whether the data is safe to purge from aggregation " +
"tables for the aggregation " + aggregationDefinition.getId() +
". If this occurred in an Active Active deployment, this error can be ignored if other node " +
"doesn't have this error";
} else {
errorMessage = "Error occurred while checking whether the data is safe to purge from aggregation " +
"tables for the aggregation " + aggregationDefinition.getId();

}
LOG.error(errorMessage, e);
purgingCheckState.put(IS_DATA_AVAILABLE_TO_PURGE, false);
purgingCheckState.put(IS_PARENT_TABLE_HAS_AGGREGATED_DATA, false);
errorMessage = "Error occurred while checking whether the data is safe to purge from aggregation tables" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt
tableEventChunk.add(event);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Event dispatched by " + this.duration + " incremental executor: " + eventChunk.toString());
LOG.debug("Event dispatched by aggregation " + aggregatorName + " for duration " + this.duration);
}
if (isProcessingExecutor) {
executorService.execute(() -> {
Expand All @@ -223,6 +223,9 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt
"' when performing table writes of aggregation '" + this.aggregatorName +
"' for duration '" + this.duration + "'. This should be investigated as this " +
"can cause accuracy loss.", t);
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping Event chunk - \"" + eventChunk.toString() + "\"");
}
} finally {
isProcessFinished.set(true);
}
Expand All @@ -236,7 +239,7 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt
}
} catch (InterruptedException e) {
LOG.error("Error occurred while waiting until table update task finishes for duration " +
duration, e);
duration + "in aggregation " + aggregatorName, e);
}
}
if (getNextExecutor() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,17 @@ public class IncrementalExecutorsInitialiser {
private String timeZone;

private boolean isInitialised;
private boolean isReadOnly;
private boolean isPersistedAggregation;

public IncrementalExecutorsInitialiser(List<TimePeriod.Duration> incrementalDurations,
Map<TimePeriod.Duration, Table> aggregationTables,
Map<TimePeriod.Duration, Executor> incrementalExecutorMap,
boolean isDistributed, String shardId, SiddhiAppContext siddhiAppContext,
MetaStreamEvent metaStreamEvent, Map<String, Table> tableMap,
Map<String, Window> windowMap,
Map<String, AggregationRuntime> aggregationMap, String timeZone) {
Map<String, AggregationRuntime> aggregationMap, String timeZone,
boolean isReadOnly, boolean isPersistedAggregation) {
this.timeZone = timeZone;
this.incrementalDurations = incrementalDurations;
this.aggregationTables = aggregationTables;
Expand All @@ -88,15 +91,18 @@ public IncrementalExecutorsInitialiser(List<TimePeriod.Duration> incrementalDura
this.aggregationMap = aggregationMap;

this.isInitialised = false;
this.isReadOnly = isReadOnly;
this.isPersistedAggregation = isPersistedAggregation;
}

public synchronized void initialiseExecutors() {
if (this.isInitialised) {
if (this.isInitialised || isReadOnly) {
// Only cleared when executors change from reading to processing state in one node deployment
return;
}
Event[] events;
Long endOFLatestEventTimestamp = null;
Long lastData = null;

// Get max(AGG_TIMESTAMP) from table corresponding to max duration
Table tableForMaxDuration = aggregationTables.get(incrementalDurations.get(incrementalDurations.size() - 1));
Expand All @@ -108,54 +114,115 @@ public synchronized void initialiseExecutors() {
// Get latest event timestamp in tableForMaxDuration and get the end time of the aggregation record
events = onDemandQueryRuntime.execute();
if (events != null) {
Long lastData = (Long) events[events.length - 1].getData(0);
lastData = (Long) events[events.length - 1].getData(0);
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(lastData, incrementalDurations.get(incrementalDurations.size() - 1), timeZone);
}
for (int i = incrementalDurations.size() - 1; i > 0; i--) {
TimePeriod.Duration recreateForDuration = incrementalDurations.get(i);
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);


// Get the table previous to the duration for which we need to recreate (e.g. if we want to recreate
// for minute duration, take the second table [provided that aggregation is done for seconds])
// This lookup is filtered by endOFLatestEventTimestamp
Table recreateFromTable = aggregationTables.get(incrementalDurations.get(i - 1));

onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext,
tableMap, windowMap,
aggregationMap);
events = onDemandQueryRuntime.execute();

if (events != null) {
long referenceToNextLatestEvent = (Long) events[events.length - 1].getData(0);
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(referenceToNextLatestEvent, incrementalDurations.get(i - 1), timeZone);

ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);

if (isPersistedAggregation) {
for (int i = incrementalDurations.size() - 1; i > 0; i--) {
if (lastData != null && !IncrementalTimeConverterUtil.
isAggregationDataComplete(lastData, incrementalDurations.get(i), timeZone)) {
recreateState(lastData, incrementalDurations.get(i),
aggregationTables.get(incrementalDurations.get(i - 1)), i == 1);
} else if (lastData == null) {
recreateState(null, incrementalDurations.get(i),
aggregationTables.get(incrementalDurations.get(i - 1)), i == 1);
}
if (i > 1) {
onDemandQuery = getOnDemandQuery(aggregationTables.get(incrementalDurations.get(i - 1)), true,
endOFLatestEventTimestamp);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null,
siddhiAppContext, tableMap, windowMap, aggregationMap);
events = onDemandQueryRuntime.execute();
if (events != null) {
lastData = (Long) events[events.length - 1].getData(0);
}
}
incrementalExecutor.execute(complexEventChunk);
}
} else {
for (int i = incrementalDurations.size() - 1; i > 0; i--) {

TimePeriod.Duration recreateForDuration = incrementalDurations.get(i);
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);


if (i == 1) {
TimePeriod.Duration rootDuration = incrementalDurations.get(0);
Executor rootIncrementalExecutor = incrementalExecutorMap.get(rootDuration);
long emitTimeOfLatestEventInTable = IncrementalTimeConverterUtil.getNextEmitTime(
referenceToNextLatestEvent, rootDuration, timeZone);
// Get the table previous to the duration for which we need to recreate (e.g. if we want to recreate
// for minute duration, take the second table [provided that aggregation is done for seconds])
// This lookup is filtered by endOFLatestEventTimestamp
Table recreateFromTable = aggregationTables.get(incrementalDurations.get(i - 1));

rootIncrementalExecutor.setEmitTime(emitTimeOfLatestEventInTable);
onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext,
tableMap, windowMap,
aggregationMap);
events = onDemandQueryRuntime.execute();

if (events != null) {
long referenceToNextLatestEvent = (Long) events[events.length - 1].getData(0);
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(referenceToNextLatestEvent, incrementalDurations.get(i - 1), timeZone);

ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);
}
incrementalExecutor.execute(complexEventChunk);

if (i == 1) {
TimePeriod.Duration rootDuration = incrementalDurations.get(0);
Executor rootIncrementalExecutor = incrementalExecutorMap.get(rootDuration);
long emitTimeOfLatestEventInTable = IncrementalTimeConverterUtil.getNextEmitTime(
referenceToNextLatestEvent, rootDuration, timeZone);

rootIncrementalExecutor.setEmitTime(emitTimeOfLatestEventInTable);

}
}
}
}
this.isInitialised = true;
}

private void recreateState(Long lastData, TimePeriod.Duration recreateForDuration,
Table recreateFromTable, boolean isBeforeRoot) {
Long endOFLatestEventTimestamp = null;
Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration);
if (lastData != null) {
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(lastData, recreateForDuration, timeZone);
}
OnDemandQuery onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
OnDemandQueryRuntime onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext,
tableMap, windowMap,
aggregationMap);
Event[] events = onDemandQueryRuntime.execute();
if (events != null) {
long referenceToNextLatestEvent = (Long) events[events.length - 1].getData(0);
ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>();
for (Event event : events) {
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setOutputData(event.getData());
complexEventChunk.add(streamEvent);
}
incrementalExecutor.execute(complexEventChunk);
if (isBeforeRoot) {
TimePeriod.Duration rootDuration = incrementalDurations.get(0);
Executor rootIncrementalExecutor = incrementalExecutorMap.get(rootDuration);
long emitTimeOfLatestEventInTable = IncrementalTimeConverterUtil.getNextEmitTime(
referenceToNextLatestEvent, rootDuration, timeZone);

rootIncrementalExecutor.setEmitTime(emitTimeOfLatestEventInTable);

}
}
}

private OnDemandQuery getOnDemandQuery(Table table, boolean isLargestGranularity, Long endOFLatestEventTimestamp) {
Selector selector = Selector.selector();
if (isLargestGranularity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class PersistedIncrementalExecutor implements Executor {
private Processor cudStreamProcessor;
private boolean isProcessingExecutor;
private LinkedBlockingQueue<QueuedCudStreamProcessor> cudStreamProcessorQueue;
private String aggregatorName;

public PersistedIncrementalExecutor(String aggregatorName, TimePeriod.Duration duration,
List<ExpressionExecutor> processExpressionExecutors,
Expand All @@ -69,6 +70,7 @@ public PersistedIncrementalExecutor(String aggregatorName, TimePeriod.Duration d
this.next = child;
this.cudStreamProcessor = cudStreamProcessor;

this.aggregatorName = aggregatorName;
this.timestampExpressionExecutor = processExpressionExecutors.remove(0);
this.streamEventFactory = new StreamEventFactory(metaStreamEvent);
setNextExecutor(child);
Expand All @@ -83,8 +85,8 @@ public PersistedIncrementalExecutor(String aggregatorName, TimePeriod.Duration d
@Override
public void execute(ComplexEventChunk streamEventChunk) {
if (log.isDebugEnabled()) {
log.debug("Event Chunk received by " + this.duration + " incremental executor: " +
streamEventChunk.toString() + " will be dropped since persisted aggregation has been scheduled ");
log.debug("Event Chunk received by the Aggregation " + aggregatorName + " for duration " + this.duration +
" will be dropped since persisted aggregation has been scheduled ");
}
streamEventChunk.reset();
while (streamEventChunk.hasNext()) {
Expand All @@ -94,6 +96,7 @@ public void execute(ComplexEventChunk streamEventChunk) {
try {
long timestamp = getTimestamp(streamEvent);
if (timestamp >= executorState.nextEmitTime) {
log.debug("Next EmitTime: " + executorState.nextEmitTime + ", Current Time: " + timestamp);
long emittedTime = executorState.nextEmitTime;
long startedTime = executorState.startTimeOfAggregates;
executorState.startTimeOfAggregates = IncrementalTimeConverterUtil.getStartTimeOfAggregates(
Expand All @@ -120,8 +123,8 @@ private void dispatchEvent(long startTimeOfNewAggregates, long emittedTime, Stri
ZoneId.of(timeZone));
ZonedDateTime endTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(emittedTime),
ZoneId.of(timeZone));
log.info("Aggregation event dispatched for the duration " + duration + " to aggregate data from "
+ startTime.toString() + " to " + endTime.toString() + " ");
log.info("Aggregation event dispatched for the duration " + duration + " for aggregation " + aggregatorName +
" to aggregate data from " + startTime + " to " + endTime + " ");
ComplexEventChunk complexEventChunk = new ComplexEventChunk();
StreamEvent streamEvent = streamEventFactory.newInstance();
streamEvent.setType(ComplexEvent.Type.CURRENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private static long getNextEmitTimeForHour(long currentTime, String timeZone) {
.of(zonedDateTime.getYear(), zonedDateTime.getMonthValue(), zonedDateTime.getDayOfMonth() + 1,
0, 0, 0, 0, ZoneId.of(timeZone)).toEpochSecond() * 1000;
}
} else {
} else {
return ZonedDateTime
.of(zonedDateTime.getYear(), zonedDateTime.getMonthValue(), zonedDateTime.getDayOfMonth(),
zonedDateTime.getHour() + 1, 0, 0, 0, ZoneId.of(timeZone)).toEpochSecond() * 1000;
Expand Down Expand Up @@ -229,4 +229,35 @@ public static int getMillisecondsPerDuration(TimePeriod.Duration duration) {
+ ".Number of milliseconds are only define for SECONDS, MINUTES, HOURS and DAYS");
}
}

public static boolean isAggregationDataComplete(long timestamp, TimePeriod.Duration duration, String timeZone) {
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
ZoneId.of(timeZone));
ZonedDateTime zonedCurrentDateTime = ZonedDateTime.ofInstant(Instant.now(), ZoneId.of(timeZone));
switch (duration) {
case SECONDS:
return false;
case MINUTES:
return zonedDateTime.getYear() == zonedCurrentDateTime.getYear() &&
zonedDateTime.getMonthValue() == zonedCurrentDateTime.getDayOfMonth() &&
zonedDateTime.getDayOfMonth() == zonedCurrentDateTime.getDayOfMonth() &&
zonedDateTime.getHour() == zonedCurrentDateTime.getHour() &&
zonedDateTime.getMinute() == (zonedCurrentDateTime.getMinute() - 1);
case HOURS:
return zonedDateTime.getYear() == zonedCurrentDateTime.getYear() &&
zonedDateTime.getMonthValue() == zonedCurrentDateTime.getDayOfMonth() &&
zonedDateTime.getDayOfMonth() == zonedCurrentDateTime.getDayOfMonth() &&
zonedDateTime.getHour() == (zonedCurrentDateTime.getHour() - 1);
case DAYS:
return zonedDateTime.getYear() == zonedCurrentDateTime.getYear() &&
zonedDateTime.getMonthValue() == zonedCurrentDateTime.getDayOfMonth() &&
zonedDateTime.getDayOfMonth() == (zonedCurrentDateTime.getDayOfMonth() - 1);
case MONTHS:
return zonedDateTime.getYear() == zonedCurrentDateTime.getYear() &&
zonedDateTime.getMonthValue() == (zonedCurrentDateTime.getMonthValue() - 1);
case YEARS:
return zonedDateTime.getYear() == (zonedCurrentDateTime.getYear() - 1);
}
return false;
}
}
Loading

0 comments on commit 326f4b8

Please sign in to comment.