Skip to content

Commit

Permalink
Fix update or insert in InMemoryTable using EventChuck
Browse files Browse the repository at this point in the history
  • Loading branch information
suhothayan committed Sep 18, 2019
1 parent a02b34b commit d34aeb3
Show file tree
Hide file tree
Showing 23 changed files with 385 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.table.holder.EventHolder;
import io.siddhi.core.table.holder.IndexEventHolderForCache;
import io.siddhi.core.table.record.RecordTableHandler;
Expand All @@ -37,6 +38,7 @@
import io.siddhi.core.util.collection.operator.Operator;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.EventHolderPasser;
import io.siddhi.core.util.parser.ExpressionParser;
import io.siddhi.core.util.parser.OperatorParser;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
Expand Down Expand Up @@ -150,13 +152,16 @@ public void updateOrAddAndTrimUptoMaxSize(ComplexEventChunk<StateEvent> updateOr
readWriteLock.writeLock().lock();
TableState state = stateHolder.getState();
try {
ComplexEventChunk<StreamEvent> failedEvents = ((Operator) compiledCondition).tryUpdate(
InMemoryCompiledCondition inMemoryCompiledCondition = (InMemoryCompiledCondition) compiledCondition;
ComplexEventChunk<StateEvent> failedEvents = ((Operator) inMemoryCompiledCondition.
getOperatorCompiledCondition()).tryUpdate(
updateOrAddingEventChunkForCache,
state.getEventHolder(),
(InMemoryCompiledUpdateSet) compiledUpdateSet,
addingStreamEventExtractor);
if (failedEvents != null && failedEvents.getFirst() != null) {
state.getEventHolder().add(failedEvents);
state.getEventHolder().add(reduceEventsForUpdateOrInsert(
addingStreamEventExtractor, inMemoryCompiledCondition, failedEvents));
}
if (this.size() > maxSize) {
this.deleteEntriesUsingCachePolicy(this.size() - maxSize);
Expand Down Expand Up @@ -204,15 +209,13 @@ ComplexEvent generateEventWithRequiredFields(ComplexEvent event,
protected abstract StreamEvent addRequiredFields(ComplexEvent event, SiddhiAppContext siddhiAppContext,
boolean cacheExpiryEnabled);

public CacheCompiledConditionWithRouteToCache generateCacheCompileCondition(Expression condition,
MatchingMetaInfoHolder storeMatchingMetaInfoHolder,
SiddhiQueryContext siddhiQueryContext,
List<VariableExpressionExecutor>
storeVariableExpressionExecutors) {
public CacheCompiledConditionWithRouteToCache generateCacheCompileCondition(
Expression condition, MatchingMetaInfoHolder storeMatchingMetaInfoHolder,
SiddhiQueryContext siddhiQueryContext, List<VariableExpressionExecutor> storeVariableExpressionExecutors) {
boolean routeToCache = checkConditionToRouteToCache(condition, storeMatchingMetaInfoHolder);
MetaStateEvent metaStateEvent = new MetaStateEvent(storeMatchingMetaInfoHolder.getMetaStateEvent().
getMetaStreamEvents().length);
for (MetaStreamEvent referenceMetaStreamEvent: storeMatchingMetaInfoHolder.getMetaStateEvent().
for (MetaStreamEvent referenceMetaStreamEvent : storeMatchingMetaInfoHolder.getMetaStateEvent().
getMetaStreamEvents()) {
metaStateEvent.addEvent(referenceMetaStreamEvent);
}
Expand Down Expand Up @@ -259,7 +262,7 @@ private boolean checkConditionToRouteToCache(Expression condition, MatchingMetaI
return false;
}
List<Element> keys = primaryKeys.getElements();
for (Element element: keys) {
for (Element element : keys) {
primaryKeysArray.add(element.getValue());
}
recursivelyCheckConditionToRouteToCache(condition, primaryKeysArray, matchingMetaInfoHolder);
Expand Down Expand Up @@ -300,7 +303,7 @@ private boolean checkIfVariableMatchesTable(Variable variable, MatchingMetaInfoH
return true;
}

for (MetaStreamEvent streamEvent: matchingMetaInfoHolder.getMetaStateEvent().getMetaStreamEvents()) {
for (MetaStreamEvent streamEvent : matchingMetaInfoHolder.getMetaStateEvent().getMetaStreamEvents()) {
if (streamEvent.getInputReferenceId() != null &&
streamEvent.getInputReferenceId().equalsIgnoreCase(variable.getStreamId())) {
if (streamEvent.getInputDefinitions().get(0).getId().equalsIgnoreCase(tableDefinition.getId())) {
Expand All @@ -317,8 +320,15 @@ public CompiledCondition compileCondition(Expression condition, MatchingMetaInfo
boolean updateCachePolicyAttribute) {
TableState state = stateHolder.getState();
try {
return OperatorParser.constructOperatorForCache(state.getEventHolder(), condition, matchingMetaInfoHolder,
variableExpressionExecutors, tableMap, siddhiQueryContext, updateCachePolicyAttribute, this);
return new InMemoryCompiledCondition(OperatorParser.constructOperatorForCache(state.getEventHolder(),
condition, matchingMetaInfoHolder, variableExpressionExecutors, tableMap, siddhiQueryContext,
updateCachePolicyAttribute, this),
ExpressionParser.parseExpression(condition, matchingMetaInfoHolder.getMetaStateEvent(),
matchingMetaInfoHolder.getCurrentState(), tableMap, variableExpressionExecutors,
false, 0, ProcessingMode.BATCH,
false, siddhiQueryContext),
matchingMetaInfoHolder.getStoreEventIndex()
);
} finally {
stateHolder.returnState(state);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.siddhi.core.table;

import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.util.collection.operator.CompiledCondition;

/**
* Compiled condition created for {@link InMemoryTable}
*/
public class InMemoryCompiledCondition implements CompiledCondition {
private CompiledCondition operatorCompiledCondition;
private ExpressionExecutor updateOrInsertExpressionExecutor;
private int storeEventIndex;

public InMemoryCompiledCondition(CompiledCondition operatorCompiledCondition,
ExpressionExecutor updateOrInsertExpressionExecutor, int storeEventIndex) {
this.operatorCompiledCondition = operatorCompiledCondition;
this.updateOrInsertExpressionExecutor = updateOrInsertExpressionExecutor;
this.storeEventIndex = storeEventIndex;
}

public CompiledCondition getOperatorCompiledCondition() {
return operatorCompiledCondition;
}

public ExpressionExecutor getUpdateOrInsertExpressionExecutor() {
return updateOrInsertExpressionExecutor;
}

public int getStoreEventIndex() {
return storeEventIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public void delete(ComplexEventChunk<StateEvent> deletingEventChunk, CompiledCon
readWriteLock.writeLock().lock();
TableState state = stateHolder.getState();
try {
((Operator) compiledCondition).delete(deletingEventChunk, state.eventHolder);
((Operator) ((InMemoryCompiledCondition) compiledCondition).getOperatorCompiledCondition()).
delete(deletingEventChunk, state.eventHolder);
} finally {
stateHolder.returnState(state);
readWriteLock.writeLock().unlock();
Expand All @@ -108,8 +109,8 @@ public void update(ComplexEventChunk<StateEvent> updatingEventChunk, CompiledCon
readWriteLock.writeLock().lock();
TableState state = stateHolder.getState();
try {
((Operator) compiledCondition).update(updatingEventChunk, state.eventHolder,
(InMemoryCompiledUpdateSet) compiledUpdateSet);
((Operator) ((InMemoryCompiledCondition) compiledCondition).getOperatorCompiledCondition()).
update(updatingEventChunk, state.eventHolder, (InMemoryCompiledUpdateSet) compiledUpdateSet);
} finally {
stateHolder.returnState(state);
readWriteLock.writeLock().unlock();
Expand All @@ -124,27 +125,57 @@ public void updateOrAdd(ComplexEventChunk<StateEvent> updateOrAddingEventChunk,
AddingStreamEventExtractor addingStreamEventExtractor) {
readWriteLock.writeLock().lock();
TableState state = stateHolder.getState();
InMemoryCompiledCondition inMemoryCompiledCondition = (InMemoryCompiledCondition) compiledCondition;
try {
ComplexEventChunk<StreamEvent> failedEvents = ((Operator) compiledCondition).tryUpdate(
updateOrAddingEventChunk,
state.eventHolder,
(InMemoryCompiledUpdateSet) compiledUpdateSet,
addingStreamEventExtractor);
ComplexEventChunk<StateEvent> failedEvents =
((Operator) inMemoryCompiledCondition.getOperatorCompiledCondition()).
tryUpdate(updateOrAddingEventChunk, state.eventHolder,
(InMemoryCompiledUpdateSet) compiledUpdateSet,
addingStreamEventExtractor);
if (failedEvents != null && failedEvents.getFirst() != null) {
state.eventHolder.add(failedEvents);
state.eventHolder.add(reduceEventsForUpdateOrInsert(
addingStreamEventExtractor, inMemoryCompiledCondition, failedEvents));
}
} finally {
stateHolder.returnState(state);
readWriteLock.writeLock().unlock();
}
}

protected ComplexEventChunk<StreamEvent> reduceEventsForUpdateOrInsert(
AddingStreamEventExtractor addingStreamEventExtractor,
InMemoryCompiledCondition inMemoryCompiledCondition,
ComplexEventChunk<StateEvent> failedEvents) {
ComplexEventChunk<StreamEvent> toInsertEventChunk = new ComplexEventChunk<>(failedEvents.isBatch());
failedEvents.reset();
while (failedEvents.hasNext()) {
StateEvent failedEvent = failedEvents.next();
boolean updated = false;
toInsertEventChunk.reset();
while (toInsertEventChunk.hasNext()) {
StreamEvent toInsertEvent = toInsertEventChunk.next();
failedEvent.setEvent(inMemoryCompiledCondition.getStoreEventIndex(), toInsertEvent);
if ((Boolean) inMemoryCompiledCondition.
getUpdateOrInsertExpressionExecutor().execute(failedEvent)) {
toInsertEvent.setOutputData(addingStreamEventExtractor.
getAddingStreamEvent(failedEvent).getOutputData());
updated = true;
}
}
if (!updated) {
toInsertEventChunk.add(addingStreamEventExtractor.getAddingStreamEvent(failedEvent));
}
}
return toInsertEventChunk;
}

@Override
public boolean contains(StateEvent matchingEvent, CompiledCondition compiledCondition) {
readWriteLock.readLock().lock();
TableState state = stateHolder.getState();
try {
return ((Operator) compiledCondition).contains(matchingEvent, state.eventHolder);
return ((Operator) ((InMemoryCompiledCondition) compiledCondition).getOperatorCompiledCondition()).
contains(matchingEvent, state.eventHolder);
} finally {
stateHolder.returnState(state);
readWriteLock.readLock().unlock();
Expand All @@ -166,7 +197,8 @@ public StreamEvent find(CompiledCondition compiledCondition, StateEvent matching
TableState state = stateHolder.getState();
readWriteLock.readLock().lock();
try {
return ((Operator) compiledCondition).find(matchingEvent, state.eventHolder, tableStreamEventCloner);
return ((Operator) ((InMemoryCompiledCondition) compiledCondition).getOperatorCompiledCondition()).
find(matchingEvent, state.eventHolder, tableStreamEventCloner);
} finally {
stateHolder.returnState(state);
readWriteLock.readLock().unlock();
Expand All @@ -179,8 +211,14 @@ public CompiledCondition compileCondition(Expression condition, MatchingMetaInfo
Map<String, Table> tableMap, SiddhiQueryContext siddhiQueryContext) {
TableState state = stateHolder.getState();
try {
return OperatorParser.constructOperator(state.eventHolder, condition, matchingMetaInfoHolder,
variableExpressionExecutors, tableMap, siddhiQueryContext);
return new InMemoryCompiledCondition(OperatorParser.constructOperator(state.eventHolder, condition,
matchingMetaInfoHolder, variableExpressionExecutors, tableMap, siddhiQueryContext),
ExpressionParser.parseExpression(condition, matchingMetaInfoHolder.getMetaStateEvent(),
matchingMetaInfoHolder.getCurrentState(), tableMap, variableExpressionExecutors,
false, 0, ProcessingMode.BATCH,
false, siddhiQueryContext),
matchingMetaInfoHolder.getStoreEventIndex()
);
} finally {
stateHolder.returnState(state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ public void update(ComplexEventChunk<StateEvent> updatingEventChunk, Object stor
}

@Override
public ComplexEventChunk<StreamEvent> tryUpdate(ComplexEventChunk<StateEvent> updatingOrAddingEventChunk,
Object storeEvents, InMemoryCompiledUpdateSet compiledUpdateSet,
AddingStreamEventExtractor addingStreamEventExtractor) {
public ComplexEventChunk<StateEvent> tryUpdate(ComplexEventChunk<StateEvent> updatingOrAddingEventChunk,
Object storeEvents, InMemoryCompiledUpdateSet compiledUpdateSet,
AddingStreamEventExtractor addingStreamEventExtractor) {

updatingOrAddingEventChunk.reset();
ComplexEventChunk<StreamEvent> failedEventChunk = new ComplexEventChunk<StreamEvent>
ComplexEventChunk<StateEvent> failedEventChunk = new ComplexEventChunk<StateEvent>
(updatingOrAddingEventChunk.isBatch());
while (updatingOrAddingEventChunk.hasNext()) {
StateEvent updateOrAddingEvent = updatingOrAddingEventChunk.next();
Expand All @@ -144,7 +144,8 @@ public ComplexEventChunk<StreamEvent> tryUpdate(ComplexEventChunk<StateEvent> up
}
}
if (!updated) {
failedEventChunk.add(addingStreamEventExtractor.getAddingStreamEvent(updateOrAddingEvent));
updatingOrAddingEventChunk.remove();
failedEventChunk.add(updateOrAddingEvent);
}
} finally {
updateOrAddingEvent.setEvent(storeEventPosition, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ public void update(ComplexEventChunk<StateEvent> updatingEventChunk, Object stor
}

@Override
public ComplexEventChunk<StreamEvent> tryUpdate(ComplexEventChunk<StateEvent> updatingOrAddingEventChunk, Object
storeEvents,
InMemoryCompiledUpdateSet compiledUpdateSet,
AddingStreamEventExtractor addingStreamEventExtractor) {
public ComplexEventChunk<StateEvent> tryUpdate(ComplexEventChunk<StateEvent> updatingOrAddingEventChunk,
Object storeEvents,
InMemoryCompiledUpdateSet compiledUpdateSet,
AddingStreamEventExtractor addingStreamEventExtractor) {
ComplexEventChunk<StreamEvent> storeEventChunk = (ComplexEventChunk<StreamEvent>) storeEvents;
updatingOrAddingEventChunk.reset();
ComplexEventChunk<StreamEvent> failedEventChunk = new ComplexEventChunk<StreamEvent>
ComplexEventChunk<StateEvent> failedEventChunk = new ComplexEventChunk<StateEvent>
(updatingOrAddingEventChunk.isBatch());
while (updatingOrAddingEventChunk.hasNext()) {
StateEvent overwritingOrAddingEvent = updatingOrAddingEventChunk.next();
Expand All @@ -150,7 +150,8 @@ public ComplexEventChunk<StreamEvent> tryUpdate(ComplexEventChunk<StateEvent> up
}
}
if (!updated) {
failedEventChunk.add(addingStreamEventExtractor.getAddingStreamEvent(overwritingOrAddingEvent));
updatingOrAddingEventChunk.remove();
failedEventChunk.add(overwritingOrAddingEvent);
}
} finally {
overwritingOrAddingEvent.setEvent(storeEventPosition, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ public void update(ComplexEventChunk<StateEvent> updatingEventChunk, Object stor
}

@Override
public ComplexEventChunk<StreamEvent> tryUpdate(ComplexEventChunk<StateEvent> updatingOrAddingEventChunk,
Object storeEvents,
InMemoryCompiledUpdateSet compiledUpdateSet,
AddingStreamEventExtractor addingStreamEventExtractor) {
ComplexEventChunk<StreamEvent> failedEventChunk = new ComplexEventChunk<StreamEvent>
(updatingOrAddingEventChunk.isBatch());
public ComplexEventChunk<StateEvent> tryUpdate(ComplexEventChunk<StateEvent> updatingOrAddingEventChunk,
Object storeEvents,
InMemoryCompiledUpdateSet compiledUpdateSet,
AddingStreamEventExtractor addingStreamEventExtractor) {
ComplexEventChunk<StateEvent> failedEventChunk = new ComplexEventChunk<>(updatingOrAddingEventChunk.isBatch());
updatingOrAddingEventChunk.reset();
while (updatingOrAddingEventChunk.hasNext()) {
StateEvent overwritingOrAddingEvent = updatingOrAddingEventChunk.next();
Expand All @@ -108,7 +107,8 @@ public ComplexEventChunk<StreamEvent> tryUpdate(ComplexEventChunk<StateEvent> up
update((IndexedEventHolder) storeEvents, compiledUpdateSet, overwritingOrAddingEvent,
foundEventChunk);
} else {
failedEventChunk.add(addingStreamEventExtractor.getAddingStreamEvent(overwritingOrAddingEvent));
updatingOrAddingEventChunk.remove();
failedEventChunk.add(overwritingOrAddingEvent);
}
}
return failedEventChunk;
Expand Down
Loading

0 comments on commit d34aeb3

Please sign in to comment.