Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update or insert into InMemoryTable using event chunk that has updates within the chunk fails #1243

Closed
sybernix opened this issue Apr 29, 2019 · 0 comments · Fixed by #1503
Closed

Comments

@sybernix
Copy link
Contributor

Description:
Update or insert into any table is expected to sequentially check the events if the incoming event can replace any previously received event in the table and if not that event should be added. However, if we send an event chunk as follows

Event[] events = new Event[3];
events[0] = new Event(System.currentTimeMillis(), new Object[]{"WSO2", 55, 100L});
events[1] = new Event(System.currentTimeMillis(), new Object[]{"IBM", 55, 100L});
events[2] = new Event(System.currentTimeMillis(), new Object[]{"WSO2", 155, 200L});
updateStockStream.send(events);

and the updateStockStream is as follows,

from UpdateStockStream
update or insert into StockTable
on StockTable.symbol == symbol ;

the resulting InMemoryTable table entries are as follows
WSO2 55 100
IBM 55 100
WSO2 155 200

The last "WSO2 155 200" has been added instead of replacing the first entry "WSO2 55 100"

Affected Siddhi Version:
5.0.1

OS, DB, other environment details and versions:
Mac OS Mojave 10.14.3

Steps to reproduce:

@Test
public void updateOrInsertTableTest13() throws InterruptedException, SQLException {
    log.info("updateOrInsertTableTest13");
    SiddhiManager siddhiManager = new SiddhiManager();
    String streams = "" +
            "define stream UpdateStockStream (symbol string, price int, volume long); " +
            "define stream SearchStream (symbol string); " +
            "@PrimaryKey(\"symbol\")" +
            //"@Index(\"volume\")" +
            "define table StockTable (symbol string, price int, volume long); ";
    String query = "" +
            "@info(name = 'query1') " +
            "from UpdateStockStream " +
            "update or insert into StockTable " +
            "   on StockTable.symbol == symbol ;" +
            "" +
            "@info(name = 'query2') " +
            "from SearchStream#window.length(1) join StockTable on StockTable.symbol == SearchStream.symbol " +
            "select StockTable.symbol as symbol, price, volume " +
            "insert into OutStream;";

    SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);
    InputHandler updateStockStream = siddhiAppRuntime.getInputHandler("UpdateStockStream");
    InputHandler searchStream = siddhiAppRuntime.getInputHandler("SearchStream");
    siddhiAppRuntime.addCallback("query2", new QueryCallback() {
        @Override
        public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
            EventPrinter.print(timeStamp, inEvents, removeEvents);
            if (inEvents != null) {
                for (Event event : inEvents) {
                    int inEventCount = actualEventCount.incrementAndGet();
                    switch (inEventCount) {
                        case 1:
                            Assert.assertEquals(event.getData(), new Object[]{"WSO2", 155, 200L});
                            break;
                        case 2:
                            Assert.assertEquals(event.getData(), new Object[]{"IBM", 155, 200L});
                            break;
                        default:
                            Assert.assertSame(inEventCount, 2);
                    }
                }
            }
        }

    });
    siddhiAppRuntime.start();
    Event[] events = new Event[4];
    events[0] = new Event(System.currentTimeMillis(), new Object[]{"WSO2", 55, 100L});
    events[1] = new Event(System.currentTimeMillis(), new Object[]{"IBM", 55, 100L});
    events[2] = new Event(System.currentTimeMillis(), new Object[]{"WSO2", 155, 200L});
    events[3] = new Event(System.currentTimeMillis(), new Object[]{"IBM", 155, 200L});
    updateStockStream.send(events);
    searchStream.send(new Object[]{"WSO2"});
    searchStream.send(new Object[]{"IBM"});
    waitTillVariableCountMatches(2, Duration.ONE_MINUTE);
    siddhiAppRuntime.shutdown();
}
suhothayan added a commit to suhothayan/siddhi that referenced this issue Sep 18, 2019
@mohanvive mohanvive changed the title Update or insert into InMemoryTable using event chuck that has updates within the chunk fails Update or insert into InMemoryTable using event chunk that has updates within the chunk fails Sep 24, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants