diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/query/processor/stream/window/SessionWindowProcessor.java b/modules/siddhi-core/src/main/java/io/siddhi/core/query/processor/stream/window/SessionWindowProcessor.java index cadef43678..f69b6060e8 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/query/processor/stream/window/SessionWindowProcessor.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/query/processor/stream/window/SessionWindowProcessor.java @@ -62,34 +62,30 @@ @Extension( name = "session", namespace = "", - description = "This is a session window that holds events that belong to a specific session. The events " + - "that belong to a specific session are identified by a grouping attribute (i.e., a session key). A " + - "session gap period is specified to determine the time period after which the session is considered " + - "to be expired. A new event that arrives with a specific value for the session key is matched with" + - " the session window with the same session key.\n " + - "There can be out of order and late arrival of events, these events can arrive after the session is " + - "expired, to include those events to the matching session key specify a " + - "latency time period that is less than the session gap period." + - "To have aggregate functions with session windows, the events need to be grouped by the " + - "session key via a 'group by' clause.", + description = "Holds events that belong to a session. Events belong to a specific session are " + + "identified by a session key, and a session gap is determines the time period after which " + + "the session is considered to be expired. " + + "To have meaningful aggregation on session windows, the events need to be aggregated based on " + + "session key via a `group by` clause.", parameters = { - @Parameter(name = "window.session", - description = "The time period for which the session considered is valid. This is specified" + - " in seconds, minutes, or milliseconds (i.e., 'min', 'sec', or 'ms'.", + @Parameter(name = "session.gap", + description = "The time period after which the session is considered to be expired.", type = {DataType.INT, DataType.LONG, DataType.TIME}), - @Parameter(name = "window.key", - description = "The grouping attribute for events.", + @Parameter(name = "session.key", + description = "The session identification attribute. Used to group events belonging to a " + + "specific session.", type = {DataType.STRING}, optional = true, defaultValue = "default-key", dynamic = true), - @Parameter(name = "window.allowed.latency", - description = "This specifies the time period for which the session window is valid after " + - "the expiration of the session. The time period specified here should be less than " + - "the session time gap (which is specified via the 'window.session' parameter).", + @Parameter(name = "allowed.latency", + description = "The time period for which the session window is valid after " + + "the expiration of the session, to accept late event arrivals. " + + "This time period should be less than " + + "the `session.gap` parameter.", type = {DataType.INT, DataType.LONG, DataType.TIME}, optional = true, defaultValue = "0") }, parameterOverloads = { - @ParameterOverload(parameterNames = {"window.session"}), - @ParameterOverload(parameterNames = {"window.session", "window.key"}), - @ParameterOverload(parameterNames = {"window.session", "window.key", "window.allowed.latency"}) + @ParameterOverload(parameterNames = {"session.gap"}), + @ParameterOverload(parameterNames = {"session.gap", "session.key"}), + @ParameterOverload(parameterNames = {"session.gap", "session.key", "allowed.latency"}) }, examples = { @@ -97,16 +93,32 @@ syntax = "define stream PurchaseEventStream " + "(user string, item_number int, price float, quantity int);\n" + "\n" - + "@info(name='query0) \n" + + "@info(name='query1) \n" + + "from PurchaseEventStream#window.session(5 sec, user) \n" + + "select user, sum(quantity) as totalQuantity, sum(price) as totalPrice \n" + + "group by user \n" + + "insert into OutputStream;", + description = "From the events arriving at the PurchaseEventStream, " + + "a session window with 5 seconds session gap is processed based on 'user' attribute " + + "as the session group identification key. All events falling into the same session " + + "are aggregated based on `user` attribute, and outputted to the OutputStream." + ), + @Example( + syntax = "define stream PurchaseEventStream " + + "(user string, item_number int, price float, quantity int);\n" + + "\n" + + "@info(name='query2) \n" + "from PurchaseEventStream#window.session(5 sec, user, 2 sec) \n" - + "select * \n" - + "insert all events into OutputStream;", - description = "This query processes events that arrive at the PurchaseEvent input stream. " + - "The 'user' attribute is the session key, and the session gap is 5 " + - "seconds. '2 sec' is specified as the allowed latency. Therefore, events with the " + - "matching user name that arrive 2 seconds after the expiration of the session are " + - "also considered when performing aggregations for the session identified by the given" + - " user name." + + "select user, sum(quantity) as totalQuantity, sum(price) as totalPrice \n" + + "group by user \n" + + "insert into OutputStream;", + description = "From the events arriving at the PurchaseEventStream, " + + "a session window with 5 seconds session gap is processed based on 'user' attribute " + + "as the session group identification key. This session window is kept active for " + + "2 seconds after the session expiry to capture late (out of order) event arrivals. " + + "If the event timestamp falls in to the last session the session is reactivated. " + + "Then all events falling into the same session " + + "are aggregated based on `user` attribute, and outputted to the OutputStream." ) } ) diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/window/SessionWindowTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/window/SessionWindowTestCase.java index 14db486f53..de3d5a3d2f 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/window/SessionWindowTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/window/SessionWindowTestCase.java @@ -89,9 +89,9 @@ public void testSessionWindow1() { } catch (SiddhiAppCreationException e) { AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types " + "''. Supported parameter overloads are " + - "( window.session), " + - "( window.session, window.key), ( window.session, " + - " window.key, window.allowed.latency).", e.getCause().getMessage()); + "( session.gap), " + + "( session.gap, session.key), ( session.gap, " + + " session.key, allowed.latency).", e.getCause().getMessage()); throw e; } finally { if (siddhiAppRuntime != null) { @@ -120,7 +120,7 @@ public void testSessionWindow2() { siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(purchaseEventStream + query); } catch (SiddhiAppCreationException e) { - AssertJUnit.assertEquals("The 'session' expects input parameter 'window.session' at position '0' " + + AssertJUnit.assertEquals("The 'session' expects input parameter 'session.gap' at position '0' " + "to be static, but found a dynamic attribute.", e.getCause().getMessage()); throw e; } finally { @@ -153,9 +153,9 @@ public void testSessionWindow3() { } catch (SiddhiAppCreationException e) { AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types " + - "''. Supported parameter overloads are ( window.session), " + - "( window.session, window.key), ( window.session, " + - " window.key, window.allowed.latency).", e.getCause().getMessage()); + "''. Supported parameter overloads are ( session.gap), " + + "( session.gap, session.key), ( session.gap, " + + " session.key, allowed.latency).", e.getCause().getMessage()); throw e; } finally { if (siddhiAppRuntime != null) { @@ -220,9 +220,9 @@ public void testSessionWindow5() { } catch (SiddhiAppCreationException e) { AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types " + - "''. Supported parameter overloads are ( window.session), " + - "( window.session, window.key), ( window.session, " + - " window.key, window.allowed.latency).", e.getCause().getMessage()); + "''. Supported parameter overloads are ( session.gap), " + + "( session.gap, session.key), ( session.gap, " + + " session.key, allowed.latency).", e.getCause().getMessage()); throw e; } finally { if (siddhiAppRuntime != null) { @@ -253,7 +253,7 @@ public void testSessionWindow6() { siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(purchaseEventStream + query); } catch (SiddhiAppCreationException e) { - AssertJUnit.assertEquals("The 'session' expects input parameter 'window.allowed.latency' at position " + + AssertJUnit.assertEquals("The 'session' expects input parameter 'allowed.latency' at position " + "'2' to be static, but found a dynamic attribute.", e.getCause().getMessage()); throw e; } finally { @@ -287,10 +287,10 @@ public void testSessionWindow7() { } catch (SiddhiAppCreationException e) { AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute " + "types ''. Supported parameter overloads are " + - "( window.session), " + - "( window.session, window.key), " + - "( window.session, window.key, " + - " window.allowed.latency).", + "( session.gap), " + + "( session.gap, session.key), " + + "( session.gap, session.key, " + + " allowed.latency).", e.getCause().getMessage()); throw e; } finally { @@ -324,9 +324,9 @@ public void testSessionWindow8() { } catch (SiddhiAppCreationException e) { AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types" + - " ''. Supported parameter overloads are ( window.session), " + - "( window.session, window.key), ( window.session, " + - " window.key, window.allowed.latency).", e.getCause().getMessage()); + " ''. Supported parameter overloads are ( session.gap), " + + "( session.gap, session.key), ( session.gap, " + + " session.key, allowed.latency).", e.getCause().getMessage()); throw e; } finally { if (siddhiAppRuntime != null) { @@ -391,9 +391,9 @@ public void testSessionWindow10() { } catch (SiddhiAppCreationException e) { AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types " + - "''. Supported parameter overloads are ( window.session), " + - "( window.session, window.key), ( window.session, " + - " window.key, window.allowed.latency).", e.getCause().getMessage()); + "''. Supported parameter overloads are ( session.gap), " + + "( session.gap, session.key), ( session.gap, " + + " session.key, allowed.latency).", e.getCause().getMessage()); throw e; } finally { if (siddhiAppRuntime != null) {