Skip to content

Commit

Permalink
Support samza portable UDF metrics. (#25265)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryucc authored Feb 9, 2023
1 parent 9fcb3a5 commit ea1625a
Showing 1 changed file with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {}
public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
response.getMonitoringInfosList().stream()
.filter(monitoringInfo -> !monitoringInfo.getPayload().isEmpty())
.forEach(this::parseAndUpdateMetric);
.map(this::parseAndUpdateMetric)
.distinct()
.forEach(samzaMetricsContainer::updateMetrics);
}

/**
Expand All @@ -113,8 +115,9 @@ public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
*
* @see
* org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo)
* @return the final transformUniqueName for the metric
*/
private void parseAndUpdateMetric(MetricsApi.MonitoringInfo monitoringInfo) {
private String parseAndUpdateMetric(MetricsApi.MonitoringInfo monitoringInfo) {
String pTransformId =
monitoringInfo.getLabelsOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, stepName);
String transformUniqueName = transformIdToUniqueName.getOrDefault(pTransformId, pTransformId);
Expand Down Expand Up @@ -148,7 +151,8 @@ private void parseAndUpdateMetric(MetricsApi.MonitoringInfo monitoringInfo) {
break;

default:
LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
LOG.debug("Unsupported metric type {}", monitoringInfo.getType());
}
return transformUniqueName;
}
}

0 comments on commit ea1625a

Please sign in to comment.