Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1865]: create processor to parse date time strings #2207

Merged
merged 21 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timekeeping.BooleanTimekeepingProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timer.BooleanTimerProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.datetime.DateTimeFromStringProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.fieldrename.FiledRenameProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.hasher.FieldHasherProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.mapper.FieldMapperProcessor;
Expand Down Expand Up @@ -67,6 +68,7 @@ public List<IStreamPipesPipelineElement<?>> pipelineElements() {
new TimestampExtractorProcessor(),
new BooleanCounterProcessor(),
new BooleanInverterProcessor(),
new DateTimeFromStringProcessor(),
new BooleanTimekeepingProcessor(),
new BooleanTimerProcessor(),
new CsvMetadataEnrichmentProcessor(),
Expand All @@ -84,7 +86,7 @@ public List<IStreamPipesPipelineElement<?>> pipelineElements() {
new BooleanOperatorProcessor(),
new FiledRenameProcessor(),
new RoundProcessor()
);
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.transformation.jvm.processor.datetime;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class DateTimeFromStringProcessor extends StreamPipesDataProcessor {

public static final String FIELD_ID = "inputField";
public static final String OUTPUT_TIMESTAMP_RUNTIME_NAME = "timestringInMillis";
public static final String OUTPUT_TIMEZONE_RUNTIME_NAME = "timeZone";
public static final String INPUT_TIMEZONE_KEY = "inputTimeZone";

private String streamInputDateTimeFieldName;
private String selectedTimeZone;

@Override
public DataProcessorDescription declareModel() {
return ProcessingElementBuilder
.create("org.apache.streampipes.processors.transformation.jvm.datetime", 0)
.category(DataProcessorType.STRING_OPERATOR, DataProcessorType.TIME)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.requiredStream(StreamRequirementsBuilder.create()
.requiredPropertyWithUnaryMapping(
EpRequirements.stringReq(),
Labels.withId(FIELD_ID),
PropertyScope.NONE
)
.build())
.requiredSingleValueSelection(Labels.withId(INPUT_TIMEZONE_KEY),
Options.from(getTimeZoneOptions()), true
)
.outputStrategy(
OutputStrategies.append(
EpProperties.timestampProperty(OUTPUT_TIMESTAMP_RUNTIME_NAME),
EpProperties.stringEp(
// We can use the labels from the input timezone here
Labels.withId(INPUT_TIMEZONE_KEY),
OUTPUT_TIMEZONE_RUNTIME_NAME,
SO.SCHEDULE_TIMEZONE
)
)
)
.build();
}

@Override
public void onInvocation(
ProcessorParams parameters, SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext
) throws SpRuntimeException {
ProcessingElementParameterExtractor extractor = parameters.extractor();
this.streamInputDateTimeFieldName = extractor.mappingPropertyValue(FIELD_ID);
this.selectedTimeZone = extractor.selectedSingleValue(INPUT_TIMEZONE_KEY, String.class);
}

@Override
public void onEvent(Event event, SpOutputCollector collector) {
String dateTimeString = event.getFieldBySelector(streamInputDateTimeFieldName)
.getAsPrimitive()
.getAsString();
DateTimeFormatter dtFormatter = DateTimeFormatter.ISO_DATE_TIME;
ZonedDateTime zdt = parseDateTime(dateTimeString, dtFormatter);

/*
* A temporary workaround is in place to put a long represent the
* zonedDateTimeVariable One possible workaround is to use the time zone and the
* long to reconstitute the actual time after the event has been sent.
* event.addField(OUTPUT_DATETIME_RUNTIME_NAME, zdt);
*/
event.addField(
OUTPUT_TIMESTAMP_RUNTIME_NAME,
zdt.toInstant()
.toEpochMilli()
);
event.addField(OUTPUT_TIMEZONE_RUNTIME_NAME, selectedTimeZone);

collector.collect(event);
}

@Override
public void onDetach() {

}

private ZonedDateTime parseDateTime(String dateTimeString, DateTimeFormatter dtf) {
ZonedDateTime zdt;
try {
zdt = ZonedDateTime.parse(dateTimeString);

} catch (DateTimeParseException e1) {
try {
LocalDateTime ldt = LocalDateTime.parse(dateTimeString, dtf);
ZoneId timeZoneId = ZoneId.of(selectedTimeZone);
zdt = ldt.atZone(timeZoneId);
} catch (DateTimeParseException e2) {
throw new RuntimeException("Could not parse DateTime String: " + dateTimeString);
}
}
return zdt;
}

private static String[] getTimeZoneOptions() {
List<String> timeZones = new ArrayList<>(ZoneId.getAvailableZoneIds());
Collections.sort(timeZones);
return timeZones.toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

## Datetime From String

<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
</p>

***

## Overview

The "Datetime From String" processor is a handy tool that helps convert human-readable datetime information into a
format that machines can understand. This is particularly useful when dealing with data that includes dates and times.

### Why Use This Processor?

In the context of event streams, you may encounter dates and times formatted for human readability but not necessarily
optimized for computer processing. The "Datetime From String" processor addresses this by facilitating the conversion
of human-readable datetime information within your continuous stream of events.

***

## How It Works

When you input a data stream into this processor containing a datetime in a specific format (such as "2023-11-24 15:30:
00"), it
undergoes a transformation. The processor converts it into a computer-friendly format called a ZonedDateTime object.

### Example

Let's say you have an event stream with a property containing values like "2023-11-24 15:30:00" and you want to make
sure your computer understands it. You can use
this processor to convert it into a format that's machine-friendly.

***

## Getting Started

To use this processor, you need one thing in your data:

1. **Datetime String**: This is the name of the event property that contains the human-readable datetime string, like "2023-11-24 15:30:00".


### Configuration

The only thing you need to configure is the time zone.
1. **Time Zone**: Specify the time zone that applies to your datetime if it doesn't already have this information.This ensures that the processor understands the context of your
datetime.

## Output

After the conversion happens, the processor adds a new piece of information to your data stream:

* **timestringInMillis**: This is the transformed datetime in a format that computers can easily work with (UNIX timestamp in milliseconds).
* **timeZone**: The name of the timezone the `dateTime` value refers to. Can be used to reconstitute the actual time.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.streampipes.processors.transformation.jvm.datetime.title=Datetime From String
org.apache.streampipes.processors.transformation.jvm.datetime.description=Converts a human-readable timestamp from a textual (string) representation into a computer-readable object

inputField.title=DateTime String
inputField.description=The event property that contains the timestamp ISO 8601 formatted strings, e.g., '2023-11-29T18:30:22'

inputTimeZone.title=Time Zone
inputTimeZone.description=The time zone for which the string applies (if the string already contains the time zone information, the user time zone will be ignored)

Loading
Loading