diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf
index 46f5b084fc..33f1792576 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf
@@ -13,22 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
{
- "appId" : "HBaseAuditLogApp",
+ "appId" : "EaglePolicyEngine",
"mode" : "LOCAL",
"siteId" : "testsite",
"topology" : {
"name" : "alertUnitTopology_1",
"numOfTotalWorkers" : 2,
"numOfSpoutTasks" : 1,
- "numOfRouterBolts" : 4,
- "numOfAlertBolts" : 10,
+ "numOfRouterBolts" : 1,
+ "numOfAlertBolts" : 1,
"numOfPublishExecutors" : 1,
"numOfPublishTasks" : 1,
"messageTimeoutSecs": 3600,
"localMode" : "true"
},
"spout" : {
- "kafkaBrokerZkQuorum": "server.eagle.apache.org:2181",
+ "kafkaBrokerZkQuorum": "127.0.0.1:2000",
"kafkaBrokerZkBasePath": "/kafka",
"stormKafkaUseSameZkQuorumWithKafkaBroker": true,
"stormKafkaTransactionZkQuorum": "",
@@ -36,7 +36,7 @@
"stormKafkaEagleConsumer": "eagle_consumer"
},
"zkConfig" : {
- "zkQuorum" : "server.eagle.apache.org:2181",
+ "zkQuorum" : "127.0.0.1:2000",
"zkRoot" : "/alert"
},
"metadataService": {
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index 4338964242..dbce70fcdb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -65,6 +65,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
private SpoutSpec cachedSpoutSpec;
+
private transient KafkaSpoutMetric kafkaSpoutMetric;
@SuppressWarnings("rawtypes")
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
index f1f4e5894e..1bf9983cf4 100644
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
+++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
@@ -35,7 +35,7 @@ public class ExampleStormApplication extends StormApplication{
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("metric_spout", new RandomEventSpout(), config.getInt("spoutNum"));
+ builder.setSpout("metric_spout", new RandomEventSpout(), config.hasPath("spoutNum") ? config.getInt("spoutNum") : 1);
builder.setBolt("sink_1",environment.getStreamSink("SAMPLE_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
builder.setBolt("sink_2",environment.getStreamSink("SAMPLE_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
return builder.createTopology();
diff --git a/eagle-examples/eagle-app-example/src/main/resources/application.conf b/eagle-examples/eagle-app-example/src/main/resources/application.conf
new file mode 100644
index 0000000000..9d5c3c5669
--- /dev/null
+++ b/eagle-examples/eagle-app-example/src/main/resources/application.conf
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+ "appId" : "ExampleMonitorApp",
+ "mode" : "LOCAL",
+ "siteId" : "testsite",
+ "topology" : {
+ "numOfTotalWorkers" : 2,
+ "numOfSpoutTasks" : 2,
+ "numOfParserTasks" : 2,
+ "numOfJoinTasks" : 2,
+ "numOfSinkTasks" : 2
+ },
+ "dataSourceConfig": {
+ "topic" : "nonexistingtopic",
+ "zkConnection" : "127.0.0.1:2000",
+ "txZkServers" : "127.0.0.1:2000",
+ "schemeCls" : "storm.kafka.StringScheme"
+ },
+ "eagleService": {
+ "host": "localhost",
+ "port": 9090
+ "username": "admin",
+ "password": "secret"
+ },
+ "dataSinkConfig": {
+ "topic" : "myexampletopic",
+ "brokerList" : "127.0.0.1:2000",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
+ }
+}
diff --git a/eagle-flink/README.md b/eagle-flink/README.md
new file mode 100644
index 0000000000..4e1595f1d9
--- /dev/null
+++ b/eagle-flink/README.md
@@ -0,0 +1,22 @@
+## Design goals
+
+### 1. execute rules on one or multiple streams
+
+### 2. dynamically inject new rules on existing streams
+
+### 3. reuse streams as much as possible
+
+## Primivite operations
+
+### 1. rules on single stream keyed by some fields
+ avg(cpu) > 0.8 [1m] group by host
+
+ sum(failed_requests) > 60 [1m] group by host
+
+ avg(failure_ratio) > 0.1 [1m] group by host
+
+### 2. rules on multiple streams joined by some fields
+
+
+
+
diff --git a/eagle-flink/pom.xml b/eagle-flink/pom.xml
new file mode 100644
index 0000000000..6f6db82b8c
--- /dev/null
+++ b/eagle-flink/pom.xml
@@ -0,0 +1,55 @@
+
+
+
+ eagle-parent
+ org.apache.eagle
+ 1.0.0-SNAPSHOT
+
+ 4.0.0
+
+ eagle-flink
+
+ 1.10.0
+ 2.11
+
+
+
+ org.projectlombok
+ lombok
+ 1.16.20
+ provided
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.google.guava
+ guava
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ log4j
+ log4j
+
+
+ org.wso2.siddhi
+ siddhi-core
+
+
+ junit
+ junit
+ test
+
+
+
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/AlertDeduplication.java b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertDeduplication.java
new file mode 100644
index 0000000000..acb31ba3dc
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertDeduplication.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.flink;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.List;
+import java.util.Objects;
+
+public class AlertDeduplication {
+ private String outputStreamId;
+ private String dedupIntervalMin;
+ private List dedupFields;
+
+ public String getOutputStreamId() {
+ return outputStreamId;
+ }
+
+ public void setOutputStreamId(String outputStreamId) {
+ this.outputStreamId = outputStreamId;
+ }
+
+ public String getDedupIntervalMin() {
+ return dedupIntervalMin;
+ }
+
+ public void setDedupIntervalMin(String dedupIntervalMin) {
+ this.dedupIntervalMin = dedupIntervalMin;
+ }
+
+ public List getDedupFields() {
+ return dedupFields;
+ }
+
+ public void setDedupFields(List dedupFields) {
+ this.dedupFields = dedupFields;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(outputStreamId)
+ .append(dedupFields)
+ .append(dedupIntervalMin)
+ .build();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == this) {
+ return true;
+ }
+ if (!(that instanceof AlertDeduplication)) {
+ return false;
+ }
+ AlertDeduplication another = (AlertDeduplication) that;
+ if (ListUtils.isEqualList(another.dedupFields, this.dedupFields)
+ && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin)
+ && Objects.equals(another.outputStreamId, this.outputStreamId)) {
+ return true;
+ }
+ return false;
+ }
+
+
+}
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/AlertDefinition.java b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertDefinition.java
new file mode 100644
index 0000000000..108b283877
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertDefinition.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.Objects;
+
+public class AlertDefinition {
+ private TemplateType templateType = TemplateType.TEXT;
+ private String subject;
+ private String body;
+
+ private AlertSeverity severity;
+ private String category;
+
+ public String getBody() {
+ return body;
+ }
+
+ public void setBody(String templateResource) {
+ this.body = templateResource;
+ }
+
+ public TemplateType getTemplateType() {
+ return templateType;
+ }
+
+ public void setTemplateType(TemplateType type) {
+ this.templateType = type;
+ }
+
+ public String getSubject() {
+ return subject;
+ }
+
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+
+ public AlertSeverity getSeverity() {
+ return severity;
+ }
+
+ public void setSeverity(AlertSeverity severity) {
+ this.severity = severity;
+ }
+
+ public String getCategory() {
+ return category;
+ }
+
+ public void setCategory(String category) {
+ this.category = category;
+ }
+
+ public enum TemplateType {
+ TEXT,
+ // FILE,
+ // HTTP
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(templateType)
+ .append(this.body)
+ .append(this.category)
+ .append(this.severity)
+ .append(this.subject)
+ .build();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == this) {
+ return true;
+ }
+ if (!(that instanceof AlertDefinition)) {
+ return false;
+ }
+ AlertDefinition another = (AlertDefinition) that;
+ if (Objects.equals(another.templateType, this.templateType)
+ && Objects.equals(another.body, this.body)
+ && Objects.equals(another.category, this.category)
+ && Objects.equals(another.severity, this.severity)
+ && Objects.equals(another.subject, this.subject)) {
+ return true;
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/AlertPublishEvent.java b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertPublishEvent.java
new file mode 100644
index 0000000000..a4c48e9c6b
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertPublishEvent.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.flink;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Use as final rich alert event.
+ */
+public class AlertPublishEvent {
+ private String alertId;
+ private String siteId;
+ private List appIds;
+ private String policyId;
+ private String policyValue;
+ private long alertTimestamp;
+ private Map alertData;
+ private String alertSubject;
+ private String alertBody;
+ private String streamId;
+ private String createdBy;
+ private long createdTime;
+
+ public static final String ALERT_ID_KEY = "alertId";
+ public static final String SITE_ID_KEY = "siteId";
+ public static final String APP_IDS_KEY = "appIds";
+ public static final String POLICY_ID_KEY = "policyId";
+ public static final String POLICY_VALUE_KEY = "policyValue";
+ public static final String ALERT_CATEGORY = "category";
+ public static final String ALERT_SEVERITY = "severity";
+ public static final String ALERT_HOST = "host";
+
+ public String getAlertId() {
+ return alertId;
+ }
+
+ public void setAlertId(String alertId) {
+ this.alertId = alertId;
+ }
+
+ public List getAppIds() {
+ return appIds;
+ }
+
+ public void setAppIds(List appIds) {
+ this.appIds = appIds;
+ }
+
+ public String getPolicyValue() {
+ return policyValue;
+ }
+
+ public void setPolicyValue(String policyValue) {
+ this.policyValue = policyValue;
+ }
+
+ public long getAlertTimestamp() {
+ return alertTimestamp;
+ }
+
+ public void setAlertTimestamp(long alertTimestamp) {
+ this.alertTimestamp = alertTimestamp;
+ }
+
+ public String getSiteId() {
+ return siteId;
+ }
+
+ public void setSiteId(String siteId) {
+ this.siteId = siteId;
+ }
+
+
+ public String getPolicyId() {
+ return policyId;
+ }
+
+ public void setPolicyId(String policyId) {
+ this.policyId = policyId;
+ }
+
+ public Map getAlertData() {
+ return alertData;
+ }
+
+ public void setAlertData(Map alertData) {
+ this.alertData = alertData;
+ }
+
+ public static AlertPublishEvent createAlertPublishEvent(AlertStreamEvent event) {
+ Preconditions.checkNotNull(event.getAlertId(), "alertId is not initialized before being published: " + event.toString());
+ AlertPublishEvent alertEvent = new AlertPublishEvent();
+ alertEvent.setAlertId(event.getAlertId());
+ alertEvent.setPolicyId(event.getPolicyId());
+ alertEvent.setAlertTimestamp(event.getCreatedTime());
+ alertEvent.setStreamId(event.getStreamId());
+ alertEvent.setCreatedBy(event.getCreatedBy());
+ alertEvent.setCreatedTime(event.getCreatedTime());
+ alertEvent.setAlertSubject(event.getSubject());
+ alertEvent.setAlertBody(event.getBody());
+ if (event.getContext() != null && !event.getContext().isEmpty()) {
+ if (event.getContext().containsKey(SITE_ID_KEY)) {
+ alertEvent.setSiteId(event.getContext().get(SITE_ID_KEY).toString());
+ }
+ if (event.getContext().containsKey(POLICY_VALUE_KEY)) {
+ alertEvent.setPolicyValue(event.getContext().get(POLICY_VALUE_KEY).toString());
+ }
+ if (event.getContext().containsKey(APP_IDS_KEY)) {
+ alertEvent.setAppIds((List) event.getContext().get(APP_IDS_KEY));
+ }
+ }
+ alertEvent.setAlertData(event.getDataMap());
+ return alertEvent;
+ }
+
+ public String toString() {
+ return String.format("%s %s alertId=%s, siteId=%s, policyId=%s, alertData=%s",
+ DateTimeUtil.millisecondsToHumanDateWithSeconds(alertTimestamp),
+ DateTimeUtil.CURRENT_TIME_ZONE.getID(),
+ alertId,
+ siteId,
+ policyId,
+ alertData == null ? "" : alertData.toString());
+ }
+
+ public String getAlertSubject() {
+ return alertSubject;
+ }
+
+ public void setAlertSubject(String alertSubject) {
+ this.alertSubject = alertSubject;
+ }
+
+ public String getAlertBody() {
+ return alertBody;
+ }
+
+ public void setAlertBody(String alertBody) {
+ this.alertBody = alertBody;
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public void setStreamId(String streamId) {
+ this.streamId = streamId;
+ }
+
+ public String getCreatedBy() {
+ return createdBy;
+ }
+
+ public void setCreatedBy(String createdBy) {
+ this.createdBy = createdBy;
+ }
+
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+ public void setCreatedTime(long createdTime) {
+ this.createdTime = createdTime;
+ }
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/AlertSeverity.java b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertSeverity.java
new file mode 100644
index 0000000000..45452b86b1
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertSeverity.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import java.io.Serializable;
+
+public enum AlertSeverity{
+ UNKNOWN, OK, WARNING, CRITICAL, FATAL
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/AlertSink.java b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertSink.java
new file mode 100644
index 0000000000..267ae67ee4
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertSink.java
@@ -0,0 +1,17 @@
+package org.apache.eagle.flink;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertSink implements SinkFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);
+
+ @Override
+ public void invoke(AlertStreamEvent value, Context context) {
+ LOG.info(value.toString());
+ }
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/AlertStreamCallback.java b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertStreamCallback.java
new file mode 100644
index 0000000000..7aaf947f3b
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertStreamCallback.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+/**
+ * Get called back by Siddhi runtime when policy is evaluated to be true, then
+ * it will forward the alert to next processor in Flink
+ */
+public class AlertStreamCallback extends StreamCallback {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AlertStreamCallback.class);
+ private final String outputStream;
+ private final PolicyHandlerContext context;
+ private final StreamDefinition definition;
+
+ private int currentIndex;
+
+ public AlertStreamCallback(String outputStream,
+ StreamDefinition streamDefinition,
+ PolicyHandlerContext context,
+ int currentIndex) {
+ this.outputStream = outputStream;
+ this.context = context;
+ this.definition = streamDefinition;
+ this.currentIndex = currentIndex;
+ }
+
+ /**
+ * Possibly more than one event will be triggered for alerting.
+ */
+ @Override
+ public void receive(Event[] events) {
+ LOG.info("Generated {} alerts in {}, index of definiton {} ", events.length, context.getPolicyEvaluatorId(), currentIndex);
+ for (Event e : events) {
+ org.apache.flink.util.Collector eagleCollector = (org.apache.flink.util.Collector) e.getData()[0];
+ AlertStreamEvent event = new AlertStreamEvent();
+ event.setSiteId("");
+ event.setTimestamp(e.getTimestamp());
+ // remove collector from event
+ Object[] payload = new Object[e.getData().length - 1];
+ System.arraycopy(e.getData(), 1, payload, 0, e.getData().length - 1);
+ event.setData(payload);
+ event.setStreamId(outputStream);
+ event.setPolicyId("");
+ if (this.context.getPolicyEvaluator() != null) {
+ event.setCreatedBy(context.getPolicyEvaluator().getName());
+ }
+ event.setCreatedTime(System.currentTimeMillis());
+ event.setSchema(definition);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generate new alert event: {}", event);
+ }
+ eagleCollector.collect(event);
+ }
+ }
+}
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/AlertStreamCollector.java b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertStreamCollector.java
new file mode 100755
index 0000000000..2ea5c21b98
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertStreamCollector.java
@@ -0,0 +1,26 @@
+package org.apache.eagle.flink;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface AlertStreamCollector extends Collector {
+ /**
+ * No need to be thread-safe, but should be called on in synchronous like in Storm bolt execute method.
+ */
+ void flush();
+
+ void close();
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/AlertStreamEvent.java b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertStreamEvent.java
new file mode 100644
index 0000000000..ca9c0f8051
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/AlertStreamEvent.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import java.util.*;
+
+/**
+ * streamId stands for alert type instead of source event streamId.
+ */
+public class AlertStreamEvent extends StreamEvent {
+ private static final long serialVersionUID = 2392131134670106397L;
+
+ private String siteId;
+ private String alertId;
+ private String policyId;
+ private StreamDefinition schema;
+ private String createdBy;
+ private long createdTime;
+ private String category;
+ private boolean duplicationChecked = false;
+
+ // ----------------------
+ // Lazy Alert Fields
+ // ----------------------
+
+ // Dynamical context like app related fields
+ private Map context;
+ // Alert content like subject and body
+ private String subject;
+ private String body;
+
+ public AlertStreamEvent() {
+ }
+
+ public AlertStreamEvent(AlertStreamEvent event) {
+ this.siteId = event.getSiteId();
+ this.alertId = event.getAlertId();
+ this.policyId = event.policyId;
+ this.schema = event.schema;
+ this.createdBy = event.createdBy;
+ this.createdTime = event.createdTime;
+ this.setTimestamp(event.getTimestamp());
+ this.setData(new Object[event.data.length]);
+ System.arraycopy(event.data, 0, this.data, 0, event.data.length);
+ this.setStreamId(event.getStreamId());
+ this.setMetaVersion(event.getMetaVersion());
+ }
+
+ public void setPolicyId(String policyId) {
+ this.policyId = policyId;
+ }
+
+ public String getPolicyId() {
+ return policyId;
+ }
+
+ @Override
+ public String toString() {
+ if(this.getData() == null){
+ return "";
+ }
+ List dataStrings = new ArrayList<>(this.getData().length);
+ for (Object obj : this.getData()) {
+ if (obj != null) {
+ dataStrings.add(obj.toString());
+ } else {
+ dataStrings.add(null);
+ }
+ }
+
+ return String.format("Alert {site=%s, stream=%s,timestamp=%s,data=%s, policyId=%s, createdBy=%s, metaVersion=%s}",
+ this.getSiteId(),
+ this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
+ this.getDataMap(), this.getPolicyId(), this.getCreatedBy(), this.getMetaVersion());
+ }
+
+ public String getCreatedBy() {
+ return createdBy;
+ }
+
+ public void setCreatedBy(String createdBy) {
+ this.createdBy = createdBy;
+ }
+
+ public StreamDefinition getSchema() {
+ return schema;
+ }
+
+ public void setSchema(StreamDefinition schema) {
+ this.schema = schema;
+ }
+
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+ public void setCreatedTime(long createdTime) {
+ this.createdTime = createdTime;
+ }
+
+ public Map getDataMap() {
+ Map event = new HashMap<>();
+ for (StreamColumn column : schema.getColumns()) {
+ Object obj = this.getData()[schema.getColumnIndex(column.getName())];
+ if (obj == null) {
+ event.put(column.getName(), null);
+ continue;
+ }
+ event.put(column.getName(), obj);
+ }
+ return event;
+ }
+
+ public Map getContext() {
+ return context;
+ }
+
+ public void setContext(Map context) {
+ this.context = context;
+ }
+
+ public String getAlertId() {
+ ensureAlertId();
+ return alertId;
+ }
+
+ public void setAlertId(String alertId){
+ this.alertId = alertId;
+ }
+
+ public void ensureAlertId() {
+ if (this.alertId == null) {
+ this.alertId = UUID.randomUUID().toString();
+ }
+ }
+
+ public String getSubject() {
+ return subject;
+ }
+
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+
+ public String getBody() {
+ return body;
+ }
+
+ public void setBody(String body) {
+ this.body = body;
+ }
+
+ public String getCategory() {
+ return category;
+ }
+
+ public void setCategory(String category) {
+ this.category = category;
+ }
+
+ public String getSiteId() {
+ return siteId;
+ }
+
+ public void setSiteId(String siteId) {
+ this.siteId = siteId;
+ }
+
+ public boolean isDuplicationChecked() {
+ return duplicationChecked;
+ }
+
+ public void setDuplicationChecked(boolean duplicationChecked) {
+ this.duplicationChecked = duplicationChecked;
+ }
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/Collector.java b/eagle-flink/src/main/java/org/apache/eagle/flink/Collector.java
new file mode 100755
index 0000000000..294e25c980
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/Collector.java
@@ -0,0 +1,27 @@
+package org.apache.eagle.flink;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@FunctionalInterface
+public interface Collector {
+ /**
+ * Must make sure thread-safe.
+ *
+ * @param t
+ */
+ void emit(T t);
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/CompositePolicyHandler.java b/eagle-flink/src/main/java/org/apache/eagle/flink/CompositePolicyHandler.java
new file mode 100644
index 0000000000..032a8ec2b6
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/CompositePolicyHandler.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created on 7/27/16.
+ */
+public class CompositePolicyHandler implements PolicyStreamHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(CompositePolicyHandler.class);
+
+ private PolicyStreamHandler policyHandler;
+ private PolicyStreamHandler stateHandler;
+ private List handlers = new ArrayList<>();
+
+ private Collector collector;
+
+ private Map sds;
+
+ public CompositePolicyHandler(Map sds) {
+ this.sds = sds;
+ }
+
+ @Override
+ public void prepare(PolicyHandlerContext context) throws Exception {
+ this.collector = collector;
+ // TODO: create two handlers
+ policyHandler = PolicyStreamHandlers.createHandler(context.getPolicyDefinition().getDefinition(), sds);
+// policyHandler.prepare(collector, context);
+ handlers.add(policyHandler);
+
+ if (context.getPolicyDefinition().getStateDefinition() != null) {
+ stateHandler = PolicyStreamHandlers.createStateHandler(context.getPolicyDefinition().getStateDefinition().type, sds);
+// stateHandler.prepare(collector, context);
+ handlers.add(stateHandler);
+ }
+ }
+
+ @Override
+ public void send(StreamEvent event, Collector collector) throws Exception {
+ // policyHandler.send(event);
+ send(event, 0);
+ }
+
+ // send event to index of stream handler
+ public void send(StreamEvent event, int idx) throws Exception {
+ if (handlers.size() > idx) {
+ handlers.get(idx).send(event, null);
+ } else if (event instanceof AlertStreamEvent) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Emit new alert event: {}", event);
+ }
+ collector.emit((AlertStreamEvent) event); // for alert stream events, emit if no handler found.
+ } else {
+ // nothing found. LOG, and throw exception
+ LOG.error("non-alert-stream-event {} send with index {}, but the handler is not found!", event, idx);
+ throw new Exception(String.format("event %s send with idx %d can not found expecting handler!", event, idx));
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (PolicyStreamHandler handler : handlers) {
+ try {
+ handler.close();
+ } catch (Exception e) {
+ LOG.error("close handler {} failed, continue to run.", handler);
+ }
+ }
+ }
+
+}
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/DateTimeUtil.java b/eagle-flink/src/main/java/org/apache/eagle/flink/DateTimeUtil.java
new file mode 100644
index 0000000000..a25ffbaa63
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/DateTimeUtil.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * be aware that SimpleDateFormat instantiation is expensive, so if that's under a tight loop, probably we need
+ * a thread local SimpleDateFormat object.
+ */
+public class DateTimeUtil {
+ public static final long ONESECOND = 1L * 1000L;
+ public static final long ONEMINUTE = 1L * 60L * 1000L;
+ public static final long ONEHOUR = 1L * 60L * 60L * 1000L;
+ public static final long ONEDAY = 24L * 60L * 60L * 1000L;
+ public static TimeZone CURRENT_TIME_ZONE;
+
+ static {
+ Config config = ConfigFactory.load();
+ CURRENT_TIME_ZONE = TimeZone.getTimeZone((config.hasPath("service.timezone")
+ ? config.getString("service.timezone") : "UTC"));
+ }
+
+ public static Date humanDateToDate(String date) throws ParseException {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(CURRENT_TIME_ZONE);
+ return sdf.parse(date);
+ }
+
+ public static long getCurrentTimestamp() {
+ return System.currentTimeMillis();
+ }
+
+ public static String secondsToHumanDate(long seconds) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(CURRENT_TIME_ZONE);
+ Date t = new Date();
+ t.setTime(seconds * 1000);
+ return sdf.format(t);
+ }
+
+ public static String secondsToHumanDate(long seconds, TimeZone timeZone) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(timeZone);
+ Date t = new Date();
+ t.setTime(seconds * 1000);
+ return sdf.format(t);
+ }
+
+ public static String millisecondsToHumanDateWithMilliseconds(long milliseconds) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ sdf.setTimeZone(CURRENT_TIME_ZONE);
+ Date t = new Date();
+ t.setTime(milliseconds);
+ return sdf.format(t);
+ }
+
+ public static String millisecondsToHumanDateWithSeconds(long milliseconds) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(CURRENT_TIME_ZONE);
+ Date t = new Date();
+ t.setTime(milliseconds);
+ return sdf.format(t);
+ }
+
+ public static String millisecondsToHumanDateWithSecondsAndTimezone(long milliseconds) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
+ sdf.setTimeZone(CURRENT_TIME_ZONE);
+ Date t = new Date();
+ t.setTime(milliseconds);
+ return sdf.format(t);
+ }
+
+ public static long humanDateToSeconds(String date) throws ParseException {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(CURRENT_TIME_ZONE);
+ Date d = sdf.parse(date);
+ return d.getTime() / 1000;
+ }
+
+ public static long humanDateToSeconds(String date, TimeZone timeZone) throws ParseException {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(timeZone);
+ Date d = sdf.parse(date);
+ return d.getTime() / 1000;
+ }
+
+ public static long humanDateToMilliseconds(String date) throws ParseException {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ sdf.setTimeZone(CURRENT_TIME_ZONE);
+ Date d = sdf.parse(date);
+ return d.getTime();
+ }
+
+ public static long humanDateToMilliseconds(String date, TimeZone timeZone) throws ParseException {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ sdf.setTimeZone(timeZone);
+ Date d = sdf.parse(date);
+ return d.getTime();
+ }
+
+
+ public static long humanDateToMillisecondsWithoutException(String date) {
+ try {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ sdf.setTimeZone(CURRENT_TIME_ZONE);
+ Date d = sdf.parse(date);
+ return d.getTime();
+ } catch (ParseException ex) {
+ return 0L;
+ }
+ }
+
+ public static long humanDateToSecondsWithoutException(String date) {
+ try {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(CURRENT_TIME_ZONE);
+ Date d = sdf.parse(date);
+ return (d.getTime() / 1000);
+ } catch (ParseException ex) {
+ return 0L;
+ }
+ }
+
+ /**
+ * this could be accurate only when timezone is UTC
+ * for the timezones other than UTC, there is possibly issue, for example
+ * assume timezone is GMT+8 in China
+ * When user time is "2014-07-15 05:00:00", it will be converted to timestamp first,
+ * internally it would be "2014-07-14 21:00:00" in UTC timezone. When rounded down to day, the internal time would
+ * be changed to "2014-07-14 00:00:00", and that means the user time is "2014-07-14 08:00:00".
+ * But originally user wants to round it to "2014-07-15 00:00:00"
+ *
+ * @param timeInMillis the seconds elapsed since 1970-01-01 00:00:00
+ */
+ public static long roundDown(int field, long timeInMillis) {
+ switch (field) {
+ case Calendar.DAY_OF_MONTH:
+ case Calendar.DAY_OF_WEEK:
+ case Calendar.DAY_OF_YEAR:
+ return (timeInMillis - timeInMillis % (24 * 60 * 60 * 1000));
+ case Calendar.HOUR:
+ return (timeInMillis - timeInMillis % (60 * 60 * 1000));
+ case Calendar.MINUTE:
+ return (timeInMillis - timeInMillis % (60 * 1000));
+ case Calendar.SECOND:
+ return (timeInMillis - timeInMillis % (1000));
+ default:
+ return 0L;
+ }
+ }
+
+ public static String getCalendarFieldName(int field) {
+ switch (field) {
+ case Calendar.DAY_OF_MONTH:
+ return "DAY_OF_MONTH";
+ case Calendar.DAY_OF_WEEK:
+ return "DAY_OF_WEEK";
+ case Calendar.DAY_OF_YEAR:
+ return "DAY_OF_YEAR";
+ case Calendar.HOUR:
+ return "HOUR";
+ case Calendar.MINUTE:
+ return "MINUTE";
+ case Calendar.SECOND:
+ return "SECOND";
+ default:
+ throw new IllegalArgumentException("Unknown field code: " + field);
+ }
+ }
+
+ public static String format(long milliseconds, String format) {
+ SimpleDateFormat sdf = new SimpleDateFormat(format);
+ sdf.setTimeZone(CURRENT_TIME_ZONE);
+ Date t = new Date();
+ t.setTime(milliseconds);
+ return sdf.format(t);
+ }
+
+ //For mapr
+ //exp: 2015-06-06T10:44:22.800Z
+ public static long maprhumanDateToMilliseconds(String date) throws ParseException {
+ date = date.replace('T', ' ');
+ date = date.replace('Z', ' ');
+ date = date.replace('.', ',');
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS ");
+ sdf.setTimeZone(CURRENT_TIME_ZONE);
+ Date d = sdf.parse(date);
+ return d.getTime();
+ }
+
+ public static long parseTimeStrToMilliseconds(String timeStr) throws ParseException {
+ try {
+ return Long.valueOf(timeStr);
+ } catch (Exception ex) {
+ return humanDateToSeconds(timeStr) * ONESECOND;
+ }
+ }
+}
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/MyStreamCounter.java b/eagle-flink/src/main/java/org/apache/eagle/flink/MyStreamCounter.java
new file mode 100644
index 0000000000..a92345e036
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/MyStreamCounter.java
@@ -0,0 +1,20 @@
+package org.apache.eagle.flink;
+
+import org.apache.eagle.flink.StreamCounter;
+
+public class MyStreamCounter implements StreamCounter {
+ @Override
+ public void incr(String scopeName) {
+
+ }
+
+ @Override
+ public void incrBy(String scopeName, int length) {
+
+ }
+
+ @Override
+ public void scope(String scopeName) {
+
+ }
+}
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/PartitionedEvent.java b/eagle-flink/src/main/java/org/apache/eagle/flink/PartitionedEvent.java
new file mode 100644
index 0000000000..f26b4215ce
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/PartitionedEvent.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * This is a critical data structure across spout, router bolt and alert bolt
+ * partition[StreamPartition] defines how one incoming data stream is partitioned, sorted
+ * partitionKey[long] is java hash value of groupby fields. The groupby fields are defined in StreamPartition
+ * event[StreamEvent] is actual data.
+ */
+public class PartitionedEvent implements Serializable {
+ private static final long serialVersionUID = -3840016190614238593L;
+ private StreamPartition partition;
+ private long partitionKey;
+ private StreamEvent event;
+
+ public PartitionedEvent() {
+ this.event = null;
+ this.partition = null;
+ this.partitionKey = 0L;
+ }
+
+ public PartitionedEvent(StreamEvent event, StreamPartition partition, int partitionKey) {
+ this.event = event;
+ this.partition = partition;
+ this.partitionKey = partitionKey;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (obj instanceof PartitionedEvent) {
+ PartitionedEvent another = (PartitionedEvent) obj;
+ return !(this.partitionKey != another.getPartitionKey()
+ || !Objects.equals(this.event, another.getEvent())
+ || !Objects.equals(this.partition, another.getPartition()));
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(partitionKey)
+ .append(event)
+ .append(partition)
+ .build();
+ }
+
+ public StreamEvent getEvent() {
+ return event;
+ }
+
+ public void setEvent(StreamEvent event) {
+ this.event = event;
+ }
+
+ public StreamPartition getPartition() {
+ return partition;
+ }
+
+ public void setPartition(StreamPartition partition) {
+ this.partition = partition;
+ }
+
+ public void setPartitionKey(long partitionKey) {
+ this.partitionKey = partitionKey;
+ }
+
+ public long getPartitionKey() {
+ return this.partitionKey;
+ }
+
+ public String toString() {
+ return String.format("PartitionedEvent[partition=%s,event=%s,key=%s", partition, event, partitionKey);
+ }
+
+ public long getTimestamp() {
+ return (event != null) ? event.getTimestamp() : 0L;
+ }
+
+ public String getStreamId() {
+ return (event != null) ? event.getStreamId() : null;
+ }
+
+ public Object[] getData() {
+ return event != null ? event.getData() : null;
+ }
+
+ public boolean isSortRequired() {
+ return isPartitionRequired();
+ }
+
+ public boolean isPartitionRequired() {
+ return this.getPartition() != null;
+ }
+
+ public PartitionedEvent copy() {
+ PartitionedEvent copied = new PartitionedEvent();
+ copied.setEvent(this.getEvent());
+ copied.setPartition(this.partition);
+ copied.setPartitionKey(this.partitionKey);
+ return copied;
+ }
+
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyChangeListener.java b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyChangeListener.java
new file mode 100644
index 0000000000..2a1d26d61e
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyChangeListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import java.util.Collection;
+import java.util.List;
+
+import java.util.List;
+import java.util.Map;
+
+public interface PolicyChangeListener {
+ void onPolicyChange(String version,
+ List added,
+ List removed,
+ List modified, Map sds);
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyDefinition.java b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyDefinition.java
new file mode 100644
index 0000000000..55346618cd
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyDefinition.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.ListUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.io.Serializable;
+import java.util.*;
+
+/**
+ * @since Apr 5, 2016.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PolicyDefinition implements Serializable {
+ private static final long serialVersionUID = 377581499339572414L;
+ // unique identifier
+ private String name;
+ private String description;
+ private List inputStreams = new ArrayList<>();
+ private List outputStreams = new ArrayList<>();
+ private String siteId = "default";
+
+ private Definition definition;
+ private Definition stateDefinition;
+ private PolicyStatus policyStatus = PolicyStatus.ENABLED;
+ private AlertDefinition alertDefinition;
+ private List alertDeduplications = new ArrayList<>();
+
+ // one stream only have one partition in one policy, since we don't support stream alias
+ private List partitionSpec = new ArrayList();
+ private boolean dedicated;
+
+ // runtime configuration for policy, these are user-invisible
+ private int parallelismHint = 1;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public List getInputStreams() {
+ return inputStreams;
+ }
+
+ public void setInputStreams(List inputStreams) {
+ this.inputStreams = inputStreams;
+ }
+
+ public List getOutputStreams() {
+ return outputStreams;
+ }
+
+ public void setOutputStreams(List outputStreams) {
+ this.outputStreams = outputStreams;
+ }
+
+ public Definition getDefinition() {
+ return definition;
+ }
+
+ public Definition getStateDefinition() {
+ return stateDefinition;
+ }
+
+ public void setStateDefinition(Definition stateDefinition) {
+ this.stateDefinition = stateDefinition;
+ }
+
+ public void setDefinition(Definition definition) {
+ this.definition = definition;
+ }
+
+ public List getPartitionSpec() {
+ return partitionSpec;
+ }
+
+ public void setPartitionSpec(List partitionSpec) {
+ this.partitionSpec = partitionSpec;
+ }
+
+ public void addPartition(StreamPartition par) {
+ this.partitionSpec.add(par);
+ }
+
+ public boolean isDedicated() {
+ return dedicated;
+ }
+
+ public void setDedicated(boolean dedicated) {
+ this.dedicated = dedicated;
+ }
+
+ public int getParallelismHint() {
+ return parallelismHint;
+ }
+
+ public void setParallelismHint(int parallelism) {
+ this.parallelismHint = parallelism;
+ }
+
+ public PolicyStatus getPolicyStatus() {
+ return policyStatus;
+ }
+
+ public void setPolicyStatus(PolicyStatus policyStatus) {
+ this.policyStatus = policyStatus;
+ }
+
+ public List getAlertDeduplications() {
+ return alertDeduplications;
+ }
+
+ public void setAlertDeduplications(List alertDeduplications) {
+ this.alertDeduplications = alertDeduplications;
+ }
+
+ public AlertDefinition getAlertDefinition() {
+ return alertDefinition;
+ }
+
+ public void setAlertDefinition(AlertDefinition alertDefinition) {
+ this.alertDefinition = alertDefinition;
+ }
+
+ public AlertSeverity getAlertSeverity() {
+ return alertDefinition == null ? null : alertDefinition.getSeverity();
+ }
+
+ public String getAlertCategory() {
+ return alertDefinition == null ? null : alertDefinition.getCategory();
+ }
+
+ public String getSiteId() {
+ return siteId;
+ }
+
+ public void setSiteId(String siteId) {
+ this.siteId = siteId;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(siteId)
+ .append(name)
+ .append(inputStreams)
+ .append(outputStreams)
+ .append(definition)
+ .append(partitionSpec)
+ .append(policyStatus)
+ .append(parallelismHint)
+ .append(alertDefinition)
+ .append(alertDeduplications)
+ .build();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == this) {
+ return true;
+ }
+
+ if (!(that instanceof PolicyDefinition)) {
+ return false;
+ }
+
+ PolicyDefinition another = (PolicyDefinition) that;
+
+ if (Objects.equals(another.siteId, this.siteId)
+ && Objects.equals(another.name, this.name)
+ && Objects.equals(another.description, this.description)
+ && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
+ && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
+ && (another.definition != null && another.definition.equals(this.definition))
+ && Objects.equals(this.definition, another.definition)
+ && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
+ && another.policyStatus.equals(this.policyStatus)
+ && another.parallelismHint == this.parallelismHint
+ && Objects.equals(another.alertDefinition, alertDefinition)
+ && CollectionUtils.isEqualCollection(another.alertDeduplications, alertDeduplications)) {
+ return true;
+ }
+ return false;
+ }
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class Definition implements Serializable {
+ private static final long serialVersionUID = -622366527887848346L;
+
+ public String type;
+ public String value;
+ public String handlerClass;
+ public Map properties = new HashMap<>();
+
+ private List inputStreams = new ArrayList();
+ private List outputStreams = new ArrayList();
+
+ public Definition(String type, String value) {
+ this.type = type;
+ this.value = value;
+ }
+
+ public Definition() {
+ this.type = null;
+ this.value = null;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(type).append(value).build();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == this) {
+ return true;
+ }
+ if (!(that instanceof Definition)) {
+ return false;
+ }
+ Definition another = (Definition) that;
+ if (another.type.equals(this.type)
+ && another.value.equals(this.value)
+ && ListUtils.isEqualList(another.inputStreams, this.inputStreams)
+ && ListUtils.isEqualList(another.outputStreams, this.outputStreams)) {
+ return true;
+ }
+ return false;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ public void setInputStreams(List inputStreams) {
+ this.inputStreams = inputStreams;
+ }
+
+ public void setOutputStreams(List outputStreams) {
+ this.outputStreams = outputStreams;
+ }
+
+ public List getInputStreams() {
+ return inputStreams;
+ }
+
+ public List getOutputStreams() {
+ return outputStreams;
+ }
+
+ public String getHandlerClass() {
+ return handlerClass;
+ }
+
+ public void setHandlerClass(String handlerClass) {
+ this.handlerClass = handlerClass;
+ }
+
+ public Map getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{type=\"%s\",value=\"%s\", inputStreams=\"%s\", outputStreams=\"%s\" }", type, value, inputStreams, outputStreams);
+ }
+ }
+
+ public static enum PolicyStatus {
+ ENABLED, DISABLED
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{site=\"%s\", name=\"%s\",definition=%s}", this.getSiteId(), this.getName(), this.getDefinition() == null ? "null" : this.getDefinition().toString());
+ }
+}
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyGroupEvaluator.java b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyGroupEvaluator.java
new file mode 100644
index 0000000000..842ea83e61
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyGroupEvaluator.java
@@ -0,0 +1,41 @@
+package org.apache.eagle.flink;
+
+import java.io.Serializable;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * policy group refers to the policies which belong to the same MonitoredStream
+ * 3 lifecycle steps are involved in PolicyGroupEvaluator
+ * Step 1: create object. Be aware that in distributed environment, this object may be serialized and transferred across network
+ * Step 2: init. This normally is invoked only once before nextEvent is invoked
+ * Step 3: nextEvent
+ * Step 4: close
+ */
+public interface PolicyGroupEvaluator extends PolicyChangeListener, Serializable {
+ void init(StreamContext context, AlertStreamCollector collector);
+
+ /**
+ * Evaluate event.
+ */
+ void nextEvent(PartitionedEvent event);
+
+ String getName();
+
+ void close();
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyGroupEvaluatorImpl.java b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyGroupEvaluatorImpl.java
new file mode 100644
index 0000000000..2fb643f207
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyGroupEvaluatorImpl.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
+ private static final long serialVersionUID = -5499413193675985288L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(PolicyGroupEvaluatorImpl.class);
+
+ private AlertStreamCollector collector;
+ // mapping from policy name to PolicyDefinition
+ private volatile Map policyDefinitionMap = new HashMap<>();
+ // mapping from policy name to PolicyStreamHandler
+ private volatile Map policyStreamHandlerMap = new HashMap<>();
+ private String policyEvaluatorId;
+ private StreamContext context;
+
+ public PolicyGroupEvaluatorImpl(String policyEvaluatorId) {
+ this.policyEvaluatorId = policyEvaluatorId;
+ }
+
+ public void init(StreamContext context, AlertStreamCollector collector) {
+ this.collector = collector;
+ this.policyStreamHandlerMap = new HashMap<>();
+ this.context = context;
+ Thread.currentThread().setName(policyEvaluatorId);
+ }
+
+ public void nextEvent(PartitionedEvent event) {
+ this.context.counter().incr("receive_count");
+ dispatch(event);
+ }
+
+ @Override
+ public String getName() {
+ return this.policyEvaluatorId;
+ }
+
+ public void close() {
+ for (PolicyStreamHandler handler : policyStreamHandlerMap.values()) {
+ try {
+ handler.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close handler {}", handler.toString(), e);
+ }
+ }
+ }
+
+ /**
+ * fixme make selection of PolicyStreamHandler to be more efficient.
+ *
+ * @param partitionedEvent PartitionedEvent
+ */
+ private void dispatch(PartitionedEvent partitionedEvent) {
+ boolean handled = false;
+ for (Map.Entry policyStreamHandler : policyStreamHandlerMap.entrySet()) {
+ if (isAcceptedByPolicy(partitionedEvent, policyDefinitionMap.get(policyStreamHandler.getKey()))) {
+ try {
+ handled = true;
+ this.context.counter().incr("eval_count");
+ policyStreamHandler.getValue().send(partitionedEvent.getEvent(), null);
+ } catch (Exception e) {
+ this.context.counter().incr("fail_count");
+ LOG.error("{} failed to handle {}", policyStreamHandler.getValue(), partitionedEvent.getEvent(), e);
+ }
+ }
+ }
+ if (!handled) {
+ this.context.counter().incr("drop_count");
+ LOG.warn("Drop stream non-matched event {}, which should not be sent to evaluator", partitionedEvent);
+ } else {
+ this.context.counter().incr("accept_count");
+ }
+ }
+
+ private static boolean isAcceptedByPolicy(PartitionedEvent event, PolicyDefinition policy) {
+ return policy.getPartitionSpec().contains(event.getPartition())
+ && (policy.getInputStreams().contains(event.getEvent().getStreamId())
+ || policy.getDefinition().getInputStreams().contains(event.getEvent().getStreamId()));
+ }
+
+
+ @Override
+ public void onPolicyChange(String version, List added, List removed, List modified, Map sds) {
+ Map copyPolicies = new HashMap<>(policyDefinitionMap);
+ Map copyHandlers = new HashMap<>(policyStreamHandlerMap);
+ for (PolicyDefinition pd : added) {
+ inplaceAdd(copyPolicies, copyHandlers, pd, sds);
+ }
+ for (PolicyDefinition pd : removed) {
+ inplaceRemove(copyPolicies, copyHandlers, pd);
+ }
+ for (PolicyDefinition pd : modified) {
+ inplaceRemove(copyPolicies, copyHandlers, pd);
+ inplaceAdd(copyPolicies, copyHandlers, pd, sds);
+ }
+
+ // logging
+ LOG.info("{} with {} Policy metadata updated with added={}, removed={}, modified={}", policyEvaluatorId, version, added, removed, modified);
+
+ // switch reference
+ this.policyDefinitionMap = copyPolicies;
+ this.policyStreamHandlerMap = copyHandlers;
+ }
+
+ private void inplaceAdd(Map policies, Map handlers, PolicyDefinition policy, Map sds) {
+ if (handlers.containsKey(policy.getName())) {
+ LOG.error("metadata calculation error, try to add existing PolicyDefinition " + policy);
+ } else {
+ policies.put(policy.getName(), policy);
+ CompositePolicyHandler handler = new CompositePolicyHandler(sds);
+ try {
+ PolicyHandlerContext handlerContext = new PolicyHandlerContext();
+ handlerContext.setPolicyCounter(this.context.counter());
+ handlerContext.setPolicyDefinition(policy);
+ handlerContext.setPolicyEvaluator(this);
+ handlerContext.setPolicyEvaluatorId(policyEvaluatorId);
+ handlerContext.setConfig(this.context.config());
+ handler.prepare(handlerContext);
+ handlers.put(policy.getName(), handler);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ policies.remove(policy.getName());
+ handlers.remove(policy.getName());
+ }
+ }
+ }
+
+ private void inplaceRemove(Map policies, Map handlers, PolicyDefinition policy) {
+ if (handlers.containsKey(policy.getName())) {
+ PolicyStreamHandler handler = handlers.get(policy.getName());
+ try {
+ handler.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close handler {}", handler, e);
+ } finally {
+ policies.remove(policy.getName());
+ handlers.remove(policy.getName());
+ LOG.info("Removed policy: {}", policy);
+ }
+ } else {
+ LOG.error("metadata calculation error, try to remove nonexisting PolicyDefinition: " + policy);
+ }
+ }
+
+
+ public CompositePolicyHandler getPolicyHandler(String policy) {
+ return policyStreamHandlerMap.get(policy);
+ }
+
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyHandlerContext.java b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyHandlerContext.java
new file mode 100644
index 0000000000..fc53069e17
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyHandlerContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.flink;
+
+import com.typesafe.config.Config;
+
+public class PolicyHandlerContext {
+ private PolicyDefinition policyDefinition;
+ private PolicyGroupEvaluator policyEvaluator;
+ private StreamCounter policyCounter;
+ private String policyEvaluatorId;
+ private Config config;
+
+ public PolicyDefinition getPolicyDefinition() {
+ return policyDefinition;
+ }
+
+ public void setPolicyDefinition(PolicyDefinition policyDefinition) {
+ this.policyDefinition = policyDefinition;
+ }
+
+ public PolicyGroupEvaluator getPolicyEvaluator() {
+ return policyEvaluator;
+ }
+
+ public void setPolicyEvaluator(PolicyGroupEvaluator policyEvaluator) {
+ this.policyEvaluator = policyEvaluator;
+ }
+
+ public void setPolicyCounter(StreamCounter metric) {
+ this.policyCounter = metric;
+ }
+
+ public StreamCounter getPolicyCounter() {
+ return policyCounter;
+ }
+
+ public String getPolicyEvaluatorId() {
+ return policyEvaluatorId;
+ }
+
+ public void setPolicyEvaluatorId(String policyEvaluatorId) {
+ this.policyEvaluatorId = policyEvaluatorId;
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+
+ public void setConfig(Config config) {
+ this.config = config;
+ }
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyStreamHandler.java b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyStreamHandler.java
new file mode 100755
index 0000000000..6f55d9d7f6
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyStreamHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+public interface PolicyStreamHandler {
+ void prepare(PolicyHandlerContext context) throws Exception;
+
+ void send(StreamEvent event, Collector collector) throws Exception;
+
+ void close() throws Exception;
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyStreamHandlers.java b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyStreamHandlers.java
new file mode 100644
index 0000000000..7b1be47fb9
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/PolicyStreamHandlers.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * TODO/FIXME: to support multiple stage definition in single policy. The methods in this class is not good to understand now.(Hard code of 0/1).
+ */
+public class PolicyStreamHandlers {
+ private static final Logger LOG = LoggerFactory.getLogger(PolicyStreamHandlers.class);
+
+ public static final String SIDDHI_ENGINE = "siddhi";
+ public static final String NO_DATA_ALERT_ENGINE = "nodataalert";
+ public static final String ABSENCE_ALERT_ENGINE = "absencealert";
+ public static final String CUSTOMIZED_ENGINE = "Custom";
+
+ public static PolicyStreamHandler createHandler(PolicyDefinition.Definition definition, Map sds) {
+ if (SIDDHI_ENGINE.equals(definition.getType())) {
+ return new SiddhiPolicyHandler(sds, 0);// // FIXME: 8/2/16
+ }
+ throw new IllegalArgumentException("Illegal policy stream handler type " + definition.getType());
+ }
+
+ public static PolicyStreamHandler createStateHandler(String type, Map sds) {
+ if (SIDDHI_ENGINE.equals(type)) {
+ return new SiddhiPolicyStateHandler(sds, 1); //// FIXME: 8/2/16
+ }
+ throw new IllegalArgumentException("Illegal policy state handler type " + type);
+ }
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/SiddhiDefinitionAdapter.java b/eagle-flink/src/main/java/org/apache/eagle/flink/SiddhiDefinitionAdapter.java
new file mode 100644
index 0000000000..c404c0e254
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/SiddhiDefinitionAdapter.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.definition.Attribute;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class SiddhiDefinitionAdapter {
+ private static final String DEFINE_STREAM_TEMPLATE = "define stream %s ( %s );";
+ private static final String OUTPUT_STREAM_TEMPLATE = "select %s insert into %s;";
+
+ public static String buildSiddhiOutpuStreamClause(String outStreamId, List columns){
+ List siddhiColumns = new ArrayList<>();
+ for (StreamColumn column : columns) {
+ siddhiColumns.add(column.getName());
+ }
+ return String.format(OUTPUT_STREAM_TEMPLATE, StringUtils.join(siddhiColumns, ","), outStreamId);
+ }
+
+ public static String buildSiddhiDefineStreamClause(String inStreamId, List columns){
+ List siddhiColumns = new ArrayList<>();
+ for (StreamColumn column : columns) {
+ siddhiColumns.add(String.format("%s %s", column.getName(),
+ convertToSiddhiAttributeType(column.getType()).toString().toLowerCase()));
+ }
+ return String.format(DEFINE_STREAM_TEMPLATE, inStreamId, StringUtils.join(siddhiColumns, ","));
+ }
+
+ public static String buildStreamDefinition(StreamDefinition streamDefinition) {
+ List columns = new ArrayList<>();
+ Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null");
+ if (streamDefinition.getColumns() != null) {
+ for (StreamColumn column : streamDefinition.getColumns()) {
+ columns.add(String.format("%s %s", column.getName(), convertToSiddhiAttributeType(column.getType()).toString().toLowerCase()));
+ }
+ } else {
+ log.warn("No columns found for stream {}" + streamDefinition.getStreamId());
+ }
+ return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getStreamId(), StringUtils.join(columns, ","));
+ }
+
+ public static Attribute.Type convertToSiddhiAttributeType(StreamColumn.ColumnType type) {
+ if (_EAGLE_SIDDHI_TYPE_MAPPING.containsKey(type)) {
+ return _EAGLE_SIDDHI_TYPE_MAPPING.get(type);
+ }
+
+ throw new IllegalArgumentException("Unknown stream type: " + type);
+ }
+
+ public static Class> convertToJavaAttributeType(StreamColumn.ColumnType type) {
+ if (_EAGLE_JAVA_TYPE_MAPPING.containsKey(type)) {
+ return _EAGLE_JAVA_TYPE_MAPPING.get(type);
+ }
+
+ throw new IllegalArgumentException("Unknown stream type: " + type);
+ }
+
+ public static StreamColumn.ColumnType convertFromJavaAttributeType(Class> type) {
+ if (_JAVA_EAGLE_TYPE_MAPPING.containsKey(type)) {
+ return _JAVA_EAGLE_TYPE_MAPPING.get(type);
+ }
+
+ throw new IllegalArgumentException("Unknown stream type: " + type);
+ }
+
+ public static StreamColumn.ColumnType convertFromSiddhiAttributeType(Attribute.Type type) {
+ if (_SIDDHI_EAGLE_TYPE_MAPPING.containsKey(type)) {
+ return _SIDDHI_EAGLE_TYPE_MAPPING.get(type);
+ }
+
+ throw new IllegalArgumentException("Unknown siddhi type: " + type);
+ }
+
+ /**
+ * Build Siddhi execution plan off a single input stream and output stream
+ * A Siddhi execution plan consists of three parts: input stream definitions, output stream definitions and policy
+ * So the evaluation flow is:
+ * input stream -> policy evaluation -> output stream
+ */
+ public static String buildSiddhiExecutionPlan(StreamDefinition inStreamDef,
+ String policy, StreamDefinition outStreamDef) {
+ StringBuilder builder = new StringBuilder();
+ List modifiedIn = new ArrayList<>(inStreamDef.getColumns());
+ StreamColumn iCollectorCol = StreamColumn.builder().name("__collector__").
+ type(StreamColumn.ColumnType.OBJECT).build();
+ modifiedIn.add(0, iCollectorCol);
+ builder.append(SiddhiDefinitionAdapter.buildSiddhiDefineStreamClause(inStreamDef.getStreamId(), modifiedIn));
+ builder.append("\n");
+
+ // concatenate policy and output stream definition
+ // ex: "from sampleStream_1[name == \"cpu\" and value > 50.0] select __collector__, name, host, flag, value insert into outputStream;"
+ builder.append("from ");
+ builder.append(inStreamDef.getStreamId());
+ builder.append(" [");
+ builder.append(policy);
+ builder.append("] ");
+
+ List modifiedOut = new ArrayList<>(inStreamDef.getColumns());
+ StreamColumn oCollectorCol = StreamColumn.builder().name("__collector__").
+ type(StreamColumn.ColumnType.OBJECT).build();
+ modifiedOut.add(0, oCollectorCol);
+ builder.append(SiddhiDefinitionAdapter.buildSiddhiOutpuStreamClause(outStreamDef.getStreamId(), modifiedOut));
+ log.debug("Generated siddhi execution plan: {}", builder.toString());
+ return builder.toString();
+ }
+
+ public static String buildSiddhiExecutionPlan(PolicyDefinition policyDefinition, Map sds) {
+ StringBuilder builder = new StringBuilder();
+ PolicyDefinition.Definition coreDefinition = policyDefinition.getDefinition();
+ // init if not present
+ List inputStreams = coreDefinition.getInputStreams();
+ if (inputStreams == null || inputStreams.isEmpty()) {
+ inputStreams = policyDefinition.getInputStreams();
+ }
+
+ for (String inputStream : inputStreams) {
+ StreamDefinition sd = sds.get(inputStream);
+ List columns = sd.getColumns();
+ columns = new ArrayList<>(columns);
+ StreamColumn collectorCol = StreamColumn.builder().name("__collector__").
+ type(StreamColumn.ColumnType.OBJECT).build();
+ columns.add(0, collectorCol);
+ builder.append(SiddhiDefinitionAdapter.buildSiddhiDefineStreamClause(sd.getStreamId(), columns));
+ builder.append("\n");
+ }
+ builder.append(coreDefinition.value);
+ log.debug("Generated siddhi execution plan: {} from definition: {}", builder.toString(), coreDefinition);
+ return builder.toString();
+ }
+
+ public static String buildSiddhiExecutionPlan(String policyDefinition, Map inputStreamDefinitions) {
+ StringBuilder builder = new StringBuilder();
+ for (Map.Entry entry: inputStreamDefinitions.entrySet()) {
+ builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(entry.getValue()));
+ builder.append("\n");
+ }
+ builder.append(policyDefinition);
+ log.debug("Generated siddhi execution plan: {}", builder.toString());
+ return builder.toString();
+ }
+
+ /**
+ * public enum Type {
+ * STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT
+ * }.
+ */
+ private static final Map _EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>();
+ private static final Map> _EAGLE_JAVA_TYPE_MAPPING = new HashMap<>();
+ private static final Map, StreamColumn.ColumnType> _JAVA_EAGLE_TYPE_MAPPING = new HashMap<>();
+ private static final Map _SIDDHI_EAGLE_TYPE_MAPPING = new HashMap<>();
+
+ static {
+ _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.ColumnType.STRING, Attribute.Type.STRING);
+ _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.ColumnType.INT, Attribute.Type.INT);
+ _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.ColumnType.LONG, Attribute.Type.LONG);
+ _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.ColumnType.FLOAT, Attribute.Type.FLOAT);
+ _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.ColumnType.DOUBLE, Attribute.Type.DOUBLE);
+ _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.ColumnType.BOOL, Attribute.Type.BOOL);
+ _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.ColumnType.OBJECT, Attribute.Type.OBJECT);
+
+ _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.ColumnType.STRING, String.class);
+ _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.ColumnType.INT, Integer.class);
+ _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.ColumnType.LONG, Long.class);
+ _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.ColumnType.FLOAT, Float.class);
+ _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.ColumnType.DOUBLE, Double.class);
+ _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.ColumnType.BOOL, Boolean.class);
+ _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.ColumnType.OBJECT, Object.class);
+
+ _JAVA_EAGLE_TYPE_MAPPING.put(String.class, StreamColumn.ColumnType.STRING);
+ _JAVA_EAGLE_TYPE_MAPPING.put(Integer.class, StreamColumn.ColumnType.INT);
+ _JAVA_EAGLE_TYPE_MAPPING.put(Long.class, StreamColumn.ColumnType.LONG);
+ _JAVA_EAGLE_TYPE_MAPPING.put(Float.class, StreamColumn.ColumnType.FLOAT);
+ _JAVA_EAGLE_TYPE_MAPPING.put(Double.class, StreamColumn.ColumnType.DOUBLE);
+ _JAVA_EAGLE_TYPE_MAPPING.put(Boolean.class, StreamColumn.ColumnType.BOOL);
+ _JAVA_EAGLE_TYPE_MAPPING.put(Object.class, StreamColumn.ColumnType.OBJECT);
+
+ _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.STRING, StreamColumn.ColumnType.STRING);
+ _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.INT, StreamColumn.ColumnType.INT);
+ _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.LONG, StreamColumn.ColumnType.LONG);
+ _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.FLOAT, StreamColumn.ColumnType.FLOAT);
+ _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.DOUBLE, StreamColumn.ColumnType.DOUBLE);
+ _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.BOOL, StreamColumn.ColumnType.BOOL);
+ _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.OBJECT, StreamColumn.ColumnType.OBJECT);
+ }
+
+ public static StreamDefinition convertFromSiddiDefinition(AbstractDefinition siddhiDefinition) {
+ StreamDefinition.StreamDefinitionBuilder builder = StreamDefinition.builder();
+ builder.streamId(siddhiDefinition.getId());
+ List columns = new ArrayList<>(siddhiDefinition.getAttributeNameArray().length);
+ for (Attribute attribute : siddhiDefinition.getAttributeList()) {
+ StreamColumn.StreamColumnBuilder colBuilder = StreamColumn.builder();
+ colBuilder.type(convertFromSiddhiAttributeType(attribute.getType()));
+ colBuilder.name(attribute.getName());
+ columns.add(colBuilder.build());
+ }
+ builder.columns(columns);
+ builder.timeseries(true);
+ builder.description("Auto-generated stream schema from siddhi for " + siddhiDefinition.getId());
+ return builder.build();
+ }
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/SiddhiPolicyFlinkProcessor.java b/eagle-flink/src/main/java/org/apache/eagle/flink/SiddhiPolicyFlinkProcessor.java
new file mode 100644
index 0000000000..d04836082c
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/SiddhiPolicyFlinkProcessor.java
@@ -0,0 +1,93 @@
+package org.apache.eagle.flink;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A Flink processor to evaluate Siddhi policy
+ *
+ * The steps to evaluate Siddhi policy should follow Flink's lifecycle methods,
+ * Note: this processor must be serialiable as Flink runtime may persist it for failover
+ * 1. setup phase. Initialize Siddhi runtime in open() method
+ * 2. event process phase.
+ * In Siddhi stream callback, invoke Flink collector
+ * 3. cleanup phase
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class SiddhiPolicyFlinkProcessor extends KeyedProcessFunction {
+ private final StreamDefinition inStreamDef;
+ private final String policy;
+ private final StreamDefinition outStreamDef;
+
+ private volatile SiddhiManager siddhiManager;
+ private ExecutionPlanRuntime executionRuntime;
+
+ /**
+ * setup phase
+ */
+ @Override
+ public void open(Configuration parameters) throws Exception{
+ this.siddhiManager = new SiddhiManager();
+ String plan = SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(inStreamDef, policy, outStreamDef);
+ log.info("Siddhi execution plan: {}", plan);
+ try {
+ this.executionRuntime = siddhiManager.createExecutionPlanRuntime(plan);
+ log.info("Created siddhi runtime {}", executionRuntime.getName());
+ } catch (Exception parserException) {
+ log.error("Failed to create siddhi runtime for input stream: {}, output stream: {}, siddhi plan: \n\n{}\n",
+ inStreamDef.getStreamId(), outStreamDef.getStreamId(), plan, parserException);
+ throw parserException;
+ }
+
+ // fixme what to set up for PolicyHandlerContext
+ PolicyHandlerContext context = new PolicyHandlerContext();
+ context.setPolicyDefinition(null);
+ context.setPolicyCounter(new MyStreamCounter());
+
+ // add output stream callback
+ this.executionRuntime.addCallback(outStreamDef.getStreamId(),
+ new AlertStreamCallback(outStreamDef.getStreamId(), outStreamDef, context, 0));
+ this.executionRuntime.start();
+ }
+
+ /**
+ * event process phase
+ * input StreamEvent and output AlertStreamEvent
+ * Note: in order for Siddhi runtime's callback can use @param out to collect output, we should add @param out
+ * into @param value. In Siddhi callback, it can get original event so the collector.
+ * In fact, the Object[] data's first element is reserved for this purpose.
+ */
+ @Override
+ public void processElement(StreamEvent value, Context ctx, Collector out) throws Exception {
+ String streamId = value.getStreamId();
+ InputHandler inputHandler = executionRuntime.getInputHandler(streamId);
+ if (inputHandler != null) {
+ // add collector to existing event data
+ Object[] modified = new Object[value.getData().length+1];
+ modified[0] = out;
+ System.arraycopy(value.getData(), 0, modified, 1, value.getData().length);
+ inputHandler.send(value.getTimestamp(), modified);
+ log.debug("sent event to siddhi stream {} ", streamId);
+ } else {
+ log.warn("No input handler found for stream {}", streamId);
+ }
+ }
+
+ /**
+ * cleanup phse
+ */
+ @Override
+ public void close() throws Exception {
+ this.executionRuntime.shutdown();
+ }
+}
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/SiddhiPolicyHandler.java b/eagle-flink/src/main/java/org/apache/eagle/flink/SiddhiPolicyHandler.java
new file mode 100755
index 0000000000..53281a54be
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/SiddhiPolicyHandler.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+
+import java.util.List;
+import java.util.Map;
+
+public class SiddhiPolicyHandler implements PolicyStreamHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyHandler.class);
+ private ExecutionPlanRuntime executionRuntime;
+ private SiddhiManager siddhiManager;
+ private Map sds;
+ private PolicyDefinition policy;
+ private PolicyHandlerContext context;
+
+ private int currentIndex = 0; // the index of current definition statement inside the policy definition
+
+ public SiddhiPolicyHandler(Map sds, int index) {
+ this.sds = sds;
+ this.currentIndex = index;
+ }
+
+ protected String generateExecutionPlan(PolicyDefinition policyDefinition, Map sds) throws StreamNotDefinedException {
+ return SiddhiDefinitionAdapter.buildSiddhiExecutionPlan(policyDefinition,sds);
+ }
+
+ @Override
+ public void prepare(PolicyHandlerContext context) throws Exception {
+ LOG.info("Initializing handler for policy {}", context.getPolicyDefinition());
+ this.policy = context.getPolicyDefinition();
+ this.siddhiManager = new SiddhiManager();
+ String plan = generateExecutionPlan(policy, sds);
+ LOG.info("Siddhi execution plan: {}", plan);
+ try {
+ this.executionRuntime = siddhiManager.createExecutionPlanRuntime(plan);
+ LOG.info("Created siddhi runtime {}", executionRuntime.getName());
+ } catch (Exception parserException) {
+ LOG.error("Failed to create siddhi runtime for policy: {}, siddhi plan: \n\n{}\n", context.getPolicyDefinition().getName(), plan, parserException);
+ throw parserException;
+ }
+
+ // add output stream callback
+ List outputStreams = getOutputStreams(policy);
+ for (final String outputStream : outputStreams) {
+ if (executionRuntime.getStreamDefinitionMap().containsKey(outputStream)) {
+ StreamDefinition streamDefinition = SiddhiDefinitionAdapter.convertFromSiddiDefinition(executionRuntime.getStreamDefinitionMap().get(outputStream));
+ this.executionRuntime.addCallback(outputStream,
+ new AlertStreamCallback(outputStream, streamDefinition,
+ context, currentIndex));
+ } else {
+ throw new IllegalStateException("Undefined output stream " + outputStream);
+ }
+ }
+ this.executionRuntime.start();
+ this.context = context;
+ LOG.info("Initialized policy handler for policy: {}", policy.getName());
+ }
+
+ protected List getOutputStreams(PolicyDefinition policy) {
+ return policy.getOutputStreams().isEmpty() ? policy.getDefinition().getOutputStreams() : policy.getOutputStreams();
+ }
+
+ public void send(StreamEvent event, Collector collector) throws Exception {
+ context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "receive_count"));
+ String streamId = event.getStreamId();
+ InputHandler inputHandler = executionRuntime.getInputHandler(streamId);
+ if (inputHandler != null) {
+ context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "eval_count"));
+ // wrap event with collector
+// Object[] wrapper = new Object[event.getData().length + 1];
+// wrapper[0] = collector;
+// System.arraycopy(event.getData(), 0, wrapper, 1, event.getData().length);
+ event.getData()[0] = collector;
+ inputHandler.send(event.getTimestamp(), event.getData());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("sent event to siddhi stream {} ", streamId);
+ }
+ } else {
+ context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "drop_count"));
+ LOG.warn("No input handler found for stream {}", streamId);
+ }
+ }
+
+ public void close() throws Exception {
+ LOG.info("Closing handler for policy {}", this.policy.getName());
+ this.executionRuntime.shutdown();
+ LOG.info("Shutdown siddhi runtime {}", this.executionRuntime.getName());
+ this.siddhiManager.shutdown();
+ LOG.info("Shutdown siddhi manager {}", this.siddhiManager);
+ LOG.info("Closed handler for policy {}", this.policy.getName());
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("SiddhiPolicyHandler for policy: ");
+ sb.append(this.policy == null ? "" : this.policy.getName());
+ return sb.toString();
+ }
+
+}
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/SiddhiPolicyStateHandler.java b/eagle-flink/src/main/java/org/apache/eagle/flink/SiddhiPolicyStateHandler.java
new file mode 100644
index 0000000000..3d67e55a69
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/SiddhiPolicyStateHandler.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created on 7/27/16.
+ */
+public class SiddhiPolicyStateHandler extends SiddhiPolicyHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyStateHandler.class);
+
+ public SiddhiPolicyStateHandler(Map sds, int index) {
+ super(sds, index);
+ }
+
+ @Override
+ protected String generateExecutionPlan(PolicyDefinition policyDefinition, Map sds) throws StreamNotDefinedException {
+ StringBuilder builder = new StringBuilder();
+ PolicyDefinition.Definition stateDefiniton = policyDefinition.getStateDefinition();
+ List inputStreams = stateDefiniton.getInputStreams();
+ for (String inputStream : inputStreams) { // the state stream follow the output stream of the policy definition
+ builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream)));
+ builder.append("\n");
+ }
+ builder.append(stateDefiniton.value);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generated siddhi state execution plan: {} from definiton: {}", builder.toString(), stateDefiniton);
+ }
+ return builder.toString();
+ }
+
+ @Override
+ protected List getOutputStreams(PolicyDefinition policy) {
+ return policy.getStateDefinition().getOutputStreams();
+ }
+
+ // more validation on prepare
+
+}
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/StreamColumn.java b/eagle-flink/src/main/java/org/apache/eagle/flink/StreamColumn.java
new file mode 100644
index 0000000000..80babbb28a
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/StreamColumn.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import lombok.Builder;
+import lombok.Data;
+import java.io.Serializable;
+
+@Builder
+@Data
+public class StreamColumn implements Serializable {
+ private String name;
+ private ColumnType type;
+ private Object defaultValue;
+ private boolean required;
+ private String description;
+ private String nodataExpression;
+
+ public enum ColumnType{
+ STRING,
+ INT,
+ LONG,
+ FLOAT,
+ DOUBLE,
+ BOOL,
+ OBJECT
+ }
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/StreamContext.java b/eagle-flink/src/main/java/org/apache/eagle/flink/StreamContext.java
new file mode 100644
index 0000000000..e1afc20a10
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/StreamContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import com.typesafe.config.Config;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface StreamContext {
+ StreamCounter counter();
+
+ Config config();
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/StreamContextImpl.java b/eagle-flink/src/main/java/org/apache/eagle/flink/StreamContextImpl.java
new file mode 100644
index 0000000000..010f3a4c32
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/StreamContextImpl.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import com.typesafe.config.Config;
+
+public class StreamContextImpl implements StreamContext {
+ private final Config config;
+ private final StreamCounter counter;
+
+ public StreamContextImpl(Config config, MyStreamCounter counter) {
+ this.counter = counter;
+ this.config = config;
+ }
+
+ @Override
+ public StreamCounter counter() {
+ return this.counter;
+ }
+
+ @Override
+ public Config config() {
+ return this.config;
+ }
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/StreamCounter.java b/eagle-flink/src/main/java/org/apache/eagle/flink/StreamCounter.java
new file mode 100644
index 0000000000..94497e9813
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/StreamCounter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+public interface StreamCounter {
+ void incr(String scopeName);
+
+ void incrBy(String scopeName, int length);
+
+ void scope(String scopeName);
+}
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/StreamDefinition.java b/eagle-flink/src/main/java/org/apache/eagle/flink/StreamDefinition.java
new file mode 100644
index 0000000000..6872685743
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/StreamDefinition.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import lombok.Builder;
+import lombok.Data;
+import java.io.Serializable;
+import java.util.List;
+
+@Builder(toBuilder = true)
+@Data
+public class StreamDefinition implements Serializable {
+ // Stream unique ID
+ private String streamId;
+
+ // Stream description
+ private String description;
+
+ // Is validateable or not
+ private boolean validate;
+
+ // Is timeseries-based stream or not
+ private boolean timeseries;
+
+ // Stream data source ID
+ private String dataSource;
+
+ private String group;
+
+ private String streamSource;
+
+ // Tenant (Site) ID
+ private String siteId;
+
+ private List columns;
+
+ public int getColumnIndex(String column) {
+ int i = 0;
+ for (StreamColumn col : this.getColumns()) {
+ if (col.getName().equals(column)) {
+ return i;
+ }
+ i++;
+ }
+ return -1;
+ }
+
+}
\ No newline at end of file
diff --git a/eagle-flink/src/main/java/org/apache/eagle/flink/StreamEvent.java b/eagle-flink/src/main/java/org/apache/eagle/flink/StreamEvent.java
new file mode 100644
index 0000000000..33ccbdfaf5
--- /dev/null
+++ b/eagle-flink/src/main/java/org/apache/eagle/flink/StreamEvent.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.flink;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * @since Apr 5, 2016.
+ */
+public class StreamEvent implements Serializable {
+ private static final long serialVersionUID = 2765116509856609763L;
+
+ protected String streamId;
+ protected Object[] data;
+ protected long timestamp;
+ protected String metaVersion;
+
+ public StreamEvent() {
+ }
+
+ public StreamEvent(String streamId, long timestamp, Object[] data) {
+ this.setStreamId(streamId);
+ this.setTimestamp(timestamp);
+ this.setData(data);
+ }
+
+ public StreamEvent(String streamId, long timestamp, Object[] data, String metaVersion) {
+ this.setStreamId(streamId);
+ this.setTimestamp(timestamp);
+ this.setData(data);
+ this.setMetaVersion(metaVersion);
+ }
+
+ public Collector getFlinkCollector(){
+ return (Collector)(data[0]);
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public void setStreamId(String streamId) {
+ this.streamId = streamId;
+ }
+
+ public long getKey(){
+ return timestamp;
+ }
+
+ public void setData(Object[] data) {
+ this.data = data;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getMetaVersion() {
+ return metaVersion;
+ }
+
+ public void setMetaVersion(String metaVersion) {
+ this.metaVersion = metaVersion;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(streamId).append(timestamp).append(data).append(metaVersion).build();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof StreamEvent) {
+ StreamEvent another = (StreamEvent) obj;
+ return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp
+ && Arrays.deepEquals(this.data, another.data) && Objects.equals(this.metaVersion, another.metaVersion);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ List dataStrings = new ArrayList<>();
+ if (this.getData() != null) {
+ for (Object obj : this.getData()) {
+ if (obj != null) {
+ dataStrings.add(obj.toString());
+ } else {
+ dataStrings.add(null);
+ }
+ }
+ }
+ return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]",
+ this.getStreamId(),
+ DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
+ StringUtils.join(dataStrings, ","),
+ this.getMetaVersion());
+ }
+
+ public static StreamEventBuilder builder() {
+ return new StreamEventBuilder();
+ }
+
+ /**
+ * @return cloned new event object.
+ */
+ public StreamEvent copy() {
+ StreamEvent newEvent = new StreamEvent();
+ newEvent.setTimestamp(this.getTimestamp());
+ newEvent.setData(this.getData());
+ newEvent.setStreamId(this.getStreamId());
+ newEvent.setMetaVersion(this.getMetaVersion());
+ return newEvent;
+ }
+
+ public void copyFrom(StreamEvent event) {
+ this.setTimestamp(event.getTimestamp());
+ this.setData(event.getData());
+ this.setStreamId(event.getStreamId());
+ this.setMetaVersion(event.getMetaVersion());
+ }
+
+ public Object[] getData() {
+ return data;
+ }
+
+ public Object[] getData(StreamDefinition streamDefinition, List column) {
+ ArrayList