Skip to content

Commit

Permalink
feat: add metrics (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
zoercai authored Jul 5, 2021
1 parent aaf2ece commit 5c0ec6d
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.sdk.io.gcp.spanner.cdc.ChangeStreamSourceDescriptor;
import org.apache.beam.sdk.io.gcp.spanner.cdc.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.spanner.cdc.PipelineInitializer;
import org.apache.beam.sdk.io.gcp.spanner.cdc.PostProcessingMetricsDoFn;
import org.apache.beam.sdk.io.gcp.spanner.cdc.ReadChangeStreamPartitionDoFn;
import org.apache.beam.sdk.io.gcp.spanner.cdc.actions.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.DaoFactory;
Expand Down Expand Up @@ -1427,6 +1428,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getExclusiveEndAt().toSqlTimesta
final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(
getSpannerConfig(), daoFactory, mapperFactory, actionFactory);
final PostProcessingMetricsDoFn postProcessingMetricsDoFn = new PostProcessingMetricsDoFn();

// FIXME: Remove the partitionMetadataDAO as a parameter
// TODO: See if we can have a DAO for the admin operations
Expand All @@ -1440,7 +1442,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getExclusiveEndAt().toSqlTimesta
return input
.apply("Generate change stream sources", Create.of(sources))
.apply("Detect new partitions", ParDo.of(detectNewPartitionsDoFn))
.apply("Read change stream partition", ParDo.of(readChangeStreamPartitionDoFn));
.apply("Read change stream partition", ParDo.of(readChangeStreamPartitionDoFn))
.apply("Post processing metrics", ParDo.of(postProcessingMetricsDoFn));

// TODO: We need to perform cleanup after everything has terminated (delete metadata table)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc;

import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.ReadChangeStream;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;

public class CdcMetrics {

public static final Counter PARTITIONS_DETECTED_COUNTER =
Metrics.counter(ReadChangeStream.class, "partitions_detected");

public static final Counter PARTITION_SPLIT_COUNTER =
Metrics.counter(ReadChangeStream.class, "partition_split_count");

public static final Counter PARTITION_MERGE_COUNTER =
Metrics.counter(ReadChangeStream.class, "partition_merge_count");

public static final Distribution PARTITION_CREATED_TO_SCHEDULED_MS =
Metrics.distribution(ReadChangeStream.class, "partition_created_to_scheduled_ms");

// TODO(zoc): add this correctly
public static final Distribution WATERMARK_TO_LATEST_RECORD_COMMIT_TIMESTAMP_MS =
Metrics.distribution(ReadChangeStream.class, "watermark_to_latest_record_commit_timestamp_ms");

public static final Distribution RECORD_COMMIT_TIMESTAMP_TO_EMITTED_MS =
Metrics.distribution(ReadChangeStream.class, "record_commit_timestamp_to_emitted_ms");
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc;

import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.PARTITIONS_DETECTED_COUNTER;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.PARTITION_CREATED_TO_SCHEDULED_MS;

import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
Expand Down Expand Up @@ -132,10 +135,16 @@ public ProcessContinuation processElement(
while (resultSet.next()) {
// TODO(hengfeng): change the log level in this file.
LOG.debug("Reading record currentIndex:" + currentIndex);
PARTITIONS_DETECTED_COUNTER.inc();

if (!tracker.tryClaim(currentIndex)) {
return ProcessContinuation.stop();
}
PartitionMetadata metadata = buildPartitionMetadata(resultSet);
PARTITION_CREATED_TO_SCHEDULED_MS
.update(new Duration(metadata.getCreatedAt().toDate().getTime(),
Instant.now().getMillis()).getMillis());

LOG.debug(
String.format(
"Get partition metadata currentIndex:%d meta:%s", currentIndex, metadata));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc;

import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.RECORD_COMMIT_TIMESTAMP_TO_EMITTED_MS;

import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangeRecord;
import org.apache.beam.sdk.transforms.DoFn;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class PostProcessingMetricsDoFn extends DoFn<DataChangeRecord, DataChangeRecord>
implements Serializable {

@ProcessElement
public void processElement(@Element DataChangeRecord dataChangeRecord,
OutputReceiver<DataChangeRecord> receiver) {
RECORD_COMMIT_TIMESTAMP_TO_EMITTED_MS.update(
new Duration(new Instant(dataChangeRecord.getCommitTimestamp().toSqlTimestamp().getTime()),
new Instant()).getMillis());
receiver.output(dataChangeRecord);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public ProcessContinuation processElement(
+ partition.getPartitionToken()
+ " with restriction "
+ tracker.currentRestriction());

switch (tracker.currentRestriction().getMode()) {
case QUERY_CHANGE_STREAM:
return queryChangeStream(partition, tracker, receiver, watermarkEstimator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc.actions;

import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.PARTITION_MERGE_COUNTER;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.PARTITION_SPLIT_COUNTER;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State.CREATED;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State.FINISHED;

Expand Down Expand Up @@ -67,6 +69,8 @@ public Optional<ProcessContinuation> run(
for (ChildPartition childPartition : record.getChildPartitions()) {
if (isSplit(childPartition)) {
LOG.info("Processing child partition split event");
PARTITION_SPLIT_COUNTER.inc();

final PartitionMetadata row =
toPartitionMetadata(
record.getStartTimestamp(),
Expand All @@ -79,6 +83,8 @@ public Optional<ProcessContinuation> run(
partitionMetadataDao.insert(row);
} else {
LOG.info("Processing child partition merge event");
PARTITION_MERGE_COUNTER.inc();

partitionMetadataDao.runInTransaction(
transaction -> {
final long finishedParents =
Expand Down

0 comments on commit 5c0ec6d

Please sign in to comment.