Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrates to zipkin-reporter 3.2 BytesMessageSender #214

Merged
merged 3 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>zipkin-gcp-parent</artifactId>
<groupId>io.zipkin.gcp</groupId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>

<artifactId>benchmarks</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion collector-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>zipkin-gcp-parent</artifactId>
<groupId>io.zipkin.gcp</groupId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion encoder-stackdriver-brave/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>zipkin-gcp-parent</artifactId>
<groupId>io.zipkin.gcp</groupId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion encoder-stackdriver-zipkin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>zipkin-gcp-parent</artifactId>
<groupId>io.zipkin.gcp</groupId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>io.zipkin.gcp</groupId>
<artifactId>zipkin-gcp-parent</artifactId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>

<artifactId>zipkin-module-gcp</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

<groupId>io.zipkin.gcp</groupId>
<artifactId>zipkin-gcp-parent</artifactId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
<packaging>pom</packaging>

<modules>
Expand Down Expand Up @@ -75,7 +75,7 @@
<zipkin.groupId>io.zipkin.zipkin2</zipkin.groupId>
<!-- when updating, update docker/Dockerfile and storage/src/test/java/zipkin2/storage/kafka/IT* -->
<zipkin.version>3.0.2</zipkin.version>
<zipkin-reporter.version>3.1.1</zipkin-reporter.version>
<zipkin-reporter.version>3.2.1</zipkin-reporter.version>
<spring-boot.version>3.2.1</spring-boot.version>
<!-- armeria.groupId allows you to test feature branches with jitpack -->
<armeria.groupId>com.linecorp.armeria</armeria.groupId>
Expand Down
2 changes: 1 addition & 1 deletion propagation-stackdriver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>zipkin-gcp-parent</artifactId>
<groupId>io.zipkin.gcp</groupId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion sender-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>zipkin-gcp-parent</artifactId>
<groupId>io.zipkin.gcp</groupId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
148 changes: 28 additions & 120 deletions sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,22 @@
*/
package zipkin2.reporter.pubsub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.Call;
import zipkin2.reporter.Callback;
import zipkin2.reporter.CheckResult;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.Sender;

public class PubSubSender extends Sender {
public class PubSubSender extends BytesMessageSender.Base {

public static PubSubSender create(String topic) {
return newBuilder().topic(topic).build();
Expand Down Expand Up @@ -109,7 +101,6 @@ public PubSubSender build() {
if (topic == null) throw new NullPointerException("topic == null");

if (executorProvider == null) executorProvider = defaultExecutorProvider();
;

if (publisher == null) {
try {
Expand Down Expand Up @@ -146,69 +137,50 @@ public Builder toBuilder() {

final String topic;
final int messageMaxBytes;
final Encoding encoding;
final Publisher publisher;
final ExecutorProvider executorProvider;
final TopicAdminClient topicAdminClient;

volatile boolean closeCalled;

PubSubSender(Builder builder) {
this.topic = builder.topic;
this.messageMaxBytes = builder.messageMaxBytes;
this.encoding = builder.encoding;
this.publisher = builder.publisher;
this.executorProvider = builder.executorProvider;
this.topicAdminClient = builder.topicAdminClient;
super(builder.encoding);
topic = builder.topic;
messageMaxBytes = builder.messageMaxBytes;
publisher = builder.publisher;
executorProvider = builder.executorProvider;
topicAdminClient = builder.topicAdminClient;
}

/**
* If no permissions given sent back ok, f permissions and topic exist ok, if topic does not exist error
*
* @return
*/
@Override
public CheckResult check() {
try {
Topic topic = topicAdminClient.getTopic(TopicName.parse(this.topic));
return CheckResult.OK;
} catch (ApiException e) {
return CheckResult.failed(e);
}
}

@Override public Encoding encoding() {
return encoding;
}

@Override
public int messageMaxBytes() {
@Override public int messageMaxBytes() {
return messageMaxBytes;
}

@Override
public int messageSizeInBytes(List<byte[]> bytes) {
return encoding().listSizeInBytes(bytes);
}

@Override
public Call<Void> sendSpans(List<byte[]> byteList) {
if (closeCalled) throw new IllegalStateException("closed");
@Override public void send(List<byte[]> byteList) throws IOException {
if (closeCalled) throw new ClosedSenderException();

byte[] messageBytes = BytesMessageEncoder.forEncoding(encoding()).encode(byteList);
PubsubMessage pubsubMessage =
PubsubMessage message =
PubsubMessage.newBuilder().setData(ByteString.copyFrom(messageBytes)).build();

return new PubSubCall(pubsubMessage);
try {
publisher.publish(message).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException(e.getMessage());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) throw (RuntimeException) cause;
if (cause instanceof Error) throw (Error) cause;
throw new RuntimeException(cause);
}
}

/**
* Shutdown on Publisher is not async thus moving the synchronized block to another function in order not to block until the shutdown is over
*
* @throws IOException
* Shutdown on Publisher is not async thus moving the synchronized block to another function in
* order not to block until the shutdown is over.
*/
@Override
public void close() throws IOException {
@Override public void close() {
if (!setClosed()) {
return;
}
Expand All @@ -227,68 +199,4 @@ private synchronized boolean setClosed() {
@Override public final String toString() {
return "PubSubSender{topic=" + topic + "}";
}

class PubSubCall extends Call.Base<Void> {
private final PubsubMessage message;
volatile ApiFuture<String> future;

public PubSubCall(PubsubMessage message) {
this.message = message;
}

@Override
protected Void doExecute() throws IOException {
try {
publisher.publish(message).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return null;
}

@Override
protected void doEnqueue(Callback<Void> callback) {
future = publisher.publish(message);
ApiFutures.addCallback(future, new ApiFutureCallbackAdapter(callback),
executorProvider.getExecutor());
if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans");
}

@Override
protected void doCancel() {
Future<String> maybeFuture = future;
if (maybeFuture != null) maybeFuture.cancel(true);
}

@Override
protected boolean doIsCanceled() {
Future<String> maybeFuture = future;
return maybeFuture != null && maybeFuture.isCancelled();
}

@Override
public Call<Void> clone() {
PubsubMessage clone = PubsubMessage.newBuilder(message).build();
return new PubSubCall(clone);
}
}

static final class ApiFutureCallbackAdapter implements ApiFutureCallback<String> {

final Callback<Void> callback;

public ApiFutureCallbackAdapter(Callback<Void> callback) {
this.callback = callback;
}

@Override
public void onFailure(Throwable t) {
callback.onError(t);
}

@Override
public void onSuccess(String result) {
callback.onSuccess(null);
}
}
}
Loading