From 7da2c1f0d1eda386098adabc24711abeec78a26c Mon Sep 17 00:00:00 2001 From: Philipp Zehnder Date: Tue, 6 Dec 2022 17:26:06 +0100 Subject: [PATCH] [#820] enable checkstyle for three more modules --- .../pom.xml | 10 ++ .../jvm/ImageProcessingJvmInit.java | 30 +++--- .../processor/commons/ImageTransformer.java | 8 +- .../commons/PlainImageTransformer.java | 4 +- .../processor/commons/RequiredBoxStream.java | 19 ++-- .../GenericImageClassification.java | 22 +++-- .../GenericImageClassificationController.java | 44 +++++---- .../processor/imagecropper/ImageCropper.java | 5 +- .../imagecropper/ImageCropperController.java | 23 ++--- .../imagecropper/ImageCropperParameters.java | 3 +- .../imageenrichment/BoxCoordinates.java | 23 +++-- .../processor/imageenrichment/ColorUtil.java | 40 ++++---- .../imageenrichment/ImageEnricher.java | 19 ++-- .../ImageEnrichmentController.java | 21 +++-- .../ImageEnrichmentParameters.java | 10 +- .../jvm/processor/qrreader/QrCodeReader.java | 9 +- .../qrreader/QrCodeReaderController.java | 37 ++++---- .../qrreader/QrCodeReaderParameters.java | 2 +- .../pom.xml | 25 +++-- .../AbstractPatternDetectionProgram.java | 16 ++-- .../flink/PatternDetectionFlinkInit.java | 46 +++++----- .../detection/flink/config/ConfigKeys.java | 8 +- .../processor/absence/AbsenceController.java | 38 ++++---- .../processor/absence/AbsenceParameters.java | 49 +++++----- .../processor/absence/AbsenceProgram.java | 84 ++++++++--------- .../flink/processor/and/AndController.java | 46 +++++----- .../flink/processor/and/AndParameters.java | 3 +- .../flink/processor/and/AndProgram.java | 65 ++++++------- .../processor/common/TimestampExtractor.java | 3 +- .../peak/PeakDetectionCalculator.java | 15 +-- .../peak/PeakDetectionController.java | 48 +++++----- .../peak/PeakDetectionParameters.java | 6 +- .../processor/peak/PeakDetectionProgram.java | 30 +++--- .../peak/utils/SlidingBatchWindow.java | 2 +- .../flink/processor/sequence/Sequence.java | 9 +- .../sequence/SequenceController.java | 26 +++--- .../processor/sequence/SequenceProgram.java | 12 ++- .../processor/absence/TestAbsence.java | 24 +++-- .../detection/processor/and/TestAnd.java | 32 ++++--- .../pom.xml | 4 + .../flink/AbstractStatisticsProgram.java | 13 +-- .../statistics/flink/StatisticsFlinkInit.java | 38 ++++---- .../statistics/flink/config/ConfigKeys.java | 8 +- .../flink/extensions/MapKeySelector.java | 3 +- .../flink/extensions/SlidingBatchWindow.java | 8 +- .../extensions/SlidingEventTimeWindow.java | 22 ++--- .../extensions/TimestampMappingFunction.java | 4 +- .../summary/StatisticsSummaryCalculator.java | 9 +- .../summary/StatisticsSummaryController.java | 49 ++++++---- .../summary/StatisticsSummaryProgram.java | 3 +- .../StatisticsSummaryCalculatorWindow.java | 11 ++- .../StatisticsSummaryControllerWindow.java | 91 ++++++++++--------- .../StatisticsSummaryParametersWindow.java | 3 +- .../StatisticsSummaryParamsSerializable.java | 2 +- .../StatisticsSummaryProgramWindow.java | 37 ++++---- .../pom.xml | 4 + .../flink/AbstractTextMiningProgram.java | 11 ++- .../textmining/flink/TextMiningFlinkInit.java | 34 +++---- .../textmining/flink/config/ConfigKeys.java | 8 +- .../processor/language/LanguageDetection.java | 11 ++- .../language/LanguageDetectionController.java | 48 +++++----- .../language/LanguageDetectionParameters.java | 3 +- .../language/LanguageDetectionProgram.java | 5 +- .../wordcount/WordCountController.java | 45 +++++---- .../wordcount/WordCountParameters.java | 27 +++--- .../processor/wordcount/WordCountProgram.java | 13 +-- .../processor/wordcount/WordSplitter.java | 3 +- .../wordcount/WordToEventConverter.java | 11 ++- 68 files changed, 786 insertions(+), 658 deletions(-) diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/pom.xml b/streampipes-extensions/streampipes-processors-image-processing-jvm/pom.xml index e16030151d..d841a255df 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/pom.xml +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/pom.xml @@ -52,4 +52,14 @@ ddogleg + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/ImageProcessingJvmInit.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/ImageProcessingJvmInit.java index 22f98973cd..879d2c6fd0 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/ImageProcessingJvmInit.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/ImageProcessingJvmInit.java @@ -41,20 +41,20 @@ public SpServiceDefinition provideServiceDefinition() { "Processors Image Processing JVM", "", 8090) - .registerPipelineElements( - new ImageEnrichmentController(), - new ImageCropperController(), - new QrCodeReaderController(), - new GenericImageClassificationController()) - .registerMessagingFormats( - new JsonDataFormatFactory(), - new CborDataFormatFactory(), - new SmileDataFormatFactory(), - new FstDataFormatFactory()) - .registerMessagingProtocols( - new SpKafkaProtocolFactory(), - new SpJmsProtocolFactory(), - new SpMqttProtocolFactory()) - .build(); + .registerPipelineElements( + new ImageEnrichmentController(), + new ImageCropperController(), + new QrCodeReaderController(), + new GenericImageClassificationController()) + .registerMessagingFormats( + new JsonDataFormatFactory(), + new CborDataFormatFactory(), + new SmileDataFormatFactory(), + new FstDataFormatFactory()) + .registerMessagingProtocols( + new SpKafkaProtocolFactory(), + new SpJmsProtocolFactory(), + new SpMqttProtocolFactory()) + .build(); } } diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/commons/ImageTransformer.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/commons/ImageTransformer.java index 03fca60b97..199ded1048 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/commons/ImageTransformer.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/commons/ImageTransformer.java @@ -22,6 +22,8 @@ import org.apache.streampipes.processors.imageprocessing.jvm.processor.imageenrichment.BoxCoordinates; import org.apache.streampipes.processors.imageprocessing.jvm.processor.imageenrichment.ImageEnrichmentParameters; +import javax.imageio.ImageIO; + import java.awt.image.BufferedImage; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -31,8 +33,6 @@ import java.util.Map; import java.util.Optional; -import javax.imageio.ImageIO; - public class ImageTransformer extends PlainImageTransformer { public ImageTransformer(Event in, ImageEnrichmentParameters params) { @@ -46,8 +46,8 @@ public Optional getImage() { public List> getAllBoxCoordinates() { List> allBoxes = in.getFieldBySelector(params.getBoxArray()) - .getAsList() - .parseAsCustomType(value -> value.getAsComposite().getRawValue()); + .getAsList() + .parseAsCustomType(value -> value.getAsComposite().getRawValue()); List> allBoxesMap = new ArrayList<>(); allBoxes.forEach(box -> { diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/commons/PlainImageTransformer.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/commons/PlainImageTransformer.java index 0549ad77af..7fdc755da0 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/commons/PlainImageTransformer.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/commons/PlainImageTransformer.java @@ -20,6 +20,8 @@ import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams; +import javax.imageio.ImageIO; + import java.awt.image.BufferedImage; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -27,8 +29,6 @@ import java.util.Base64; import java.util.Optional; -import javax.imageio.ImageIO; - public class PlainImageTransformer { protected Event in; diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/commons/RequiredBoxStream.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/commons/RequiredBoxStream.java index 84b216958e..93c0547385 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/commons/RequiredBoxStream.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/commons/RequiredBoxStream.java @@ -22,7 +22,6 @@ import org.apache.streampipes.sdk.helpers.CollectedStreamRequirements; import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; -import org.apache.streampipes.sdk.utils.Datatypes; public class RequiredBoxStream { @@ -30,14 +29,14 @@ public class RequiredBoxStream { public static final String BOX_ARRAY_PROPERTY = "box-array-property"; public static CollectedStreamRequirements getBoxStream() { - return StreamRequirementsBuilder - .create() - .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq("https://image.com"), Labels - .withId(IMAGE_PROPERTY), - PropertyScope.NONE) - .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReqList("https://streampipes.org/boundingboxes"), - Labels.withId(BOX_ARRAY_PROPERTY), - PropertyScope.NONE) - .build(); + return StreamRequirementsBuilder + .create() + .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq("https://image.com"), Labels + .withId(IMAGE_PROPERTY), + PropertyScope.NONE) + .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReqList("https://streampipes.org/boundingboxes"), + Labels.withId(BOX_ARRAY_PROPERTY), + PropertyScope.NONE) + .build(); } } diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/genericclassification/GenericImageClassification.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/genericclassification/GenericImageClassification.java index 7e2d0aa6d2..72eebe85b1 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/genericclassification/GenericImageClassification.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/genericclassification/GenericImageClassification.java @@ -17,6 +17,12 @@ */ package org.apache.streampipes.processors.imageprocessing.jvm.processor.genericclassification; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.PlainImageTransformer; +import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext; +import org.apache.streampipes.wrapper.routing.SpOutputCollector; +import org.apache.streampipes.wrapper.runtime.EventProcessor; + import boofcv.abst.scene.ImageClassifier; import boofcv.factory.scene.ClassifierAndSource; import boofcv.factory.scene.FactoryImageClassifier; @@ -24,11 +30,6 @@ import boofcv.struct.image.GrayF32; import boofcv.struct.image.Planar; import deepboof.io.DeepBoofDataBaseOps; -import org.apache.streampipes.model.runtime.Event; -import org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.PlainImageTransformer; -import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext; -import org.apache.streampipes.wrapper.routing.SpOutputCollector; -import org.apache.streampipes.wrapper.runtime.EventProcessor; import java.awt.image.BufferedImage; import java.io.File; @@ -46,10 +47,11 @@ public class GenericImageClassification implements EventProcessor categories; @Override - public void onInvocation(GenericImageClassificationParameters genericImageClassificationParameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) { + public void onInvocation(GenericImageClassificationParameters genericImageClassificationParameters, + SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) { this.params = genericImageClassificationParameters; //this.cs = FactoryImageClassifier.vgg_cifar10(); // Test set 89.9% for 10 categories - ClassifierAndSource cs = FactoryImageClassifier.nin_imagenet(); // Test set 62.6% for 1000 categories + ClassifierAndSource cs = FactoryImageClassifier.nin_imagenet(); // Test set 62.6% for 1000 categories File path = DeepBoofDataBaseOps.downloadModel(cs.getSource(), new File("download_data")); @@ -65,8 +67,8 @@ public void onInvocation(GenericImageClassificationParameters genericImageClassi @Override public void onEvent(Event in, SpOutputCollector out) { PlainImageTransformer imageTransformer = new - PlainImageTransformer<>(in, - params); + PlainImageTransformer<>(in, + params); Optional imageOpt = imageTransformer.getImage(params.getImagePropertyName()); @@ -86,7 +88,7 @@ public int compare(ImageClassifier.Score o1, ImageClassifier.Score o2) { //Collections.reverse(scores); if (scores.size() > 0) { - System.out.println(scores.get(0).score +":" +categories.get(scores.get(0).category)); + System.out.println(scores.get(0).score + ":" + categories.get(scores.get(0).category)); //scores.forEach(score -> System.out.println(score.category +":" +categories.get(score.category) +":" +score)); in.addField("score", scores.get(0).score); in.addField("category", categories.get(scores.get(0).category)); diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/genericclassification/GenericImageClassificationController.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/genericclassification/GenericImageClassificationController.java index 414f18b545..339e4cd059 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/genericclassification/GenericImageClassificationController.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/genericclassification/GenericImageClassificationController.java @@ -24,37 +24,45 @@ import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; -import org.apache.streampipes.sdk.helpers.*; +import org.apache.streampipes.sdk.helpers.EpProperties; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.sdk.utils.Assets; import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor; import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer; -public class GenericImageClassificationController extends StandaloneEventProcessingDeclarer { +public class GenericImageClassificationController + extends StandaloneEventProcessingDeclarer { private static final String IMAGE = "image-mapping"; @Override public DataProcessorDescription declareModel() { - return ProcessingElementBuilder.create("org.apache.streampipes.processor.imageclassification.jvm.generic-image-classification") - .category(DataProcessorType.IMAGE_PROCESSING) - .withAssets(Assets.DOCUMENTATION) - .withLocales(Locales.EN) - .requiredStream(StreamRequirementsBuilder - .create() - .requiredPropertyWithUnaryMapping(EpRequirements - .domainPropertyReq("https://image.com"), Labels.withId(IMAGE), - PropertyScope.NONE) - .build()) - .outputStrategy(OutputStrategies.append( - EpProperties.doubleEp(Labels.empty(), "score", "https://schema.org/score"), - EpProperties.stringEp(Labels.empty(), "category", "https://schema.org/category") + return ProcessingElementBuilder.create( + "org.apache.streampipes.processor.imageclassification.jvm.generic-image-classification") + .category(DataProcessorType.IMAGE_PROCESSING) + .withAssets(Assets.DOCUMENTATION) + .withLocales(Locales.EN) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredPropertyWithUnaryMapping(EpRequirements + .domainPropertyReq("https://image.com"), Labels.withId(IMAGE), + PropertyScope.NONE) + .build()) + .outputStrategy(OutputStrategies.append( + EpProperties.doubleEp(Labels.empty(), "score", "https://schema.org/score"), + EpProperties.stringEp(Labels.empty(), "category", "https://schema.org/category") - )) - .build(); + )) + .build(); } @Override - public ConfiguredEventProcessor onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) { + public ConfiguredEventProcessor onInvocation( + DataProcessorInvocation graph, + ProcessingElementParameterExtractor extractor) { String imageProperty = extractor.mappingPropertyValue(IMAGE); diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropper.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropper.java index 5e45bd10b9..cbb1d64f43 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropper.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropper.java @@ -35,7 +35,8 @@ public class ImageCropper implements EventProcessor { private ImageCropperParameters params; @Override - public void onInvocation(ImageCropperParameters imageCropperParameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) { + public void onInvocation(ImageCropperParameters imageCropperParameters, SpOutputCollector spOutputCollector, + EventProcessorRuntimeContext runtimeContext) { this.params = imageCropperParameters; } @@ -52,7 +53,7 @@ public void onEvent(Event in, SpOutputCollector out) { BoxCoordinates boxCoordinates = imageTransformer.getBoxCoordinates(image, box); BufferedImage dest = image.getSubimage(boxCoordinates.getX(), boxCoordinates.getY(), boxCoordinates.getWidth(), - boxCoordinates.getHeight()); + boxCoordinates.getHeight()); Optional finalImage = imageTransformer.makeImage(dest); diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropperController.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropperController.java index 51109aef5e..97868ff413 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropperController.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropperController.java @@ -40,25 +40,26 @@ public class ImageCropperController extends StandaloneEventProcessingDeclarer onInvocation(DataProcessorInvocation dataProcessorInvocation, ProcessingElementParameterExtractor extractor) { + public ConfiguredEventProcessor onInvocation(DataProcessorInvocation dataProcessorInvocation, + ProcessingElementParameterExtractor extractor) { String imageProperty = extractor.mappingPropertyValue(IMAGE_PROPERTY); String boxArray = extractor.mappingPropertyValue(RequiredBoxStream.BOX_ARRAY_PROPERTY); ImageCropperParameters params = new ImageCropperParameters(dataProcessorInvocation, imageProperty, - boxArray, "box_width", "box_height", "box_x", "box_y"); + boxArray, "box_width", "box_height", "box_x", "box_y"); return new ConfiguredEventProcessor<>(params, ImageCropper::new); } diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropperParameters.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropperParameters.java index 9148f8ef56..183a57e8da 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropperParameters.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropperParameters.java @@ -22,7 +22,8 @@ public class ImageCropperParameters extends ImageEnrichmentParameters { - public ImageCropperParameters(DataProcessorInvocation graph, String imageProperty, String boxArray, String boxWidth, String boxHeight, String boxX, String boxY) { + public ImageCropperParameters(DataProcessorInvocation graph, String imageProperty, String boxArray, String boxWidth, + String boxHeight, String boxX, String boxY) { super(graph, imageProperty, boxArray, boxWidth, boxHeight, boxX, boxY); } } diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/BoxCoordinates.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/BoxCoordinates.java index 9a754538c9..bd645f46ed 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/BoxCoordinates.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/BoxCoordinates.java @@ -26,21 +26,24 @@ public class BoxCoordinates { private float score; private String classesindex; - public static BoxCoordinates make(Float width, Float height, Float x, Float - y) { + public static BoxCoordinates make( + Float width, + Float height, + Float x, + Float y) { return new BoxCoordinates(Math.round(width), - Math.round(height), - Math.round(x), - Math.round(y)); + Math.round(height), + Math.round(x), + Math.round(y)); } public static BoxCoordinates make(Float width, Float height, Float x, Float y, float score, String classesindex) { return new BoxCoordinates(Math.round(width), - Math.round(height), - Math.round(x), - Math.round(y), - score, - classesindex); + Math.round(height), + Math.round(x), + Math.round(y), + score, + classesindex); } public BoxCoordinates(int width, int height, int x, int y) { diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ColorUtil.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ColorUtil.java index f098f9d6a2..9383b58714 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ColorUtil.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ColorUtil.java @@ -17,30 +17,30 @@ package org.apache.streampipes.processors.imageprocessing.jvm.processor.imageenrichment; -import java.awt.*; +import java.awt.Color; -public enum ColorUtil { +public enum ColorUtil { - DARK_GREY(Color.DARK_GRAY), - BLACK(Color.BLACK), - RED(Color.RED), - PINK(Color.PINK), - ORANGE(Color.ORANGE), - YELLO(Color.YELLOW), - GREEN(Color.GREEN), - MAGENTA(Color.MAGENTA), - CYAN(Color.CYAN), - BLUE(Color.BLUE); + DARK_GREY(Color.DARK_GRAY), + BLACK(Color.BLACK), + RED(Color.RED), + PINK(Color.PINK), + ORANGE(Color.ORANGE), + YELLO(Color.YELLOW), + GREEN(Color.GREEN), + MAGENTA(Color.MAGENTA), + CYAN(Color.CYAN), + BLUE(Color.BLUE); - Color color; + Color color; - ColorUtil(Color color) { - this.color = color; - } + ColorUtil(Color color) { + this.color = color; + } - static public Color getColor(int num) { - int index = num % ColorUtil.values().length; - return ColorUtil.values()[index].color; - } + static public Color getColor(int num) { + int index = num % ColorUtil.values().length; + return ColorUtil.values()[index].color; + } } diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnricher.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnricher.java index 18fb1a1c6c..6e63180025 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnricher.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnricher.java @@ -22,7 +22,11 @@ import org.apache.streampipes.wrapper.routing.SpOutputCollector; import org.apache.streampipes.wrapper.runtime.EventProcessor; -import java.awt.*; +import java.awt.BasicStroke; +import java.awt.Color; +import java.awt.FontMetrics; +import java.awt.Graphics2D; +import java.awt.Rectangle; import java.awt.geom.Rectangle2D; import java.awt.image.BufferedImage; import java.util.Base64; @@ -35,7 +39,8 @@ public class ImageEnricher implements EventProcessor private ImageEnrichmentParameters params; @Override - public void onInvocation(ImageEnrichmentParameters params, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) { + public void onInvocation(ImageEnrichmentParameters params, SpOutputCollector spOutputCollector, + EventProcessorRuntimeContext runtimeContext) { this.params = params; } @@ -44,7 +49,7 @@ public void onEvent(org.apache.streampipes.model.runtime.Event in, SpOutputColle ImageTransformer imageTransformer = new ImageTransformer(in, params); Optional imageOpt = - imageTransformer.getImage(); + imageTransformer.getImage(); if (imageOpt.isPresent()) { @@ -64,7 +69,7 @@ public void onEvent(org.apache.streampipes.model.runtime.Event in, SpOutputColle //Box graph.setStroke(new BasicStroke(5)); graph.draw(new Rectangle(boxCoordinates.getX(), boxCoordinates.getY(), boxCoordinates.getWidth(), - boxCoordinates.getHeight())); + boxCoordinates.getHeight())); //Label String str = boxCoordinates.getClassesindex() + ": " + boxCoordinates.getScore(); @@ -73,9 +78,9 @@ public void onEvent(org.apache.streampipes.model.runtime.Event in, SpOutputColle Rectangle2D rect = fm.getStringBounds(str, graph); graph.fillRect(boxCoordinates.getX(), - boxCoordinates.getY() - fm.getAscent(), - (int) rect.getWidth(), - (int) rect.getHeight()); + boxCoordinates.getY() - fm.getAscent(), + (int) rect.getWidth(), + (int) rect.getHeight()); graph.setColor(Color.white); graph.drawString(str, boxCoordinates.getX(), boxCoordinates.getY()); diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnrichmentController.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnrichmentController.java index 888b5733e0..95cdf17a17 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnrichmentController.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnrichmentController.java @@ -38,23 +38,24 @@ public class ImageEnrichmentController extends StandaloneEventProcessingDeclarer @Override public DataProcessorDescription declareModel() { return ProcessingElementBuilder.create("org.apache.streampipes.processor.imageclassification.jvm.image-enricher") - .withAssets(Assets.DOCUMENTATION, Assets.ICON) - .withLocales(Locales.EN) - .category(DataProcessorType.IMAGE_PROCESSING) - .requiredStream(RequiredBoxStream.getBoxStream()) - .outputStrategy(OutputStrategies.fixed( - EpProperties.stringEp(Labels.empty(), "image", "https://image.com") - )) - .build(); + .withAssets(Assets.DOCUMENTATION, Assets.ICON) + .withLocales(Locales.EN) + .category(DataProcessorType.IMAGE_PROCESSING) + .requiredStream(RequiredBoxStream.getBoxStream()) + .outputStrategy(OutputStrategies.fixed( + EpProperties.stringEp(Labels.empty(), "image", "https://image.com") + )) + .build(); } @Override - public ConfiguredEventProcessor onInvocation(DataProcessorInvocation dataProcessorInvocation, ProcessingElementParameterExtractor extractor) { + public ConfiguredEventProcessor onInvocation( + DataProcessorInvocation dataProcessorInvocation, ProcessingElementParameterExtractor extractor) { String imageProperty = extractor.mappingPropertyValue(IMAGE_PROPERTY); String boxArray = extractor.mappingPropertyValue(RequiredBoxStream.BOX_ARRAY_PROPERTY); ImageEnrichmentParameters params = new ImageEnrichmentParameters(dataProcessorInvocation, imageProperty, - boxArray, "box_width", "box_height", "box_x", "box_y", "score", "classesindex"); + boxArray, "box_width", "box_height", "box_x", "box_y", "score", "classesindex"); return new ConfiguredEventProcessor<>(params, ImageEnricher::new); diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnrichmentParameters.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnrichmentParameters.java index 32b3642bb2..af8089b630 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnrichmentParameters.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnrichmentParameters.java @@ -32,7 +32,9 @@ public class ImageEnrichmentParameters extends EventProcessorBindingParams { private String score; private String classesindex; - public ImageEnrichmentParameters(DataProcessorInvocation graph, String imageProperty, String boxArray, String boxWidth, String boxHeight, String boxX, String boxY, String score, String classesindex) { + public ImageEnrichmentParameters(DataProcessorInvocation graph, String imageProperty, String boxArray, + String boxWidth, String boxHeight, String boxX, String boxY, String score, + String classesindex) { super(graph); this.imageProperty = imageProperty; this.boxArray = boxArray; @@ -44,7 +46,8 @@ public ImageEnrichmentParameters(DataProcessorInvocation graph, String imageProp this.classesindex = classesindex; } - public ImageEnrichmentParameters(DataProcessorInvocation graph, String imageProperty, String boxArray, String boxWidth, String boxHeight, String boxX, String boxY) { + public ImageEnrichmentParameters(DataProcessorInvocation graph, String imageProperty, String boxArray, + String boxWidth, String boxHeight, String boxX, String boxY) { super(graph); this.imageProperty = imageProperty; this.boxArray = boxArray; @@ -54,7 +57,8 @@ public ImageEnrichmentParameters(DataProcessorInvocation graph, String imageProp this.boxY = boxY; } - public ImageEnrichmentParameters(DataProcessorInvocation graph, String imageProperty, String boxWidth, String boxHeight, String boxX, String boxY) { + public ImageEnrichmentParameters(DataProcessorInvocation graph, String imageProperty, String boxWidth, + String boxHeight, String boxX, String boxY) { super(graph); this.imageProperty = imageProperty; this.boxWidth = boxWidth; diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReader.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReader.java index c275809ba0..70b79a7d14 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReader.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReader.java @@ -22,13 +22,13 @@ import boofcv.factory.fiducial.FactoryFiducial; import boofcv.io.image.ConvertBufferedImage; import boofcv.struct.image.GrayU8; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.PlainImageTransformer; import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext; import org.apache.streampipes.wrapper.routing.SpOutputCollector; import org.apache.streampipes.wrapper.runtime.EventProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.awt.image.BufferedImage; import java.util.List; @@ -42,7 +42,8 @@ public class QrCodeReader implements EventProcessor { private static final Logger LOG = LoggerFactory.getLogger(QrCodeReader.class); @Override - public void onInvocation(QrCodeReaderParameters qrCodeReaderParameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) { + public void onInvocation(QrCodeReaderParameters qrCodeReaderParameters, SpOutputCollector spOutputCollector, + EventProcessorRuntimeContext runtimeContext) { this.params = qrCodeReaderParameters; this.sendIfNoResult = qrCodeReaderParameters.getSendIfNoResult(); this.placeholderValue = qrCodeReaderParameters.getPlaceholderValue(); @@ -51,7 +52,7 @@ public void onInvocation(QrCodeReaderParameters qrCodeReaderParameters, SpOutput @Override public void onEvent(Event in, SpOutputCollector out) { PlainImageTransformer imageTransformer = new PlainImageTransformer<> - (in, params); + (in, params); Optional imageOpt = imageTransformer.getImage(params.getImagePropertyName()); if (imageOpt.isPresent()) { diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReaderController.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReaderController.java index b8cc6c330c..2226e3bbdb 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReaderController.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReaderController.java @@ -17,8 +17,6 @@ */ package org.apache.streampipes.processors.imageprocessing.jvm.processor.qrreader; -import static org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.RequiredBoxStream.IMAGE_PROPERTY; - import org.apache.streampipes.model.DataProcessorType; import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.graph.DataProcessorInvocation; @@ -36,6 +34,8 @@ import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor; import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer; +import static org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.RequiredBoxStream.IMAGE_PROPERTY; + public class QrCodeReaderController extends StandaloneEventProcessingDeclarer { private static final String PLACEHOLDER_VALUE = "placeholder-value"; @@ -45,30 +45,31 @@ public class QrCodeReaderController extends StandaloneEventProcessingDeclarer onInvocation(DataProcessorInvocation dataProcessorInvocation, ProcessingElementParameterExtractor extractor) { + public ConfiguredEventProcessor onInvocation(DataProcessorInvocation dataProcessorInvocation, + ProcessingElementParameterExtractor extractor) { String imagePropertyName = extractor.mappingPropertyValue(IMAGE_PROPERTY); String placeholderValue = extractor.singleValueParameter(PLACEHOLDER_VALUE, String.class); Boolean sendIfNoResult = extractor.selectedSingleValue(SEND_IF_NO_RESULT, String.class) - .equals("Yes"); + .equals("Yes"); QrCodeReaderParameters params = new QrCodeReaderParameters(dataProcessorInvocation, - imagePropertyName, placeholderValue, sendIfNoResult); + imagePropertyName, placeholderValue, sendIfNoResult); return new ConfiguredEventProcessor<>(params, QrCodeReader::new); } diff --git a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReaderParameters.java b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReaderParameters.java index de9fd94546..675889ea3c 100644 --- a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReaderParameters.java +++ b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReaderParameters.java @@ -27,7 +27,7 @@ public class QrCodeReaderParameters extends EventProcessorBindingParams { private Boolean sendIfNoResult; public QrCodeReaderParameters(DataProcessorInvocation graph, String imagePropertyName, String - placeholderValue, Boolean sendIfNoResult) { + placeholderValue, Boolean sendIfNoResult) { super(graph); this.imagePropertyName = imagePropertyName; this.placeholderValue = placeholderValue; diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/pom.xml b/streampipes-extensions/streampipes-processors-pattern-detection-flink/pom.xml index f917cf3f87..4863278660 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/pom.xml +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/pom.xml @@ -17,7 +17,8 @@ ~ --> - + streampipes-extensions org.apache.streampipes @@ -130,20 +131,26 @@ - + META-INF/spring.handlers - + META-INF/spring.factories - + META-INF/spring.schemas - - + + reference.conf - + org.apache.streampipes.processors.pattern.detection.flink.PatternDetectionFlinkInit @@ -154,6 +161,10 @@ + + org.apache.maven.plugins + maven-checkstyle-plugin + streampipes-processors-pattern-detection-flink diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/AbstractPatternDetectionProgram.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/AbstractPatternDetectionProgram.java index e02200c2ec..f10f350e36 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/AbstractPatternDetectionProgram.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/AbstractPatternDetectionProgram.java @@ -17,8 +17,6 @@ */ package org.apache.streampipes.processors.pattern.detection.flink; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.streampipes.client.StreamPipesClient; import org.apache.streampipes.container.config.ConfigExtractor; import org.apache.streampipes.processors.pattern.detection.flink.config.ConfigKeys; @@ -27,7 +25,11 @@ import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig; import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams; -public abstract class AbstractPatternDetectionProgram extends FlinkDataProcessorRuntime { +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +public abstract class AbstractPatternDetectionProgram + extends FlinkDataProcessorRuntime { public AbstractPatternDetectionProgram(B params, ConfigExtractor configExtractor, @@ -39,10 +41,10 @@ public AbstractPatternDetectionProgram(B params, protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) { SpConfig config = configExtractor.getConfig(); return new FlinkDeploymentConfig(config.getString( - ConfigKeys.FLINK_JAR_FILE_LOC), - config.getString(ConfigKeys.FLINK_HOST), - config.getInteger(ConfigKeys.FLINK_PORT), - config.getBoolean(ConfigKeys.DEBUG) + ConfigKeys.FLINK_JAR_FILE_LOC), + config.getString(ConfigKeys.FLINK_HOST), + config.getInteger(ConfigKeys.FLINK_PORT), + config.getBoolean(ConfigKeys.DEBUG) ); } diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java index c78b186fbc..c4abe0e22c 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/PatternDetectionFlinkInit.java @@ -36,7 +36,7 @@ public class PatternDetectionFlinkInit extends StandaloneModelSubmitter { - public static final String ServiceGroup = "org.apache.streampipes.processors.patterndetection.flink"; + public static final String SERVICE_GROUP = "org.apache.streampipes.processors.patterndetection.flink"; public static void main(String[] args) { new PatternDetectionFlinkInit().init(); @@ -44,28 +44,28 @@ public static void main(String[] args) { @Override public SpServiceDefinition provideServiceDefinition() { - return SpServiceDefinitionBuilder.create(ServiceGroup, - "Processors Pattern Detection Flink", - "", - 8090) - .registerPipelineElements(new PeakDetectionController(), - new SequenceController(), - new AbsenceController(), - new AndController()) - .registerMessagingFormats( - new JsonDataFormatFactory(), - new CborDataFormatFactory(), - new SmileDataFormatFactory(), - new FstDataFormatFactory()) - .registerMessagingProtocols( - new SpKafkaProtocolFactory(), - new SpJmsProtocolFactory(), - new SpMqttProtocolFactory()) - .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager") - .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager") - .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program") - .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location") - .build(); + return SpServiceDefinitionBuilder.create(SERVICE_GROUP, + "Processors Pattern Detection Flink", + "", + 8090) + .registerPipelineElements(new PeakDetectionController(), + new SequenceController(), + new AbsenceController(), + new AndController()) + .registerMessagingFormats( + new JsonDataFormatFactory(), + new CborDataFormatFactory(), + new SmileDataFormatFactory(), + new FstDataFormatFactory()) + .registerMessagingProtocols( + new SpKafkaProtocolFactory(), + new SpJmsProtocolFactory(), + new SpMqttProtocolFactory()) + .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager") + .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager") + .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program") + .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location") + .build(); } } diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/config/ConfigKeys.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/config/ConfigKeys.java index dc6f471d61..031be0b102 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/config/ConfigKeys.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/config/ConfigKeys.java @@ -19,8 +19,8 @@ package org.apache.streampipes.processors.pattern.detection.flink.config; public class ConfigKeys { - public final static String FLINK_HOST = "SP_FLINK_HOST"; - public final static String FLINK_PORT = "SP_FLINK_PORT"; - public final static String DEBUG = "SP_FLINK_DEBUG"; - public final static String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC"; + public static final String FLINK_HOST = "SP_FLINK_HOST"; + public static final String FLINK_PORT = "SP_FLINK_PORT"; + public static final String DEBUG = "SP_FLINK_DEBUG"; + public static final String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC"; } diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceController.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceController.java index 2119071fc5..68753ac769 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceController.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceController.java @@ -28,7 +28,11 @@ import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; -import org.apache.streampipes.sdk.helpers.*; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.helpers.Options; +import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.sdk.utils.Assets; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime; @@ -44,22 +48,22 @@ public class AbsenceController extends FlinkDataProcessorDeclarer selectProperties = new ArrayList<>(); - private Integer timeWindowSize; - private TimeUnit timeUnit; - - public AbsenceParameters(DataProcessorInvocation graph, List selectProperties, Integer timeWindowSize, TimeUnit timeUnit) { - super(graph); - this.selectProperties = selectProperties; - this.timeWindowSize = timeWindowSize; - this.timeUnit = timeUnit; - } - - public List getSelectProperties() { - return selectProperties; - } - - public Integer getTimeWindowSize() { - return timeWindowSize; - } - - public TimeUnit getTimeUnit() { - return timeUnit; - } + private static final long serialVersionUID = 4319341875274736697L; + + private List selectProperties = new ArrayList<>(); + private Integer timeWindowSize; + private TimeUnit timeUnit; + + public AbsenceParameters(DataProcessorInvocation graph, List selectProperties, Integer timeWindowSize, + TimeUnit timeUnit) { + super(graph); + this.selectProperties = selectProperties; + this.timeWindowSize = timeWindowSize; + this.timeUnit = timeUnit; + } + + public List getSelectProperties() { + return selectProperties; + } + + public Integer getTimeWindowSize() { + return timeWindowSize; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } } diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceProgram.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceProgram.java index e3ece2d418..440080f070 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceProgram.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/absence/AbsenceProgram.java @@ -17,6 +17,12 @@ */ package org.apache.streampipes.processors.pattern.detection.flink.processor.absence; +import org.apache.streampipes.client.StreamPipesClient; +import org.apache.streampipes.container.config.ConfigExtractor; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.processors.pattern.detection.flink.AbstractPatternDetectionProgram; +import org.apache.streampipes.processors.pattern.detection.flink.processor.and.TimeUnitConverter; + import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.CEP; @@ -30,11 +36,6 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; -import org.apache.streampipes.client.StreamPipesClient; -import org.apache.streampipes.container.config.ConfigExtractor; -import org.apache.streampipes.model.runtime.Event; -import org.apache.streampipes.processors.pattern.detection.flink.AbstractPatternDetectionProgram; -import org.apache.streampipes.processors.pattern.detection.flink.processor.and.TimeUnitConverter; import java.util.List; import java.util.Map; @@ -52,66 +53,67 @@ public DataStream getApplicationLogic(DataStream... messageStream) Time time = TimeUnitConverter.toTime(params.getTimeUnit(), params.getTimeWindowSize()); - DataStream> stream1 = messageStream[0].flatMap(new FlatMapFunction>() { - @Override - public void flatMap(Event in, Collector> out) throws + DataStream> stream1 = + messageStream[0].flatMap(new FlatMapFunction>() { + @Override + public void flatMap(Event in, Collector> out) throws Exception { - out.collect(new Tuple2<>(true, in)); - } - }); - - DataStream> stream2 = messageStream[1].flatMap(new FlatMapFunction>() { - @Override - public void flatMap(Event in, Collector> out) throws + out.collect(new Tuple2<>(true, in)); + } + }); + + DataStream> stream2 = + messageStream[1].flatMap(new FlatMapFunction>() { + @Override + public void flatMap(Event in, Collector> out) throws Exception { - out.collect(new Tuple2<>(false, in)); - } - }); + out.collect(new Tuple2<>(false, in)); + } + }); DataStream> joinedStreams = stream2.union(stream1); Pattern, Tuple2> matchedEvents = - Pattern.>begin("start") - .where(new SimpleCondition>() { - @Override - public boolean filter(Tuple2 ride) throws Exception { - return ride.f0; - } - }) - .next("end") - .where(new SimpleCondition>() { - @Override - public boolean filter(Tuple2 ride) throws Exception { - return !ride.f0; - } - }); + Pattern.>begin("start") + .where(new SimpleCondition>() { + @Override + public boolean filter(Tuple2 ride) throws Exception { + return ride.f0; + } + }) + .next("end") + .where(new SimpleCondition>() { + @Override + public boolean filter(Tuple2 ride) throws Exception { + return !ride.f0; + } + }); PatternStream> patternStream = CEP.pattern(joinedStreams, matchedEvents - .within(time)); + .within(time)); - OutputTag> timedout = new OutputTag> - ("timedout") { + OutputTag> timedout = new OutputTag>("timedout") { }; SingleOutputStreamOperator> matched = patternStream.flatSelect( - timedout, - new TimedOut(), - new FlatSelectNothing<>() + timedout, + new TimedOut(), + new FlatSelectNothing<>() ); return matched.getSideOutput(timedout).flatMap(new FlatMapFunction, - Event>() { + Event>() { @Override public void flatMap(Tuple2 in, Collector out) throws - Exception { + Exception { out.collect(in.f1); } }); } public static class TimedOut implements PatternFlatTimeoutFunction, - Tuple2> { + Tuple2> { @Override public void timeout(Map>> map, long l, Collector> collector) throws Exception { diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndController.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndController.java index 28c2d681c7..c3d05a4445 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndController.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndController.java @@ -27,7 +27,11 @@ import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; -import org.apache.streampipes.sdk.helpers.*; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.helpers.Options; +import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.sdk.utils.Assets; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime; @@ -44,26 +48,26 @@ public class AndController extends FlinkDataProcessorDeclarer { @Override public DataProcessorDescription declareModel() { return ProcessingElementBuilder.create("org.apache.streampipes.processors.pattern-detection.flink.and") - .category(DataProcessorType.PATTERN_DETECT) - .withLocales(Locales.EN) - .withAssets(Assets.DOCUMENTATION, Assets.ICON) - .requiredStream(StreamRequirementsBuilder - .create() - .requiredPropertyWithNaryMapping(EpRequirements.anyProperty(), - Labels.withId(LEFT_MAPPING) - , PropertyScope.DIMENSION_PROPERTY) - .build()) - .requiredStream(StreamRequirementsBuilder - .create() - .requiredPropertyWithNaryMapping(EpRequirements.anyProperty(), - Labels.withId(RIGHT_MAPPING) - , PropertyScope.DIMENSION_PROPERTY) - .build()) - .requiredSingleValueSelection(Labels.withId(TIME_UNIT), Options.from("Seconds", - "Minutes", "Hours")) - .requiredIntegerParameter(Labels.withId(TIME_WINDOW)) - .outputStrategy(OutputStrategies.custom(true)) - .build(); + .category(DataProcessorType.PATTERN_DETECT) + .withLocales(Locales.EN) + .withAssets(Assets.DOCUMENTATION, Assets.ICON) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredPropertyWithNaryMapping(EpRequirements.anyProperty(), + Labels.withId(LEFT_MAPPING) + , PropertyScope.DIMENSION_PROPERTY) + .build()) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredPropertyWithNaryMapping(EpRequirements.anyProperty(), + Labels.withId(RIGHT_MAPPING) + , PropertyScope.DIMENSION_PROPERTY) + .build()) + .requiredSingleValueSelection(Labels.withId(TIME_UNIT), Options.from("Seconds", + "Minutes", "Hours")) + .requiredIntegerParameter(Labels.withId(TIME_WINDOW)) + .outputStrategy(OutputStrategies.custom(true)) + .build(); } diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndParameters.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndParameters.java index c017949301..bd8f9896ee 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndParameters.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndParameters.java @@ -32,7 +32,8 @@ public class AndParameters extends EventProcessorBindingParams { private List rightMappings; - public AndParameters(DataProcessorInvocation invocationGraph, TimeUnit timeUnit, Integer timeWindow, List leftMappings, List rightMappings) { + public AndParameters(DataProcessorInvocation invocationGraph, TimeUnit timeUnit, Integer timeWindow, + List leftMappings, List rightMappings) { super(invocationGraph); this.timeUnit = timeUnit; this.timeWindow = timeWindow; diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndProgram.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndProgram.java index 547bb780af..18ff52a5bb 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndProgram.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/and/AndProgram.java @@ -17,15 +17,16 @@ */ package org.apache.streampipes.processors.pattern.detection.flink.processor.and; +import org.apache.streampipes.client.StreamPipesClient; +import org.apache.streampipes.container.config.ConfigExtractor; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.processors.pattern.detection.flink.AbstractPatternDetectionProgram; + import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.streampipes.client.StreamPipesClient; -import org.apache.streampipes.container.config.ConfigExtractor; -import org.apache.streampipes.model.runtime.Event; -import org.apache.streampipes.processors.pattern.detection.flink.AbstractPatternDetectionProgram; import java.util.List; @@ -46,33 +47,33 @@ public DataStream getApplicationLogic(DataStream... messageStream) Time time = TimeUnitConverter.toTime(params.getTimeUnit(), params.getTimeWindow()); return messageStream[0].join(messageStream[1]) - .where(new KeySelector() { - @Override - public String getKey(Event stringObjectMap) throws Exception { - StringBuilder builder = new StringBuilder(); - for (String key : leftMappings) { - builder.append(key); - } - return builder.toString(); - } - }).equalTo(new KeySelector() { - @Override - public String getKey(Event stringObjectMap) throws Exception { - StringBuilder builder = new StringBuilder(); - for (String key : rightMappings) { - builder.append(key); - } - return builder.toString(); - } - }).window(TumblingEventTimeWindows.of(time)) - .apply(new JoinFunction() { - @Override - public Event join(Event e1, Event e2) throws Exception { - Event map = new Event(); - e1.getFields().forEach((key, value) -> map.addField(value)); - e2.getFields().forEach((key, value) -> map.addField(value)); - return map; - } - }); + .where(new KeySelector() { + @Override + public String getKey(Event stringObjectMap) throws Exception { + StringBuilder builder = new StringBuilder(); + for (String key : leftMappings) { + builder.append(key); + } + return builder.toString(); + } + }).equalTo(new KeySelector() { + @Override + public String getKey(Event stringObjectMap) throws Exception { + StringBuilder builder = new StringBuilder(); + for (String key : rightMappings) { + builder.append(key); + } + return builder.toString(); + } + }).window(TumblingEventTimeWindows.of(time)) + .apply(new JoinFunction() { + @Override + public Event join(Event e1, Event e2) throws Exception { + Event map = new Event(); + e1.getFields().forEach((key, value) -> map.addField(value)); + e2.getFields().forEach((key, value) -> map.addField(value)); + return map; + } + }); } } diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/common/TimestampExtractor.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/common/TimestampExtractor.java index d584f1dd15..73375499a9 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/common/TimestampExtractor.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/common/TimestampExtractor.java @@ -17,9 +17,10 @@ */ package org.apache.streampipes.processors.pattern.detection.flink.processor.common; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.streampipes.model.runtime.Event; +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; + public class TimestampExtractor extends AscendingTimestampExtractor { private String timestampField; diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionCalculator.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionCalculator.java index cbe9633923..11b3006125 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionCalculator.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionCalculator.java @@ -18,10 +18,11 @@ package org.apache.streampipes.processors.pattern.detection.flink.processor.peak; +import org.apache.streampipes.model.runtime.Event; + import org.apache.commons.math3.stat.descriptive.SummaryStatistics; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; -import org.apache.streampipes.model.runtime.Event; import java.util.Arrays; import java.util.List; @@ -39,7 +40,7 @@ public class PeakDetectionCalculator implements FlatMapFunction, Eve private Double influence; public PeakDetectionCalculator(String groupBy, String valueToObserve, Integer lag, Double - threshold, Double influence) { + threshold, Double influence) { this.groupBy = groupBy; this.valueToObserve = valueToObserve; this.lag = lag; @@ -50,16 +51,16 @@ public PeakDetectionCalculator(String groupBy, String valueToObserve, Integer la @Override public void flatMap(List in, Collector out) - throws Exception { + throws Exception { List y = in - .stream() - .map(m -> m.getFieldBySelector(valueToObserve).getAsPrimitive().getAsDouble()) - .collect(Collectors.toList()); + .stream() + .map(m -> m.getFieldBySelector(valueToObserve).getAsPrimitive().getAsDouble()) + .collect(Collectors.toList()); Integer[] signals = makeIntegerArray(y.size()); Double[] filteredY = makeDoubleArray(y.size()); - if (in.size() >= (lag+1)) { + if (in.size() >= (lag + 1)) { for (int i = 0; i < lag; i++) { filteredY[i] = y.get(i); } diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionController.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionController.java index 647e893896..f5a816131c 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionController.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionController.java @@ -27,7 +27,11 @@ import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; -import org.apache.streampipes.sdk.helpers.*; +import org.apache.streampipes.sdk.helpers.EpProperties; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.sdk.utils.Assets; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime; @@ -45,26 +49,26 @@ public class PeakDetectionController extends FlinkDataProcessorDeclarer getRuntime(DataProcess PeakDetectionParameters params = new PeakDetectionParameters(sepa, - valueToObserve, timestampMapping, groupBy, countWindowSize, lag, threshold, influence); + valueToObserve, timestampMapping, groupBy, countWindowSize, lag, threshold, influence); return new PeakDetectionProgram(params, configExtractor, streamPipesClient); } diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionParameters.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionParameters.java index 4fdd1f3e8e..472d31e213 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionParameters.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionParameters.java @@ -41,9 +41,9 @@ public PeakDetectionParameters(DataProcessorInvocation graph) { } public PeakDetectionParameters(DataProcessorInvocation graph, String valueToObserve, String - timestampMapping, String groupBy, Integer countWindowSize, Integer lag, Double - threshold, Double - influence) { + timestampMapping, String groupBy, Integer countWindowSize, Integer lag, Double + threshold, Double + influence) { super(graph); this.valueToObserve = valueToObserve; this.timestampMapping = timestampMapping; diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionProgram.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionProgram.java index db83eaec65..eb5d138b5d 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionProgram.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/PeakDetectionProgram.java @@ -18,16 +18,18 @@ package org.apache.streampipes.processors.pattern.detection.flink.processor.peak; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.streampipes.client.StreamPipesClient; import org.apache.streampipes.container.config.ConfigExtractor; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.processors.pattern.detection.flink.AbstractPatternDetectionProgram; import org.apache.streampipes.processors.pattern.detection.flink.processor.peak.utils.SlidingBatchWindow; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.datastream.DataStream; + + import java.util.List; /** @@ -52,16 +54,16 @@ protected DataStream getApplicationLogic(DataStream[] messageStrea Integer countWindowSize = params.getCountWindowSize(); return messageStream[0] - .keyBy(getKeySelector()) - .transform - ("sliding-batch-window-shift", - TypeInformation.of(new TypeHint>() { - }), new SlidingBatchWindow<>(countWindowSize)) - .flatMap(new PeakDetectionCalculator(groupBy, - valueToObserve, - lag, - threshold, - influence)); + .keyBy(getKeySelector()) + .transform + ("sliding-batch-window-shift", + TypeInformation.of(new TypeHint>() { + }), new SlidingBatchWindow<>(countWindowSize)) + .flatMap(new PeakDetectionCalculator(groupBy, + valueToObserve, + lag, + threshold, + influence)); } private KeySelector getKeySelector() { diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/utils/SlidingBatchWindow.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/utils/SlidingBatchWindow.java index 14643b275a..0aa096749c 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/utils/SlidingBatchWindow.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/peak/utils/SlidingBatchWindow.java @@ -29,7 +29,7 @@ * Created by riemer on 21.04.2017. */ public class SlidingBatchWindow extends AbstractStreamOperator> implements - OneInputStreamOperator> { + OneInputStreamOperator> { private Integer windowSize; private List currentEvents; diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/Sequence.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/Sequence.java index 6e3d9de4ee..e5e3c10cb9 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/Sequence.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/Sequence.java @@ -18,10 +18,11 @@ package org.apache.streampipes.processors.pattern.detection.flink.processor.sequence; +import org.apache.streampipes.model.runtime.Event; + import org.apache.flink.api.common.state.ValueState; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.util.Collector; -import org.apache.streampipes.model.runtime.Event; public class Sequence extends CoProcessFunction { @@ -37,9 +38,9 @@ public Sequence(String timeUnit, Integer timeWindow) { //@Override //public void open(Configuration parameters) throws Exception { - // TODO: add RuntimeContext - //state = getRuntimeContext().getState(new ValueStateDescriptor<>("sequence-event-storage", - // EventStorage.class)); + // TODO: add RuntimeContext + //state = getRuntimeContext().getState(new ValueStateDescriptor<>("sequence-event-storage", + // EventStorage.class)); //} diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/SequenceController.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/SequenceController.java index 911d830ccd..83bed5fdca 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/SequenceController.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/main/java/org/apache/streampipes/processors/pattern/detection/flink/processor/sequence/SequenceController.java @@ -26,7 +26,11 @@ import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; -import org.apache.streampipes.sdk.helpers.*; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.helpers.Options; +import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.sdk.utils.Assets; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime; @@ -39,16 +43,16 @@ public class SequenceController extends FlinkDataProcessorDeclarer { public SequenceProgram(SequenceParameters params, @@ -35,11 +36,12 @@ public SequenceProgram(SequenceParameters params, @Override protected DataStream getApplicationLogic(DataStream... dataStreams) { - return dataStreams[0].keyBy(getKeySelector()).connect(dataStreams[1].keyBy(getKeySelector())).process(new Sequence(params + return dataStreams[0].keyBy(getKeySelector()).connect(dataStreams[1].keyBy(getKeySelector())) + .process(new Sequence(params .getTimeUnit(), params - .getTimeWindow - ())); + .getTimeWindow + ())); } private KeySelector getKeySelector() { diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/absence/TestAbsence.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/absence/TestAbsence.java index 2a485ab075..2338227afa 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/absence/TestAbsence.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/absence/TestAbsence.java @@ -17,10 +17,6 @@ */ package org.apache.streampipes.processors.pattern.detection.processor.absence; -import io.flinkspector.datastream.DataStreamTestBase; -import io.flinkspector.datastream.input.EventTimeInput; -import io.flinkspector.datastream.input.EventTimeInputBuilder; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.streampipes.container.config.ConfigExtractor; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.processors.pattern.detection.flink.PatternDetectionFlinkInit; @@ -29,6 +25,11 @@ import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceProgram; import org.apache.streampipes.processors.pattern.detection.flink.processor.and.TimeUnit; import org.apache.streampipes.test.generator.InvocationGraphGenerator; + +import io.flinkspector.datastream.DataStreamTestBase; +import io.flinkspector.datastream.input.EventTimeInput; +import io.flinkspector.datastream.input.EventTimeInputBuilder; +import org.apache.flink.streaming.api.datastream.DataStream; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -48,9 +49,9 @@ public class TestAbsence extends DataStreamTestBase { @Parameterized.Parameters public static Iterable data() { return Arrays.asList(new Object[][]{ - {10, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 12}, - {10, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), false, 5}, - {5, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 6}, + {10, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 12}, + {10, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), false, 5}, + {5, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 6}, }); } @@ -75,12 +76,15 @@ public static Iterable data() { @Test public void testAbsenceProgram() { - AbsenceParameters params = new AbsenceParameters(InvocationGraphGenerator.makeEmptyInvocation(new AbsenceController().declareModel()), Arrays.asList("id", "timestamp", "value"), timeWindow, timeUnit); + AbsenceParameters params = + new AbsenceParameters(InvocationGraphGenerator.makeEmptyInvocation(new AbsenceController().declareModel()), + Arrays.asList("id", "timestamp", "value"), timeWindow, timeUnit); - ConfigExtractor configExtractor = ConfigExtractor.from(PatternDetectionFlinkInit.ServiceGroup); + ConfigExtractor configExtractor = ConfigExtractor.from(PatternDetectionFlinkInit.SERVICE_GROUP); AbsenceProgram program = new AbsenceProgram(params, configExtractor, null); - DataStream stream = program.getApplicationLogic(createTestStream(makeInputData(1, makeMap(), 0)), createTestStream(makeInputData(waitForMs, makeMap(), 1))); + DataStream stream = program.getApplicationLogic(createTestStream(makeInputData(1, makeMap(), 0)), + createTestStream(makeInputData(waitForMs, makeMap(), 1))); assertStream(stream, equalTo(getOutput(shouldMatch))); } diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/and/TestAnd.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/and/TestAnd.java index c5402e20db..1ba3b4c438 100644 --- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/and/TestAnd.java +++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/and/TestAnd.java @@ -16,11 +16,6 @@ * */ package org.apache.streampipes.processors.pattern.detection.processor.and; - -import io.flinkspector.datastream.DataStreamTestBase; -import io.flinkspector.datastream.input.EventTimeInput; -import io.flinkspector.datastream.input.EventTimeInputBuilder; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.streampipes.container.config.ConfigExtractor; import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.runtime.Event; @@ -31,6 +26,11 @@ import org.apache.streampipes.processors.pattern.detection.flink.processor.and.TimeUnit; import org.apache.streampipes.test.generator.InvocationGraphGenerator; import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator; + +import io.flinkspector.datastream.DataStreamTestBase; +import io.flinkspector.datastream.input.EventTimeInput; +import io.flinkspector.datastream.input.EventTimeInputBuilder; +import org.apache.flink.streaming.api.datastream.DataStream; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -51,12 +51,12 @@ public class TestAnd extends DataStreamTestBase { @Parameterized.Parameters public static Iterable data() { return Arrays.asList(new Object[][]{ - {2, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 1, 1}, - {1, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 1, 1}, - {10, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), false, 1, 12}, - {10, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 3, 4}, - {1, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), false, 1, 2}, - {3600, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 10, 3500}, + {2, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 1, 1}, + {1, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 1, 1}, + {10, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), false, 1, 12}, + {10, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 3, 4}, + {1, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), false, 1, 2}, + {3600, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 10, 3500}, }); } @@ -87,13 +87,15 @@ public void testAndProgram() { DataProcessorDescription description = new AndController().declareModel(); description.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); AndParameters params = - new AndParameters(InvocationGraphGenerator.makeEmptyInvocation(description), timeUnit, - timeWindow, leftMapping, rightMapping); + new AndParameters(InvocationGraphGenerator.makeEmptyInvocation(description), timeUnit, + timeWindow, leftMapping, rightMapping); - ConfigExtractor configExtractor = ConfigExtractor.from(PatternDetectionFlinkInit.ServiceGroup); + ConfigExtractor configExtractor = ConfigExtractor.from(PatternDetectionFlinkInit.SERVICE_GROUP); AndProgram program = new AndProgram(params, configExtractor, null); - DataStream stream = program.getApplicationLogic(createTestStream(makeInputData(delayFirstEvent, makeMap("field1"))), createTestStream(makeInputData(delaySecondEvent, makeMap("field2")))); + DataStream stream = + program.getApplicationLogic(createTestStream(makeInputData(delayFirstEvent, makeMap("field1"))), + createTestStream(makeInputData(delaySecondEvent, makeMap("field2")))); assertStream(stream, equalTo(getOutput(shouldMatch))); } diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/pom.xml b/streampipes-extensions/streampipes-processors-statistics-flink/pom.xml index c984827a77..8981e91ac2 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/pom.xml +++ b/streampipes-extensions/streampipes-processors-statistics-flink/pom.xml @@ -93,6 +93,10 @@ + + org.apache.maven.plugins + maven-checkstyle-plugin + streampipes-processors-statistics-flink diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/AbstractStatisticsProgram.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/AbstractStatisticsProgram.java index a3d8174c85..d32bd4322f 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/AbstractStatisticsProgram.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/AbstractStatisticsProgram.java @@ -25,9 +25,10 @@ import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig; import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams; -public abstract class AbstractStatisticsProgram extends FlinkDataProcessorRuntime { +public abstract class AbstractStatisticsProgram + extends FlinkDataProcessorRuntime { - public AbstractStatisticsProgram(B params, + public AbstractStatisticsProgram(T params, ConfigExtractor configExtractor, StreamPipesClient streamPipesClient) { super(params, configExtractor, streamPipesClient); @@ -37,10 +38,10 @@ public AbstractStatisticsProgram(B params, protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) { SpConfig config = configExtractor.getConfig(); return new FlinkDeploymentConfig(config.getString( - ConfigKeys.FLINK_JAR_FILE_LOC), - config.getString(ConfigKeys.FLINK_HOST), - config.getInteger(ConfigKeys.FLINK_PORT), - config.getBoolean(ConfigKeys.DEBUG) + ConfigKeys.FLINK_JAR_FILE_LOC), + config.getString(ConfigKeys.FLINK_HOST), + config.getInteger(ConfigKeys.FLINK_PORT), + config.getBoolean(ConfigKeys.DEBUG) ); } diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java index 6bb4b84ee6..23b75d234d 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/StatisticsFlinkInit.java @@ -41,25 +41,25 @@ public static void main(String[] args) { @Override public SpServiceDefinition provideServiceDefinition() { return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.statistics.flink", - "Processors Statistics Flink", - "", - 8090) - .registerPipelineElements(new StatisticsSummaryController(), - new StatisticsSummaryControllerWindow()) - .registerMessagingFormats( - new JsonDataFormatFactory(), - new CborDataFormatFactory(), - new SmileDataFormatFactory(), - new FstDataFormatFactory()) - .registerMessagingProtocols( - new SpKafkaProtocolFactory(), - new SpJmsProtocolFactory(), - new SpMqttProtocolFactory()) - .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager") - .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager") - .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program") - .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location") - .build(); + "Processors Statistics Flink", + "", + 8090) + .registerPipelineElements(new StatisticsSummaryController(), + new StatisticsSummaryControllerWindow()) + .registerMessagingFormats( + new JsonDataFormatFactory(), + new CborDataFormatFactory(), + new SmileDataFormatFactory(), + new FstDataFormatFactory()) + .registerMessagingProtocols( + new SpKafkaProtocolFactory(), + new SpJmsProtocolFactory(), + new SpMqttProtocolFactory()) + .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager") + .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager") + .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program") + .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location") + .build(); } } diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/config/ConfigKeys.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/config/ConfigKeys.java index c090a00b24..74f67f5e3e 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/config/ConfigKeys.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/config/ConfigKeys.java @@ -19,8 +19,8 @@ package org.apache.streampipes.processors.statistics.flink.config; public class ConfigKeys { - public final static String FLINK_HOST = "SP_FLINK_HOST"; - public final static String FLINK_PORT = "SP_FLINK_PORT"; - public final static String DEBUG = "SP_FLINK_DEBUG"; - public final static String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC"; + public static final String FLINK_HOST = "SP_FLINK_HOST"; + public static final String FLINK_PORT = "SP_FLINK_PORT"; + public static final String DEBUG = "SP_FLINK_DEBUG"; + public static final String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC"; } diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/MapKeySelector.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/MapKeySelector.java index 420c6579ce..ac75284e63 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/MapKeySelector.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/MapKeySelector.java @@ -18,9 +18,10 @@ package org.apache.streampipes.processors.statistics.flink.extensions; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.streampipes.model.runtime.Event; +import org.apache.flink.api.java.functions.KeySelector; + import java.io.Serializable; public class MapKeySelector implements Serializable { diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/SlidingBatchWindow.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/SlidingBatchWindow.java index 67cca48f55..0bd24393d3 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/SlidingBatchWindow.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/SlidingBatchWindow.java @@ -25,11 +25,11 @@ import java.util.ArrayList; import java.util.List; -public class SlidingBatchWindow extends AbstractStreamOperator> implements - OneInputStreamOperator> { +public class SlidingBatchWindow extends AbstractStreamOperator> implements + OneInputStreamOperator> { private Integer windowSize; - private List currentEvents; + private List currentEvents; public SlidingBatchWindow(Integer windowSize) { super(); @@ -38,7 +38,7 @@ public SlidingBatchWindow(Integer windowSize) { } @Override - public void processElement(StreamRecord in) throws Exception { + public void processElement(StreamRecord in) throws Exception { currentEvents.add(in.getValue()); if (currentEvents.size() > windowSize) { currentEvents.remove(0); diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/SlidingEventTimeWindow.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/SlidingEventTimeWindow.java index 6db2742d87..8086ae08a7 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/SlidingEventTimeWindow.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/SlidingEventTimeWindow.java @@ -28,16 +28,16 @@ import java.util.List; import java.util.concurrent.TimeUnit; -public class SlidingEventTimeWindow extends AbstractUdfStreamOperator, - TimestampMappingFunction> - implements - OneInputStreamOperator>, Serializable { +public class SlidingEventTimeWindow extends AbstractUdfStreamOperator, + TimestampMappingFunction> + implements + OneInputStreamOperator>, Serializable { private Long timeWindowSizeInMillis; - private List currentEvents; + private List currentEvents; - public SlidingEventTimeWindow(Long time, TimeUnit timeUnit, TimestampMappingFunction - timestampMappingFunction) { + public SlidingEventTimeWindow(Long time, TimeUnit timeUnit, TimestampMappingFunction + timestampMappingFunction) { super(timestampMappingFunction); this.timeWindowSizeInMillis = toMilliseconds(time, timeUnit); this.currentEvents = new ArrayList<>(); @@ -49,7 +49,7 @@ private Long toMilliseconds(Long time, TimeUnit timeUnit) { } @Override - public void processElement(StreamRecord in) throws Exception { + public void processElement(StreamRecord in) throws Exception { Long currentTimestamp = userFunction.getTimestamp(in.getValue()); checkForRemoval(currentTimestamp); @@ -60,10 +60,10 @@ public void processElement(StreamRecord in) throws Exception { } private void checkForRemoval(Long currentTimestamp) { - Iterator it = currentEvents.iterator(); + Iterator it = currentEvents.iterator(); - while(it.hasNext()) { - IN next = it.next(); + while (it.hasNext()) { + T next = it.next(); if (removalRequired(userFunction.getTimestamp(next), currentTimestamp)) { it.remove(); } else { diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/TimestampMappingFunction.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/TimestampMappingFunction.java index d16a5ae7b7..4f6675f8dc 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/TimestampMappingFunction.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/extensions/TimestampMappingFunction.java @@ -22,7 +22,7 @@ import java.io.Serializable; -public interface TimestampMappingFunction extends Function, Serializable { +public interface TimestampMappingFunction extends Function, Serializable { - Long getTimestamp(IN in); + Long getTimestamp(T in); } diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryCalculator.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryCalculator.java index 9aff0cb4c1..2c2b10b3c1 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryCalculator.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryCalculator.java @@ -18,11 +18,12 @@ package org.apache.streampipes.processors.statistics.flink.processor.stat.summary; +import org.apache.streampipes.model.runtime.Event; + import org.apache.commons.lang3.StringUtils; import org.apache.commons.math3.stat.descriptive.SummaryStatistics; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; -import org.apache.streampipes.model.runtime.Event; import java.util.List; @@ -36,11 +37,11 @@ public StatisticsSummaryCalculator(List listPropertyMappings) { @Override public void flatMap(Event in, Collector out) throws - Exception { + Exception { - for (String property: listPropertyMappings) { + for (String property : listPropertyMappings) { List listValues = (in.getFieldBySelector(property).getAsList().castItems - (Double.class)); + (Double.class)); SummaryStatistics stats = new SummaryStatistics(); diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryController.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryController.java index 936501e9c4..f1e1f6e5ba 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryController.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryController.java @@ -18,7 +18,6 @@ package org.apache.streampipes.processors.statistics.flink.processor.stat.summary; -import org.apache.commons.lang3.StringUtils; import org.apache.streampipes.client.StreamPipesClient; import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.container.api.ResolvesContainerProvidedOutputStrategy; @@ -30,18 +29,24 @@ import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; -import org.apache.streampipes.sdk.helpers.*; +import org.apache.streampipes.sdk.helpers.EpProperties; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.sdk.utils.Assets; import org.apache.streampipes.sdk.utils.Datatypes; import org.apache.streampipes.vocabulary.Statistics; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime; +import org.apache.commons.lang3.StringUtils; + import java.util.List; public class StatisticsSummaryController extends FlinkDataProcessorDeclarer implements - ResolvesContainerProvidedOutputStrategy { + ResolvesContainerProvidedOutputStrategy { private static final String listPropertyMappingName = "list-property"; @@ -56,22 +61,23 @@ public class StatisticsSummaryController extends FlinkDataProcessorDeclarer getRuntime(DataProcessorInvocation graph, - ProcessingElementParameterExtractor extractor, - ConfigExtractor configExtractor, - StreamPipesClient streamPipesClient) { + public FlinkDataProcessorRuntime getRuntime( + DataProcessorInvocation graph, + ProcessingElementParameterExtractor extractor, + ConfigExtractor configExtractor, + StreamPipesClient streamPipesClient) { List listPropertyMappings = extractor.mappingPropertyValues(listPropertyMappingName); StatisticsSummaryParameters params = new StatisticsSummaryParameters(graph, listPropertyMappings); @@ -81,19 +87,22 @@ public FlinkDataProcessorRuntime getRuntime(DataPro } @Override - public EventSchema resolveOutputStrategy(DataProcessorInvocation processingElement, ProcessingElementParameterExtractor extractor) throws SpRuntimeException { + public EventSchema resolveOutputStrategy(DataProcessorInvocation processingElement, + ProcessingElementParameterExtractor extractor) throws SpRuntimeException { EventSchema eventSchema = processingElement.getInputStreams().get(0).getEventSchema(); List listPropertyMappings = extractor.mappingPropertyValues(listPropertyMappingName); - for (String property: listPropertyMappings) { + for (String property : listPropertyMappings) { String propertyPrefix = StringUtils.substringAfterLast(property, ":"); eventSchema.addEventProperty(EpProperties.doubleEp(Labels.empty(), propertyPrefix + "_" + MIN, Statistics.MIN)); eventSchema.addEventProperty(EpProperties.doubleEp(Labels.empty(), propertyPrefix + "_" + MAX, Statistics.MAX)); eventSchema.addEventProperty(EpProperties.doubleEp(Labels.empty(), propertyPrefix + "_" + SUM, Statistics.SUM)); - eventSchema.addEventProperty(EpProperties.doubleEp(Labels.empty(), propertyPrefix + "_" + STDDEV, Statistics.STDDEV)); - eventSchema.addEventProperty(EpProperties.doubleEp(Labels.empty(), propertyPrefix + "_" + VARIANCE, Statistics.VARIANCE)); + eventSchema.addEventProperty( + EpProperties.doubleEp(Labels.empty(), propertyPrefix + "_" + STDDEV, Statistics.STDDEV)); + eventSchema.addEventProperty( + EpProperties.doubleEp(Labels.empty(), propertyPrefix + "_" + VARIANCE, Statistics.VARIANCE)); eventSchema.addEventProperty(EpProperties.doubleEp(Labels.empty(), propertyPrefix + "_" + MEAN, Statistics.MEAN)); eventSchema.addEventProperty(EpProperties.doubleEp(Labels.empty(), propertyPrefix + "_" + N, Statistics.N)); } diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryProgram.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryProgram.java index 0c0edc5153..c7cf38d51a 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryProgram.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/summary/StatisticsSummaryProgram.java @@ -18,12 +18,13 @@ package org.apache.streampipes.processors.statistics.flink.processor.stat.summary; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.streampipes.client.StreamPipesClient; import org.apache.streampipes.container.config.ConfigExtractor; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.processors.statistics.flink.AbstractStatisticsProgram; +import org.apache.flink.streaming.api.datastream.DataStream; + public class StatisticsSummaryProgram extends AbstractStatisticsProgram { public StatisticsSummaryProgram(StatisticsSummaryParameters params, diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryCalculatorWindow.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryCalculatorWindow.java index 5e57e58e3f..e20dcb51e0 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryCalculatorWindow.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryCalculatorWindow.java @@ -18,18 +18,19 @@ package org.apache.streampipes.processors.statistics.flink.processor.stat.window; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.processors.statistics.flink.processor.stat.summary.StatisticsSummaryController; + import org.apache.commons.math3.stat.descriptive.SummaryStatistics; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; -import org.apache.streampipes.model.runtime.Event; -import org.apache.streampipes.processors.statistics.flink.processor.stat.summary.StatisticsSummaryController; import java.io.Serializable; import java.util.List; import java.util.stream.Collectors; public class StatisticsSummaryCalculatorWindow implements FlatMapFunction, Event>, - Serializable { + Serializable { private String partitionMapping; private String valueToObserveMapping; @@ -41,10 +42,10 @@ public StatisticsSummaryCalculatorWindow(String partitionMapping, String valueTo @Override public void flatMap(List in, Collector out) - throws Exception { + throws Exception { List listValues = (in.stream().map(m -> m.getFieldBySelector(valueToObserveMapping) .getAsPrimitive().getAsDouble()) - .collect(Collectors.toList())); + .collect(Collectors.toList())); SummaryStatistics stats = new SummaryStatistics(); diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryControllerWindow.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryControllerWindow.java index 720f8a1c01..8a4f60d3b5 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryControllerWindow.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryControllerWindow.java @@ -27,7 +27,12 @@ import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; -import org.apache.streampipes.sdk.helpers.*; +import org.apache.streampipes.sdk.helpers.EpProperties; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.helpers.Options; +import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.sdk.utils.Assets; import org.apache.streampipes.vocabulary.Statistics; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer; @@ -36,7 +41,7 @@ import java.util.concurrent.TimeUnit; public class StatisticsSummaryControllerWindow extends - FlinkDataProcessorDeclarer { + FlinkDataProcessorDeclarer { private static final String VALUE_TO_OBSERVE = "value-to-observe"; private static final String PARTITION_BY = "partition-by"; @@ -46,40 +51,42 @@ public class StatisticsSummaryControllerWindow extends @Override public DataProcessorDescription declareModel() { - return ProcessingElementBuilder.create("org.apache.streampipes.processors.statistics.flink.statistics-summary-window") - .withLocales(Locales.EN) - .withAssets(Assets.DOCUMENTATION, Assets.ICON) - .requiredStream(StreamRequirementsBuilder - .create() - .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), - Labels.withId(VALUE_TO_OBSERVE), PropertyScope.MEASUREMENT_PROPERTY) - .requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(), - Labels.withId(TIMESTAMP_MAPPING), - PropertyScope.HEADER_PROPERTY) - .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(), - Labels.withId(PARTITION_BY), PropertyScope.DIMENSION_PROPERTY) - .build()) - .requiredIntegerParameter(Labels.withId(TIME_WINDOW)) - .requiredSingleValueSelection(Labels.withId(TIME_SCALE), - Options.from("Hours", "Minutes", "Seconds")) - .outputStrategy(OutputStrategies.fixed( - EpProperties.timestampProperty("timestamp"), - EpProperties.stringEp(Labels.empty(), "id", "http://schema.org/id"), - EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.MEAN, Statistics.MEAN), - EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.MIN, Statistics.MIN), - EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.MAX, Statistics.MAX), - EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.SUM, Statistics.SUM), - EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.STDDEV, Statistics.STDDEV), - EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.VARIANCE, Statistics.VARIANCE), - EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.N, Statistics.N))) - .build(); + return ProcessingElementBuilder.create( + "org.apache.streampipes.processors.statistics.flink.statistics-summary-window") + .withLocales(Locales.EN) + .withAssets(Assets.DOCUMENTATION, Assets.ICON) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), + Labels.withId(VALUE_TO_OBSERVE), PropertyScope.MEASUREMENT_PROPERTY) + .requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(), + Labels.withId(TIMESTAMP_MAPPING), + PropertyScope.HEADER_PROPERTY) + .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(), + Labels.withId(PARTITION_BY), PropertyScope.DIMENSION_PROPERTY) + .build()) + .requiredIntegerParameter(Labels.withId(TIME_WINDOW)) + .requiredSingleValueSelection(Labels.withId(TIME_SCALE), + Options.from("Hours", "Minutes", "Seconds")) + .outputStrategy(OutputStrategies.fixed( + EpProperties.timestampProperty("timestamp"), + EpProperties.stringEp(Labels.empty(), "id", "http://schema.org/id"), + EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.MEAN, Statistics.MEAN), + EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.MIN, Statistics.MIN), + EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.MAX, Statistics.MAX), + EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.SUM, Statistics.SUM), + EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.STDDEV, Statistics.STDDEV), + EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.VARIANCE, Statistics.VARIANCE), + EpProperties.doubleEp(Labels.empty(), StatisticsSummaryController.N, Statistics.N))) + .build(); } @Override - public FlinkDataProcessorRuntime getRuntime(DataProcessorInvocation sepa, - ProcessingElementParameterExtractor extractor, - ConfigExtractor configExtractor, - StreamPipesClient streamPipesClient) { + public FlinkDataProcessorRuntime getRuntime( + DataProcessorInvocation sepa, + ProcessingElementParameterExtractor extractor, + ConfigExtractor configExtractor, + StreamPipesClient streamPipesClient) { String valueToObserve = extractor.mappingPropertyValue(VALUE_TO_OBSERVE); String timestampMapping = extractor.mappingPropertyValue(TIMESTAMP_MAPPING); @@ -93,19 +100,21 @@ public FlinkDataProcessorRuntime getRuntime(D if (scale.equals("Hours")) { timeUnit = TimeUnit.HOURS; - } - else if (scale.equals("Minutes")) { + } else if (scale.equals("Minutes")) { timeUnit = TimeUnit.MINUTES; - } - else { + } else { timeUnit = TimeUnit.SECONDS; } StatisticsSummaryParametersWindow params = new StatisticsSummaryParametersWindow(sepa, - valueToObserve, timestampMapping, groupBy, (long) timeWindowSize, timeUnit); - - StatisticsSummaryParamsSerializable serializableParams = new StatisticsSummaryParamsSerializable - (valueToObserve, timestampMapping, groupBy, (long) timeWindowSize, timeUnit); + valueToObserve, timestampMapping, groupBy, (long) timeWindowSize, timeUnit); + + StatisticsSummaryParamsSerializable serializableParams = new StatisticsSummaryParamsSerializable( + valueToObserve, + timestampMapping, + groupBy, + (long) timeWindowSize, + timeUnit); return new StatisticsSummaryProgramWindow(params, serializableParams, configExtractor, streamPipesClient); diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryParametersWindow.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryParametersWindow.java index dfa5e482dd..1e7215811a 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryParametersWindow.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryParametersWindow.java @@ -32,14 +32,13 @@ public class StatisticsSummaryParametersWindow extends EventProcessorBindingPara private TimeUnit timeUnit; - public StatisticsSummaryParametersWindow(DataProcessorInvocation graph) { super(graph); } public StatisticsSummaryParametersWindow(DataProcessorInvocation graph, String valueToObserve, String timestampMapping, String groupBy, Long - timeWindowSize, TimeUnit timeUnit) { + timeWindowSize, TimeUnit timeUnit) { super(graph); this.valueToObserve = valueToObserve; this.timestampMapping = timestampMapping; diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryParamsSerializable.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryParamsSerializable.java index 3bde440bb4..b264b1ff1e 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryParamsSerializable.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryParamsSerializable.java @@ -31,7 +31,7 @@ public class StatisticsSummaryParamsSerializable implements Serializable { public StatisticsSummaryParamsSerializable(String valueToObserve, String timestampMapping, String groupBy, Long timeWindowSize, TimeUnit - timeUnit) { + timeUnit) { this.valueToObserve = valueToObserve; this.timestampMapping = timestampMapping; this.groupBy = groupBy; diff --git a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryProgramWindow.java b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryProgramWindow.java index 2e42e2ac7a..381e3dd04a 100644 --- a/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryProgramWindow.java +++ b/streampipes-extensions/streampipes-processors-statistics-flink/src/main/java/org/apache/streampipes/processors/statistics/flink/processor/stat/window/StatisticsSummaryProgramWindow.java @@ -18,10 +18,6 @@ package org.apache.streampipes.processors.statistics.flink.processor.stat.window; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.streampipes.client.StreamPipesClient; import org.apache.streampipes.container.config.ConfigExtractor; import org.apache.streampipes.model.runtime.Event; @@ -30,10 +26,15 @@ import org.apache.streampipes.processors.statistics.flink.extensions.SlidingEventTimeWindow; import org.apache.streampipes.processors.statistics.flink.extensions.TimestampMappingFunction; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; + import java.util.List; public class StatisticsSummaryProgramWindow extends - AbstractStatisticsProgram { + AbstractStatisticsProgram { private StatisticsSummaryParamsSerializable serializableParams; @@ -50,24 +51,22 @@ public StatisticsSummaryProgramWindow(StatisticsSummaryParametersWindow params, protected DataStream getApplicationLogic(DataStream... messageStream) { StatisticsSummaryParamsSerializable sp = new - StatisticsSummaryParamsSerializable(serializableParams.getValueToObserve(), - serializableParams.getTimestampMapping(), serializableParams.getGroupBy(), - serializableParams.getTimeWindowSize(), serializableParams.getTimeUnit()); + StatisticsSummaryParamsSerializable(serializableParams.getValueToObserve(), + serializableParams.getTimestampMapping(), serializableParams.getGroupBy(), + serializableParams.getTimeWindowSize(), serializableParams.getTimeUnit()); DataStream output = messageStream[0] - .keyBy(new MapKeySelector(sp.getGroupBy()).getKeySelector()) - .transform - ("sliding-window-event-shift", - TypeInformation.of(new TypeHint>() { - }), new SlidingEventTimeWindow<>(sp.getTimeWindowSize(), sp.getTimeUnit(), - (TimestampMappingFunction) in -> - in.getFieldBySelector(sp.getTimestampMapping()) - .getAsPrimitive().getAsLong())) - .flatMap(new StatisticsSummaryCalculatorWindow(sp.getGroupBy(), sp.getValueToObserve())); + .keyBy(new MapKeySelector(sp.getGroupBy()).getKeySelector()) + .transform + ("sliding-window-event-shift", + TypeInformation.of(new TypeHint>() { + }), new SlidingEventTimeWindow<>(sp.getTimeWindowSize(), sp.getTimeUnit(), + (TimestampMappingFunction) in -> + in.getFieldBySelector(sp.getTimestampMapping()) + .getAsPrimitive().getAsLong())) + .flatMap(new StatisticsSummaryCalculatorWindow(sp.getGroupBy(), sp.getValueToObserve())); return output; } - - } diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/pom.xml b/streampipes-extensions/streampipes-processors-text-mining-flink/pom.xml index 3411d6e451..fd064512d4 100644 --- a/streampipes-extensions/streampipes-processors-text-mining-flink/pom.xml +++ b/streampipes-extensions/streampipes-processors-text-mining-flink/pom.xml @@ -135,6 +135,10 @@ + + org.apache.maven.plugins + maven-checkstyle-plugin + streampipes-processors-textmining-flink diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/AbstractTextMiningProgram.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/AbstractTextMiningProgram.java index 1fe58e5cb8..3a2ce7125c 100644 --- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/AbstractTextMiningProgram.java +++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/AbstractTextMiningProgram.java @@ -25,7 +25,8 @@ import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig; import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams; -public abstract class AbstractTextMiningProgram extends FlinkDataProcessorRuntime { +public abstract class AbstractTextMiningProgram + extends FlinkDataProcessorRuntime { public AbstractTextMiningProgram(B params, ConfigExtractor configExtractor, @@ -37,10 +38,10 @@ public AbstractTextMiningProgram(B params, protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) { SpConfig config = configExtractor.getConfig(); return new FlinkDeploymentConfig(config.getString( - ConfigKeys.FLINK_JAR_FILE_LOC), - config.getString(ConfigKeys.FLINK_HOST), - config.getInteger(ConfigKeys.FLINK_PORT), - config.getBoolean(ConfigKeys.DEBUG) + ConfigKeys.FLINK_JAR_FILE_LOC), + config.getString(ConfigKeys.FLINK_HOST), + config.getInteger(ConfigKeys.FLINK_PORT), + config.getBoolean(ConfigKeys.DEBUG) ); } diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/TextMiningFlinkInit.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/TextMiningFlinkInit.java index 630be9d809..8be14fee80 100644 --- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/TextMiningFlinkInit.java +++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/TextMiningFlinkInit.java @@ -41,22 +41,22 @@ public static void main(String[] args) { public SpServiceDefinition provideServiceDefinition() { return SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.textmining.flink", - "Processors Text Mining Flink", "", - 8090) - .registerPipelineElements(new WordCountController()) - .registerMessagingFormats( - new JsonDataFormatFactory(), - new CborDataFormatFactory(), - new SmileDataFormatFactory(), - new FstDataFormatFactory()) - .registerMessagingProtocols( - new SpKafkaProtocolFactory(), - new SpJmsProtocolFactory(), - new SpMqttProtocolFactory()) - .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager") - .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager") - .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program") - .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location") - .build(); + "Processors Text Mining Flink", "", + 8090) + .registerPipelineElements(new WordCountController()) + .registerMessagingFormats( + new JsonDataFormatFactory(), + new CborDataFormatFactory(), + new SmileDataFormatFactory(), + new FstDataFormatFactory()) + .registerMessagingProtocols( + new SpKafkaProtocolFactory(), + new SpJmsProtocolFactory(), + new SpMqttProtocolFactory()) + .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager") + .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager") + .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program") + .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location") + .build(); } } diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/config/ConfigKeys.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/config/ConfigKeys.java index e9ccd593b2..aca97b1c26 100644 --- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/config/ConfigKeys.java +++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/config/ConfigKeys.java @@ -19,8 +19,8 @@ package org.apache.streampipes.processors.textmining.flink.config; public class ConfigKeys { - public final static String FLINK_HOST = "SP_FLINK_HOST"; - public final static String FLINK_PORT = "SP_FLINK_PORT"; - public final static String DEBUG = "SP_FLINK_DEBUG"; - public final static String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC"; + public static final String FLINK_HOST = "SP_FLINK_HOST"; + public static final String FLINK_PORT = "SP_FLINK_PORT"; + public static final String DEBUG = "SP_FLINK_DEBUG"; + public static final String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC"; } diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetection.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetection.java index 0b18119ecc..f164fdfc21 100644 --- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetection.java +++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetection.java @@ -17,6 +17,8 @@ */ package org.apache.streampipes.processors.textmining.flink.processor.language; +import org.apache.streampipes.model.runtime.Event; + import com.optimaize.langdetect.LanguageDetector; import com.optimaize.langdetect.LanguageDetectorBuilder; import com.optimaize.langdetect.i18n.LdLocale; @@ -28,7 +30,6 @@ import com.optimaize.langdetect.text.TextObjectFactory; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; -import org.apache.streampipes.model.runtime.Event; import java.io.IOException; import java.util.List; @@ -51,17 +52,17 @@ public LanguageDetection(String fieldName) { } this.languageDetector = LanguageDetectorBuilder.create(NgramExtractors.standard()) - .withProfiles(languageProfiles) - .build(); + .withProfiles(languageProfiles) + .build(); this.textObjectFactory = CommonTextObjectFactories.forDetectingOnLargeText(); } @Override - public void flatMap(Event in, Collector out) { + public void flatMap(Event in, Collector out) { TextObject textObject = textObjectFactory.forText(in.getFieldBySelector(fieldName) - .getAsPrimitive().getAsString()); + .getAsPrimitive().getAsString()); com.google.common.base.Optional lang = languageDetector.detect(textObject); if (lang.isPresent()) { diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionController.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionController.java index 77d96ddeaa..846c72b8bf 100644 --- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionController.java +++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionController.java @@ -26,7 +26,11 @@ import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; -import org.apache.streampipes.sdk.helpers.*; +import org.apache.streampipes.sdk.helpers.EpProperties; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.sdk.utils.Assets; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime; @@ -39,30 +43,32 @@ public class LanguageDetectionController extends FlinkDataProcessorDeclarer getRuntime(DataProcessorInvocation graph, - ProcessingElementParameterExtractor extractor, - ConfigExtractor configExtractor, - StreamPipesClient streamPipesClient) { + public FlinkDataProcessorRuntime getRuntime( + DataProcessorInvocation graph, + ProcessingElementParameterExtractor extractor, + ConfigExtractor configExtractor, + StreamPipesClient streamPipesClient) { String fieldName = extractor.mappingPropertyValue(DETECTION_FIELD_KEY); - return new LanguageDetectionProgram(new LanguageDetectionParameters(graph, fieldName), configExtractor, streamPipesClient); + return new LanguageDetectionProgram(new LanguageDetectionParameters(graph, fieldName), configExtractor, + streamPipesClient); } } diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionParameters.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionParameters.java index ce0b26054e..020ea5df49 100644 --- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionParameters.java +++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionParameters.java @@ -24,8 +24,7 @@ public class LanguageDetectionParameters extends EventProcessorBindingParams { private String fieldName; - public LanguageDetectionParameters(DataProcessorInvocation graph, String fieldName) - { + public LanguageDetectionParameters(DataProcessorInvocation graph, String fieldName) { super(graph); this.fieldName = fieldName; } diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionProgram.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionProgram.java index 460afd2cea..a5bc3ae497 100644 --- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionProgram.java +++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/language/LanguageDetectionProgram.java @@ -17,12 +17,13 @@ */ package org.apache.streampipes.processors.textmining.flink.processor.language; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.streampipes.client.StreamPipesClient; import org.apache.streampipes.container.config.ConfigExtractor; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.processors.textmining.flink.AbstractTextMiningProgram; +import org.apache.flink.streaming.api.datastream.DataStream; + public class LanguageDetectionProgram extends AbstractTextMiningProgram { public LanguageDetectionProgram(LanguageDetectionParameters params, @@ -34,6 +35,6 @@ public LanguageDetectionProgram(LanguageDetectionParameters params, @Override protected DataStream getApplicationLogic(DataStream... messageStream) { return messageStream[0] - .flatMap(new LanguageDetection(params.getFieldName())); + .flatMap(new LanguageDetection(params.getFieldName())); } } diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountController.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountController.java index 40d105adb3..14f7cc3845 100644 --- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountController.java +++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountController.java @@ -27,7 +27,11 @@ import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; -import org.apache.streampipes.sdk.helpers.*; +import org.apache.streampipes.sdk.helpers.EpProperties; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.sdk.utils.Assets; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer; import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime; @@ -42,24 +46,24 @@ public class WordCountController extends FlinkDataProcessorDeclarer getRuntime(DataProcessorIn String fieldName = extractor.mappingPropertyValue(WORD_COUNT_FIELD_KEY); Integer timeWindowValue = extractor.singleValueParameter(TIME_WINDOW_KEY, Integer.class); - return new WordCountProgram(new WordCountParameters(graph, fieldName, timeWindowValue), configExtractor, streamPipesClient); + return new WordCountProgram(new WordCountParameters(graph, fieldName, timeWindowValue), configExtractor, + streamPipesClient); } } diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountParameters.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountParameters.java index 13d1688b39..ea91058cc3 100644 --- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountParameters.java +++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountParameters.java @@ -23,21 +23,20 @@ public class WordCountParameters extends EventProcessorBindingParams { - private String wordCountFieldName; - private Integer timeWindowValue; + private String wordCountFieldName; + private Integer timeWindowValue; - public WordCountParameters(DataProcessorInvocation graph, String wordCountFieldName, Integer timeWindowValue) - { - super(graph); - this.wordCountFieldName = wordCountFieldName; - this.timeWindowValue = timeWindowValue; - } + public WordCountParameters(DataProcessorInvocation graph, String wordCountFieldName, Integer timeWindowValue) { + super(graph); + this.wordCountFieldName = wordCountFieldName; + this.timeWindowValue = timeWindowValue; + } - public String getWordCountFieldName() { - return wordCountFieldName; - } + public String getWordCountFieldName() { + return wordCountFieldName; + } - public Integer getTimeWindowValue() { - return timeWindowValue; - } + public Integer getTimeWindowValue() { + return timeWindowValue; + } } diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountProgram.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountProgram.java index 883ddaeeff..9b11086cb0 100644 --- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountProgram.java +++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordCountProgram.java @@ -18,12 +18,13 @@ package org.apache.streampipes.processors.textmining.flink.processor.wordcount; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.streampipes.client.StreamPipesClient; import org.apache.streampipes.container.config.ConfigExtractor; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.processors.textmining.flink.AbstractTextMiningProgram; +import org.apache.flink.streaming.api.datastream.DataStream; + import java.io.Serializable; public class WordCountProgram extends AbstractTextMiningProgram implements Serializable { @@ -36,13 +37,13 @@ public WordCountProgram(WordCountParameters params, @Override protected DataStream getApplicationLogic( - DataStream... messageStream) { + DataStream... messageStream) { return messageStream[0] - .flatMap(new WordSplitter(bindingParams.getWordCountFieldName())) - .keyBy("word") - .sum("count") - .flatMap(new WordToEventConverter()); + .flatMap(new WordSplitter(bindingParams.getWordCountFieldName())) + .keyBy("word") + .sum("count") + .flatMap(new WordToEventConverter()); } } diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordSplitter.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordSplitter.java index aec34e8c56..8e23aaeabd 100644 --- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordSplitter.java +++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordSplitter.java @@ -18,9 +18,10 @@ package org.apache.streampipes.processors.textmining.flink.processor.wordcount; +import org.apache.streampipes.model.runtime.Event; + import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; -import org.apache.streampipes.model.runtime.Event; public class WordSplitter implements FlatMapFunction { diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordToEventConverter.java b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordToEventConverter.java index 60bf50d8ef..ca0968eeaa 100644 --- a/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordToEventConverter.java +++ b/streampipes-extensions/streampipes-processors-text-mining-flink/src/main/java/org/apache/streampipes/processors/textmining/flink/processor/wordcount/WordToEventConverter.java @@ -17,17 +17,18 @@ */ package org.apache.streampipes.processors.textmining.flink.processor.wordcount; +import org.apache.streampipes.model.runtime.Event; + import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; -import org.apache.streampipes.model.runtime.Event; public class WordToEventConverter implements FlatMapFunction { @Override public void flatMap(Word word, Collector collector) throws Exception { - Event event = new Event(); - event.addField("word", word.getWord()); - event.addField("count", word.getCount()); - collector.collect(event); + Event event = new Event(); + event.addField("word", word.getWord()); + event.addField("count", word.getCount()); + collector.collect(event); } }