diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/JoinInputStreamParser.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/JoinInputStreamParser.java index 837cff78d2..d1f9f8b287 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/JoinInputStreamParser.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/JoinInputStreamParser.java @@ -422,7 +422,13 @@ private static FindableProcessor insertJoinProcessorsAndGetFindable(JoinProcesso ((MetaStreamEvent) streamRuntime.getMetaComplexEvent()), expressionExecutors, configReader, outputExpectsExpiredEvents, true, false, inputStream, siddhiQueryContext); - lastProcessor = windowProcessor; + if (lastProcessor != null) { + prevLastProcessor = lastProcessor; + prevLastProcessor.setNextProcessor(windowProcessor); + lastProcessor = windowProcessor; + } else { + lastProcessor = windowProcessor; + } } catch (Throwable t) { throw new SiddhiAppCreationException(t); } diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/JoinTableTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/JoinTableTestCase.java index 007b61d768..e732c74a91 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/JoinTableTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/JoinTableTestCase.java @@ -742,6 +742,7 @@ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { stockStream.send(new Object[]{"WSO2", 55.6f, 100L}); stockStream.send(new Object[]{"IBM", 75.6f, 10L}); checkStockStream.send(new Object[]{"WSO2"}); + checkStockStream.send(new Object[]{"IBM"}); Thread.sleep(500);