Skip to content

Commit

Permalink
#1 Initial SDK implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gunnarmorling committed Jun 13, 2023
1 parent 5c7f283 commit 5089dfd
Show file tree
Hide file tree
Showing 46 changed files with 2,031 additions and 1 deletion.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@ bin

# VSCode
.vscode

# Apache Maven
dependency-reduced-pom.xml
target

# Misc.
.DS_Store
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Decodable Java SDK

This repository contains a software development kit (SDK) for implementing Apache Flink jobs and running them on Decodable.
It integrates Flink with the Decodable environment, for instance providing easy access to Decodable streams.

## Structure

* _sdk_: The Decodable SDK
* _examples_: Examples for using the SDK

## Installation

tbd.

## Usage

tbd.

## Building the SDK

Gradle is used for building the SDK.

Run the following to produce the SDK binary:

```bash
./gradlew build
```

Run the following to install the SDK JAR into your local Maven repository:

```bash
./gradlew publishToMavenLocal
```

## License

This code base is available under the Apache License, version 2.
168 changes: 168 additions & 0 deletions examples/apache-maven/custom-pipelines-hello-world/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
SPDX-License-Identifier: Apache-2.0
Copyright Decodable, Inc.
Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>co.decodable.examples</groupId>
<artifactId>custom-pipelines-hello-world</artifactId>
<version>0.1</version>
<packaging>jar</packaging>

<name>Decodable SDK Example</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.4</flink.version>
<target.java.version>11</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.1</log4j.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>co.decodable</groupId>
<artifactId>decodable-sdk-java</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>


<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.9.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>redpanda</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>co.decodable.examples.cpdemo.DataStreamJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.1.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright Decodable, Inc.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package co.decodable.examples.cpdemo;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import co.decodable.sdk.DecodableStreamSink;
import co.decodable.sdk.DecodableStreamSource;

public class DataStreamJob {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DecodableStreamSource<String> source =
DecodableStreamSource.builder()
.withStreamName("purchase-orders")
.build();

DecodableStreamSink<String> sink =
DecodableStreamSink.builder()
.withStreamName("purchase-orders-processed")
.build();

DataStream<String> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Purchase Orders Source")
.map(new NameConverter());

stream.sinkTo(sink);

env.execute("Purchase Order Processor");
}

public static class NameConverter extends RichMapFunction<String, String> {

private static final long serialVersionUID = 1L;

private transient ObjectMapper mapper;

@Override
public void open(Configuration parameters) throws Exception {
mapper = new ObjectMapper();
}

@Override
public String map(String value) throws Exception {
ObjectNode purchaseOrder = (ObjectNode) mapper.readTree(value);
purchaseOrder.put("customer_name", purchaseOrder.get("customer_name").asText().toUpperCase());
return mapper.writeValueAsString(purchaseOrder);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
################################################################################
# SPDX-License-Identifier: Apache-2.0
#
# Copyright Decodable, Inc.
#
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
################################################################################

rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender

appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Loading

0 comments on commit 5089dfd

Please sign in to comment.