Skip to content

Commit

Permalink
Merge pull request #1580 from suhothayan/master
Browse files Browse the repository at this point in the history
Improve session window documentation
  • Loading branch information
mohanvive authored Dec 5, 2019
2 parents 1ab8dde + af44cef commit 8e03def
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,51 +62,63 @@
@Extension(
name = "session",
namespace = "",
description = "This is a session window that holds events that belong to a specific session. The events " +
"that belong to a specific session are identified by a grouping attribute (i.e., a session key). A " +
"session gap period is specified to determine the time period after which the session is considered " +
"to be expired. A new event that arrives with a specific value for the session key is matched with" +
" the session window with the same session key.\n " +
"There can be out of order and late arrival of events, these events can arrive after the session is " +
"expired, to include those events to the matching session key specify a " +
"latency time period that is less than the session gap period." +
"To have aggregate functions with session windows, the events need to be grouped by the " +
"session key via a 'group by' clause.",
description = "Holds events that belong to a session. Events belong to a specific session are " +
"identified by a session key, and a session gap is determines the time period after which " +
"the session is considered to be expired. " +
"To have meaningful aggregation on session windows, the events need to be aggregated based on " +
"session key via a `group by` clause.",
parameters = {
@Parameter(name = "window.session",
description = "The time period for which the session considered is valid. This is specified" +
" in seconds, minutes, or milliseconds (i.e., 'min', 'sec', or 'ms'.",
@Parameter(name = "session.gap",
description = "The time period after which the session is considered to be expired.",
type = {DataType.INT, DataType.LONG, DataType.TIME}),
@Parameter(name = "window.key",
description = "The grouping attribute for events.",
@Parameter(name = "session.key",
description = "The session identification attribute. Used to group events belonging to a " +
"specific session.",
type = {DataType.STRING}, optional = true, defaultValue = "default-key", dynamic = true),
@Parameter(name = "window.allowed.latency",
description = "This specifies the time period for which the session window is valid after " +
"the expiration of the session. The time period specified here should be less than " +
"the session time gap (which is specified via the 'window.session' parameter).",
@Parameter(name = "allowed.latency",
description = "The time period for which the session window is valid after " +
"the expiration of the session, to accept late event arrivals. " +
"This time period should be less than " +
"the `session.gap` parameter.",
type = {DataType.INT, DataType.LONG, DataType.TIME}, optional = true, defaultValue = "0")
},
parameterOverloads = {
@ParameterOverload(parameterNames = {"window.session"}),
@ParameterOverload(parameterNames = {"window.session", "window.key"}),
@ParameterOverload(parameterNames = {"window.session", "window.key", "window.allowed.latency"})
@ParameterOverload(parameterNames = {"session.gap"}),
@ParameterOverload(parameterNames = {"session.gap", "session.key"}),
@ParameterOverload(parameterNames = {"session.gap", "session.key", "allowed.latency"})

},
examples = {
@Example(
syntax = "define stream PurchaseEventStream "
+ "(user string, item_number int, price float, quantity int);\n"
+ "\n"
+ "@info(name='query0) \n"
+ "@info(name='query1) \n"
+ "from PurchaseEventStream#window.session(5 sec, user) \n"
+ "select user, sum(quantity) as totalQuantity, sum(price) as totalPrice \n"
+ "group by user \n"
+ "insert into OutputStream;",
description = "From the events arriving at the PurchaseEventStream, " +
"a session window with 5 seconds session gap is processed based on 'user' attribute " +
"as the session group identification key. All events falling into the same session " +
"are aggregated based on `user` attribute, and outputted to the OutputStream."
),
@Example(
syntax = "define stream PurchaseEventStream "
+ "(user string, item_number int, price float, quantity int);\n"
+ "\n"
+ "@info(name='query2) \n"
+ "from PurchaseEventStream#window.session(5 sec, user, 2 sec) \n"
+ "select * \n"
+ "insert all events into OutputStream;",
description = "This query processes events that arrive at the PurchaseEvent input stream. " +
"The 'user' attribute is the session key, and the session gap is 5 " +
"seconds. '2 sec' is specified as the allowed latency. Therefore, events with the " +
"matching user name that arrive 2 seconds after the expiration of the session are " +
"also considered when performing aggregations for the session identified by the given" +
" user name."
+ "select user, sum(quantity) as totalQuantity, sum(price) as totalPrice \n"
+ "group by user \n"
+ "insert into OutputStream;",
description = "From the events arriving at the PurchaseEventStream, " +
"a session window with 5 seconds session gap is processed based on 'user' attribute " +
"as the session group identification key. This session window is kept active for " +
"2 seconds after the session expiry to capture late (out of order) event arrivals. " +
"If the event timestamp falls in to the last session the session is reactivated. " +
"Then all events falling into the same session " +
"are aggregated based on `user` attribute, and outputted to the OutputStream."
)
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ public void testSessionWindow1() {
} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types " +
"'<LONG, STRING, LONG, INT>'. Supported parameter overloads are " +
"(<INT|LONG|TIME> window.session), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key), (<INT|LONG|TIME> window.session, " +
"<STRING> window.key, <INT|LONG|TIME> window.allowed.latency).", e.getCause().getMessage());
"(<INT|LONG|TIME> session.gap), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key), (<INT|LONG|TIME> session.gap, " +
"<STRING> session.key, <INT|LONG|TIME> allowed.latency).", e.getCause().getMessage());
throw e;
} finally {
if (siddhiAppRuntime != null) {
Expand Down Expand Up @@ -120,7 +120,7 @@ public void testSessionWindow2() {
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(purchaseEventStream + query);

} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("The 'session' expects input parameter 'window.session' at position '0' " +
AssertJUnit.assertEquals("The 'session' expects input parameter 'session.gap' at position '0' " +
"to be static, but found a dynamic attribute.", e.getCause().getMessage());
throw e;
} finally {
Expand Down Expand Up @@ -153,9 +153,9 @@ public void testSessionWindow3() {

} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types " +
"'<STRING, STRING, LONG>'. Supported parameter overloads are (<INT|LONG|TIME> window.session), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key), (<INT|LONG|TIME> window.session, " +
"<STRING> window.key, <INT|LONG|TIME> window.allowed.latency).", e.getCause().getMessage());
"'<STRING, STRING, LONG>'. Supported parameter overloads are (<INT|LONG|TIME> session.gap), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key), (<INT|LONG|TIME> session.gap, " +
"<STRING> session.key, <INT|LONG|TIME> allowed.latency).", e.getCause().getMessage());
throw e;
} finally {
if (siddhiAppRuntime != null) {
Expand Down Expand Up @@ -220,9 +220,9 @@ public void testSessionWindow5() {

} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types " +
"'<LONG, INT, LONG>'. Supported parameter overloads are (<INT|LONG|TIME> window.session), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key), (<INT|LONG|TIME> window.session, " +
"<STRING> window.key, <INT|LONG|TIME> window.allowed.latency).", e.getCause().getMessage());
"'<LONG, INT, LONG>'. Supported parameter overloads are (<INT|LONG|TIME> session.gap), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key), (<INT|LONG|TIME> session.gap, " +
"<STRING> session.key, <INT|LONG|TIME> allowed.latency).", e.getCause().getMessage());
throw e;
} finally {
if (siddhiAppRuntime != null) {
Expand Down Expand Up @@ -253,7 +253,7 @@ public void testSessionWindow6() {
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(purchaseEventStream + query);

} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("The 'session' expects input parameter 'window.allowed.latency' at position " +
AssertJUnit.assertEquals("The 'session' expects input parameter 'allowed.latency' at position " +
"'2' to be static, but found a dynamic attribute.", e.getCause().getMessage());
throw e;
} finally {
Expand Down Expand Up @@ -287,10 +287,10 @@ public void testSessionWindow7() {
} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute " +
"types '<LONG, STRING, STRING>'. Supported parameter overloads are " +
"(<INT|LONG|TIME> window.session), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key, " +
"<INT|LONG|TIME> window.allowed.latency).",
"(<INT|LONG|TIME> session.gap), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key, " +
"<INT|LONG|TIME> allowed.latency).",
e.getCause().getMessage());
throw e;
} finally {
Expand Down Expand Up @@ -324,9 +324,9 @@ public void testSessionWindow8() {

} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types" +
" '<LONG, INT>'. Supported parameter overloads are (<INT|LONG|TIME> window.session), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key), (<INT|LONG|TIME> window.session, " +
"<STRING> window.key, <INT|LONG|TIME> window.allowed.latency).", e.getCause().getMessage());
" '<LONG, INT>'. Supported parameter overloads are (<INT|LONG|TIME> session.gap), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key), (<INT|LONG|TIME> session.gap, " +
"<STRING> session.key, <INT|LONG|TIME> allowed.latency).", e.getCause().getMessage());
throw e;
} finally {
if (siddhiAppRuntime != null) {
Expand Down Expand Up @@ -391,9 +391,9 @@ public void testSessionWindow10() {

} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types " +
"'<LONG, LONG>'. Supported parameter overloads are (<INT|LONG|TIME> window.session), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key), (<INT|LONG|TIME> window.session, " +
"<STRING> window.key, <INT|LONG|TIME> window.allowed.latency).", e.getCause().getMessage());
"'<LONG, LONG>'. Supported parameter overloads are (<INT|LONG|TIME> session.gap), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key), (<INT|LONG|TIME> session.gap, " +
"<STRING> session.key, <INT|LONG|TIME> allowed.latency).", e.getCause().getMessage());
throw e;
} finally {
if (siddhiAppRuntime != null) {
Expand Down

0 comments on commit 8e03def

Please sign in to comment.