From 495c689d3da9f0c25b771084a719a9c153558508 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Thu, 3 Mar 2016 18:40:12 -0800 Subject: [PATCH] BigtableIO.Read: use PBegin, rather than PInput Sources should start from the beginning of a pipeline. --- .../google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java index c3f233f249..562d253d77 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java @@ -41,9 +41,9 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo; import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PBegin; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PDone; -import com.google.cloud.dataflow.sdk.values.PInput; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.FutureCallback; @@ -176,7 +176,7 @@ public static Write write() { * @see BigtableIO */ @Experimental - public static class Read extends PTransform> { + public static class Read extends PTransform> { /** * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster * indicated by the given options, and using any other specified customizations. @@ -241,14 +241,14 @@ public String getTableId() { } @Override - public PCollection apply(PInput input) { + public PCollection apply(PBegin input) { BigtableSource source = new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null); return input.getPipeline().apply(com.google.cloud.dataflow.sdk.io.Read.from(source)); } @Override - public void validate(PInput input) { + public void validate(PBegin input) { checkArgument(options != null, "BigtableOptions not specified"); checkArgument(!tableId.isEmpty(), "Table ID not specified"); try {