Skip to content

Commit

Permalink
haoch#65 addRouteOperator state add
Browse files Browse the repository at this point in the history
  • Loading branch information
pranjal0811 committed Apr 29, 2020
1 parent 57bce89 commit 882cb43
Showing 1 changed file with 78 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,26 @@

package org.apache.flink.streaming.siddhi.router;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.siddhi.control.ControlEvent;
import org.apache.flink.streaming.siddhi.control.MetadataControlEvent;
import org.apache.flink.streaming.siddhi.control.OperationControlEvent;
import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
import org.apache.flink.streaming.siddhi.utils.SiddhiExecutionPlanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -38,18 +49,78 @@
public class AddRouteOperator extends AbstractStreamOperator<Tuple2<StreamRoute, Object>>
implements OneInputStreamOperator<Tuple2<StreamRoute, Object>, Tuple2<StreamRoute, Object>> {

private Map<String, Set<String>> inputStreamToExecutionPlans = new HashMap<>();
private static final Logger LOGGER = LoggerFactory.getLogger(AddRouteOperator.class);

private Map<String, List<String>> executionPlanIdToPartitionKeys = new HashMap<>();
private transient Map<String, Set<String>> inputStreamToExecutionPlans;

private Map<String, Boolean> executionPlanEnabled = new HashMap<>();
private transient Map<String, List<String>> executionPlanIdToPartitionKeys;

private transient Map<String, Boolean> executionPlanEnabled;

private Map<String, SiddhiStreamSchema<?>> dataStreamSchemas;

private transient ListState<Tuple2<String,Object>> addRouteState;

private static final String ADD_ROUTE_OPERATOR_STATE = "add_route_operator_state";

public AddRouteOperator(Map<String, SiddhiStreamSchema<?>> dataStreamSchemas) {
this.dataStreamSchemas = new HashMap<>(dataStreamSchemas);
}

@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Tuple2<StreamRoute, Object>>> output) {
super.setup(containingTask, config, output);
inputStreamToExecutionPlans = new HashMap<>();
executionPlanIdToPartitionKeys = new HashMap<>();
executionPlanEnabled = new HashMap<>();
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
addRouteState.clear();

addRouteState.add(Tuple2.of("inputStreamToExecutionPlans",inputStreamToExecutionPlans));
addRouteState.add(Tuple2.of("executionPlanIdToPartitionKeys",executionPlanIdToPartitionKeys));
addRouteState.add(Tuple2.of("executionPlanEnabled",executionPlanEnabled));

}

public void restoreState() throws Exception{
for(Tuple2<String,Object> element : addRouteState.get()){
switch(element.f0){
case "inputStreamToExecutionPlans":
inputStreamToExecutionPlans = (HashMap<String,Set<String>>)element.f1;
break;
case "executionPlanIdToPartitionKeys":
executionPlanIdToPartitionKeys = (HashMap<String,List<String>>)element.f1;
break;
case "executionPlanEnabled":
executionPlanEnabled = (HashMap<String,Boolean>)element.f1;
break;
default:
LOGGER.warn("No valid state tuple");
}
}
LOGGER.info("AddRouteOperator states restored successfully.....");
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
ListStateDescriptor<Tuple2<String, Object>> descriptor =
new ListStateDescriptor<>(ADD_ROUTE_OPERATOR_STATE,
TypeInformation.of(new TypeHint<Tuple2<String, Object>>() {}));

if(addRouteState == null){
addRouteState = context.getOperatorStateStore().getUnionListState(descriptor);
}

if (context.isRestored()) {
restoreState();
}

}
@Override
public void processElement(StreamRecord<Tuple2<StreamRoute, Object>> element) throws Exception {
StreamRoute streamRoute = element.getValue().f0;
Expand Down Expand Up @@ -81,13 +152,16 @@ public void processElement(StreamRecord<Tuple2<StreamRoute, Object>> element) th
String[] fieldNames = schema.getFieldNames();
Object[] row = schema.getStreamSerializer().getRow(value);
streamRoute.setPartitionKey(-1);
long partitionValue = 0;
for (String partitionKey : partitionKeys) {
long partitionValue = 0;
for (int i = 0; i < fieldNames.length; ++i) {
if (partitionKey.equals(fieldNames[i])) {
partitionValue += row[i].hashCode();
}
}
}

if(partitionValue != 0) {
streamRoute.setPartitionKey(Math.abs(partitionValue));
}

Expand Down

0 comments on commit 882cb43

Please sign in to comment.