Skip to content

Commit

Permalink
Drop deprecated AWS SnsIO.writeAsync (closes apache#26710)
Browse files Browse the repository at this point in the history
  • Loading branch information
Moritz Mack committed May 16, 2023
1 parent 96989dc commit 65142af
Show file tree
Hide file tree
Showing 12 changed files with 2 additions and 819 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@
* Passing a tag into MultiProcessShared is now required in the Python SDK ([#26168](https://github.com/apache/beam/issues/26168)).
* CloudDebuggerOptions is removed (deprecated in Beam v2.47.0) for Dataflow runner as the Google Cloud Debugger service is [shutting down](https://cloud.google.com/debugger/docs/deprecations). (Java) ([#25959](https://github.com/apache/beam/issues/25959)).
* AWS 2 client providers (deprecated in Beam [v2.38.0](#2380---2022-04-20)) are finally removed ([#26681](https://github.com/apache/beam/issues/26681)).

* AWS 2 SnsIO.writeAsync (deprecated in Beam v2.37.0 due to risk of data loss) was finally removed ([#26710](https://github.com/apache/beam/issues/26710)).
*
## Deprecations

* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)).
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
package org.apache.beam.sdk.io.aws2.sns;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.value.AutoValue;
import java.net.URI;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory;
Expand All @@ -45,7 +42,6 @@
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsAsyncClient;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.InvalidParameterException;
import software.amazon.awssdk.services.sns.model.NotFoundException;
Expand Down Expand Up @@ -114,16 +110,6 @@ public static <T> Write<T> write() {
.build();
}

/**
* @deprecated Please use {@link SnsIO#write()} to avoid the risk of data loss.
* @see <a href="https://github.com/apache/beam/issues/21366">Issue #21366</a>, <a
* href="https://issues.apache.org/jira/browse/BEAM-13203">BEAM-13203</a>
*/
@Deprecated
public static <T> WriteAsync<T> writeAsync() {
return new AutoValue_SnsIO_WriteAsync.Builder<T>().build();
}

/** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write<T>
Expand Down Expand Up @@ -305,152 +291,4 @@ public void tearDown() {
}
}
}

/**
* Implementation of {@link #writeAsync}.
*
* @deprecated Please use {@link SnsIO#write()} to avoid the risk of data loss.
* @see <a href="https://github.com/apache/beam/issues/21366">Issue #21366</a>, <a
* href="https://issues.apache.org/jira/browse/BEAM-13203">BEAM-13203</a>
*/
@Deprecated
@AutoValue
public abstract static class WriteAsync<T>
extends PTransform<PCollection<T>, PCollection<SnsResponse<T>>> {

abstract @Nullable SnsAsyncClientProvider getSnsClientProvider();

/** SerializableFunction to create PublishRequest. */
abstract @Nullable SerializableFunction<T, PublishRequest> getPublishRequestFn();

/** Coder for element T. */
abstract @Nullable Coder<T> getCoder();

abstract Builder<T> builder();

@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setSnsClientProvider(SnsAsyncClientProvider asyncClientProvider);

abstract Builder<T> setCoder(Coder<T> elementCoder);

abstract Builder<T> setPublishRequestFn(
SerializableFunction<T, PublishRequest> publishRequestFn);

abstract WriteAsync<T> build();
}

/**
* Specify a Coder for SNS PublishRequest object.
*
* @param elementCoder Coder
*/
public WriteAsync<T> withCoder(Coder<T> elementCoder) {
checkNotNull(elementCoder, "elementCoder cannot be null");
return builder().setCoder(elementCoder).build();
}

/**
* Specify a function for converting a message into PublishRequest object.
*
* @param publishRequestFn publishRequestFn
*/
public WriteAsync<T> withPublishRequestFn(
SerializableFunction<T, PublishRequest> publishRequestFn) {
checkNotNull(publishRequestFn, "publishRequestFn cannot be null");
return builder().setPublishRequestFn(publishRequestFn).build();
}

/**
* Allows to specify custom {@link SnsAsyncClientProvider}. {@link SnsAsyncClientProvider}
* creates new {@link SnsAsyncClientProvider} which is later used for writing to a SNS topic.
*/
public WriteAsync<T> withSnsClientProvider(SnsAsyncClientProvider asyncClientProvider) {
checkNotNull(asyncClientProvider, "asyncClientProvider cannot be null");
return builder().setSnsClientProvider(asyncClientProvider).build();
}

/**
* Specify credential details and region to be used to write to SNS. If you need more
* sophisticated credential protocol, then you should look at {@link
* WriteAsync#withSnsClientProvider(SnsAsyncClientProvider)}.
*/
public WriteAsync<T> withSnsClientProvider(
AwsCredentialsProvider credentialsProvider, String region) {
checkNotNull(credentialsProvider, "credentialsProvider cannot be null");
checkNotNull(region, "region cannot be null");
return withSnsClientProvider(credentialsProvider, region, null);
}

/**
* Specify credential details and region to be used to write to SNS. If you need more
* sophisticated credential protocol, then you should look at {@link
* WriteAsync#withSnsClientProvider(SnsAsyncClientProvider)}.
*
* <p>The {@code serviceEndpoint} sets an alternative service host.
*/
public WriteAsync<T> withSnsClientProvider(
AwsCredentialsProvider credentialsProvider, String region, URI serviceEndpoint) {
checkNotNull(credentialsProvider, "credentialsProvider cannot be null");
checkNotNull(region, "region cannot be null");
return withSnsClientProvider(
new BasicSnsAsyncClientProvider(credentialsProvider, region, serviceEndpoint));
}

@Override
public PCollection<SnsResponse<T>> expand(PCollection<T> input) {
checkArgument(getSnsClientProvider() != null, "withSnsClientProvider() needs to called");
checkArgument(getPublishRequestFn() != null, "withPublishRequestFn() needs to called");
checkArgument(getCoder() != null, "withElementCoder() needs to called");

return input
.apply(ParDo.of(new SnsWriteAsyncFn<>(this)))
.setCoder(SnsResponseCoder.of(getCoder()));
}

private static class SnsWriteAsyncFn<T> extends DoFn<T, SnsResponse<T>> {

private static final Logger LOG = LoggerFactory.getLogger(SnsWriteAsyncFn.class);

private final WriteAsync<T> spec;
private transient SnsAsyncClient client;

SnsWriteAsyncFn(WriteAsync<T> spec) {
this.spec = spec;
}

@Setup
public void setup() {
this.client = spec.getSnsClientProvider().getSnsAsyncClient();
}

@SuppressWarnings("FutureReturnValueIgnored")
@ProcessElement
public void processElement(ProcessContext context) {
PublishRequest publishRequest = spec.getPublishRequestFn().apply(context.element());
client.publish(publishRequest).whenComplete(getPublishResponse(context));
}

private BiConsumer<? super PublishResponse, ? super Throwable> getPublishResponse(
DoFn<T, SnsResponse<T>>.ProcessContext context) {
return (response, ex) -> {
if (ex == null) {
SnsResponse<T> snsResponse = SnsResponse.of(context.element(), response);
context.output(snsResponse);
} else {
LOG.error("Error while publishing request to SNS", ex);
throw new SnsWriteException("Error while publishing request to SNS", ex);
}
};
}
}
}

/** Exception class for SNS write exceptions. */
protected static class SnsWriteException extends RuntimeException {

SnsWriteException(String message, Throwable error) {
super(message, error);
}
}
}

This file was deleted.

Loading

0 comments on commit 65142af

Please sign in to comment.