Skip to content

Commit

Permalink
[hotfix] Add checkstyle to module wrapper-siddhi (#1028)
Browse files Browse the repository at this point in the history
  • Loading branch information
tenthe authored Jan 3, 2023
1 parent 7e960ba commit 4993474
Show file tree
Hide file tree
Showing 56 changed files with 403 additions and 350 deletions.
9 changes: 8 additions & 1 deletion streampipes-wrapper-siddhi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,12 @@
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

public class SiddhiAppConfig {

private List<String> queries;
private List<String> definitions;
private final List<String> queries;
private final List<String> definitions;
private SiddhiOutputConfig outputConfig;

public SiddhiAppConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public class SiddhiAppConfigBuilder {

private SiddhiAppConfig siddhiAppConfig;
private final SiddhiAppConfig siddhiAppConfig;

public static SiddhiAppConfigBuilder create(SiddhiOutputConfig outputConfig) {
return new SiddhiAppConfigBuilder(outputConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@
*/
package org.apache.streampipes.wrapper.siddhi;

import org.apache.streampipes.wrapper.siddhi.query.*;
import org.apache.streampipes.wrapper.siddhi.query.FromClause;
import org.apache.streampipes.wrapper.siddhi.query.GroupByClause;
import org.apache.streampipes.wrapper.siddhi.query.HavingClause;
import org.apache.streampipes.wrapper.siddhi.query.InsertIntoClause;
import org.apache.streampipes.wrapper.siddhi.query.LimitClause;
import org.apache.streampipes.wrapper.siddhi.query.OffsetClause;
import org.apache.streampipes.wrapper.siddhi.query.OrderByClause;
import org.apache.streampipes.wrapper.siddhi.query.SelectClause;
import org.apache.streampipes.wrapper.siddhi.query.SiddhiQuery;

public class SiddhiQueryBuilder {

private SiddhiQuery siddhiQuery;
private final SiddhiQuery siddhiQuery;

public static SiddhiQueryBuilder create(FromClause fromClause, InsertIntoClause insertIntoClause) {
return new SiddhiQueryBuilder(fromClause.toSiddhiEpl(), insertIntoClause.toSiddhiEpl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
public class SiddhiConstants {

public static final String SELECT = "select";
public static final String INSERT = "insert";
public static final String FROM = "from";
public static final String OFFSET = "offset";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@
*/
package org.apache.streampipes.wrapper.siddhi.engine;

import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.output.StreamCallback;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.StreamDefinition;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
Expand All @@ -33,6 +27,13 @@
import org.apache.streampipes.wrapper.siddhi.manager.SpSiddhiManager;
import org.apache.streampipes.wrapper.siddhi.model.EventPropertyDef;
import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;

import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.output.StreamCallback;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.StreamDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,21 +74,21 @@ public void initializeEngine(SiddhiInvocationConfigGenerator<? extends EventProc

siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(settings.getSiddhiAppString());
settings.getSiddhiProcessorParams().getParams()
.getInEventTypes()
.forEach((key, value) -> {
String preparedKey = SiddhiUtils.prepareName(key);
siddhiInputHandlers.put(key, siddhiAppRuntime.getInputHandler(preparedKey));
});
.getInEventTypes()
.forEach((key, value) -> {
String preparedKey = SiddhiUtils.prepareName(key);
siddhiInputHandlers.put(key, siddhiAppRuntime.getInputHandler(preparedKey));
});

StreamCallback callback;
Map<String, StreamDefinition> streamDef = siddhiAppRuntime.getStreamDefinitionMap();
String outputKey = SiddhiUtils.getPreparedOutputTopicName(params);
List<Attribute> streamAttributes = streamDef.get(outputKey).getAttributeList();
if (!debugMode) {
callback = new SiddhiOutputStreamCallback(spOutputCollector,
runtimeContext,
streamAttributes,
settings.getSiddhiAppConfig().getOutputConfig());
runtimeContext,
streamAttributes,
settings.getSiddhiAppConfig().getOutputConfig());
} else {
callback = new SiddhiOutputStreamDebugCallback(debugCallback, settings.getSiddhiAppConfig().getOutputConfig());
}
Expand All @@ -101,10 +102,10 @@ public void processEvent(org.apache.streampipes.model.runtime.Event event) {
String sourceId = event.getSourceInfo().getSourceId();
InputHandler inputHandler = siddhiInputHandlers.get(sourceId);
List<String> eventKeys = this.typeInfo
.get(sourceId)
.stream()
.map(EventPropertyDef::getFieldName)
.collect(Collectors.toList());
.get(sourceId)
.stream()
.map(EventPropertyDef::getFieldName)
.collect(Collectors.toList());

inputHandler.send(SiddhiUtils.toObjArr(eventKeys, event.getRaw()));
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@
import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> implements
EventProcessor<B>, SiddhiStatementGenerator<B> {
public abstract class SiddhiEventEngine<T extends EventProcessorBindingParams> implements
EventProcessor<T>, SiddhiStatementGenerator<T> {

private static final Logger LOG = LoggerFactory.getLogger(SiddhiEventEngine.class);

private SiddhiEngine siddhiEngine;
private final SiddhiEngine siddhiEngine;

public SiddhiEventEngine() {
this.siddhiEngine = new SiddhiEngine();
Expand All @@ -43,9 +44,10 @@ public SiddhiEventEngine(SiddhiDebugCallback debugCallback) {
}

@Override
public void onInvocation(B parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
SiddhiInvocationConfigGenerator<B> siddhiConfigGenerator = new SiddhiInvocationConfigGenerator<>(parameters,
this::makeStatements);
public void onInvocation(T parameters, SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext) {
SiddhiInvocationConfigGenerator<T> siddhiConfigGenerator = new SiddhiInvocationConfigGenerator<>(parameters,
this::makeStatements);
this.siddhiEngine.initializeEngine(siddhiConfigGenerator, spOutputCollector, runtimeContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.streampipes.wrapper.siddhi.SiddhiAppConfig;
import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;

public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams> {
public interface SiddhiStatementGenerator<T extends EventProcessorBindingParams> {

SiddhiAppConfig makeStatements(SiddhiProcessorParams<B> siddhiParams, String finalInsertIntoStreamName);
SiddhiAppConfig makeStatements(SiddhiProcessorParams<T> siddhiParams, String finalInsertIntoStreamName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import org.apache.streampipes.wrapper.standalone.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

public abstract class StreamPipesSiddhiProcessor extends StreamPipesDataProcessor implements SiddhiStatementGenerator<ProcessorParams> {
public abstract class StreamPipesSiddhiProcessor extends StreamPipesDataProcessor
implements SiddhiStatementGenerator<ProcessorParams> {

private SiddhiEngine siddhiEngine;
private final SiddhiEngine siddhiEngine;

public StreamPipesSiddhiProcessor() {
this.siddhiEngine = new SiddhiEngine();
Expand All @@ -39,8 +40,10 @@ public StreamPipesSiddhiProcessor(SiddhiDebugCallback debugCallback) {
}

@Override
public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
SiddhiInvocationConfigGenerator<ProcessorParams> siddhiConfigGenerator = new SiddhiInvocationConfigGenerator<>(parameters,
public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
SiddhiInvocationConfigGenerator<ProcessorParams> siddhiConfigGenerator =
new SiddhiInvocationConfigGenerator<>(parameters,
this::makeStatements);
this.siddhiEngine.initializeEngine(siddhiConfigGenerator, spOutputCollector, runtimeContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,27 @@
*/
package org.apache.streampipes.wrapper.siddhi.engine.callback;

import io.siddhi.core.event.Event;
import io.siddhi.core.stream.output.StreamCallback;
import io.siddhi.query.api.definition.Attribute;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.siddhi.output.SiddhiListOutputConfig;
import org.apache.streampipes.wrapper.siddhi.output.SiddhiOutputConfig;
import org.apache.streampipes.wrapper.siddhi.output.SiddhiOutputType;
import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;

import io.siddhi.core.event.Event;
import io.siddhi.core.stream.output.StreamCallback;
import io.siddhi.query.api.definition.Attribute;

import java.util.Arrays;
import java.util.List;

public class SiddhiOutputStreamCallback extends StreamCallback {

private SpOutputCollector collector;
private EventProcessorRuntimeContext runtimeContext;
private SiddhiOutputConfig outputConfig;
private final SpOutputCollector collector;
private final EventProcessorRuntimeContext runtimeContext;
private final SiddhiOutputConfig outputConfig;

private List<Attribute> streamAttributes;
private final List<Attribute> streamAttributes;

public SiddhiOutputStreamCallback(SpOutputCollector collector,
EventProcessorRuntimeContext runtimeContext,
Expand All @@ -50,17 +51,17 @@ public SiddhiOutputStreamCallback(SpOutputCollector collector,

private void sendEvents(List<Event> events) {
collector.collect(SiddhiUtils.toSpEvent(events,
((SiddhiListOutputConfig) outputConfig).getListFieldName(),
runtimeContext.getOutputSchemaInfo(),
runtimeContext.getOutputSourceInfo(),
streamAttributes));
((SiddhiListOutputConfig) outputConfig).getListFieldName(),
runtimeContext.getOutputSchemaInfo(),
runtimeContext.getOutputSourceInfo(),
streamAttributes));
}

private void sendEvent(Event event) {
collector.collect(SiddhiUtils.toSpEvent(event,
runtimeContext.getOutputSchemaInfo(),
runtimeContext.getOutputSourceInfo(),
streamAttributes));
runtimeContext.getOutputSchemaInfo(),
runtimeContext.getOutputSourceInfo(),
streamAttributes));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.streampipes.wrapper.siddhi.engine.callback;

import io.siddhi.core.event.Event;
import io.siddhi.core.stream.output.StreamCallback;

import org.apache.streampipes.wrapper.siddhi.output.SiddhiOutputConfig;
import org.apache.streampipes.wrapper.siddhi.output.SiddhiOutputType;

import io.siddhi.core.event.Event;
import io.siddhi.core.stream.output.StreamCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,8 +32,8 @@ public class SiddhiOutputStreamDebugCallback extends StreamCallback {

private static final Logger LOG = LoggerFactory.getLogger(SiddhiOutputStreamDebugCallback.class);

private SiddhiDebugCallback callback;
private SiddhiOutputConfig outputConfig;
private final SiddhiDebugCallback callback;
private final SiddhiOutputConfig outputConfig;

public SiddhiOutputStreamDebugCallback(SiddhiDebugCallback callback,
SiddhiOutputConfig outputConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,26 @@
import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
import org.apache.streampipes.wrapper.siddhi.model.EventPropertyDef;

import java.util.*;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

public class EventTypeGenerator<B extends EventProcessorBindingParams> {
public class EventTypeGenerator<T extends EventProcessorBindingParams> {

private B params;
private final T params;

public EventTypeGenerator(B params) {
public EventTypeGenerator(T params) {
this.params = params;
}

public List<EventPropertyDef> generateOutEventTypes() {
List<EventPropertyDef> sortedEventKeys = new ArrayList<>();
params.getOutEventType().forEach((key, value) -> {
sortedEventKeys.add(makeEventType(key, value));
sortedEventKeys.sort(Comparator.comparing(EventPropertyDef::getFieldName));
sortedEventKeys.add(makeEventType(key, value));
sortedEventKeys.sort(Comparator.comparing(EventPropertyDef::getFieldName));
});
return sortedEventKeys;
}
Expand Down Expand Up @@ -80,7 +84,7 @@ private String toType(Class<?> o) {
return SiddhiConstants.SIDDHI_DOUBLE_TYPE;
} else if (o.equals(Boolean.class)) {
return SiddhiConstants.SIDDHI_BOOLEAN_TYPE;
} else if (o.equals(String.class)){
} else if (o.equals(String.class)) {
return SiddhiConstants.SIDDHI_STRING_TYPE;
} else {
return SiddhiConstants.SIDDHI_OBJECT_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import java.util.ArrayList;
import java.util.List;

public class InputStreamNameGenerator<B extends EventProcessorBindingParams> {
public class InputStreamNameGenerator<T extends EventProcessorBindingParams> {

private B params;
private final T params;

public InputStreamNameGenerator(B params) {
public InputStreamNameGenerator(T params) {
this.params = params;
}

Expand Down
Loading

0 comments on commit 4993474

Please sign in to comment.