From c7fcdef483062e7ade99f297ee1e1665a59c7c4a Mon Sep 17 00:00:00 2001 From: Parker Timmins Date: Thu, 31 Oct 2024 22:07:49 -0500 Subject: [PATCH] yaml rest test --- .../test/ingest/310_reroute_processor.yml | 104 ++++++++++++++++++ .../bulk/TransportAbstractBulkAction.java | 2 +- 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml index 53229290da03e..d0ada4a6030b4 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/310_reroute_processor.yml @@ -252,3 +252,107 @@ teardown: - match: { _source.existing-field : true } - match: { _source.added-in-pipeline-before-reroute : true } - match: { _source.added-in-pipeline-after-reroute : true } + +--- +"Test data stream with lazy rollover obtains pipeline from template": + # Start with pipeline that reroute + - do: + ingest.put_pipeline: + id: "demo-reroute" + body: > + { + "processors": [ + { + "reroute" : {"namespace": "foo"} + } + ] + } + - match: { acknowledged: true } + + # Set pipeline in template + - do: + indices.put_index_template: + name: demo_1 + body: + index_patterns: [ "demo*" ] + priority: 1 + data_stream: { } + template: + settings: + index.default_pipeline: "demo-reroute" + - match: { acknowledged: true } + + - do: + indices.create_data_stream: + name: demo-dataset-default + - match: { acknowledged: true } + + - do: + index: + index: demo-dataset-default + body: + '@timestamp': '2020-12-12' + some-field: 1 + - do: + indices.refresh: + index: demo-dataset-foo + + # document is rerouted + - do: + search: + index: demo-dataset-foo + body: { query: { match_all: { } } } + - length: { hits.hits: 1 } + - match: { hits.hits.0._source.some-field: 1 } + + # add higher priority template without reroute + - do: + indices.put_index_template: + name: demo_2 + body: + index_patterns: [ "demo*" ] + priority: 2 # higher priority + data_stream: { } + - match: { acknowledged: true } + + - do: + index: + index: demo-dataset-default + body: + '@timestamp': '2020-12-12' + some-field: 2 + - do: + indices.refresh: + index: demo-dataset-foo + + # still reroute because datastream hasn't rolled over + - do: + search: + index: demo-dataset-foo + body: { query: { match_all: { } } } + - length: { hits.hits: 2 } + + # perform lazy rollover + - do: + indices.rollover: + alias: demo-dataset-default + lazy: true + + - do: + index: + index: demo-dataset-default + body: + '@timestamp': '2020-12-12' + some-field: 3 + - do: + indices.refresh: + index: demo-dataset-default + + # now absence of pipeline resolved from template, so no reroute + - do: + search: + index: demo-dataset-default + body: { query: { match_all: { } } } + - length: { hits.hits: 1 } + - match: { hits.hits.0._source.some-field: 3 } + diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 506c4e88f2eca..af5b4048eec21 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -235,7 +235,7 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec if (indexRequest.isPipelineResolved() == false) { var pipeline = resolvedPipelineCache.computeIfAbsent( indexRequest.index(), - // TODO perhaps this should be update to use `threadPool.absoluteTimeInMillis()`, but leaving as is for now. + // TODO perhaps this should use `threadPool.absoluteTimeInMillis()`, but leaving as is for now. (index) -> IngestService.resolveStoredPipelines(actionRequest, indexRequest, metadata, System.currentTimeMillis()) ); IngestService.setPipelineOnRequest(indexRequest, pipeline);