diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalDataPurger.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalDataPurger.java index e4702ac0e3..2a477043ed 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalDataPurger.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalDataPurger.java @@ -393,8 +393,17 @@ private Map isSafeToPurgeTheDuration(long purgeTime, Table pare purgingCheckState.put(IS_PARENT_TABLE_HAS_AGGREGATED_DATA, dataInParentTable != null && dataInParentTable.length > 0); } catch (Exception e) { - LOG.error("Error occurred while checking whether the data is safe to purge from aggregation " + - "tables for the aggregation " + aggregationDefinition.getId(), e); + if (e.getMessage().contains("deadlocked")) { + errorMessage = "Deadlock observed while checking whether the data is safe to purge from aggregation " + + "tables for the aggregation " + aggregationDefinition.getId() + + ". If this occurred in an Active Active deployment, this error can be ignored if other node " + + "doesn't have this error"; + } else { + errorMessage = "Error occurred while checking whether the data is safe to purge from aggregation " + + "tables for the aggregation " + aggregationDefinition.getId(); + + } + LOG.error(errorMessage, e); purgingCheckState.put(IS_DATA_AVAILABLE_TO_PURGE, false); purgingCheckState.put(IS_PARENT_TABLE_HAS_AGGREGATED_DATA, false); errorMessage = "Error occurred while checking whether the data is safe to purge from aggregation tables" + diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutor.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutor.java index eecd654853..4cac0bff68 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutor.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutor.java @@ -212,7 +212,7 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt tableEventChunk.add(event); } if (LOG.isDebugEnabled()) { - LOG.debug("Event dispatched by " + this.duration + " incremental executor: " + eventChunk.toString()); + LOG.debug("Event dispatched by aggregation " + aggregatorName + " for duration " + this.duration); } if (isProcessingExecutor) { executorService.execute(() -> { @@ -223,6 +223,9 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt "' when performing table writes of aggregation '" + this.aggregatorName + "' for duration '" + this.duration + "'. This should be investigated as this " + "can cause accuracy loss.", t); + if (LOG.isDebugEnabled()) { + LOG.debug("Dropping Event chunk - \"" + eventChunk.toString() + "\""); + } } finally { isProcessFinished.set(true); } @@ -236,7 +239,7 @@ private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueSt } } catch (InterruptedException e) { LOG.error("Error occurred while waiting until table update task finishes for duration " + - duration, e); + duration + "in aggregation " + aggregatorName, e); } } if (getNextExecutor() != null) { diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutorsInitialiser.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutorsInitialiser.java index be6dbf76a5..e3f416483d 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutorsInitialiser.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/IncrementalExecutorsInitialiser.java @@ -64,6 +64,8 @@ public class IncrementalExecutorsInitialiser { private String timeZone; private boolean isInitialised; + private boolean isReadOnly; + private boolean isPersistedAggregation; public IncrementalExecutorsInitialiser(List incrementalDurations, Map aggregationTables, @@ -71,7 +73,8 @@ public IncrementalExecutorsInitialiser(List incrementalDura boolean isDistributed, String shardId, SiddhiAppContext siddhiAppContext, MetaStreamEvent metaStreamEvent, Map tableMap, Map windowMap, - Map aggregationMap, String timeZone) { + Map aggregationMap, String timeZone, + boolean isReadOnly, boolean isPersistedAggregation) { this.timeZone = timeZone; this.incrementalDurations = incrementalDurations; this.aggregationTables = aggregationTables; @@ -88,15 +91,18 @@ public IncrementalExecutorsInitialiser(List incrementalDura this.aggregationMap = aggregationMap; this.isInitialised = false; + this.isReadOnly = isReadOnly; + this.isPersistedAggregation = isPersistedAggregation; } public synchronized void initialiseExecutors() { - if (this.isInitialised) { + if (this.isInitialised || isReadOnly) { // Only cleared when executors change from reading to processing state in one node deployment return; } Event[] events; Long endOFLatestEventTimestamp = null; + Long lastData = null; // Get max(AGG_TIMESTAMP) from table corresponding to max duration Table tableForMaxDuration = aggregationTables.get(incrementalDurations.get(incrementalDurations.size() - 1)); @@ -108,54 +114,115 @@ public synchronized void initialiseExecutors() { // Get latest event timestamp in tableForMaxDuration and get the end time of the aggregation record events = onDemandQueryRuntime.execute(); if (events != null) { - Long lastData = (Long) events[events.length - 1].getData(0); + lastData = (Long) events[events.length - 1].getData(0); endOFLatestEventTimestamp = IncrementalTimeConverterUtil .getNextEmitTime(lastData, incrementalDurations.get(incrementalDurations.size() - 1), timeZone); } - for (int i = incrementalDurations.size() - 1; i > 0; i--) { - TimePeriod.Duration recreateForDuration = incrementalDurations.get(i); - Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration); - - - // Get the table previous to the duration for which we need to recreate (e.g. if we want to recreate - // for minute duration, take the second table [provided that aggregation is done for seconds]) - // This lookup is filtered by endOFLatestEventTimestamp - Table recreateFromTable = aggregationTables.get(incrementalDurations.get(i - 1)); - - onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp); - onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND); - onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext, - tableMap, windowMap, - aggregationMap); - events = onDemandQueryRuntime.execute(); - - if (events != null) { - long referenceToNextLatestEvent = (Long) events[events.length - 1].getData(0); - endOFLatestEventTimestamp = IncrementalTimeConverterUtil - .getNextEmitTime(referenceToNextLatestEvent, incrementalDurations.get(i - 1), timeZone); - - ComplexEventChunk complexEventChunk = new ComplexEventChunk<>(); - for (Event event : events) { - StreamEvent streamEvent = streamEventFactory.newInstance(); - streamEvent.setOutputData(event.getData()); - complexEventChunk.add(streamEvent); + + if (isPersistedAggregation) { + for (int i = incrementalDurations.size() - 1; i > 0; i--) { + if (lastData != null && !IncrementalTimeConverterUtil. + isAggregationDataComplete(lastData, incrementalDurations.get(i), timeZone)) { + recreateState(lastData, incrementalDurations.get(i), + aggregationTables.get(incrementalDurations.get(i - 1)), i == 1); + } else if (lastData == null) { + recreateState(null, incrementalDurations.get(i), + aggregationTables.get(incrementalDurations.get(i - 1)), i == 1); + } + if (i > 1) { + onDemandQuery = getOnDemandQuery(aggregationTables.get(incrementalDurations.get(i - 1)), true, + endOFLatestEventTimestamp); + onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND); + onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, + siddhiAppContext, tableMap, windowMap, aggregationMap); + events = onDemandQueryRuntime.execute(); + if (events != null) { + lastData = (Long) events[events.length - 1].getData(0); + } } - incrementalExecutor.execute(complexEventChunk); + } + } else { + for (int i = incrementalDurations.size() - 1; i > 0; i--) { + + TimePeriod.Duration recreateForDuration = incrementalDurations.get(i); + Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration); + - if (i == 1) { - TimePeriod.Duration rootDuration = incrementalDurations.get(0); - Executor rootIncrementalExecutor = incrementalExecutorMap.get(rootDuration); - long emitTimeOfLatestEventInTable = IncrementalTimeConverterUtil.getNextEmitTime( - referenceToNextLatestEvent, rootDuration, timeZone); + // Get the table previous to the duration for which we need to recreate (e.g. if we want to recreate + // for minute duration, take the second table [provided that aggregation is done for seconds]) + // This lookup is filtered by endOFLatestEventTimestamp + Table recreateFromTable = aggregationTables.get(incrementalDurations.get(i - 1)); - rootIncrementalExecutor.setEmitTime(emitTimeOfLatestEventInTable); + onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp); + onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND); + onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext, + tableMap, windowMap, + aggregationMap); + events = onDemandQueryRuntime.execute(); + if (events != null) { + long referenceToNextLatestEvent = (Long) events[events.length - 1].getData(0); + endOFLatestEventTimestamp = IncrementalTimeConverterUtil + .getNextEmitTime(referenceToNextLatestEvent, incrementalDurations.get(i - 1), timeZone); + + ComplexEventChunk complexEventChunk = new ComplexEventChunk<>(); + for (Event event : events) { + StreamEvent streamEvent = streamEventFactory.newInstance(); + streamEvent.setOutputData(event.getData()); + complexEventChunk.add(streamEvent); + } + incrementalExecutor.execute(complexEventChunk); + + if (i == 1) { + TimePeriod.Duration rootDuration = incrementalDurations.get(0); + Executor rootIncrementalExecutor = incrementalExecutorMap.get(rootDuration); + long emitTimeOfLatestEventInTable = IncrementalTimeConverterUtil.getNextEmitTime( + referenceToNextLatestEvent, rootDuration, timeZone); + + rootIncrementalExecutor.setEmitTime(emitTimeOfLatestEventInTable); + + } } } } this.isInitialised = true; } + private void recreateState(Long lastData, TimePeriod.Duration recreateForDuration, + Table recreateFromTable, boolean isBeforeRoot) { + Long endOFLatestEventTimestamp = null; + Executor incrementalExecutor = incrementalExecutorMap.get(recreateForDuration); + if (lastData != null) { + endOFLatestEventTimestamp = IncrementalTimeConverterUtil + .getNextEmitTime(lastData, recreateForDuration, timeZone); + } + OnDemandQuery onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp); + onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND); + OnDemandQueryRuntime onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext, + tableMap, windowMap, + aggregationMap); + Event[] events = onDemandQueryRuntime.execute(); + if (events != null) { + long referenceToNextLatestEvent = (Long) events[events.length - 1].getData(0); + ComplexEventChunk complexEventChunk = new ComplexEventChunk<>(); + for (Event event : events) { + StreamEvent streamEvent = streamEventFactory.newInstance(); + streamEvent.setOutputData(event.getData()); + complexEventChunk.add(streamEvent); + } + incrementalExecutor.execute(complexEventChunk); + if (isBeforeRoot) { + TimePeriod.Duration rootDuration = incrementalDurations.get(0); + Executor rootIncrementalExecutor = incrementalExecutorMap.get(rootDuration); + long emitTimeOfLatestEventInTable = IncrementalTimeConverterUtil.getNextEmitTime( + referenceToNextLatestEvent, rootDuration, timeZone); + + rootIncrementalExecutor.setEmitTime(emitTimeOfLatestEventInTable); + + } + } + } + private OnDemandQuery getOnDemandQuery(Table table, boolean isLargestGranularity, Long endOFLatestEventTimestamp) { Selector selector = Selector.selector(); if (isLargestGranularity) { diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/persistedaggregation/PersistedIncrementalExecutor.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/persistedaggregation/PersistedIncrementalExecutor.java index 9ac69862b0..fc399cc7ff 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/persistedaggregation/PersistedIncrementalExecutor.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/persistedaggregation/PersistedIncrementalExecutor.java @@ -57,6 +57,7 @@ public class PersistedIncrementalExecutor implements Executor { private Processor cudStreamProcessor; private boolean isProcessingExecutor; private LinkedBlockingQueue cudStreamProcessorQueue; + private String aggregatorName; public PersistedIncrementalExecutor(String aggregatorName, TimePeriod.Duration duration, List processExpressionExecutors, @@ -69,6 +70,7 @@ public PersistedIncrementalExecutor(String aggregatorName, TimePeriod.Duration d this.next = child; this.cudStreamProcessor = cudStreamProcessor; + this.aggregatorName = aggregatorName; this.timestampExpressionExecutor = processExpressionExecutors.remove(0); this.streamEventFactory = new StreamEventFactory(metaStreamEvent); setNextExecutor(child); @@ -83,8 +85,8 @@ public PersistedIncrementalExecutor(String aggregatorName, TimePeriod.Duration d @Override public void execute(ComplexEventChunk streamEventChunk) { if (log.isDebugEnabled()) { - log.debug("Event Chunk received by " + this.duration + " incremental executor: " + - streamEventChunk.toString() + " will be dropped since persisted aggregation has been scheduled "); + log.debug("Event Chunk received by the Aggregation " + aggregatorName + " for duration " + this.duration + + " will be dropped since persisted aggregation has been scheduled "); } streamEventChunk.reset(); while (streamEventChunk.hasNext()) { @@ -94,6 +96,7 @@ public void execute(ComplexEventChunk streamEventChunk) { try { long timestamp = getTimestamp(streamEvent); if (timestamp >= executorState.nextEmitTime) { + log.debug("Next EmitTime: " + executorState.nextEmitTime + ", Current Time: " + timestamp); long emittedTime = executorState.nextEmitTime; long startedTime = executorState.startTimeOfAggregates; executorState.startTimeOfAggregates = IncrementalTimeConverterUtil.getStartTimeOfAggregates( @@ -120,8 +123,8 @@ private void dispatchEvent(long startTimeOfNewAggregates, long emittedTime, Stri ZoneId.of(timeZone)); ZonedDateTime endTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(emittedTime), ZoneId.of(timeZone)); - log.info("Aggregation event dispatched for the duration " + duration + " to aggregate data from " - + startTime.toString() + " to " + endTime.toString() + " "); + log.info("Aggregation event dispatched for the duration " + duration + " for aggregation " + aggregatorName + + " to aggregate data from " + startTime + " to " + endTime + " "); ComplexEventChunk complexEventChunk = new ComplexEventChunk(); StreamEvent streamEvent = streamEventFactory.newInstance(); streamEvent.setType(ComplexEvent.Type.CURRENT); diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/IncrementalTimeConverterUtil.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/IncrementalTimeConverterUtil.java index 7f7d70be03..93d25d424c 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/IncrementalTimeConverterUtil.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/IncrementalTimeConverterUtil.java @@ -109,7 +109,7 @@ private static long getNextEmitTimeForHour(long currentTime, String timeZone) { .of(zonedDateTime.getYear(), zonedDateTime.getMonthValue(), zonedDateTime.getDayOfMonth() + 1, 0, 0, 0, 0, ZoneId.of(timeZone)).toEpochSecond() * 1000; } - } else { + } else { return ZonedDateTime .of(zonedDateTime.getYear(), zonedDateTime.getMonthValue(), zonedDateTime.getDayOfMonth(), zonedDateTime.getHour() + 1, 0, 0, 0, ZoneId.of(timeZone)).toEpochSecond() * 1000; @@ -229,4 +229,35 @@ public static int getMillisecondsPerDuration(TimePeriod.Duration duration) { + ".Number of milliseconds are only define for SECONDS, MINUTES, HOURS and DAYS"); } } + + public static boolean isAggregationDataComplete(long timestamp, TimePeriod.Duration duration, String timeZone) { + ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(timestamp), + ZoneId.of(timeZone)); + ZonedDateTime zonedCurrentDateTime = ZonedDateTime.ofInstant(Instant.now(), ZoneId.of(timeZone)); + switch (duration) { + case SECONDS: + return false; + case MINUTES: + return zonedDateTime.getYear() == zonedCurrentDateTime.getYear() && + zonedDateTime.getMonthValue() == zonedCurrentDateTime.getDayOfMonth() && + zonedDateTime.getDayOfMonth() == zonedCurrentDateTime.getDayOfMonth() && + zonedDateTime.getHour() == zonedCurrentDateTime.getHour() && + zonedDateTime.getMinute() == (zonedCurrentDateTime.getMinute() - 1); + case HOURS: + return zonedDateTime.getYear() == zonedCurrentDateTime.getYear() && + zonedDateTime.getMonthValue() == zonedCurrentDateTime.getDayOfMonth() && + zonedDateTime.getDayOfMonth() == zonedCurrentDateTime.getDayOfMonth() && + zonedDateTime.getHour() == (zonedCurrentDateTime.getHour() - 1); + case DAYS: + return zonedDateTime.getYear() == zonedCurrentDateTime.getYear() && + zonedDateTime.getMonthValue() == zonedCurrentDateTime.getDayOfMonth() && + zonedDateTime.getDayOfMonth() == (zonedCurrentDateTime.getDayOfMonth() - 1); + case MONTHS: + return zonedDateTime.getYear() == zonedCurrentDateTime.getYear() && + zonedDateTime.getMonthValue() == (zonedCurrentDateTime.getMonthValue() - 1); + case YEARS: + return zonedDateTime.getYear() == (zonedCurrentDateTime.getYear() - 1); + } + return false; + } } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/SiddhiConstants.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/SiddhiConstants.java index 82f9ee8bb6..a5d00c7631 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/SiddhiConstants.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/SiddhiConstants.java @@ -79,6 +79,7 @@ public final class SiddhiConstants { public static final String ANNOTATION_IGNORE_EVENTS_OLDER_THAN_BUFFER = "IgnoreEventsOlderThanBuffer"; public static final String ANNOTATION_ELEMENT_REF = "ref"; public static final String ANNOTATION_ELEMENT_ENABLE = "enable"; + public static final String ANNOTATION_ELEMENT_IS_READ_ONLY = "is.read.only"; public static final String ANNOTATION_ELEMENT_IDLE_PERIOD = "idle.period"; public static final String ANNOTATION_ELEMENT_INTERVAL = "interval"; public static final String ANNOTATION_ELEMENT_INCLUDE = "include"; diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/AggregationParser.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/AggregationParser.java index bd4bbf406d..132196e44c 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/AggregationParser.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/AggregationParser.java @@ -114,6 +114,7 @@ import static io.siddhi.core.util.SiddhiConstants.AGG_SHARD_ID_COL; import static io.siddhi.core.util.SiddhiConstants.AGG_START_TIMESTAMP_COL; import static io.siddhi.core.util.SiddhiConstants.ANNOTATION_ELEMENT_ENABLE; +import static io.siddhi.core.util.SiddhiConstants.ANNOTATION_ELEMENT_IS_READ_ONLY; import static io.siddhi.core.util.SiddhiConstants.ANNOTATION_PARTITION_BY_ID; import static io.siddhi.core.util.SiddhiConstants.ANNOTATION_PERSISTED_AGGREGATION; import static io.siddhi.core.util.SiddhiConstants.EQUALS; @@ -165,6 +166,7 @@ public static AggregationRuntime parse(AggregationDefinition aggregationDefiniti String timeZone = getTimeZone(siddhiAppContext); boolean isDebugEnabled = log.isDebugEnabled(); boolean isPersistedAggregation = false; + boolean isReadOnly = false; if (!validateTimeZone(timeZone)) { throw new SiddhiAppCreationException( @@ -183,6 +185,8 @@ public static AggregationRuntime parse(AggregationDefinition aggregationDefiniti if (aggregationProperties != null) { String persistedAggregationMode = aggregationProperties.getElement(ANNOTATION_ELEMENT_ENABLE); isPersistedAggregation = persistedAggregationMode == null || Boolean.parseBoolean(persistedAggregationMode); + String readOnlyMode = aggregationProperties.getElement(ANNOTATION_ELEMENT_IS_READ_ONLY); + isReadOnly = Boolean.parseBoolean(readOnlyMode); } if (isPersistedAggregation) { @@ -449,7 +453,7 @@ public static AggregationRuntime parse(AggregationDefinition aggregationDefiniti aggregationDurations, aggregationTables, siddhiQueryContext, aggregatorName, shouldUpdateTimestamp, timeZone, isPersistedAggregation, incomingOutputStreamDefinition, isDistributed, shardId, isProcessingOnExternalTime, aggregationDefinition, - configManager, groupByVariableList); + configManager, groupByVariableList, isReadOnly); isOptimisedLookup = isOptimisedLookup && aggregationTables.get(aggregationDurations.get(0)) instanceof QueryableProcessor; @@ -473,7 +477,7 @@ public static AggregationRuntime parse(AggregationDefinition aggregationDefiniti //Recreate in-memory data from tables IncrementalExecutorsInitialiser incrementalExecutorsInitialiser = new IncrementalExecutorsInitialiser( aggregationDurations, aggregationTables, incrementalExecutorMap, isDistributed, shardId, - siddhiAppContext, processedMetaStreamEvent, tableMap, windowMap, aggregationMap, timeZone); + siddhiAppContext, processedMetaStreamEvent, tableMap, windowMap, aggregationMap, timeZone, isReadOnly, isPersistedAggregation); IncrementalExecutor rootIncrementalExecutor = (IncrementalExecutor) incrementalExecutorMap. get(aggregationDurations.get(0)); @@ -546,19 +550,23 @@ private static Map buildIncrementalExecutors( String aggregatorName, ExpressionExecutor shouldUpdateTimestamp, String timeZone, boolean isPersistedAggregation, StreamDefinition incomingOutputStreamDefinition, boolean isDistributed, String shardId, boolean isProcessingOnExternalTime, AggregationDefinition aggregationDefinition, - ConfigManager configManager, List groupByVariableList) { + ConfigManager configManager, List groupByVariableList, + boolean isReadOnly) { Map incrementalExecutorMap = new HashMap<>(); - Map cudProcessors; + Map cudProcessors = new HashMap<>(); // Create incremental executors Executor child; Executor root = null; if (isPersistedAggregation) { - cudProcessors = initAggregateQueryExecutor(incrementalDurations, processExpressionExecutorsMap, - incomingOutputStreamDefinition, isDistributed, shardId, isProcessingOnExternalTime, - siddhiQueryContext, aggregationDefinition, configManager, aggregationTables, groupByVariableList); + if (!isReadOnly) { + cudProcessors = initAggregateQueryExecutor(incrementalDurations, processExpressionExecutorsMap, + incomingOutputStreamDefinition, isDistributed, shardId, isProcessingOnExternalTime, + siddhiQueryContext, aggregationDefinition, configManager, aggregationTables, + groupByVariableList); + } CudStreamProcessorQueueManager queueManager = new CudStreamProcessorQueueManager(); @@ -573,10 +581,7 @@ private static Map buildIncrementalExecutors( // Add an object to aggregationTable map inorder fill up the missing durations aggregationTables.putIfAbsent(incrementalDurations.get(i), null); - boolean isRoot = false; - if (i == 0) { - isRoot = true; - } + boolean isRoot = i == 0; child = root; TimePeriod.Duration duration = incrementalDurations.get(i); Executor incrementalExecutor; @@ -600,10 +605,7 @@ child, siddhiQueryContext, generateCUDMetaStreamEvent(isProcessingOnExternalTime } else { for (int i = incrementalDurations.size() - 1; i >= 0; i--) { // Base incremental expression executors created using new meta - boolean isRoot = false; - if (i == 0) { - isRoot = true; - } + boolean isRoot = i == 0; child = root; TimePeriod.Duration duration = incrementalDurations.get(i); @@ -1068,6 +1070,12 @@ private static HashMap initDefaultTables( for (Variable groupByVariable : groupByVariableList) { primaryKeyAnnotation.element(null, groupByVariable.getAttributeName()); } + if (streamDefinition.getAttributeList().contains(new Attribute(AGG_LAST_TIMESTAMP_COL, + Attribute.Type.LONG))) { + Annotation indexAnnotation = new Annotation(SiddhiConstants.ANNOTATION_INDEX); + indexAnnotation.element(null, AGG_LAST_TIMESTAMP_COL); + annotations.add(indexAnnotation); + } annotations.add(primaryKeyAnnotation); for (TimePeriod.Duration duration : aggregationDurations) { String tableId = aggregatorName + "_" + duration.toString(); @@ -1098,7 +1106,9 @@ private static Map initAggregateQueryExecutor( "aggregation mode"); } Database databaseType = getDatabaseType(configManager, datasourceName); - + if (log.isDebugEnabled()) { + log.debug("Database type " + databaseType); + } SiddhiAppContext cudSiddhiAppContext = new SiddhiAppContext(); SiddhiContext context = new SiddhiContext(); context.setConfigManager(configManager); @@ -1116,10 +1126,14 @@ private static Map initAggregateQueryExecutor( try { DBAggregationQueryConfigurationEntry dbAggregationQueryConfigurationEntry = DBAggregationQueryUtil. lookupCurrentQueryConfigurationEntry(databaseType); - + if (log.isDebugEnabled()) { + log.debug("CUD queries for aggregation " + aggregationDefinition.getId()); + } for (int i = aggregationDurations.size() - 1; i > 0; i--) { if (aggregationDurations.get(i).ordinal() >= 3) { - log.debug(" Initializing cudProcessors for durations "); + if (log.isDebugEnabled()) { + log.debug(" Initializing cudProcessors for duration " + aggregationDurations.get(i)); + } String databaseSelectQuery = generateDatabaseQuery(processExpressionExecutorsMap. get(aggregationDurations.get(i)), dbAggregationQueryConfigurationEntry, incomingOutputStreamDefinition, isDistributed, shardID, isProcessingOnExternalTime, @@ -1127,6 +1141,9 @@ private static Map initAggregateQueryExecutor( aggregationTables.get(aggregationDurations.get(i - 1)), groupByVariableList, aggregationDurations.get(i)); StringConstant selectQuery = new StringConstant(databaseSelectQuery); + if (log.isDebugEnabled()) { + log.debug(selectQuery); + } ConstantExpressionExecutor selectExecutor = new ConstantExpressionExecutor(selectQuery.getValue(), Attribute.Type.STRING); Map cudInputStreamAttributesMap = @@ -1240,6 +1257,10 @@ private static String generateDatabaseQuery(List expressionE if (isDistributed) { filterQueryBuilder.append(" AND ").append(AGG_SHARD_ID_COL).append(" = '").append(shardID).append("' "); groupByQueryBuilder.add(AGG_SHARD_ID_COL); + innerSelectT2ColumnJoiner.add(AGG_SHARD_ID_COL); + subSelectT2OnConditionBuilder.add(parentAggregationTable.getTableDefinition().getId() + "." + + AGG_SHARD_ID_COL + EQUALS + INNER_SELECT_QUERY_REF_T3 + "." + AGG_SHARD_ID_COL); + if (isProcessingOnExternalTime) { subSelectT1ColumnJoiner.add(AGG_SHARD_ID_COL); } @@ -1270,7 +1291,7 @@ private static String generateDatabaseQuery(List expressionE if (groupByColumnNames.contains(variableExpressionExecutor.getAttribute().getName())) { subSelectT2ColumnJoiner.add(INNER_SELECT_QUERY_REF_T3 + "." + variableExpressionExecutor.getAttribute().getName() + SQL_AS + - variableExpressionExecutor.getAttribute().getName()) ; + variableExpressionExecutor.getAttribute().getName()); outerSelectColumnJoiner.add(SUB_SELECT_QUERY_REF_T1 + "." + variableExpressionExecutor.getAttribute().getName() + SQL_AS + attributeList.get(i).getName()); @@ -1289,11 +1310,11 @@ private static String generateDatabaseQuery(List expressionE AGG_EXTERNAL_TIMESTAMP_COL); subSelectT1ColumnJoiner.add(dbAggregationSelectFunctionTemplates.getTimeConversionFunction(). replace(PLACEHOLDER_COLUMN, AGG_EXTERNAL_TIMESTAMP_COL).replace(PLACEHOLDER_DURATION, - dbAggregationTimeConversionDurationMapping.getDurationMapping(duration)) + dbAggregationTimeConversionDurationMapping.getDurationMapping(duration)) + SQL_AS + AGG_EXTERNAL_TIMESTAMP_COL); subSelectT2ColumnJoiner.add(dbAggregationSelectFunctionTemplates.getTimeConversionFunction(). replace(PLACEHOLDER_COLUMN, AGG_EXTERNAL_TIMESTAMP_COL).replace(PLACEHOLDER_DURATION, - dbAggregationTimeConversionDurationMapping.getDurationMapping(duration)) + dbAggregationTimeConversionDurationMapping.getDurationMapping(duration)) + SQL_AS + AGG_EXTERNAL_TIMESTAMP_COL); onConditionBuilder.add(SUB_SELECT_QUERY_REF_T1 + "." + AGG_EXTERNAL_TIMESTAMP_COL + EQUALS + SUB_SELECT_QUERY_REF_T2 + "." + AGG_EXTERNAL_TIMESTAMP_COL); @@ -1338,7 +1359,7 @@ private static String generateDatabaseQuery(List expressionE groupByQueryBuilder.add(dbAggregationSelectFunctionTemplates.getTimeConversionFunction(). replace(PLACEHOLDER_COLUMN, AGG_EXTERNAL_TIMESTAMP_COL).replace(PLACEHOLDER_DURATION, - dbAggregationTimeConversionDurationMapping.getDurationMapping(duration))); + dbAggregationTimeConversionDurationMapping.getDurationMapping(duration))); groupByClause = dbAggregationSelectQueryTemplate.getGroupByClause().replace(PLACEHOLDER_COLUMNS, groupByQueryBuilder.toString()); @@ -1399,7 +1420,7 @@ private static String generateDatabaseQuery(List expressionE completeQuery.add(insertIntoQueryBuilder.toString()).add(subSelectT1ColumnJoiner.toString()).add(innerFromClause). add(SQL_WHERE + filterQueryBuilder).add(dbAggregationSelectQueryTemplate.getGroupByClause(). - replace(PLACEHOLDER_COLUMNS, groupByQueryBuilder.toString())); + replace(PLACEHOLDER_COLUMNS, groupByQueryBuilder.toString())); } return completeQuery.toString();