diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 59ca6721ef36..480a2d202c61 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -446,7 +446,7 @@ class BeamModulePlugin implements Plugin { def errorprone_version = "2.3.4" def google_clients_version = "1.31.0" def google_cloud_bigdataoss_version = "2.2.2" - def google_cloud_pubsublite_version = "0.13.2" + def google_cloud_pubsublite_version = "1.0.4" def google_code_gson_version = "2.8.6" def google_oauth_clients_version = "1.31.0" // Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java index b0cc68193184..bf6a28863b01 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java @@ -24,25 +24,36 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; /** Common util functions for converting between PubsubMessage proto and {@link PubsubMessage}. */ -public class PubsubMessages { +public final class PubsubMessages { + private PubsubMessages() {} + + public static com.google.pubsub.v1.PubsubMessage toProto(PubsubMessage input) { + Map attributes = input.getAttributeMap(); + com.google.pubsub.v1.PubsubMessage.Builder message = + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(input.getPayload())); + // TODO(BEAM-8085) this should not be null + if (attributes != null) { + message.putAllAttributes(attributes); + } + String messageId = input.getMessageId(); + if (messageId != null) { + message.setMessageId(messageId); + } + return message.build(); + } + + public static PubsubMessage fromProto(com.google.pubsub.v1.PubsubMessage input) { + return new PubsubMessage( + input.getData().toByteArray(), input.getAttributesMap(), input.getMessageId()); + } + // Convert the PubsubMessage to a PubsubMessage proto, then return its serialized representation. public static class ParsePayloadAsPubsubMessageProto implements SerializableFunction { @Override public byte[] apply(PubsubMessage input) { - Map attributes = input.getAttributeMap(); - com.google.pubsub.v1.PubsubMessage.Builder message = - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(input.getPayload())); - // TODO(BEAM-8085) this should not be null - if (attributes != null) { - message.putAllAttributes(attributes); - } - String messageId = input.getMessageId(); - if (messageId != null) { - message.setMessageId(messageId); - } - return message.build().toByteArray(); + return toProto(input).toByteArray(); } } @@ -54,8 +65,7 @@ public PubsubMessage apply(byte[] input) { try { com.google.pubsub.v1.PubsubMessage message = com.google.pubsub.v1.PubsubMessage.parseFrom(input); - return new PubsubMessage( - message.getData().toByteArray(), message.getAttributesMap(), message.getMessageId()); + return fromProto(message); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("Could not decode Pubsub message", e); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java deleted file mode 100644 index 6dc15166666a..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsublite; - -import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer; - -import com.google.cloud.pubsublite.Message; -import com.google.cloud.pubsublite.proto.PubSubMessage; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; - -/** - * A class providing a conversion validity check between Cloud Pub/Sub and Pub/Sub Lite message - * types. - */ -public final class CloudPubsubChecks { - private CloudPubsubChecks() {} - - /** - * Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the - * standard transformation methods in the client library. - * - *

Will fail the pipeline if a message has multiple attributes per key. - */ - public static PTransform, PCollection> - ensureUsableAsCloudPubsub() { - return MapElements.into(TypeDescriptor.of(PubSubMessage.class)) - .via( - message -> { - Object unused = toCpsPublishTransformer().transform(Message.fromProto(message)); - return message; - }); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java new file mode 100644 index 000000000000..1140c11c2767 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor; +import com.google.cloud.pubsublite.proto.PubSubMessage; +import com.google.cloud.pubsublite.proto.SequencedMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message types. */ +public final class CloudPubsubTransforms { + private CloudPubsubTransforms() {} + /** + * Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the + * standard transformation methods in the client library. + * + *

Will fail the pipeline if a message has multiple attributes per key. + */ + public static PTransform, PCollection> + ensureUsableAsCloudPubsub() { + return new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input.apply( + MapElements.into(TypeDescriptor.of(PubSubMessage.class)) + .via( + message -> { + Object unused = + toCpsPublishTransformer().transform(Message.fromProto(message)); + return message; + })); + } + }; + } + + /** + * Transform messages read from Pub/Sub Lite to their equivalent Cloud Pub/Sub Message that would + * have been read from PubsubIO. + * + *

Will fail the pipeline if a message has multiple attributes per map key. + */ + public static PTransform, PCollection> + toCloudPubsubMessages() { + return new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input.apply( + MapElements.into(TypeDescriptor.of(PubsubMessage.class)) + .via( + message -> + PubsubMessages.fromProto( + toCpsSubscribeTransformer() + .transform( + com.google.cloud.pubsublite.SequencedMessage.fromProto( + message))))); + } + }; + } + + /** + * Transform messages publishable using PubsubIO to their equivalent Pub/Sub Lite publishable + * message. + */ + public static PTransform, PCollection> + fromCloudPubsubMessages() { + return new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input.apply( + MapElements.into(TypeDescriptor.of(PubSubMessage.class)) + .via( + message -> + fromCpsPublishTransformer(KeyExtractor.DEFAULT) + .transform(PubsubMessages.toProto(message)) + .toProto())); + } + }; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java new file mode 100644 index 000000000000..de0cf433ff33 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import java.io.Serializable; + +/** + * A ManagedBacklogReaderFactory produces TopicBacklogReaders and tears down any produced readers + * when it is itself closed. + * + *

close() should never be called on produced readers. + */ +public interface ManagedBacklogReaderFactory extends AutoCloseable, Serializable { + TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition); + + @Override + void close(); +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java new file mode 100644 index 000000000000..9a337bfdb784 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.sdk.transforms.SerializableFunction; + +public class ManagedBacklogReaderFactoryImpl implements ManagedBacklogReaderFactory { + private final SerializableFunction newReader; + + @GuardedBy("this") + private final Map readers = new HashMap<>(); + + ManagedBacklogReaderFactoryImpl( + SerializableFunction newReader) { + this.newReader = newReader; + } + + private static final class NonCloseableTopicBacklogReader implements TopicBacklogReader { + private final TopicBacklogReader underlying; + + NonCloseableTopicBacklogReader(TopicBacklogReader underlying) { + this.underlying = underlying; + } + + @Override + public ComputeMessageStatsResponse computeMessageStats(Offset offset) throws ApiException { + return underlying.computeMessageStats(offset); + } + + @Override + public void close() { + throw new IllegalArgumentException( + "Cannot call close() on a reader returned from ManagedBacklogReaderFactory."); + } + } + + @Override + public synchronized TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) { + return new NonCloseableTopicBacklogReader( + readers.computeIfAbsent(subscriptionPartition, newReader::apply)); + } + + @Override + public synchronized void close() { + readers.values().forEach(TopicBacklogReader::close); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java new file mode 100644 index 000000000000..b39d87e6e1f0 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.io.range.OffsetRange; + +@AutoValue +@DefaultCoder(OffsetByteRangeCoder.class) +abstract class OffsetByteRange { + abstract OffsetRange getRange(); + + abstract long getByteCount(); + + static OffsetByteRange of(OffsetRange range, long byteCount) { + return new AutoValue_OffsetByteRange(range, byteCount); + } + + static OffsetByteRange of(OffsetRange range) { + return of(range, 0); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java new file mode 100644 index 000000000000..076cda13e193 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderProviders; +import org.apache.beam.sdk.coders.DelegateCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TypeDescriptor; + +public class OffsetByteRangeCoder extends AtomicCoder { + private static final Coder CODER = + DelegateCoder.of( + KvCoder.of(OffsetRange.Coder.of(), VarLongCoder.of()), + OffsetByteRangeCoder::toKv, + OffsetByteRangeCoder::fromKv); + + private static KV toKv(OffsetByteRange value) { + return KV.of(value.getRange(), value.getByteCount()); + } + + private static OffsetByteRange fromKv(KV kv) { + return OffsetByteRange.of(kv.getKey(), kv.getValue()); + } + + @Override + public void encode(OffsetByteRange value, OutputStream outStream) throws IOException { + CODER.encode(value, outStream); + } + + @Override + public OffsetByteRange decode(InputStream inStream) throws IOException { + return CODER.decode(inStream); + } + + public static CoderProvider getCoderProvider() { + return CoderProviders.forCoder( + TypeDescriptor.of(OffsetByteRange.class), new OffsetByteRangeCoder()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java index 608af8fea189..da9aaaa03ac3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java @@ -26,8 +26,6 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.beam.sdk.io.range.OffsetRange; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch; import org.joda.time.Duration; @@ -44,35 +42,33 @@ * received. IMPORTANT: minTrackingTime must be strictly smaller than the SDF read timeout when it * would return ProcessContinuation.resume(). */ -class OffsetByteRangeTracker extends RestrictionTracker - implements HasProgress { - private final TopicBacklogReader backlogReader; +class OffsetByteRangeTracker extends TrackerWithProgress { + private final TopicBacklogReader unownedBacklogReader; private final Duration minTrackingTime; private final long minBytesReceived; private final Stopwatch stopwatch; - private OffsetRange range; + private OffsetByteRange range; private @Nullable Long lastClaimed; - private long byteCount = 0; public OffsetByteRangeTracker( - OffsetRange range, - TopicBacklogReader backlogReader, + OffsetByteRange range, + TopicBacklogReader unownedBacklogReader, Stopwatch stopwatch, Duration minTrackingTime, long minBytesReceived) { - checkArgument(range.getTo() == Long.MAX_VALUE); - this.backlogReader = backlogReader; + checkArgument( + range.getRange().getTo() == Long.MAX_VALUE, + "May only construct OffsetByteRangeTracker with an unbounded range with no progress."); + checkArgument( + range.getByteCount() == 0L, + "May only construct OffsetByteRangeTracker with an unbounded range with no progress."); + this.unownedBacklogReader = unownedBacklogReader; this.minTrackingTime = minTrackingTime; this.minBytesReceived = minBytesReceived; this.stopwatch = stopwatch.reset().start(); this.range = range; } - @Override - public void finalize() { - this.backlogReader.close(); - } - @Override public IsBounded isBounded() { return IsBounded.UNBOUNDED; @@ -87,32 +83,32 @@ public boolean tryClaim(OffsetByteProgress position) { position.lastOffset().value(), lastClaimed); checkArgument( - toClaim >= range.getFrom(), + toClaim >= range.getRange().getFrom(), "Trying to claim offset %s before start of the range %s", toClaim, range); // split() has already been called, truncating this range. No more offsets may be claimed. - if (range.getTo() != Long.MAX_VALUE) { - boolean isRangeEmpty = range.getTo() == range.getFrom(); - boolean isValidClosedRange = nextOffset() == range.getTo(); + if (range.getRange().getTo() != Long.MAX_VALUE) { + boolean isRangeEmpty = range.getRange().getTo() == range.getRange().getFrom(); + boolean isValidClosedRange = nextOffset() == range.getRange().getTo(); checkState( isRangeEmpty || isValidClosedRange, "Violated class precondition: offset range improperly split. Please report a beam bug."); return false; } lastClaimed = toClaim; - byteCount += position.batchBytes(); + range = OffsetByteRange.of(range.getRange(), range.getByteCount() + position.batchBytes()); return true; } @Override - public OffsetRange currentRestriction() { + public OffsetByteRange currentRestriction() { return range; } private long nextOffset() { checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE); - return lastClaimed == null ? currentRestriction().getFrom() : lastClaimed + 1; + return lastClaimed == null ? currentRestriction().getRange().getFrom() : lastClaimed + 1; } /** @@ -124,29 +120,33 @@ private boolean receivedEnough() { if (duration.isLongerThan(minTrackingTime)) { return true; } - if (byteCount >= minBytesReceived) { + if (currentRestriction().getByteCount() >= minBytesReceived) { return true; } return false; } @Override - public @Nullable SplitResult trySplit(double fractionOfRemainder) { + public @Nullable SplitResult trySplit(double fractionOfRemainder) { // Cannot split a bounded range. This should already be completely claimed. - if (range.getTo() != Long.MAX_VALUE) { + if (range.getRange().getTo() != Long.MAX_VALUE) { return null; } if (!receivedEnough()) { return null; } - range = new OffsetRange(currentRestriction().getFrom(), nextOffset()); - return SplitResult.of(this.range, new OffsetRange(nextOffset(), Long.MAX_VALUE)); + range = + OffsetByteRange.of( + new OffsetRange(currentRestriction().getRange().getFrom(), nextOffset()), + range.getByteCount()); + return SplitResult.of( + this.range, OffsetByteRange.of(new OffsetRange(nextOffset(), Long.MAX_VALUE), 0)); } @Override @SuppressWarnings("unboxing.of.nullable") public void checkDone() throws IllegalStateException { - if (range.getFrom() == range.getTo()) { + if (range.getRange().getFrom() == range.getRange().getTo()) { return; } checkState( @@ -155,18 +155,18 @@ public void checkDone() throws IllegalStateException { range); long lastClaimedNotNull = checkNotNull(lastClaimed); checkState( - lastClaimedNotNull >= range.getTo() - 1, + lastClaimedNotNull >= range.getRange().getTo() - 1, "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted", lastClaimedNotNull, range, lastClaimedNotNull + 1, - range.getTo()); + range.getRange().getTo()); } @Override public Progress getProgress() { ComputeMessageStatsResponse stats = - this.backlogReader.computeMessageStats(Offset.of(nextOffset())); - return Progress.from(byteCount, stats.getMessageBytes()); + this.unownedBacklogReader.computeMessageStats(Offset.of(nextOffset())); + return Progress.from(range.getByteCount(), stats.getMessageBytes()); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java index 623e20c09b45..d7526d88e089 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java @@ -27,4 +27,8 @@ final class PerServerPublisherCache { private PerServerPublisherCache() {} static final PublisherCache PUBLISHER_CACHE = new PublisherCache(); + + static { + Runtime.getRuntime().addShutdownHook(new Thread(PUBLISHER_CACHE::close)); + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java index a9f7a439f0da..fdf792029863 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java @@ -17,13 +17,12 @@ */ package org.apache.beam.sdk.io.gcp.pubsublite; -import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; -import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown; import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.proto.SequencedMessage; -import java.util.concurrent.ExecutionException; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableBiFunction; @@ -35,31 +34,35 @@ class PerSubscriptionPartitionSdf extends DoFn { private final Duration maxSleepTime; + private final ManagedBacklogReaderFactory backlogReaderFactory; private final SubscriptionPartitionProcessorFactory processorFactory; private final SerializableFunction offsetReaderFactory; - private final SerializableBiFunction< - SubscriptionPartition, OffsetRange, RestrictionTracker> + private final SerializableBiFunction trackerFactory; private final SerializableFunction committerFactory; PerSubscriptionPartitionSdf( Duration maxSleepTime, + ManagedBacklogReaderFactory backlogReaderFactory, SerializableFunction offsetReaderFactory, - SerializableBiFunction< - SubscriptionPartition, - OffsetRange, - RestrictionTracker> + SerializableBiFunction trackerFactory, SubscriptionPartitionProcessorFactory processorFactory, SerializableFunction committerFactory) { this.maxSleepTime = maxSleepTime; + this.backlogReaderFactory = backlogReaderFactory; this.processorFactory = processorFactory; this.offsetReaderFactory = offsetReaderFactory; this.trackerFactory = trackerFactory; this.committerFactory = committerFactory; } + @Teardown + public void teardown() { + backlogReaderFactory.close(); + } + @GetInitialWatermarkEstimatorState public Instant getInitialWatermarkState() { return Instant.EPOCH; @@ -72,7 +75,7 @@ public MonotonicallyIncreasing newWatermarkEstimator(@WatermarkEstimatorState In @ProcessElement public ProcessContinuation processElement( - RestrictionTracker tracker, + RestrictionTracker tracker, @Element SubscriptionPartition subscriptionPartition, OutputReceiver receiver) throws Exception { @@ -83,38 +86,44 @@ public ProcessContinuation processElement( processor .lastClaimed() .ifPresent( - lastClaimedOffset -> - /* TODO(boyuanzz): When default dataflow can use finalizers, undo this. - finalizer.afterBundleCommit( - Instant.ofEpochMilli(Long.MAX_VALUE), - () -> */ { + lastClaimedOffset -> { Committer committer = committerFactory.apply(subscriptionPartition); committer.startAsync().awaitRunning(); // Commit the next-to-deliver offset. try { committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get(); - } catch (ExecutionException e) { - throw toCanonical(checkArgumentNotNull(e.getCause())).underlying; } catch (Exception e) { - throw toCanonical(e).underlying; + throw ExtractStatus.toCanonical(e).underlying; } - committer.stopAsync().awaitTerminated(); + blockingShutdown(committer); }); return result; } } @GetInitialRestriction - public OffsetRange getInitialRestriction(@Element SubscriptionPartition subscriptionPartition) { + public OffsetByteRange getInitialRestriction( + @Element SubscriptionPartition subscriptionPartition) { try (InitialOffsetReader reader = offsetReaderFactory.apply(subscriptionPartition)) { Offset offset = reader.read(); - return new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */); + return OffsetByteRange.of( + new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */)); } } @NewTracker - public RestrictionTracker newTracker( - @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetRange range) { - return trackerFactory.apply(subscriptionPartition, range); + public TrackerWithProgress newTracker( + @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetByteRange range) { + return trackerFactory.apply(backlogReaderFactory.newReader(subscriptionPartition), range); + } + + @GetSize + public double getSize( + @Element SubscriptionPartition subscriptionPartition, + @Restriction OffsetByteRange restriction) { + if (restriction.getRange().getTo() != Long.MAX_VALUE) { + return restriction.getByteCount(); + } + return newTracker(subscriptionPartition, restriction).getProgress().getWorkRemaining(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java index f8dc24baa7d6..3dbdec69db99 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java @@ -23,52 +23,50 @@ import com.google.api.core.ApiService.State; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.MessageMetadata; -import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.internal.wire.SystemExecutors; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.HashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; /** A map of working publishers by PublisherOptions. */ -class PublisherCache { - private final CloseableMonitor monitor = new CloseableMonitor(); - - private final Executor listenerExecutor = Executors.newSingleThreadExecutor(); - - @GuardedBy("monitor.monitor") +class PublisherCache implements AutoCloseable { + @GuardedBy("this") private final HashMap> livePublishers = new HashMap<>(); - Publisher get(PublisherOptions options) throws ApiException { + private synchronized void evict(PublisherOptions options) { + livePublishers.remove(options); + } + + synchronized Publisher get(PublisherOptions options) throws ApiException { checkArgument(options.usesCache()); - try (CloseableMonitor.Hold h = monitor.enter()) { - Publisher publisher = livePublishers.get(options); - if (publisher != null) { - return publisher; - } - publisher = Publishers.newPublisher(options); - livePublishers.put(options, publisher); - publisher.addListener( - new Listener() { - @Override - public void failed(State s, Throwable t) { - try (CloseableMonitor.Hold h = monitor.enter()) { - livePublishers.remove(options); - } - } - }, - listenerExecutor); - publisher.startAsync().awaitRunning(); + Publisher publisher = livePublishers.get(options); + if (publisher != null) { return publisher; } + publisher = Publishers.newPublisher(options); + livePublishers.put(options, publisher); + publisher.addListener( + new Listener() { + @Override + public void failed(State s, Throwable t) { + evict(options); + } + }, + SystemExecutors.getFuturesExecutor()); + publisher.startAsync().awaitRunning(); + return publisher; } @VisibleForTesting - void set(PublisherOptions options, Publisher toCache) { - try (CloseableMonitor.Hold h = monitor.enter()) { - livePublishers.put(options, toCache); - } + synchronized void set(PublisherOptions options, Publisher toCache) { + livePublishers.put(options, toCache); + } + + @Override + public synchronized void close() { + livePublishers.forEach(((options, publisher) -> publisher.stopAsync())); + livePublishers.clear(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java index 34012f72db15..67ea6cf6062d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java @@ -17,17 +17,27 @@ */ package org.apache.beam.sdk.io.gcp.pubsublite; +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.AdminClientSettings; import com.google.cloud.pubsublite.MessageMetadata; -import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings; +import com.google.cloud.pubsublite.internal.wire.PubsubContext; import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; +import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder; +import com.google.cloud.pubsublite.v1.AdminServiceClient; +import com.google.cloud.pubsublite.v1.AdminServiceSettings; +import com.google.cloud.pubsublite.v1.PublisherServiceClient; +import com.google.cloud.pubsublite.v1.PublisherServiceSettings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken; class Publishers { @@ -35,6 +45,38 @@ class Publishers { private Publishers() {} + private static AdminClient newAdminClient(PublisherOptions options) throws ApiException { + try { + return AdminClient.create( + AdminClientSettings.newBuilder() + .setServiceClient( + AdminServiceClient.create( + addDefaultSettings( + options.topicPath().location().extractRegion(), + AdminServiceSettings.newBuilder()))) + .setRegion(options.topicPath().location().extractRegion()) + .build()); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + + private static PublisherServiceClient newServiceClient( + PublisherOptions options, Partition partition) { + PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder(); + settingsBuilder = + addDefaultMetadata( + PubsubContext.of(FRAMEWORK), + RoutingMetadata.of(options.topicPath(), partition), + settingsBuilder); + try { + return PublisherServiceClient.create( + addDefaultSettings(options.topicPath().location().extractRegion(), settingsBuilder)); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + @SuppressWarnings("unchecked") static Publisher newPublisher(PublisherOptions options) throws ApiException { SerializableSupplier supplier = options.publisherSupplier(); @@ -44,20 +86,18 @@ static Publisher newPublisher(PublisherOptions options) throws checkArgument(token.isSupertypeOf(supplied.getClass())); return (Publisher) supplied; } - - TopicPath topic = options.topicPath(); - PartitionCountWatchingPublisherSettings.Builder publisherSettings = - PartitionCountWatchingPublisherSettings.newBuilder() - .setTopic(topic) - .setPublisherFactory( - partition -> - SinglePartitionPublisherBuilder.newBuilder() - .setTopic(topic) - .setPartition(partition) - .build()) - .setAdminClient( - AdminClient.create( - AdminClientSettings.newBuilder().setRegion(topic.location().region()).build())); - return publisherSettings.build().instantiate(); + return PartitionCountWatchingPublisherSettings.newBuilder() + .setTopic(options.topicPath()) + .setPublisherFactory( + partition -> + SinglePartitionPublisherBuilder.newBuilder() + .setTopic(options.topicPath()) + .setPartition(partition) + .setServiceClient(newServiceClient(options, partition)) + .setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS) + .build()) + .setAdminClient(newAdminClient(options)) + .build() + .instantiate(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java index ca1f2be41699..b93ac61f33be 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java @@ -107,7 +107,7 @@ public static PTransform, PCollection> * } */ public static PTransform, PDone> write(PublisherOptions options) { - return new PTransform, PDone>("PubsubLiteIO") { + return new PTransform, PDone>() { @Override public PDone expand(PCollection input) { PubsubLiteSink sink = new PubsubLiteSink(options); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java index d3acdfa35e05..d0e3afa2ac07 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java @@ -28,16 +28,14 @@ import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.internal.wire.SystemExecutors; import com.google.cloud.pubsublite.proto.PubSubMessage; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.ArrayDeque; import java.util.Deque; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.function.Consumer; import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOrError.Kind; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; /** A sink which publishes messages to Pub/Sub Lite. */ @SuppressWarnings({ @@ -56,8 +54,6 @@ class PubsubLiteSink extends DoFn { @GuardedBy("this") private transient Deque errorsSinceLastFinish; - private static final Executor executor = Executors.newCachedThreadPool(); - PubsubLiteSink(PublisherOptions options) { this.options = options; } @@ -89,7 +85,7 @@ public void failed(State s, Throwable t) { onFailure.accept(t); } }, - MoreExecutors.directExecutor()); + SystemExecutors.getFuturesExecutor()); if (!options.usesCache()) { publisher.startAsync(); } @@ -130,7 +126,7 @@ public void onFailure(Throwable t) { onFailure.accept(t); } }, - executor); + SystemExecutors.getFuturesExecutor()); } // Intentionally don't flush on bundle finish to allow multi-sink client reuse. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java index 9875880584ca..b6a9f5d59090 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java @@ -23,6 +23,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.AdminClientSettings; +import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.wire.Committer; @@ -31,8 +32,6 @@ import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; -import org.apache.beam.sdk.io.range.OffsetRange; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -54,10 +53,11 @@ private void checkSubscription(SubscriptionPartition subscriptionPartition) thro checkArgument(subscriptionPartition.subscription().equals(options.subscriptionPath())); } - private Subscriber newSubscriber(Partition partition, Consumer> consumer) { + private Subscriber newSubscriber( + Partition partition, Offset initialOffset, Consumer> consumer) { try { return options - .getSubscriberFactory(partition) + .getSubscriberFactory(partition, initialOffset) .newSubscriber( messages -> consumer.accept( @@ -71,23 +71,31 @@ private Subscriber newSubscriber(Partition partition, Consumer tracker, + RestrictionTracker tracker, OutputReceiver receiver) throws ApiException { checkSubscription(subscriptionPartition); return new SubscriptionPartitionProcessorImpl( tracker, receiver, - consumer -> newSubscriber(subscriptionPartition.partition(), consumer), + consumer -> + newSubscriber( + subscriptionPartition.partition(), + Offset.of(tracker.currentRestriction().getRange().getFrom()), + consumer), options.flowControlSettings()); } - private RestrictionTracker newRestrictionTracker( - SubscriptionPartition subscriptionPartition, OffsetRange initial) { + private TopicBacklogReader newBacklogReader(SubscriptionPartition subscriptionPartition) { checkSubscription(subscriptionPartition); + return options.getBacklogReader(subscriptionPartition.partition()); + } + + private TrackerWithProgress newRestrictionTracker( + TopicBacklogReader backlogReader, OffsetByteRange initial) { return new OffsetByteRangeTracker( initial, - options.getBacklogReader(subscriptionPartition.partition()), + backlogReader, Stopwatch.createUnstarted(), options.minBundleTimeout(), LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10)); @@ -107,7 +115,7 @@ private TopicPath getTopicPath() { try (AdminClient admin = AdminClient.create( AdminClientSettings.newBuilder() - .setRegion(options.subscriptionPath().location().region()) + .setRegion(options.subscriptionPath().location().extractRegion()) .build())) { return TopicPath.parse(admin.getSubscription(options.subscriptionPath()).get().getTopic()); } catch (Throwable t) { @@ -118,25 +126,15 @@ private TopicPath getTopicPath() { @Override public PCollection expand(PBegin input) { PCollection subscriptionPartitions; - if (options.partitions().isEmpty()) { - subscriptionPartitions = - input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath())); - } else { - subscriptionPartitions = - input.apply( - Create.of( - options.partitions().stream() - .map( - partition -> - SubscriptionPartition.of(options.subscriptionPath(), partition)) - .collect(Collectors.toList()))); - } + subscriptionPartitions = + input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath())); return subscriptionPartitions.apply( ParDo.of( new PerSubscriptionPartitionSdf( // Ensure we read for at least 5 seconds more than the bundle timeout. options.minBundleTimeout().plus(Duration.standardSeconds(5)), + new ManagedBacklogReaderFactoryImpl(this::newBacklogReader), this::newInitialOffsetReader, this::newRestrictionTracker, this::newPartitionProcessor, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java index 0d3afe2c60da..a9625be608fd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java @@ -23,6 +23,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; @@ -35,13 +36,13 @@ import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder; import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; +import com.google.cloud.pubsublite.proto.Cursor; +import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.v1.CursorServiceClient; import com.google.cloud.pubsublite.v1.CursorServiceSettings; import com.google.cloud.pubsublite.v1.SubscriberServiceClient; import com.google.cloud.pubsublite.v1.SubscriberServiceSettings; import java.io.Serializable; -import java.util.Set; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -68,11 +69,6 @@ public abstract class SubscriberOptions implements Serializable { /** Per-partition flow control parameters for this subscription. */ public abstract FlowControlSettings flowControlSettings(); - /** - * A set of partitions. If empty, continuously poll the set of partitions using an admin client. - */ - public abstract Set partitions(); - /** * The minimum wall time to pass before allowing bundle closure. * @@ -108,7 +104,6 @@ public abstract class SubscriberOptions implements Serializable { public static Builder newBuilder() { Builder builder = new AutoValue_SubscriberOptions.Builder(); return builder - .setPartitions(ImmutableSet.of()) .setFlowControlSettings(DEFAULT_FLOW_CONTROL) .setMinBundleTimeout(MIN_BUNDLE_TIMEOUT); } @@ -119,20 +114,19 @@ private SubscriberServiceClient newSubscriberServiceClient(Partition partition) throws ApiException { try { SubscriberServiceSettings.Builder settingsBuilder = SubscriberServiceSettings.newBuilder(); - settingsBuilder = addDefaultMetadata( PubsubContext.of(FRAMEWORK), RoutingMetadata.of(subscriptionPath(), partition), settingsBuilder); return SubscriberServiceClient.create( - addDefaultSettings(subscriptionPath().location().region(), settingsBuilder)); + addDefaultSettings(subscriptionPath().location().extractRegion(), settingsBuilder)); } catch (Throwable t) { throw toCanonical(t).underlying; } } - SubscriberFactory getSubscriberFactory(Partition partition) { + SubscriberFactory getSubscriberFactory(Partition partition, Offset initialOffset) { SubscriberFactory factory = subscriberFactory(); if (factory != null) { return factory; @@ -143,6 +137,10 @@ SubscriberFactory getSubscriberFactory(Partition partition) { .setSubscriptionPath(subscriptionPath()) .setPartition(partition) .setServiceClient(newSubscriberServiceClient(partition)) + .setInitialLocation( + SeekRequest.newBuilder() + .setCursor(Cursor.newBuilder().setOffset(initialOffset.value())) + .build()) .build(); } @@ -150,7 +148,7 @@ private CursorServiceClient newCursorServiceClient() throws ApiException { try { return CursorServiceClient.create( addDefaultSettings( - subscriptionPath().location().region(), CursorServiceSettings.newBuilder())); + subscriptionPath().location().extractRegion(), CursorServiceSettings.newBuilder())); } catch (Throwable t) { throw toCanonical(t).underlying; } @@ -189,7 +187,7 @@ InitialOffsetReader getInitialOffsetReader(Partition partition) { return new InitialOffsetReaderImpl( CursorClient.create( CursorClientSettings.newBuilder() - .setRegion(subscriptionPath().location().region()) + .setRegion(subscriptionPath().location().extractRegion()) .build()), subscriptionPath(), partition); @@ -201,8 +199,6 @@ public abstract static class Builder { public abstract Builder setSubscriptionPath(SubscriptionPath path); // Optional parameters - public abstract Builder setPartitions(Set partitions); - public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings); public abstract Builder setMinBundleTimeout(Duration minBundleTimeout); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java index 6bf362380ffe..530c180ebd88 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java @@ -19,7 +19,6 @@ import com.google.cloud.pubsublite.proto.SequencedMessage; import java.io.Serializable; -import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; @@ -28,6 +27,6 @@ interface SubscriptionPartitionProcessorFactory extends Serializable { SubscriptionPartitionProcessor newProcessor( SubscriptionPartition subscriptionPartition, - RestrictionTracker tracker, + RestrictionTracker tracker, OutputReceiver receiver); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java index 8d2a137a27dc..a086d18b2f65 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.pubsublite; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown; + import com.google.api.core.ApiService.Listener; import com.google.api.core.ApiService.State; import com.google.cloud.pubsublite.Offset; @@ -24,9 +26,8 @@ import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.wire.Subscriber; -import com.google.cloud.pubsublite.proto.Cursor; +import com.google.cloud.pubsublite.internal.wire.SystemExecutors; import com.google.cloud.pubsublite.proto.FlowControlRequest; -import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SequencedMessage; import com.google.protobuf.util.Timestamps; import java.util.List; @@ -36,19 +37,17 @@ import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Function; -import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture; import org.joda.time.Duration; import org.joda.time.Instant; class SubscriptionPartitionProcessorImpl extends Listener implements SubscriptionPartitionProcessor { - private final RestrictionTracker tracker; + private final RestrictionTracker tracker; private final OutputReceiver receiver; private final Subscriber subscriber; private final SettableFuture completionFuture = SettableFuture.create(); @@ -57,7 +56,7 @@ class SubscriptionPartitionProcessorImpl extends Listener @SuppressWarnings("methodref.receiver.bound.invalid") SubscriptionPartitionProcessorImpl( - RestrictionTracker tracker, + RestrictionTracker tracker, OutputReceiver receiver, Function>, Subscriber> subscriberFactory, FlowControlSettings flowControlSettings) { @@ -70,23 +69,15 @@ class SubscriptionPartitionProcessorImpl extends Listener @Override @SuppressWarnings("argument.type.incompatible") public void start() throws CheckedApiException { - this.subscriber.addListener(this, MoreExecutors.directExecutor()); + this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor()); this.subscriber.startAsync(); this.subscriber.awaitRunning(); try { - this.subscriber - .seek( - SeekRequest.newBuilder() - .setCursor(Cursor.newBuilder().setOffset(tracker.currentRestriction().getFrom())) - .build()) - .get(); this.subscriber.allowFlow( FlowControlRequest.newBuilder() .setAllowedBytes(flowControlSettings.bytesOutstanding()) .setAllowedMessages(flowControlSettings.messagesOutstanding()) .build()); - } catch (ExecutionException e) { - throw ExtractStatus.toCanonical(e.getCause()); } catch (Throwable t) { throw ExtractStatus.toCanonical(t); } @@ -125,7 +116,7 @@ public void failed(State from, Throwable failure) { @Override public void close() { - subscriber.stopAsync().awaitTerminated(); + blockingShutdown(subscriber); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java index 8c1dd9439e51..79db0f19f5dd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java @@ -62,7 +62,7 @@ Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath) try (AdminClient adminClient = AdminClient.create( AdminClientSettings.newBuilder() - .setRegion(subscriptionPath.location().region()) + .setRegion(subscriptionPath.location().extractRegion()) .build())) { return setTopicPath( TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic())); @@ -81,7 +81,9 @@ Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath) TopicBacklogReader instantiate() throws ApiException { TopicStatsClientSettings settings = - TopicStatsClientSettings.newBuilder().setRegion(topicPath().location().region()).build(); + TopicStatsClientSettings.newBuilder() + .setRegion(topicPath().location().extractRegion()) + .build(); TopicBacklogReader impl = new TopicBacklogReaderImpl(TopicStatsClient.create(settings), topicPath(), partition()); return new LimitingTopicBacklogReader(impl, Ticker.systemTicker()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java new file mode 100644 index 000000000000..7f0d0309a597 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; + +public abstract class TrackerWithProgress + extends RestrictionTracker implements HasProgress {} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java index f34ebb6a745e..5a31f4fc686d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java @@ -49,7 +49,7 @@ public class OffsetByteRangeTrackerTest { private static final double IGNORED_FRACTION = -10000000.0; private static final long MIN_BYTES = 1000; private static final OffsetRange RANGE = new OffsetRange(123L, Long.MAX_VALUE); - private final TopicBacklogReader reader = mock(TopicBacklogReader.class); + private final TopicBacklogReader unownedBacklogReader = mock(TopicBacklogReader.class); @Spy Ticker ticker; private OffsetByteRangeTracker tracker; @@ -60,14 +60,18 @@ public void setUp() { when(ticker.read()).thenReturn(0L); tracker = new OffsetByteRangeTracker( - RANGE, reader, Stopwatch.createUnstarted(ticker), Duration.millis(500), MIN_BYTES); + OffsetByteRange.of(RANGE, 0), + unownedBacklogReader, + Stopwatch.createUnstarted(ticker), + Duration.millis(500), + MIN_BYTES); } @Test public void progressTracked() { assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(123), 10))); assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(124), 11))); - when(reader.computeMessageStats(Offset.of(125))) + when(unownedBacklogReader.computeMessageStats(Offset.of(125))) .thenReturn(ComputeMessageStatsResponse.newBuilder().setMessageBytes(1000).build()); Progress progress = tracker.getProgress(); assertEquals(21, progress.getWorkCompleted(), .0001); @@ -76,7 +80,7 @@ public void progressTracked() { @Test public void getProgressStatsFailure() { - when(reader.computeMessageStats(Offset.of(123))) + when(unownedBacklogReader.computeMessageStats(Offset.of(123))) .thenThrow(new CheckedApiException(Code.INTERNAL).underlying); assertThrows(ApiException.class, tracker::getProgress); } @@ -86,11 +90,15 @@ public void getProgressStatsFailure() { public void claimSplitSuccess() { assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES))); assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(10_000), MIN_BYTES))); - SplitResult splits = tracker.trySplit(IGNORED_FRACTION); - assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom()); - assertEquals(10_001, splits.getPrimary().getTo()); - assertEquals(10_001, splits.getResidual().getFrom()); - assertEquals(Long.MAX_VALUE, splits.getResidual().getTo()); + SplitResult splits = tracker.trySplit(IGNORED_FRACTION); + OffsetByteRange primary = splits.getPrimary(); + assertEquals(RANGE.getFrom(), primary.getRange().getFrom()); + assertEquals(10_001, primary.getRange().getTo()); + assertEquals(MIN_BYTES * 2, primary.getByteCount()); + OffsetByteRange residual = splits.getResidual(); + assertEquals(10_001, residual.getRange().getFrom()); + assertEquals(Long.MAX_VALUE, residual.getRange().getTo()); + assertEquals(0, residual.getByteCount()); assertEquals(splits.getPrimary(), tracker.currentRestriction()); tracker.checkDone(); assertNull(tracker.trySplit(IGNORED_FRACTION)); @@ -100,10 +108,10 @@ public void claimSplitSuccess() { @SuppressWarnings({"dereference.of.nullable", "argument.type.incompatible"}) public void splitWithoutClaimEmpty() { when(ticker.read()).thenReturn(100000000000000L); - SplitResult splits = tracker.trySplit(IGNORED_FRACTION); - assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom()); - assertEquals(RANGE.getFrom(), splits.getPrimary().getTo()); - assertEquals(RANGE, splits.getResidual()); + SplitResult splits = tracker.trySplit(IGNORED_FRACTION); + assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getFrom()); + assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getTo()); + assertEquals(RANGE, splits.getResidual().getRange()); assertEquals(splits.getPrimary(), tracker.currentRestriction()); tracker.checkDone(); assertNull(tracker.trySplit(IGNORED_FRACTION)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java index 598037eef5f3..0a4e3e7458f5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -51,6 +52,8 @@ import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath; import org.joda.time.Duration; import org.junit.Before; import org.junit.Test; @@ -65,22 +68,24 @@ public class PerSubscriptionPartitionSdfTest { private static final Duration MAX_SLEEP_TIME = Duration.standardMinutes(10).plus(Duration.millis(10)); - private static final OffsetRange RESTRICTION = new OffsetRange(1, Long.MAX_VALUE); + private static final OffsetByteRange RESTRICTION = + OffsetByteRange.of(new OffsetRange(1, Long.MAX_VALUE), 0); private static final SubscriptionPartition PARTITION = SubscriptionPartition.of(example(SubscriptionPath.class), example(Partition.class)); @Mock SerializableFunction offsetReaderFactory; + @Mock ManagedBacklogReaderFactory backlogReaderFactory; + @Mock TopicBacklogReader backlogReader; + @Mock - SerializableBiFunction< - SubscriptionPartition, OffsetRange, RestrictionTracker> - trackerFactory; + SerializableBiFunction trackerFactory; @Mock SubscriptionPartitionProcessorFactory processorFactory; @Mock SerializableFunction committerFactory; @Mock InitialOffsetReader initialOffsetReader; - @Spy RestrictionTracker tracker; + @Spy TrackerWithProgress tracker; @Mock OutputReceiver output; @Mock SubscriptionPartitionProcessor processor; @@ -98,9 +103,11 @@ public void setUp() { when(trackerFactory.apply(any(), any())).thenReturn(tracker); when(committerFactory.apply(any())).thenReturn(committer); when(tracker.currentRestriction()).thenReturn(RESTRICTION); + when(backlogReaderFactory.newReader(any())).thenReturn(backlogReader); sdf = new PerSubscriptionPartitionSdf( MAX_SLEEP_TIME, + backlogReaderFactory, offsetReaderFactory, trackerFactory, processorFactory, @@ -110,9 +117,10 @@ public void setUp() { @Test public void getInitialRestrictionReadSuccess() { when(initialOffsetReader.read()).thenReturn(example(Offset.class)); - OffsetRange range = sdf.getInitialRestriction(PARTITION); - assertEquals(example(Offset.class).value(), range.getFrom()); - assertEquals(Long.MAX_VALUE, range.getTo()); + OffsetByteRange range = sdf.getInitialRestriction(PARTITION); + assertEquals(example(Offset.class).value(), range.getRange().getFrom()); + assertEquals(Long.MAX_VALUE, range.getRange().getTo()); + assertEquals(0, range.getByteCount()); verify(offsetReaderFactory).apply(PARTITION); } @@ -125,7 +133,13 @@ public void getInitialRestrictionReadFailure() { @Test public void newTrackerCallsFactory() { assertSame(tracker, sdf.newTracker(PARTITION, RESTRICTION)); - verify(trackerFactory).apply(PARTITION, RESTRICTION); + verify(trackerFactory).apply(backlogReader, RESTRICTION); + } + + @Test + public void tearDownClosesBacklogReaderFactory() { + sdf.teardown(); + verify(backlogReaderFactory).close(); } @Test @@ -159,12 +173,48 @@ public void process() throws Exception { order2.verify(committer).awaitTerminated(); } + private static final class NoopManagedBacklogReaderFactory + implements ManagedBacklogReaderFactory { + @Override + public TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) { + return null; + } + + @Override + public void close() {} + } + @Test @SuppressWarnings("return.type.incompatible") public void dofnIsSerializable() throws Exception { ObjectOutputStream output = new ObjectOutputStream(new ByteArrayOutputStream()); output.writeObject( new PerSubscriptionPartitionSdf( - MAX_SLEEP_TIME, x -> null, (x, y) -> null, (x, y, z) -> null, (x) -> null)); + MAX_SLEEP_TIME, + new NoopManagedBacklogReaderFactory(), + x -> null, + (x, y) -> null, + (x, y, z) -> null, + (x) -> null)); + } + + @Test + public void getProgressUnboundedRangeDelegates() { + Progress progress = Progress.from(0, 0.2); + when(tracker.getProgress()).thenReturn(progress); + assertTrue( + DoubleMath.fuzzyEquals( + progress.getWorkRemaining(), sdf.getSize(PARTITION, RESTRICTION), .0001)); + verify(tracker).getProgress(); + } + + @Test + public void getProgressBoundedReturnsBytes() { + assertTrue( + DoubleMath.fuzzyEquals( + 123.0, + sdf.getSize(PARTITION, OffsetByteRange.of(new OffsetRange(87, 8000), 123)), + .0001)); + verifyNoInteractions(tracker); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java new file mode 100644 index 000000000000..e2429423dd0c --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.junit.Assert.fail; + +import com.google.cloud.pubsublite.AdminClient; +import com.google.cloud.pubsublite.AdminClientSettings; +import com.google.cloud.pubsublite.BacklogLocation; +import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.ProjectId; +import com.google.cloud.pubsublite.SubscriptionName; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.TopicName; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.proto.PubSubMessage; +import com.google.cloud.pubsublite.proto.SequencedMessage; +import com.google.cloud.pubsublite.proto.Subscription; +import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement; +import com.google.cloud.pubsublite.proto.Topic; +import com.google.cloud.pubsublite.proto.Topic.PartitionConfig.Capacity; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.protobuf.ByteString; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.FlatMapElements; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(JUnit4.class) +public class ReadWriteIT { + private static final Logger LOG = LoggerFactory.getLogger(ReadWriteIT.class); + private static final CloudZone ZONE = CloudZone.parse("us-central1-b"); + private static final int MESSAGE_COUNT = 90; + + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + private static ProjectId getProject(PipelineOptions options) { + return ProjectId.of(checkArgumentNotNull(options.as(GcpOptions.class).getProject())); + } + + private static String randomName() { + return "beam_it_resource_" + ThreadLocalRandom.current().nextLong(); + } + + private static AdminClient newAdminClient() { + return AdminClient.create(AdminClientSettings.newBuilder().setRegion(ZONE.region()).build()); + } + + private final Deque cleanupActions = new ArrayDeque<>(); + + private TopicPath createTopic(ProjectId id) throws Exception { + TopicPath toReturn = + TopicPath.newBuilder() + .setProject(id) + .setLocation(ZONE) + .setName(TopicName.of(randomName())) + .build(); + Topic.Builder topic = Topic.newBuilder().setName(toReturn.toString()); + topic + .getPartitionConfigBuilder() + .setCount(2) + .setCapacity(Capacity.newBuilder().setPublishMibPerSec(4).setSubscribeMibPerSec(4)); + topic.getRetentionConfigBuilder().setPerPartitionBytes(30 * (1L << 30)); + cleanupActions.addLast( + () -> { + try (AdminClient client = newAdminClient()) { + client.deleteTopic(toReturn).get(); + } catch (Throwable t) { + LOG.error("Failed to clean up topic.", t); + } + }); + try (AdminClient client = newAdminClient()) { + client.createTopic(topic.build()).get(); + } + return toReturn; + } + + private SubscriptionPath createSubscription(TopicPath topic) throws Exception { + SubscriptionPath toReturn = + SubscriptionPath.newBuilder() + .setProject(topic.project()) + .setLocation(ZONE) + .setName(SubscriptionName.of(randomName())) + .build(); + Subscription.Builder subscription = Subscription.newBuilder().setName(toReturn.toString()); + subscription + .getDeliveryConfigBuilder() + .setDeliveryRequirement(DeliveryRequirement.DELIVER_IMMEDIATELY); + subscription.setTopic(topic.toString()); + cleanupActions.addLast( + () -> { + try (AdminClient client = newAdminClient()) { + client.deleteSubscription(toReturn).get(); + } catch (Throwable t) { + LOG.error("Failed to clean up subscription.", t); + } + }); + try (AdminClient client = newAdminClient()) { + client.createSubscription(subscription.build(), BacklogLocation.BEGINNING).get(); + } + return toReturn; + } + + @After + public void tearDown() { + while (!cleanupActions.isEmpty()) { + cleanupActions.removeLast().run(); + } + } + + // Workaround for BEAM-12867 + // TODO(BEAM-12867): Remove this. + private static class CustomCreate extends PTransform, PCollection> { + @Override + public PCollection expand(PCollection input) { + return input.apply( + "createIndexes", + FlatMapElements.via( + new SimpleFunction>() { + @Override + public Iterable apply(Void input) { + return IntStream.range(0, MESSAGE_COUNT).boxed().collect(Collectors.toList()); + } + })); + } + } + + public static void writeMessages(TopicPath topicPath, Pipeline pipeline) { + PCollection trigger = pipeline.apply(Create.of((Void) null)); + PCollection indexes = trigger.apply("createIndexes", new CustomCreate()); + PCollection messages = + indexes.apply( + "createMessages", + MapElements.via( + new SimpleFunction( + index -> + Message.builder() + .setData(ByteString.copyFromUtf8(index.toString())) + .build() + .toProto()) {})); + // Add UUIDs to messages for later deduplication. + messages = messages.apply("addUuids", PubsubLiteIO.addUuids()); + messages.apply( + "writeMessages", + PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build())); + } + + public static PCollection readMessages( + SubscriptionPath subscriptionPath, Pipeline pipeline) { + PCollection messages = + pipeline.apply( + "readMessages", + PubsubLiteIO.read( + SubscriberOptions.newBuilder() + .setSubscriptionPath(subscriptionPath) + // setMinBundleTimeout INTENDED FOR TESTING ONLY + // This sacrifices efficiency to make tests run faster. Do not use this in a + // real pipeline! + .setMinBundleTimeout(Duration.standardSeconds(5)) + .build())); + // Deduplicate messages based on the uuids added in PubsubLiteIO.addUuids() when writing. + return messages.apply( + "dedupeMessages", PubsubLiteIO.deduplicate(UuidDeduplicationOptions.newBuilder().build())); + } + + // This static out of band communication is needed to retain serializability. + @GuardedBy("ReadWriteIT.class") + private static final List received = new ArrayList<>(); + + private static synchronized void addMessageReceived(SequencedMessage message) { + received.add(message); + } + + private static synchronized List getTestQuickstartReceived() { + return ImmutableList.copyOf(received); + } + + private static PTransform, PCollection> + collectTestQuickstart() { + return MapElements.via( + new SimpleFunction() { + @Override + public Void apply(SequencedMessage input) { + addMessageReceived(input); + return null; + } + }); + } + + @Test + public void testReadWrite() throws Exception { + pipeline.getOptions().as(StreamingOptions.class).setStreaming(true); + pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false); + + TopicPath topic = createTopic(getProject(pipeline.getOptions())); + SubscriptionPath subscription = createSubscription(topic); + + // Publish some messages + writeMessages(topic, pipeline); + + // Read some messages. They should be deduplicated by the time we see them, so there should be + // exactly numMessages, one for every index in [0,MESSAGE_COUNT). + PCollection messages = readMessages(subscription, pipeline); + messages.apply("messageReceiver", collectTestQuickstart()); + pipeline.run(); + LOG.info("Running!"); + for (int round = 0; round < 120; ++round) { + Thread.sleep(1000); + Map receivedCounts = new HashMap<>(); + for (SequencedMessage message : getTestQuickstartReceived()) { + int id = Integer.parseInt(message.getMessage().getData().toStringUtf8()); + receivedCounts.put(id, receivedCounts.getOrDefault(id, 0) + 1); + } + LOG.info("Performing comparison round {}.\n", round); + boolean done = true; + List missing = new ArrayList<>(); + for (int id = 0; id < MESSAGE_COUNT; id++) { + int idCount = receivedCounts.getOrDefault(id, 0); + if (idCount == 0) { + missing.add(id); + done = false; + } + if (idCount > 1) { + fail(String.format("Failed to deduplicate message with id %s.", id)); + } + } + LOG.info("Still messing messages: {}.\n", missing); + if (done) { + return; + } + } + fail( + String.format( + "Failed to receive all messages after 2 minutes. Received %s messages.", + getTestQuickstartReceived().size())); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java index dbf3b939d083..3d74375897a0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java @@ -31,7 +31,6 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.pubsublite.Offset; @@ -40,7 +39,6 @@ import com.google.cloud.pubsublite.internal.wire.Subscriber; import com.google.cloud.pubsublite.proto.Cursor; import com.google.cloud.pubsublite.proto.FlowControlRequest; -import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SequencedMessage; import com.google.protobuf.util.Timestamps; import java.util.List; @@ -64,7 +62,7 @@ @RunWith(JUnit4.class) @SuppressWarnings("initialization.fields.uninitialized") public class SubscriptionPartitionProcessorImplTest { - @Spy RestrictionTracker tracker; + @Spy RestrictionTracker tracker; @Mock OutputReceiver receiver; @Mock Function>, Subscriber> subscriberFactory; @@ -83,6 +81,10 @@ private static SequencedMessage messageWithOffset(long offset) { .build(); } + private OffsetByteRange initialRange() { + return OffsetByteRange.of(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); + } + @Before public void setUp() { initMocks(this); @@ -100,17 +102,10 @@ public void setUp() { @Test public void lifecycle() throws Exception { - when(tracker.currentRestriction()) - .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); - when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class))); + when(tracker.currentRestriction()).thenReturn(initialRange()); processor.start(); verify(subscriber).startAsync(); verify(subscriber).awaitRunning(); - verify(subscriber) - .seek( - SeekRequest.newBuilder() - .setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value())) - .build()); verify(subscriber) .allowFlow( FlowControlRequest.newBuilder() @@ -123,29 +118,15 @@ public void lifecycle() throws Exception { } @Test - public void lifecycleSeekThrows() throws Exception { - when(tracker.currentRestriction()) - .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); - when(subscriber.seek(any())) - .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE))); + public void lifecycleFlowControlThrows() throws Exception { + when(tracker.currentRestriction()).thenReturn(initialRange()); doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any()); assertThrows(CheckedApiException.class, () -> processor.start()); } - @Test - public void lifecycleFlowControlThrows() { - when(tracker.currentRestriction()) - .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); - when(subscriber.seek(any())) - .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE))); - assertThrows(CheckedApiException.class, () -> processor.start()); - } - @Test public void lifecycleSubscriberAwaitThrows() throws Exception { - when(tracker.currentRestriction()) - .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); - when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class))); + when(tracker.currentRestriction()).thenReturn(initialRange()); processor.start(); doThrow(new CheckedApiException(Code.INTERNAL).underlying).when(subscriber).awaitTerminated(); assertThrows(ApiException.class, () -> processor.close()); @@ -155,21 +136,19 @@ public void lifecycleSubscriberAwaitThrows() throws Exception { @Test public void subscriberFailureFails() throws Exception { - when(tracker.currentRestriction()) - .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); - when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class))); + when(tracker.currentRestriction()).thenReturn(initialRange()); processor.start(); subscriber.fail(new CheckedApiException(Code.OUT_OF_RANGE)); ApiException e = - assertThrows(ApiException.class, () -> processor.waitForCompletion(Duration.ZERO)); + assertThrows( + // Longer wait is needed due to listener asynchrony. + ApiException.class, () -> processor.waitForCompletion(Duration.standardSeconds(1))); assertEquals(Code.OUT_OF_RANGE, e.getStatusCode().getCode()); } @Test public void allowFlowFailureFails() throws Exception { - when(tracker.currentRestriction()) - .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); - when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class))); + when(tracker.currentRestriction()).thenReturn(initialRange()); processor.start(); when(tracker.tryClaim(any())).thenReturn(true); doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any());