Skip to content

Commit

Permalink
Merge pull request #21 from synadia-io/jetstream-options
Browse files Browse the repository at this point in the history
JetStreamOption support
  • Loading branch information
scottf authored Jan 9, 2025
2 parents 37c74b8 + 0d56866 commit 5afe11c
Show file tree
Hide file tree
Showing 47 changed files with 481 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import io.synadia.flink.examples.support.ExampleConnectionListener;
import io.synadia.flink.examples.support.ExampleErrorListener;
import io.synadia.flink.examples.support.Publisher;
import io.synadia.flink.utils.PropertiesUtils;
import io.synadia.flink.v0.sink.NatsSink;
import io.synadia.flink.v0.sink.NatsSinkBuilder;
import io.synadia.flink.v0.source.NatsSource;
import io.synadia.flink.v0.source.NatsSourceBuilder;
import io.synadia.flink.v0.utils.PropertiesUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.StreamConfiguration;
import io.synadia.flink.utils.PropertiesUtils;
import io.synadia.flink.v0.payload.StringPayloadDeserializer;
import io.synadia.flink.v0.payload.StringPayloadSerializer;
import io.synadia.flink.v0.sink.NatsSink;
import io.synadia.flink.v0.sink.NatsSinkBuilder;
import io.synadia.flink.v0.source.NatsJetStreamSource;
import io.synadia.flink.v0.source.NatsJetStreamSourceBuilder;
import io.synadia.flink.v0.utils.PropertiesUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) 2023-2024 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.v0.emitter;

import io.nats.client.Message;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// Copyright (c) 2023-2024 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.v0.enumerator;
Expand All @@ -13,7 +13,7 @@
import java.util.List;
import java.util.Queue;

import static io.synadia.flink.utils.MiscUtils.generatePrefixedId;
import static io.synadia.flink.v0.utils.MiscUtils.generatePrefixedId;
import static org.apache.flink.util.Preconditions.checkNotNull;

public class NatsSourceEnumerator implements SplitEnumerator<NatsSubjectSplit, Collection<NatsSubjectSplit>> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// Copyright (c) 2023-2024 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.v0.enumerator;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// Copyright (c) 2023-2024 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.v0.enumerator;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// Copyright (c) 2023-2024 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.v0.payload;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// Copyright (c) 2023-2024 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.v0.payload;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// Copyright (c) 2023-2024 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.v0.payload;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// Copyright (c) 2023-2024 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.v0.payload;
Expand Down
44 changes: 44 additions & 0 deletions src/main/java/io/synadia/flink/v0/sink/NatsJetStreamSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2024 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.v0.sink;

import io.synadia.flink.v0.payload.PayloadSerializer;
import io.synadia.flink.v0.sink.writer.NatsJetStreamSinkWriter;
import io.synadia.flink.v0.utils.ConnectionFactory;
import org.apache.flink.api.connector.sink2.SinkWriter;

import java.io.IOException;
import java.util.List;

/**
* Flink Sink to publish data to one or more NATS subjects
* @param <InputT> the type of object from the source to convert for publishing
*/
public class NatsJetStreamSink<InputT> extends NatsSink<InputT> {

NatsJetStreamSink(List<String> subjects,
PayloadSerializer<InputT> payloadSerializer,
ConnectionFactory connectionFactory)
{
super(subjects, payloadSerializer, connectionFactory);
}

/**
* {@inheritDoc}
*/
@Override
public SinkWriter<InputT> createWriter(InitContext context) throws IOException {
return new NatsJetStreamSinkWriter<>(id, subjects, payloadSerializer, connectionFactory, context);
}

@Override
public String toString() {
return "NatsJetStreamSink{" +
"id='" + id + '\'' +
", subjects=" + subjects +
", payloadSerializer=" + payloadSerializer.getClass().getCanonicalName() +
", connectionFactory=" + connectionFactory +
'}';
}
}
108 changes: 108 additions & 0 deletions src/main/java/io/synadia/flink/v0/sink/NatsJetStreamSinkBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) 2024 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.v0.sink;

import io.synadia.flink.v0.payload.PayloadSerializer;
import io.synadia.flink.v0.source.NatsSinkOrSourceBuilder;
import io.synadia.flink.v0.utils.Constants;
import io.synadia.flink.v0.utils.PropertiesUtils;

import java.util.Properties;

import static io.synadia.flink.v0.utils.Constants.PAYLOAD_SERIALIZER;
import static io.synadia.flink.v0.utils.Constants.SINK_PREFIX;

/**
* Builder to construct {@link NatsJetStreamSink}.
*
* <p>The following example shows the minimum setup to create a NatsSink that writes String values
* to one or more NATS subjects.
*
* <pre>{@code
* NatsJetStreamSink<String> sink = NatsJetStreamSink
* .<String>builder
* .subjects("subject1", "subject2")
* .connectionPropertiesFile("/path/to/jnats_client_connection.properties")
* .build();
* }</pre>
*
* @see NatsSink
* @param <InputT> type of the records written
*/
public class NatsJetStreamSinkBuilder<InputT> extends NatsSinkOrSourceBuilder<NatsJetStreamSinkBuilder<InputT>> {
private PayloadSerializer<InputT> payloadSerializer;
private String payloadSerializerClass;

@Override
protected NatsJetStreamSinkBuilder<InputT> getThis() {
return this;
}

public NatsJetStreamSinkBuilder() {
super(SINK_PREFIX);
}

/**
* Set the payload serializer for the sink.
* @param payloadSerializer the serializer.
* @return the builder
*/
public NatsJetStreamSinkBuilder<InputT> payloadSerializer(PayloadSerializer<InputT> payloadSerializer) {
this.payloadSerializer = payloadSerializer;
this.payloadSerializerClass = null;
return this;
}

/**
* Set the fully qualified name of the desired class payload serializer for the sink.
* @param payloadSerializerClass the serializer class name.
* @return the builder
*/
public NatsJetStreamSinkBuilder<InputT> payloadSerializerClass(String payloadSerializerClass) {
this.payloadSerializer = null;
this.payloadSerializerClass = payloadSerializerClass;
return this;
}

/**
* Set sink properties from a properties object
* See the readme and {@link Constants} for property keys
* @param properties the properties object
* @return the builder
*/
public NatsJetStreamSinkBuilder<InputT> sinkProperties(Properties properties) {
baseProperties(properties);

String s = PropertiesUtils.getStringProperty(properties, PAYLOAD_SERIALIZER, prefixes);
if (s != null) {
payloadSerializerClass(s);
}

return this;
}

/**
* Build a NatsSink. Subject and
* @return the sink
*/
public NatsJetStreamSink<InputT> build() {
if (payloadSerializer == null) {
if (payloadSerializerClass == null) {
throw new IllegalStateException("Valid payload serializer class must be provided.");
}

// so much can go wrong here... ClassNotFoundException, ClassCastException
try {
//noinspection unchecked
payloadSerializer = (PayloadSerializer<InputT>) Class.forName(payloadSerializerClass).getDeclaredConstructor().newInstance();
}
catch (Exception e) {
throw new IllegalStateException("Valid payload serializer class must be provided.", e);
}
}

baseBuild();
return new NatsJetStreamSink<>(subjects, payloadSerializer, createConnectionFactory());
}
}
14 changes: 7 additions & 7 deletions src/main/java/io/synadia/flink/v0/sink/NatsSink.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// Copyright (c) 2023-2024 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.v0.sink;

import io.nats.client.NUID;
import io.synadia.flink.utils.ConnectionFactory;
import io.synadia.flink.v0.payload.PayloadSerializer;
import io.synadia.flink.v0.sink.writer.NatsSinkWriter;
import io.synadia.flink.v0.utils.ConnectionFactory;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

Expand All @@ -18,12 +18,12 @@
* @param <InputT> the type of object from the source to convert for publishing
*/
public class NatsSink<InputT> implements Sink<InputT> {
private final String id;
private final List<String> subjects;
private final PayloadSerializer<InputT> payloadSerializer;
private final ConnectionFactory connectionFactory;
protected final String id;
protected final List<String> subjects;
protected final PayloadSerializer<InputT> payloadSerializer;
protected final ConnectionFactory connectionFactory;

NatsSink(List<String> subjects,
protected NatsSink(List<String> subjects,
PayloadSerializer<InputT> payloadSerializer,
ConnectionFactory connectionFactory)
{
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/synadia/flink/v0/sink/NatsSinkBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@

package io.synadia.flink.v0.sink;

import io.synadia.flink.utils.Constants;
import io.synadia.flink.utils.PropertiesUtils;
import io.synadia.flink.v0.payload.PayloadSerializer;
import io.synadia.flink.v0.source.NatsSinkOrSourceBuilder;
import io.synadia.flink.v0.utils.Constants;
import io.synadia.flink.v0.utils.PropertiesUtils;

import java.util.Properties;

import static io.synadia.flink.utils.Constants.PAYLOAD_SERIALIZER;
import static io.synadia.flink.utils.Constants.SINK_PREFIX;
import static io.synadia.flink.v0.utils.Constants.PAYLOAD_SERIALIZER;
import static io.synadia.flink.v0.utils.Constants.SINK_PREFIX;

/**
* Builder to construct {@link NatsSink}.
Expand All @@ -28,9 +28,9 @@
* }</pre>
*
* @see NatsSink
* @param <InputT> type of the records written to Kafka
* @param <InputT> type of the records written
*/
public class NatsSinkBuilder<InputT> extends NatsSinkOrSourceBuilder<InputT, NatsSinkBuilder<InputT>> {
public class NatsSinkBuilder<InputT> extends NatsSinkOrSourceBuilder<NatsSinkBuilder<InputT>> {
private PayloadSerializer<InputT> payloadSerializer;
private String payloadSerializerClass;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2024 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.v0.sink.writer;

import io.nats.client.JetStreamApiException;
import io.synadia.flink.v0.payload.PayloadSerializer;
import io.synadia.flink.v0.utils.ConnectionFactory;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.util.FlinkRuntimeException;

import java.io.IOException;
import java.util.List;

/**
* This class is responsible to publish to one or more JetStream subjects
* @param <InputT> The type of the input elements.
*/
public class NatsJetStreamSinkWriter<InputT> extends NatsSinkWriter<InputT> {

public NatsJetStreamSinkWriter(String sinkId,
List<String> subjects,
PayloadSerializer<InputT> payloadSerializer,
ConnectionFactory connectionFactory,
Sink.InitContext sinkInitContext) throws IOException
{
super(sinkId, subjects, payloadSerializer, connectionFactory, sinkInitContext);
}

@Override
public void write(InputT element, Context context) throws IOException, InterruptedException {
byte[] payload = payloadSerializer.getBytes(element);
for (String subject : subjects) {
try {
ctx.js.publish(subject, null, payload);
}
catch (JetStreamApiException e) {
throw new FlinkRuntimeException(e);
}
}
}

@Override
public String toString() {
return "NatsJetStreamSinkWriter{" +
"id='" + id + '\'' +
", subjects=" + subjects +
'}';
}
}
Loading

0 comments on commit 5afe11c

Please sign in to comment.