Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-21119] Add metrics for error classification #15815

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ private ProgramRunId getProgramRunId(ApplicationId applicationId) throws Excepti
return programRunId;
}

/**
* Returns the list of {@link io.cdap.cdap.proto.ErrorClassificationResponse} for
* failed preview run.
*/
@POST
@Path("/namespaces/{namespace-id}/previews/{preview-id}/classify")
public void classifyRunIdLogs(HttpRequest request, HttpResponder responder,
Expand All @@ -110,7 +114,7 @@ public void classifyRunIdLogs(HttpRequest request, HttpResponder responder,
try (CloseableIterator<LogEvent> logIter = logReader.getLog(loggingContext,
readRange.getFromMillis(), readRange.getToMillis(), filter)) {
// the iterator is closed by the BodyProducer passed to the HttpResponder
errorLogsClassifier.classify(logIter, responder);
errorLogsClassifier.classify(logIter, responder, namespaceId, null, "preview", previewId);
} catch (Exception ex) {
LOG.debug("Exception while classifying logs for logging context {}", loggingContext, ex);
responder.sendStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,13 @@ public static final class Tag {

// For operations
public static final String OPERATION_RUN = "operation";

// For error classification
public static final String ERROR_CATEGORY = "ectgry";
public static final String ERROR_TYPE = "etpe";
public static final String DEPENDENCY = "edep";
public static final String ERROR_CODE_TYPE = "ecdtpe";
public static final String ERROR_CODE = "ecd";
}

/**
Expand Down Expand Up @@ -1177,6 +1184,8 @@ public static final class Program {
public static final String APPLICATION_COUNT = "application.count";
public static final String NAMESPACE_COUNT = "namespace.count";
public static final String APPLICATION_PLUGIN_COUNT = "application.plugin.count";
public static final String FAILED_RUNS_CLASSIFICATION_COUNT =
"program.failed.runs.classified.count";
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.logging;

import java.util.Objects;

/**
* Cache key for {@link io.cdap.cdap.proto.ErrorClassificationResponse}.
*/
public final class ErrorClassificationResponseCacheKey {
private final String namespace;
private final String program;
private final String appId;
private final String runId;

/**
* Constructor for {@link ErrorClassificationResponseCacheKey}.
*/
public ErrorClassificationResponseCacheKey(String namespace, String program,
String appId, String runId) {
this.namespace = namespace;
this.program = program;
this.appId = appId;
this.runId = runId;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ErrorClassificationResponseCacheKey)) {
return false;
}
ErrorClassificationResponseCacheKey that = (ErrorClassificationResponseCacheKey) o;
return Objects.equals(this.namespace, that.namespace)
&& Objects.equals(this.program, that.program) && Objects.equals(this.appId, that.appId)
&& Objects.equals(this.runId, that.runId);
}

@Override
public int hashCode() {
return Objects.hash(namespace, program, appId, runId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.inject.Inject;
import io.cdap.cdap.api.dataset.lib.CloseableIterator;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.FailureDetailsProvider;
import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.api.metrics.MetricsContext;
import io.cdap.cdap.common.conf.Constants.Logging;
import io.cdap.cdap.common.conf.Constants.Metrics;
import io.cdap.cdap.logging.read.LogEvent;
import io.cdap.cdap.proto.ErrorClassificationResponse;
import io.cdap.http.HttpResponder;
Expand All @@ -38,7 +45,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import joptsimple.internal.Strings;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
* Classifies error logs and returns {@link ErrorClassificationResponse}.
Expand All @@ -51,7 +59,7 @@ public class ErrorLogsClassifier {
"io.cdap.cdap.runtime.spi.provisioner.dataproc.DataprocRuntimeException";
private static final ImmutableList<String> ALLOWLIST_CLASSES =
ImmutableList.<String>builder().add(DATAPROC_RUNTIME_EXCEPTION).build();
private static final LoadingCache<String, Boolean> cache = CacheBuilder.newBuilder()
private static final LoadingCache<String, Boolean> CLASS_CACHE = CacheBuilder.newBuilder()
.maximumSize(5000)
.build(new CacheLoader<String, Boolean>() {
@Override
Expand All @@ -64,24 +72,59 @@ public Boolean load(String className) {
}
}
});
private final Cache<ErrorClassificationResponseCacheKey,
List<ErrorClassificationResponse>> responseCache;
private final MetricsCollectionService metricsCollectionService;

private static boolean isFailureDetailsProviderInstance(String className) {
try {
return cache.get(className);
return CLASS_CACHE.get(className);
} catch (Exception e) {
return false; // Handle any unexpected errors
}
}

/**
* Constructor for {@link ErrorLogsClassifier}.
*/
@Inject
public ErrorLogsClassifier(MetricsCollectionService metricsCollectionService) {
this.metricsCollectionService = metricsCollectionService;
responseCache = CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterWrite(7, TimeUnit.DAYS).build();
}

/**
* Classifies error logs and returns {@link ErrorClassificationResponse}.
*
* @param logIter Logs Iterator that can be closed.
* @param responder The HttpResponder.
* @param namespace The namespace of program.
* @param program The name of program.
* @param appId The name of application.
* @param runId The run id of program.
*/
public void classify(CloseableIterator<LogEvent> logIter, HttpResponder responder) {
public void classify(CloseableIterator<LogEvent> logIter, HttpResponder responder,
String namespace, @Nullable String program, String appId, String runId) throws Exception {
ErrorClassificationResponseCacheKey errorClassificationResponseCacheKey =
new ErrorClassificationResponseCacheKey(namespace, program, appId, runId);
List<ErrorClassificationResponse> responses =
responseCache.getIfPresent(errorClassificationResponseCacheKey);
if (responses != null) {
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(responses));
return;
}
responses = getErrorClassificationResponse(namespace, program, appId, runId,
logIter, responder);
responseCache.put(errorClassificationResponseCacheKey, responses);
}

private List<ErrorClassificationResponse> getErrorClassificationResponse(String namespace,
@Nullable String program, String appId, String runId, CloseableIterator<LogEvent> logIter,
HttpResponder responder) {
Map<String, ErrorClassificationResponse> responseMap = new HashMap<>();
Set<ErrorClassificationResponse> responseSet = new HashSet<>();

while (logIter.hasNext()) {
ILoggingEvent logEvent = logIter.next().getLoggingEvent();
Map<String, String> mdc = logEvent.getMDCPropertyMap();
Expand All @@ -97,6 +140,25 @@ public void classify(CloseableIterator<LogEvent> logIter, HttpResponder responde
List<ErrorClassificationResponse> responses = new ArrayList<>(responseMap.values());
responses.addAll(responseSet);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(responses));

// emit metric
MetricsContext metricsContext = metricsCollectionService.getContext(ImmutableMap.of(
Metrics.Tag.NAMESPACE, namespace,
Metrics.Tag.PROGRAM, program,
Metrics.Tag.APP, appId,
Metrics.Tag.RUN_ID, runId));

for (ErrorClassificationResponse response : responses) {
MetricsContext context = metricsContext.childContext(ImmutableMap.of(
Metrics.Tag.ERROR_CATEGORY, response.getErrorCategory(),
Metrics.Tag.ERROR_TYPE, response.getErrorType(),
Metrics.Tag.DEPENDENCY, response.getDependency(),
Metrics.Tag.ERROR_CODE_TYPE, response.getErrorCodeType(),
Metrics.Tag.ERROR_CODE, response.getErrorCode()
));
context.gauge(Metrics.Program.FAILED_RUNS_CLASSIFICATION_COUNT, 1);
}
return responses;
}

private void populateResponse(IThrowableProxy throwableProxy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ private void ensureVisibilityOnProgram(String namespace, String application, Str
accessEnforcer.enforce(programId, authenticationContext.getPrincipal(), StandardPermission.GET);
}

/**
* Returns the list of {@link io.cdap.cdap.proto.ErrorClassificationResponse} for
* failed program run.
*/
@POST
@Path("/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/runs/{run-id}/classify")
public void classifyRunIdLogs(HttpRequest request, HttpResponder responder,
Expand All @@ -127,7 +131,7 @@ public void classifyRunIdLogs(HttpRequest request, HttpResponder responder,
try (CloseableIterator<LogEvent> logIter = logReader.getLog(loggingContext,
readRange.getFromMillis(), readRange.getToMillis(), filter)) {
// the iterator is closed by the BodyProducer passed to the HttpResponder
errorLogsClassifier.classify(logIter, responder);
errorLogsClassifier.classify(logIter, responder, namespaceId, programId, appId, runId);
} catch (Exception ex) {
LOG.debug("Exception while classifying logs for logging context {}", loggingContext, ex);
responder.sendStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import com.google.common.collect.Iterators;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.cdap.cdap.api.dataset.lib.CloseableIterator;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.api.exception.WrappedStageException;
import io.cdap.cdap.api.metrics.MetricValue;
import io.cdap.cdap.api.metrics.MetricValues;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.Metrics;
import io.cdap.cdap.logging.read.LogEvent;
import io.cdap.cdap.logging.read.LogOffset;
import io.cdap.cdap.metrics.collect.AggregatedMetricsCollectionService;
import io.cdap.cdap.proto.ErrorClassificationResponse;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -44,10 +51,9 @@ public class ErrorLogsClassifierTest {

private static final Gson GSON = new Gson();
private final MockResponder responder = new MockResponder();
private final ErrorLogsClassifier classifier = new ErrorLogsClassifier();

@Test
public void testClassifyLogs() {
public void testClassifyLogs() throws Exception {
LogEvent logEvent1 = new LogEvent(getEvent1(), LogOffset.LATEST_OFFSET);
LogEvent logEvent2 = new LogEvent(getEvent2(), LogOffset.LATEST_OFFSET);
List<LogEvent> events = new ArrayList<>();
Expand All @@ -70,10 +76,15 @@ public void close() {
// no-op
}
};
classifier.classify(closeableIterator, responder);
List<MetricValues> metricValuesList = new ArrayList<>();
MetricsCollectionService mockMetricsCollectionService = getMockCollectionService(metricValuesList);
mockMetricsCollectionService.startAndWait();
ErrorLogsClassifier classifier = new ErrorLogsClassifier(mockMetricsCollectionService);
classifier.classify(closeableIterator, responder, "namespace", "program", "app", "run");
Type listType = new TypeToken<List<ErrorClassificationResponse>>() {}.getType();
List<ErrorClassificationResponse> responses =
GSON.fromJson(responder.getResponseContentAsString(), listType);
mockMetricsCollectionService.stopAndWait();
Assert.assertEquals(1, responses.size());
Assert.assertEquals("stageName", responses.get(0).getStageName());
Assert.assertEquals("errorCategory-'stageName'", responses.get(0).getErrorCategory());
Expand All @@ -85,6 +96,9 @@ public void close() {
Assert.assertEquals("errorCode", responses.get(0).getErrorCode());
Assert.assertEquals("supportedDocumentationUrl",
responses.get(0).getSupportedDocumentationUrl());
Assert.assertSame(1, metricValuesList.size());
Assert.assertTrue(containsMetric(metricValuesList.get(0),
Metrics.Program.FAILED_RUNS_CLASSIFICATION_COUNT));
}

private ILoggingEvent getEvent1() {
Expand Down Expand Up @@ -120,4 +134,22 @@ private ILoggingEvent getEvent2() {
Mockito.when(event.getMDCPropertyMap()).thenReturn(map);
return event;
}

private MetricsCollectionService getMockCollectionService(Collection<MetricValues> collection) {
return new AggregatedMetricsCollectionService(1000L) {
@Override
protected void publish(Iterator<MetricValues> metrics) {
Iterators.addAll(collection, metrics);
}
};
}

private boolean containsMetric(MetricValues metricValues, String metricName) {
for (MetricValue metricValue : metricValues.getMetrics()) {
if (metricValue.getName().equals(metricName)) {
return true;
}
}
return false;
}
}
Loading