From 47c4c1c3d8138b5891472e099fed5d71d932c393 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Wed, 25 May 2016 20:07:38 +0200 Subject: [PATCH] Implement modifyAckDeadline methods, add javadoc and tests (#1022) --- .../java/com/google/cloud/pubsub/PubSub.java | 64 +++++++++++++ .../com/google/cloud/pubsub/PubSubImpl.java | 16 +++- .../cloud/pubsub/spi/DefaultPubSubRpc.java | 4 +- .../pubsub/testing/LocalPubsubHelper.java | 8 +- .../google/cloud/pubsub/LocalSystemTest.java | 2 +- .../google/cloud/pubsub/PubSubImplTest.java | 96 +++++++++++++++++++ 6 files changed, 178 insertions(+), 12 deletions(-) diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index b98315e77a75..e6b496be8e5f 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -403,14 +403,78 @@ interface MessageConsumer extends AutoCloseable { Future nackAsync(String subscription, Iterable ackIds); + /** + * Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is + * the new deadline with respect to the time the modify request was received by the Pub/Sub + * service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS}, + * the new ack deadline will expire 10 seconds after the modify request was received by the + * service. Specifying 0 may be used to make the message available for another pull request + * (corresponds to calling {@link #nack(String, String, String...)}). + * + * @param subscription the subscription whose messages need to update their acknowledge deadline + * @param deadline the new deadline, relative to the time the modify request is received by the + * Pub/Sub service + * @param unit time unit for the {@code deadline} parameter + * @param ackId the ack id of the first message for which the acknowledge deadline must be + * modified + * @param ackIds other ack ids of messages for which the acknowledge deadline must be modified + * @throws PubSubException upon failure, or if the subscription was not found + */ void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String ackId, String... ackIds); + /** + * Sends a request to modify the acknowledge deadline of the given messages. {@code deadline} + * must be >= 0 and is the new deadline with respect to the time the modify request was received + * by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is + * {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request + * was received by the service. Specifying 0 may be used to make the message available for another + * pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns + * a {@code Future} object that can be used to wait for the modify operation to be completed. + * + * @param subscription the subscription whose messages need to update their acknowledge deadline + * @param deadline the new deadline, relative to the time the modify request is received by the + * Pub/Sub service + * @param unit time unit for the {@code deadline} parameter + * @param ackId the ack id of the first message for which the acknowledge deadline must be + * modified + * @param ackIds other ack ids of messages for which the acknowledge deadline must be modified + */ Future modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit, String ackId, String... ackIds); + /** + * Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is + * the new deadline with respect to the time the modify request was received by the Pub/Sub + * service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS}, + * the new ack deadline will expire 10 seconds after the modify request was received by the + * service. Specifying 0 may be used to make the message available for another pull request + * (corresponds to calling {@link #nack(String, Iterable)}). + * + * @param subscription the subscription whose messages need to update their acknowledge deadline + * @param deadline the new deadline, relative to the time the modify request is received by the + * Pub/Sub service + * @param unit time unit for the {@code deadline} parameter + * @param ackIds the ack ids of messages for which the acknowledge deadline must be modified + * @throws PubSubException upon failure, or if the subscription was not found + */ void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, Iterable ackIds); + /** + * Sends a request to modify the acknowledge deadline of the given messages. {@code deadline} + * must be >= 0 and is the new deadline with respect to the time the modify request was received + * by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is + * {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request + * was received by the service. Specifying 0 may be used to make the message available for another + * pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns + * a {@code Future} object that can be used to wait for the modify operation to be completed. + * + * @param subscription the subscription whose messages need to update their acknowledge deadline + * @param deadline the new deadline, relative to the time the modify request is received by the + * Pub/Sub service + * @param unit time unit for the {@code deadline} parameter + * @param ackIds the ack ids of messages for which the acknowledge deadline must be modified + */ Future modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit, Iterable ackIds); diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 948f9904b31c..ec1ca920d91d 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -16,9 +16,9 @@ package com.google.cloud.pubsub; -import static com.google.api.client.util.Preconditions.checkArgument; import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE; import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.lazyTransform; import com.google.cloud.AsyncPage; @@ -47,6 +47,7 @@ import com.google.pubsub.v1.ListTopicSubscriptionsResponse; import com.google.pubsub.v1.ListTopicsRequest; import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; import com.google.pubsub.v1.ModifyPushConfigRequest; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; @@ -504,25 +505,30 @@ public Future nackAsync(String subscription, Iterable ackIds) { @Override public void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String ackId, String... ackIds) { - + get(modifyAckDeadlineAsync(subscription, deadline, unit, Lists.asList(ackId, ackIds))); } @Override public Future modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit, String ackId, String... ackIds) { - return null; + return modifyAckDeadlineAsync(subscription, deadline, unit, Lists.asList(ackId, ackIds)); } @Override public void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, Iterable ackIds) { - + get(modifyAckDeadlineAsync(subscription, deadline, unit, ackIds)); } @Override public Future modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit, Iterable ackIds) { - return null; + ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() + .setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription)) + .setAckDeadlineSeconds((int) TimeUnit.SECONDS.convert(deadline, unit)) + .addAllAckIds(ackIds) + .build(); + return lazyTransform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION); } static Map optionMap(Option... options) { diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java index 1745b03cf7d7..7df6cb2632f5 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -54,13 +54,13 @@ import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; -import org.joda.time.Duration; - import io.grpc.ManagedChannel; import io.grpc.Status.Code; import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; +import org.joda.time.Duration; + import java.io.IOException; import java.util.Set; import java.util.concurrent.Future; diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubsubHelper.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubsubHelper.java index 5a6cf6574211..4ec128709e62 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubsubHelper.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubsubHelper.java @@ -23,6 +23,10 @@ import com.google.cloud.RetryParams; import com.google.cloud.pubsub.PubSubOptions; +import io.grpc.ManagedChannel; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; + import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -31,10 +35,6 @@ import java.util.List; import java.util.UUID; -import io.grpc.ManagedChannel; -import io.grpc.netty.NegotiationType; -import io.grpc.netty.NettyChannelBuilder; - /** * A class that runs a Pubsub emulator instance for use in tests. */ diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/LocalSystemTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/LocalSystemTest.java index 618a2b9bd926..ed1bef26c7cb 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/LocalSystemTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/LocalSystemTest.java @@ -47,7 +47,7 @@ public static void startServer() throws IOException, InterruptedException { @AfterClass public static void stopServer() throws Exception { - pubsub.options().rpc().close(); + pubsub.close(); pubsubHelper.reset(); pubsubHelper.stop(); } diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java index f9817d72b6b0..3c5b811307f5 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -47,6 +47,7 @@ import com.google.pubsub.v1.ListTopicSubscriptionsResponse; import com.google.pubsub.v1.ListTopicsRequest; import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; import com.google.pubsub.v1.ModifyPushConfigRequest; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; @@ -61,6 +62,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class PubSubImplTest { @@ -1187,6 +1189,100 @@ public void testListTopicSubscriptionsAsyncWithOptions() Iterables.toArray(page.values(), SubscriptionId.class)); } + @Test + public void testModifyAckDeadlineOneMessage() { + pubsub = options.service(); + ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() + .setAckDeadlineSeconds(10) + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAckIds("ackId") + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId"); + } + + @Test + public void testModifyAckDeadlineOneMessageAsync() + throws ExecutionException, InterruptedException { + pubsub = options.service(); + ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() + .setAckDeadlineSeconds(10) + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAckIds("ackId") + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + Future future = + pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId"); + assertNull(future.get()); + } + + @Test + public void testModifyAckDeadlineMoreMessages() { + pubsub = options.service(); + ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() + .setAckDeadlineSeconds(10) + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAllAckIds(ImmutableList.of("ackId1", "ackId2")) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId1", "ackId2"); + } + + @Test + public void testModifyAckDeadlineMoreMessagesAsync() + throws ExecutionException, InterruptedException { + pubsub = options.service(); + ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() + .setAckDeadlineSeconds(10) + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAllAckIds(ImmutableList.of("ackId1", "ackId2")) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + Future future = + pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId1", "ackId2"); + assertNull(future.get()); + } + + @Test + public void testModifyAckDeadlineMessageList() { + pubsub = options.service(); + List ackIds = ImmutableList.of("ackId1", "ackId2"); + ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() + .setAckDeadlineSeconds(10) + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAllAckIds(ackIds) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, ackIds); + } + + @Test + public void testModifyAckDeadlineMessageListAsync() + throws ExecutionException, InterruptedException { + pubsub = options.service(); + List ackIds = ImmutableList.of("ackId1", "ackId2"); + ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() + .setAckDeadlineSeconds(10) + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAllAckIds(ackIds) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + Future future = pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, ackIds); + assertNull(future.get()); + } + @Test public void testClose() throws Exception { pubsub = options.service();