diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 4801dc25fb1..7a2c59a34b7 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -94,6 +94,7 @@ dependencies { // io-kafka is only used in PTransform override so it is optional provided project(":sdks:java:io:kafka") implementation project(":sdks:java:io:google-cloud-platform") + implementation project(":runners:core-java") implementation library.java.avro implementation library.java.bigdataoss_util implementation library.java.commons_codec diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index c79897b5e5c..94d0c902e84 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -29,7 +29,9 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.sdk.metrics.BoundedTrieResult; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeResult; @@ -55,7 +57,11 @@ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) class DataflowMetrics extends MetricResults { + private static final Logger LOG = LoggerFactory.getLogger(DataflowMetrics.class); + // TODO (rosinha): Remove this once bounded_trie is available in metrics proto Dataflow + // java client. + public static final String BOUNDED_TRIE = "bounded_trie"; /** * Client for the Dataflow service. This can be used to query the service for information about * the job. @@ -104,13 +110,13 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) { ImmutableList> distributions = ImmutableList.of(); ImmutableList> gauges = ImmutableList.of(); ImmutableList> stringSets = ImmutableList.of(); - ImmutableList> boudedTries = ImmutableList.of(); + ImmutableList> boundedTries = ImmutableList.of(); JobMetrics jobMetrics; try { jobMetrics = getJobMetrics(); } catch (IOException e) { LOG.warn("Unable to query job metrics.\n"); - return MetricQueryResults.create(counters, distributions, gauges, stringSets, boudedTries); + return MetricQueryResults.create(counters, distributions, gauges, stringSets, boundedTries); } metricUpdates = firstNonNull(jobMetrics.getMetrics(), Collections.emptyList()); return populateMetricQueryResults(metricUpdates, filter); @@ -134,6 +140,7 @@ private static class DataflowMetricResultExtractor { private final ImmutableList.Builder> distributionResults; private final ImmutableList.Builder> gaugeResults; private final ImmutableList.Builder> stringSetResults; + private final ImmutableList.Builder> boundedTrieResults; private final boolean isStreamingJob; DataflowMetricResultExtractor(boolean isStreamingJob) { @@ -141,6 +148,7 @@ private static class DataflowMetricResultExtractor { distributionResults = ImmutableList.builder(); gaugeResults = ImmutableList.builder(); stringSetResults = ImmutableList.builder(); + boundedTrieResults = ImmutableList.builder(); /* In Dataflow streaming jobs, only ATTEMPTED metrics are available. * In Dataflow batch jobs, only COMMITTED metrics are available, but * we must provide ATTEMPTED, so we use COMMITTED as a good approximation. @@ -169,6 +177,11 @@ public void addMetricResult( // stringset metric StringSetResult value = getStringSetValue(committed); stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); + } else if (committed.get(BOUNDED_TRIE) != null && attempted.get(BOUNDED_TRIE) != null) { + // TODO (rosinha): This is dummy code. Once Dataflow MetricUpdate + // google client api is updated. Update this. + BoundedTrieResult value = getBoundedTrieValue(committed); + boundedTrieResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); } else { // This is exceptionally unexpected. We expect matching user metrics to only have the // value types provided by the Metrics API. @@ -196,6 +209,15 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) { return StringSetResult.create(ImmutableSet.copyOf(((Collection) metricUpdate.getSet()))); } + private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) { + if (metricUpdate.get(BOUNDED_TRIE) == null) { + return BoundedTrieResult.empty(); + } + BoundedTrie bTrie = (BoundedTrie) metricUpdate.get(BOUNDED_TRIE); + BoundedTrieData trieData = BoundedTrieData.fromProto(bTrie); + return BoundedTrieResult.create(trieData.extractResult().getResult()); + } + private DistributionResult getDistributionValue(MetricUpdate metricUpdate) { if (metricUpdate.getDistribution() == null) { return DistributionResult.IDENTITY_ELEMENT; @@ -220,9 +242,13 @@ public Iterable> getGaugeResults() { return gaugeResults.build(); } - public Iterable> geStringSetResults() { + public Iterable> getStringSetResults() { return stringSetResults.build(); } + + public Iterable> getBoundedTrieResults() { + return boundedTrieResults.build(); + } } private static class DataflowMetricQueryResultsFactory { @@ -388,8 +414,8 @@ public MetricQueryResults build() { extractor.getCounterResults(), extractor.getDistributionResults(), extractor.getGaugeResults(), - extractor.geStringSetResults(), - ImmutableList.of()); + extractor.getStringSetResults(), + extractor.getBoundedTrieResults()); } } } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index 745b065ea84..90a63554ef3 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -41,11 +41,13 @@ import java.io.IOException; import java.math.BigDecimal; import java.util.Set; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.DataflowTemplateJob; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; +import org.apache.beam.sdk.metrics.BoundedTrieResult; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricsFilter; @@ -196,6 +198,13 @@ private MetricUpdate makeStringSetMetricUpdate( return setStructuredName(update, name, namespace, step, tentative); } + private MetricUpdate makeBoundedTrieMetricUpdate( + String name, String namespace, String step, BoundedTrieData data, boolean tentative) { + MetricUpdate update = new MetricUpdate(); + update.set(DataflowMetrics.BOUNDED_TRIE, data.toProto()); + return setStructuredName(update, name, namespace, step, tentative); + } + @Test public void testSingleCounterUpdates() throws IOException { AppliedPTransform myStep = mock(AppliedPTransform.class); @@ -286,6 +295,64 @@ public void testSingleStringSetUpdates() throws IOException { StringSetResult.create(ImmutableSet.of("ab", "cd"))))); } + @Test + public void testSingleBoundedTrieUpdates() throws IOException { + AppliedPTransform myStep = mock(AppliedPTransform.class); + when(myStep.getFullName()).thenReturn("myStepName"); + BiMap, String> transformStepNames = HashBiMap.create(); + transformStepNames.put(myStep, "s2"); + + JobMetrics jobMetrics = new JobMetrics(); + DataflowPipelineJob job = mock(DataflowPipelineJob.class); + DataflowPipelineOptions options = mock(DataflowPipelineOptions.class); + when(options.isStreaming()).thenReturn(false); + when(job.getDataflowOptions()).thenReturn(options); + when(job.getState()).thenReturn(State.RUNNING); + when(job.getJobId()).thenReturn(JOB_ID); + when(job.getTransformStepNames()).thenReturn(transformStepNames); + + // The parser relies on the fact that one tentative and one committed metric update exist in + // the job metrics results. + MetricUpdate mu1 = + makeBoundedTrieMetricUpdate( + "counterName", + "counterNamespace", + "s2", + new BoundedTrieData(ImmutableList.of("ab", "cd")), + false); + MetricUpdate mu1Tentative = + makeBoundedTrieMetricUpdate( + "counterName", + "counterNamespace", + "s2", + new BoundedTrieData(ImmutableList.of("ab", "cd")), + true); + jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative)); + DataflowClient dataflowClient = mock(DataflowClient.class); + when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); + + DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); + MetricQueryResults result = dataflowMetrics.allMetrics(); + assertThat( + result.getBoundedTries(), + containsInAnyOrder( + attemptedMetricsResult( + "counterNamespace", + "counterName", + "myStepName", + BoundedTrieResult.create( + ImmutableSet.of(ImmutableList.of("ab", "cd", String.valueOf(false))))))); + assertThat( + result.getBoundedTries(), + containsInAnyOrder( + committedMetricsResult( + "counterNamespace", + "counterName", + "myStepName", + BoundedTrieResult.create( + ImmutableSet.of(ImmutableList.of("ab", "cd", String.valueOf(false))))))); + } + @Test public void testIgnoreDistributionButGetCounterUpdates() throws IOException { AppliedPTransform myStep = mock(AppliedPTransform.class); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index cf07e5f7240..d62198e8321 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -519,7 +519,12 @@ public Iterable extractMetricUpdates(boolean isFinalUpdate) { .transform( update -> MetricsToCounterUpdateConverter.fromStringSet( - update.getKey(), true, update.getUpdate()))); + update.getKey(), true, update.getUpdate())), + FluentIterable.from(updates.boundedTrieUpdates()) + .transform( + update -> + MetricsToCounterUpdateConverter.fromBoundedTrie( + update.getKey(), update.getUpdate()))); }); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java index 4866d201122..42cc4ac8bf8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java @@ -27,6 +27,7 @@ import com.google.api.services.dataflow.model.IntegerGauge; import com.google.api.services.dataflow.model.StringList; import java.util.ArrayList; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.core.metrics.DistributionData; import org.apache.beam.runners.core.metrics.StringSetData; import org.apache.beam.sdk.metrics.MetricKey; @@ -111,6 +112,16 @@ public static CounterUpdate fromStringSet( .setStringList(stringList); } + public static CounterUpdate fromBoundedTrie(MetricKey key, BoundedTrieData boundedTrieData) { + // BoundedTrie uses SET kind metric aggregation which tracks unique strings. + CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET); + // TODO (rosinha): Once the CounterUpdate API is updated in dataflow client update this. + return new CounterUpdate() + .setStructuredNameAndMetadata(name) + .setCumulative(false) + .set("bounded_trie", boundedTrieData.toProto()); + } + public static CounterUpdate fromDistribution( MetricKey key, boolean isCumulative, DistributionData update) { CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.DISTRIBUTION); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 0135e91eab9..8ade66061ba 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -217,7 +217,8 @@ public Iterable extractUpdates() { return counterUpdates() .append(distributionUpdates()) .append(gaugeUpdates()) - .append(stringSetUpdates()); + .append(stringSetUpdates()) + .append(boundedTrieUpdates()); } private FluentIterable counterUpdates() { @@ -277,6 +278,20 @@ private FluentIterable stringSetUpdates() { .filter(Predicates.notNull()); } + private FluentIterable boundedTrieUpdates() { + return FluentIterable.from(boundedTries.entries()) + .transform( + new Function, CounterUpdate>() { + @Override + public @Nullable CounterUpdate apply( + @Nonnull Map.Entry entry) { + return MetricsToCounterUpdateConverter.fromBoundedTrie( + MetricKey.create(stepName, entry.getKey()), entry.getValue().getCumulative()); + } + }) + .filter(Predicates.notNull()); + } + private FluentIterable distributionUpdates() { return FluentIterable.from(distributions.entries()) .transform( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java index d3fa69e2c31..051f164fa61 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java @@ -32,6 +32,8 @@ import com.google.api.services.dataflow.model.DistributionUpdate; import com.google.api.services.dataflow.model.StringList; import java.util.Arrays; +import org.apache.beam.model.pipeline.v1.MetricsApi; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; @@ -40,6 +42,7 @@ import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; +import org.apache.beam.sdk.metrics.BoundedTrie; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.MetricName; @@ -47,6 +50,7 @@ import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.StringSet; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.hamcrest.Matchers; import org.junit.Test; import org.junit.runner.RunWith; @@ -193,6 +197,42 @@ public void extractMetricUpdatesStringSet() { assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected)); } + @Test + public void extractMetricUpdatesBoundedTrie() { + BatchModeExecutionContext executionContext = + BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage"); + DataflowOperationContext operationContext = + executionContext.createOperationContext(NameContextsForTests.nameContextForTest()); + + BoundedTrie boundedTrie = + operationContext + .metricsContainer() + .getBoundedTrie(MetricName.named("namespace", "some-bounded-trie")); + boundedTrie.add("ab"); + boundedTrie.add("cd"); + + BoundedTrieData trieData = new BoundedTrieData(); + trieData.add(ImmutableList.of("ab")); + trieData.add(ImmutableList.of("cd")); + MetricsApi.BoundedTrie expectedTrie = trieData.toProto(); + + final CounterUpdate expected = + new CounterUpdate() + .setStructuredNameAndMetadata( + new CounterStructuredNameAndMetadata() + .setName( + new CounterStructuredName() + .setOrigin("USER") + .setOriginNamespace("namespace") + .setName("some-bounded-trie") + .setOriginalStepName("originalName")) + .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) + .setCumulative(false) + .set("bounded_trie", expectedTrie); + + assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected)); + } + @Test public void extractMsecCounters() { BatchModeExecutionContext executionContext = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 4b758aa6cd4..3073bf907f9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -50,8 +50,10 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind; import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Origin; +import org.apache.beam.sdk.metrics.BoundedTrie; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.LabeledMetricNameUtils; @@ -61,6 +63,7 @@ import org.apache.beam.sdk.metrics.NoOpHistogram; import org.apache.beam.sdk.metrics.StringSet; import org.apache.beam.sdk.util.HistogramData; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.hamcrest.collection.IsEmptyIterable; import org.hamcrest.collection.IsMapContaining; @@ -325,6 +328,72 @@ public void testStringSetUpdateExtraction() { assertThat(updates, containsInAnyOrder(name1Update)); } + @Test + public void testBoundedTrieUpdateExtraction() { + BoundedTrie boundedTrie = c1.getBoundedTrie(name1); + boundedTrie.add("ab"); + boundedTrie.add("cd", "ef"); + boundedTrie.add("gh"); + boundedTrie.add("gh"); + + BoundedTrieData expectedName1 = new BoundedTrieData(); + expectedName1.add(ImmutableList.of("ab")); + expectedName1.add(ImmutableList.of("cd", "ef")); + expectedName1.add(ImmutableList.of("gh")); + expectedName1.add(ImmutableList.of("gh")); + + CounterUpdate name1Update = + new CounterUpdate() + .setStructuredNameAndMetadata( + new CounterStructuredNameAndMetadata() + .setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + .setOriginNamespace("ns") + .setName("name1") + .setOriginalStepName("s1")) + .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) + .setCumulative(false) + .set("bounded_trie", expectedName1.toProto()); + + Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); + assertThat(updates, containsInAnyOrder(name1Update)); + + boundedTrie = c2.getBoundedTrie(name2); + boundedTrie.add("ij"); + boundedTrie.add("kl", "mn"); + boundedTrie.add("mn"); + + BoundedTrieData expectedName2 = new BoundedTrieData(); + expectedName2.add(ImmutableList.of("ij")); + expectedName2.add(ImmutableList.of("kl", "mn")); + expectedName2.add(ImmutableList.of("mn")); + + CounterUpdate name2Update = + new CounterUpdate() + .setStructuredNameAndMetadata( + new CounterStructuredNameAndMetadata() + .setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + .setOriginNamespace("ns") + .setName("name2") + .setOriginalStepName("s2")) + .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) + .setCumulative(false) + .set("bounded_trie", expectedName2.toProto()); + + updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); + assertThat(updates, containsInAnyOrder(name1Update, name2Update)); + + c1.getBoundedTrie(name1).add("op"); + expectedName1.add(ImmutableList.of("op")); + name1Update.set("bounded_trie", expectedName1.toProto()); + + updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); + assertThat(updates, containsInAnyOrder(name1Update, name2Update)); + } + @Test public void testPerWorkerMetrics() { StreamingStepMetricsContainer.setEnablePerWorkerMetrics(false);