Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(gms): filter out runs of a dataJob without any run-events #11223

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.DataProcessInstanceResult;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.*;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
Expand All @@ -33,13 +31,17 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** GraphQL Resolver used for fetching a list of Task Runs associated with a Data Job */
public class DataJobRunsResolver
implements DataFetcher<CompletableFuture<DataProcessInstanceResult>> {

private static final String PARENT_TEMPLATE_URN_SEARCH_INDEX_FIELD_NAME = "parentTemplate";
private static final String CREATED_TIME_SEARCH_INDEX_FIELD_NAME = "created";
private static final String HAS_RUN_EVENTS_FIELD_NAME = "hasRunEvents";
private static final Logger log = LoggerFactory.getLogger(DataJobRunsResolver.class);

private final EntityClient _entityClient;

Expand Down Expand Up @@ -117,7 +119,12 @@ private Filter buildTaskRunsEntityFilter(final String entityUrn) {
new Criterion()
.setField(PARENT_TEMPLATE_URN_SEARCH_INDEX_FIELD_NAME)
.setCondition(Condition.EQUAL)
.setValue(entityUrn)));
.setValue(entityUrn),
new Criterion()
.setField(HAS_RUN_EVENTS_FIELD_NAME)
.setCondition(Condition.EQUAL)
.setValue(Boolean.TRUE.toString())));

final Filter filter = new Filter();
filter.setOr(
new ConjunctiveCriterionArray(ImmutableList.of(new ConjunctiveCriterion().setAnd(array))));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.dataprocessinstances.BackfillDataProcessInstances;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import io.datahubproject.metadata.context.OperationContext;
import org.opensearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;

@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class BackfillDataProcessInstancesConfig {

@Bean
public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents(
final OperationContext opContext,
EntityService<?> entityService,
ElasticSearchService elasticSearchService,
RestHighLevelClient restHighLevelClient,
@Value("${systemUpdate.processInstanceHasRunEvents.enabled}") final boolean enabled,
@Value("${systemUpdate.processInstanceHasRunEvents.reprocess.enabled}")
boolean reprocessEnabled,
@Value("${systemUpdate.processInstanceHasRunEvents.batchSize}") final Integer batchSize,
@Value("${systemUpdate.processInstanceHasRunEvents.delayMs}") final Integer delayMs,
@Value("${systemUpdate.processInstanceHasRunEvents.totalDays}") Integer totalDays,
@Value("${systemUpdate.processInstanceHasRunEvents.windowDays}") Integer windowDays) {
return new BackfillDataProcessInstances(
opContext,
entityService,
elasticSearchService,
restHighLevelClient,
enabled,
reprocessEnabled,
batchSize,
delayMs,
totalDays,
windowDays);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.linkedin.datahub.upgrade.system.dataprocessinstances;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import org.opensearch.client.RestHighLevelClient;

public class BackfillDataProcessInstances implements NonBlockingSystemUpgrade {

private final List<UpgradeStep> _steps;

public BackfillDataProcessInstances(
OperationContext opContext,
EntityService<?> entityService,
ElasticSearchService elasticSearchService,
RestHighLevelClient restHighLevelClient,
boolean enabled,
boolean reprocessEnabled,
Integer batchSize,
Integer batchDelayMs,
Integer totalDays,
Integer windowDays) {
if (enabled) {
_steps =
ImmutableList.of(
new BackfillDataProcessInstancesHasRunEventsStep(
opContext,
entityService,
elasticSearchService,
restHighLevelClient,
reprocessEnabled,
batchSize,
batchDelayMs,
totalDays,
windowDays));
} else {
_steps = ImmutableList.of();
}
}

@Override
public String id() {
return "BackfillDataProcessInstances";
}

@Override
public List<UpgradeStep> steps() {
return _steps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package com.linkedin.datahub.upgrade.system.dataprocessinstances;

import static com.linkedin.metadata.Constants.*;

import com.google.common.base.Throwables;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.node.JsonNodeFactory;
import org.codehaus.jackson.node.ObjectNode;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.aggregations.Aggregation;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;

@Slf4j
public class BackfillDataProcessInstancesHasRunEventsStep implements UpgradeStep {

private static final String UPGRADE_ID = "BackfillDataProcessInstancesHasRunEvents";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);

private final OperationContext opContext;
private final EntityService<?> entityService;
private final ElasticSearchService elasticSearchService;
private final RestHighLevelClient restHighLevelClient;

private final boolean reprocessEnabled;
private final Integer batchSize;
private final Integer batchDelayMs;

private final Integer totalDays;
private final Integer windowDays;

public BackfillDataProcessInstancesHasRunEventsStep(
OperationContext opContext,
EntityService<?> entityService,
ElasticSearchService elasticSearchService,
RestHighLevelClient restHighLevelClient,
boolean reprocessEnabled,
Integer batchSize,
Integer batchDelayMs,
Integer totalDays,
Integer windowDays) {
this.opContext = opContext;
this.entityService = entityService;
this.elasticSearchService = elasticSearchService;
this.restHighLevelClient = restHighLevelClient;
this.reprocessEnabled = reprocessEnabled;
this.batchSize = batchSize;
this.batchDelayMs = batchDelayMs;
this.totalDays = totalDays;
this.windowDays = windowDays;
}

@SuppressWarnings("BusyWait")
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
TermsValuesSourceBuilder termsValuesSourceBuilder =
new TermsValuesSourceBuilder("urn").field("urn");

ObjectNode json = JsonNodeFactory.instance.objectNode();
json.put("hasRunEvents", true);

IndexConvention indexConvention = opContext.getSearchContext().getIndexConvention();

String runEventsIndexName =
indexConvention.getTimeseriesAspectIndexName(
DATA_PROCESS_INSTANCE_ENTITY_NAME, DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME);

DataHubUpgradeState upgradeState = DataHubUpgradeState.SUCCEEDED;

Instant now = Instant.now();
Instant overallStart = now.minus(totalDays, ChronoUnit.DAYS);
for (int i = 0; ; i++) {
Instant windowEnd = now.minus(i * windowDays, ChronoUnit.DAYS);
if (!windowEnd.isAfter(overallStart)) {
break;
}
Instant windowStart = windowEnd.minus(windowDays, ChronoUnit.DAYS);
if (windowStart.isBefore(overallStart)) {
// last iteration, cap at overallStart
windowStart = overallStart;
}

QueryBuilder queryBuilder =
QueryBuilders.boolQuery()
.must(
QueryBuilders.rangeQuery("@timestamp")
.gte(windowStart.toString())
.lt(windowEnd.toString()));

CompositeAggregationBuilder aggregationBuilder =
AggregationBuilders.composite("aggs", List.of(termsValuesSourceBuilder))
.size(batchSize);

while (true) {
SearchRequest searchRequest = new SearchRequest(runEventsIndexName);
searchRequest.source(
new SearchSourceBuilder()
.size(0)
.aggregation(aggregationBuilder)
.query(queryBuilder));

SearchResponse response;

try {
response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error(Throwables.getStackTraceAsString(e));
log.error("Error querying index {}", runEventsIndexName);
upgradeState = DataHubUpgradeState.FAILED;
break;
}
List<Aggregation> aggregations = response.getAggregations().asList();
if (aggregations.isEmpty()) {
break;
}
CompositeAggregation aggregation = (CompositeAggregation) aggregations.get(0);
Set<Urn> urns = new HashSet<>();
for (CompositeAggregation.Bucket bucket : aggregation.getBuckets()) {
for (Object value : bucket.getKey().values()) {
try {
urns.add(Urn.createFromString(String.valueOf(value)));
} catch (URISyntaxException e) {
log.warn("Ignoring invalid urn {}", value);
}
}
}
if (!urns.isEmpty()) {
urns = entityService.exists(opContext, urns);
urns.forEach(
urn ->
elasticSearchService.upsertDocument(
opContext,
DATA_PROCESS_INSTANCE_ENTITY_NAME,
json.toString(),
indexConvention.getEntityDocumentId(urn)));
}
if (aggregation.afterKey() == null) {
break;
}
aggregationBuilder.aggregateAfter(aggregation.afterKey());
if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
BootstrapStep.setUpgradeResult(context.opContext(), UPGRADE_ID_URN, entityService);
return new DefaultUpgradeStepResult(id(), upgradeState);
};
}

@Override
public String id() {
return UPGRADE_ID;
}

/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Override
public boolean isOptional() {
return true;
}

/** Returns whether the upgrade should be skipped. */
@Override
public boolean skip(UpgradeContext context) {
if (reprocessEnabled) {
return false;
}

boolean previouslyRun =
entityService.exists(
context.opContext(), UPGRADE_ID_URN, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import com.linkedin.common.Urn
record DataProcessInstanceRunEvent includes TimeseriesAspectBase, ExternalReference {

@TimeseriesField = {}
@Searchable = {
"hasValuesFieldName": "hasRunEvents"
}
status: enum DataProcessRunStatus {
/**
* The status where the Data processing run is in.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,15 @@ systemUpdate:
batchSize: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_BATCH_SIZE:500}
delayMs: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_DELAY_MS:5000}
limit: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_LIMIT:0}

processInstanceHasRunEvents:
david-leifker marked this conversation as resolved.
Show resolved Hide resolved
enabled: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_ENABLED:true}
batchSize: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_BATCH_SIZE:100}
delayMs: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_DELAY_MS:1000}
totalDays: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_TOTAL_DAYS:90}
windowDays: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_WINDOW_DAYS:1}
reprocess:
enabled: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_REPROCESS:false}

structuredProperties:
enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings
writeEnabled: ${ENABLE_STRUCTURED_PROPERTIES_WRITE:true} # write structured property values
Expand Down
Loading