Skip to content

Commit

Permalink
Re-use resolved pipeline for all request with same index
Browse files Browse the repository at this point in the history
  • Loading branch information
parkertimmins committed Oct 31, 2024
1 parent 8a5aa09 commit 38da866
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ protected void doRun() throws IOException {

private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
throws IOException {
boolean hasIndexRequestsWithPipelines = false;
final Metadata metadata;
Map<String, ComponentTemplate> componentTemplateSubstitutions = bulkRequest.getComponentTemplateSubstitutions();
Map<String, ComposableIndexTemplate> indexTemplateSubstitutions = bulkRequest.getIndexTemplateSubstitutions();
Expand Down Expand Up @@ -228,13 +227,10 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec
metadata = clusterService.state().getMetadata();
}

for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
}
// Resolve and set pipelines on each index request
boolean hasIndexRequestsWithPipelines = IngestService.resolveAndUpdateAllPipelines(bulkRequest.requests, metadata);

for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest ir) {
if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally");
Expand Down
66 changes: 48 additions & 18 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.bulk.FailureStoreMetrics;
import org.elasticsearch.action.bulk.TransportAbstractBulkAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
Expand Down Expand Up @@ -267,6 +268,7 @@ private static Map<String, Processor.Factory> processorFactories(List<IngestPlug
* @param originalRequest Original write request received.
* @param indexRequest The {@link org.elasticsearch.action.index.IndexRequest} object to update.
* @param metadata Cluster metadata from where the pipeline information could be derived.
* @return
*/
public static void resolvePipelinesAndUpdateIndexRequest(
final DocWriteRequest<?> originalRequest,
Expand All @@ -276,46 +278,74 @@ public static void resolvePipelinesAndUpdateIndexRequest(
resolvePipelinesAndUpdateIndexRequest(originalRequest, indexRequest, metadata, System.currentTimeMillis());
}

private static boolean isRolloverOnWrite(Metadata metadata, final DocWriteRequest<?> request, IndexRequest indexRequest) {
static void resolvePipelinesAndUpdateIndexRequest(
final DocWriteRequest<?> originalRequest,
final IndexRequest indexRequest,
final Metadata metadata,
final long epochMillis
) {
if (indexRequest.isPipelineResolved()) {
return;
}
var pipelines = resolveStoredPipelines(originalRequest, indexRequest, metadata, epochMillis);
setPipelineOnRequest(indexRequest, pipelines);
}

public static boolean resolveAndUpdateAllPipelines(List<DocWriteRequest<?>> requests, Metadata metadata) {
final Map<String, IngestService.Pipelines> storedPipelineCache = new HashMap<>();
boolean hasIndexRequestsWithPipelines = false;
for (DocWriteRequest<?> actionRequest : requests) {
IndexRequest indexRequest = TransportAbstractBulkAction.getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
if (indexRequest.isPipelineResolved() == false) {
// Resolve the pipeline from setting or templates if not cached
var pipelines = storedPipelineCache.computeIfAbsent(
indexRequest.index(),
(index) -> IngestService.resolveStoredPipelines(actionRequest, indexRequest, metadata, System.currentTimeMillis())
);

// Set pipeline on the index request
setPipelineOnRequest(indexRequest, pipelines);
}
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
}
}
return hasIndexRequestsWithPipelines;
}

private static boolean isRolloverOnWrite(Metadata metadata, IndexRequest indexRequest) {
DataStream dataStream = metadata.dataStreams().get(indexRequest.index());
if (dataStream == null) {
return false;
}
return dataStream.getBackingIndices().isRolloverOnWrite();
}

static void resolvePipelinesAndUpdateIndexRequest(
static Pipelines resolveStoredPipelines(
final DocWriteRequest<?> originalRequest,
final IndexRequest indexRequest,
final Metadata metadata,
final long epochMillis
) {
if (indexRequest.isPipelineResolved()) {
return;
}

/*
* Here we look for the pipelines associated with the index if the index exists. If the index does not exist we fall back to using
* templates to find the pipelines.
*/

final Pipelines pipelines;
if (isRolloverOnWrite(metadata, originalRequest, indexRequest)) {
pipelines = resolvePipelinesFromIndexTemplates(indexRequest, metadata).orElse(Pipelines.NO_PIPELINES_DEFINED);
assert indexRequest.isPipelineResolved() == false;
if (isRolloverOnWrite(metadata, indexRequest)) {
return resolvePipelinesFromIndexTemplates(indexRequest, metadata).orElse(Pipelines.NO_PIPELINES_DEFINED);
} else {
pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis).or(
return resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis).or(
() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata)
).orElse(Pipelines.NO_PIPELINES_DEFINED);
}
}

static void setPipelineOnRequest(IndexRequest indexRequest, Pipelines resolvedPipelines) {
// The pipeline coming as part of the request always has priority over the resolved one from metadata or templates
String requestPipeline = indexRequest.getPipeline();
if (requestPipeline != null) {
indexRequest.setPipeline(requestPipeline);
} else {
indexRequest.setPipeline(pipelines.defaultPipeline);
indexRequest.setPipeline(resolvedPipelines.defaultPipeline);
}
indexRequest.setFinalPipeline(pipelines.finalPipeline);
indexRequest.setFinalPipeline(resolvedPipelines.finalPipeline);
indexRequest.isPipelineResolved(true);
}

Expand Down Expand Up @@ -1521,7 +1551,7 @@ public static boolean hasPipeline(IndexRequest indexRequest) {
|| NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false;
}

private record Pipelines(String defaultPipeline, String finalPipeline) {
public record Pipelines(String defaultPipeline, String finalPipeline) {

private static final Pipelines NO_PIPELINES_DEFINED = new Pipelines(NOOP_PIPELINE_NAME, NOOP_PIPELINE_NAME);

Expand Down

0 comments on commit 38da866

Please sign in to comment.