Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve session window documentation #1580

Merged
merged 4 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,51 +62,63 @@
@Extension(
name = "session",
namespace = "",
description = "This is a session window that holds events that belong to a specific session. The events " +
"that belong to a specific session are identified by a grouping attribute (i.e., a session key). A " +
"session gap period is specified to determine the time period after which the session is considered " +
"to be expired. A new event that arrives with a specific value for the session key is matched with" +
" the session window with the same session key.\n " +
"There can be out of order and late arrival of events, these events can arrive after the session is " +
"expired, to include those events to the matching session key specify a " +
"latency time period that is less than the session gap period." +
"To have aggregate functions with session windows, the events need to be grouped by the " +
"session key via a 'group by' clause.",
description = "Holds events that belong to a session. Events belong to a specific session are " +
"identified by a session key, and a session gap is determines the time period after which " +
"the session is considered to be expired. " +
"To have meaningful aggregation on session windows, the events need to be aggregated based on " +
"session key via a `group by` clause.",
parameters = {
@Parameter(name = "window.session",
description = "The time period for which the session considered is valid. This is specified" +
" in seconds, minutes, or milliseconds (i.e., 'min', 'sec', or 'ms'.",
@Parameter(name = "session.gap",
description = "The time period after which the session is considered to be expired.",
type = {DataType.INT, DataType.LONG, DataType.TIME}),
@Parameter(name = "window.key",
description = "The grouping attribute for events.",
@Parameter(name = "session.key",
description = "The session identification attribute. Used to group events belonging to a " +
"specific session.",
type = {DataType.STRING}, optional = true, defaultValue = "default-key", dynamic = true),
@Parameter(name = "window.allowed.latency",
description = "This specifies the time period for which the session window is valid after " +
"the expiration of the session. The time period specified here should be less than " +
"the session time gap (which is specified via the 'window.session' parameter).",
@Parameter(name = "allowed.latency",
description = "The time period for which the session window is valid after " +
"the expiration of the session, to accept late event arrivals. " +
"This time period should be less than " +
"the `session.gap` parameter.",
type = {DataType.INT, DataType.LONG, DataType.TIME}, optional = true, defaultValue = "0")
},
parameterOverloads = {
@ParameterOverload(parameterNames = {"window.session"}),
@ParameterOverload(parameterNames = {"window.session", "window.key"}),
@ParameterOverload(parameterNames = {"window.session", "window.key", "window.allowed.latency"})
@ParameterOverload(parameterNames = {"session.gap"}),
@ParameterOverload(parameterNames = {"session.gap", "session.key"}),
@ParameterOverload(parameterNames = {"session.gap", "session.key", "allowed.latency"})

},
examples = {
@Example(
syntax = "define stream PurchaseEventStream "
+ "(user string, item_number int, price float, quantity int);\n"
+ "\n"
+ "@info(name='query0) \n"
+ "@info(name='query1) \n"
+ "from PurchaseEventStream#window.session(5 sec, user) \n"
+ "select user, sum(quantity) as totalQuantity, sum(price) as totalPrice \n"
+ "group by user \n"
+ "insert into OutputStream;",
description = "From the events arriving at the PurchaseEventStream, " +
"a session window with 5 seconds session gap is processed based on 'user' attribute " +
"as the session group identification key. All events falling into the same session " +
"are aggregated based on `user` attribute, and outputted to the OutputStream."
),
@Example(
syntax = "define stream PurchaseEventStream "
+ "(user string, item_number int, price float, quantity int);\n"
+ "\n"
+ "@info(name='query2) \n"
+ "from PurchaseEventStream#window.session(5 sec, user, 2 sec) \n"
+ "select * \n"
+ "insert all events into OutputStream;",
description = "This query processes events that arrive at the PurchaseEvent input stream. " +
"The 'user' attribute is the session key, and the session gap is 5 " +
"seconds. '2 sec' is specified as the allowed latency. Therefore, events with the " +
"matching user name that arrive 2 seconds after the expiration of the session are " +
"also considered when performing aggregations for the session identified by the given" +
" user name."
+ "select user, sum(quantity) as totalQuantity, sum(price) as totalPrice \n"
+ "group by user \n"
+ "insert into OutputStream;",
description = "From the events arriving at the PurchaseEventStream, " +
"a session window with 5 seconds session gap is processed based on 'user' attribute " +
"as the session group identification key. This session window is kept active for " +
"2 seconds after the session expiry to capture late (out of order) event arrivals. " +
"If the event timestamp falls in to the last session the session is reactivated. " +
"Then all events falling into the same session " +
"are aggregated based on `user` attribute, and outputted to the OutputStream."
)
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ public void testSessionWindow1() {
} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types " +
"'<LONG, STRING, LONG, INT>'. Supported parameter overloads are " +
"(<INT|LONG|TIME> window.session), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key), (<INT|LONG|TIME> window.session, " +
"<STRING> window.key, <INT|LONG|TIME> window.allowed.latency).", e.getCause().getMessage());
"(<INT|LONG|TIME> session.gap), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key), (<INT|LONG|TIME> session.gap, " +
"<STRING> session.key, <INT|LONG|TIME> allowed.latency).", e.getCause().getMessage());
throw e;
} finally {
if (siddhiAppRuntime != null) {
Expand Down Expand Up @@ -120,7 +120,7 @@ public void testSessionWindow2() {
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(purchaseEventStream + query);

} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("The 'session' expects input parameter 'window.session' at position '0' " +
AssertJUnit.assertEquals("The 'session' expects input parameter 'session.gap' at position '0' " +
"to be static, but found a dynamic attribute.", e.getCause().getMessage());
throw e;
} finally {
Expand Down Expand Up @@ -153,9 +153,9 @@ public void testSessionWindow3() {

} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types " +
"'<STRING, STRING, LONG>'. Supported parameter overloads are (<INT|LONG|TIME> window.session), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key), (<INT|LONG|TIME> window.session, " +
"<STRING> window.key, <INT|LONG|TIME> window.allowed.latency).", e.getCause().getMessage());
"'<STRING, STRING, LONG>'. Supported parameter overloads are (<INT|LONG|TIME> session.gap), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key), (<INT|LONG|TIME> session.gap, " +
"<STRING> session.key, <INT|LONG|TIME> allowed.latency).", e.getCause().getMessage());
throw e;
} finally {
if (siddhiAppRuntime != null) {
Expand Down Expand Up @@ -220,9 +220,9 @@ public void testSessionWindow5() {

} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types " +
"'<LONG, INT, LONG>'. Supported parameter overloads are (<INT|LONG|TIME> window.session), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key), (<INT|LONG|TIME> window.session, " +
"<STRING> window.key, <INT|LONG|TIME> window.allowed.latency).", e.getCause().getMessage());
"'<LONG, INT, LONG>'. Supported parameter overloads are (<INT|LONG|TIME> session.gap), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key), (<INT|LONG|TIME> session.gap, " +
"<STRING> session.key, <INT|LONG|TIME> allowed.latency).", e.getCause().getMessage());
throw e;
} finally {
if (siddhiAppRuntime != null) {
Expand Down Expand Up @@ -253,7 +253,7 @@ public void testSessionWindow6() {
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(purchaseEventStream + query);

} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("The 'session' expects input parameter 'window.allowed.latency' at position " +
AssertJUnit.assertEquals("The 'session' expects input parameter 'allowed.latency' at position " +
"'2' to be static, but found a dynamic attribute.", e.getCause().getMessage());
throw e;
} finally {
Expand Down Expand Up @@ -287,10 +287,10 @@ public void testSessionWindow7() {
} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute " +
"types '<LONG, STRING, STRING>'. Supported parameter overloads are " +
"(<INT|LONG|TIME> window.session), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key, " +
"<INT|LONG|TIME> window.allowed.latency).",
"(<INT|LONG|TIME> session.gap), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key, " +
"<INT|LONG|TIME> allowed.latency).",
e.getCause().getMessage());
throw e;
} finally {
Expand Down Expand Up @@ -324,9 +324,9 @@ public void testSessionWindow8() {

} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types" +
" '<LONG, INT>'. Supported parameter overloads are (<INT|LONG|TIME> window.session), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key), (<INT|LONG|TIME> window.session, " +
"<STRING> window.key, <INT|LONG|TIME> window.allowed.latency).", e.getCause().getMessage());
" '<LONG, INT>'. Supported parameter overloads are (<INT|LONG|TIME> session.gap), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key), (<INT|LONG|TIME> session.gap, " +
"<STRING> session.key, <INT|LONG|TIME> allowed.latency).", e.getCause().getMessage());
throw e;
} finally {
if (siddhiAppRuntime != null) {
Expand Down Expand Up @@ -391,9 +391,9 @@ public void testSessionWindow10() {

} catch (SiddhiAppCreationException e) {
AssertJUnit.assertEquals("There is no parameterOverload for 'session' that matches attribute types " +
"'<LONG, LONG>'. Supported parameter overloads are (<INT|LONG|TIME> window.session), " +
"(<INT|LONG|TIME> window.session, <STRING> window.key), (<INT|LONG|TIME> window.session, " +
"<STRING> window.key, <INT|LONG|TIME> window.allowed.latency).", e.getCause().getMessage());
"'<LONG, LONG>'. Supported parameter overloads are (<INT|LONG|TIME> session.gap), " +
"(<INT|LONG|TIME> session.gap, <STRING> session.key), (<INT|LONG|TIME> session.gap, " +
"<STRING> session.key, <INT|LONG|TIME> allowed.latency).", e.getCause().getMessage());
throw e;
} finally {
if (siddhiAppRuntime != null) {
Expand Down