Skip to content

Commit

Permalink
Metrics for incremental bulk splits (#116765) (#117280)
Browse files Browse the repository at this point in the history
Backport of PR #116765. Add metrics to track incremental bulk request splits due to indexing pressure. Resolves ES-9612
  • Loading branch information
ankikuma authored Nov 21, 2024
1 parent 1cd95b5 commit 45d5990
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 6 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/116765.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 116765
summary: Metrics for incremental bulk splits
area: Distributed
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), "512B")
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), "2048B")
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK.getKey(), "2KB")
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK.getKey(), "4KB")
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK_SIZE.getKey(), "1024B")
.build();
}
Expand Down Expand Up @@ -162,6 +162,8 @@ public void testIncrementalBulkLowWatermarkBackOff() throws Exception {

IndexRequest indexRequest = indexRequest(index);
long total = indexRequest.ramBytesUsed();
long lowWaterMarkSplits = indexingPressure.stats().getLowWaterMarkSplits();
long highWaterMarkSplits = indexingPressure.stats().getHighWaterMarkSplits();
while (total < 2048) {
refCounted.incRef();
handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true));
Expand All @@ -176,6 +178,8 @@ public void testIncrementalBulkLowWatermarkBackOff() throws Exception {
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));

assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L)));
assertBusy(() -> assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(lowWaterMarkSplits + 1)));
assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(highWaterMarkSplits));

PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
handler.lastItems(List.of(indexRequest), refCounted::decRef, future);
Expand All @@ -193,6 +197,8 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
long lowWaterMarkSplits = indexingPressure.stats().getLowWaterMarkSplits();
long highWaterMarkSplits = indexingPressure.stats().getHighWaterMarkSplits();

AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
AtomicBoolean nextPage = new AtomicBoolean(false);
Expand All @@ -218,6 +224,8 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
handlerNoThrottle.addItems(requestsNoThrottle, refCounted::decRef, () -> nextPage.set(true));
assertTrue(nextPage.get());
nextPage.set(false);
assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(highWaterMarkSplits));
assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(lowWaterMarkSplits));

ArrayList<DocWriteRequest<?>> requestsThrottle = new ArrayList<>();
// Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled
Expand All @@ -236,6 +244,8 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {

// Wait until we are ready for the next page
assertBusy(() -> assertTrue(nextPage.get()));
assertBusy(() -> assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(highWaterMarkSplits + 1)));
assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(lowWaterMarkSplits));

for (IncrementalBulkService.Handler h : handlers) {
refCounted.incRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,48 @@

package org.elasticsearch.monitor.metrics;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.elasticsearch.index.IndexingPressure.MAX_COORDINATING_BYTES;
import static org.elasticsearch.index.IndexingPressure.MAX_PRIMARY_BYTES;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class NodeIndexingMetricsIT extends ESIntegTestCase {
Expand Down Expand Up @@ -453,6 +466,211 @@ public void testPrimaryDocumentRejectionMetricsFluctuatingOverTime() throws Exce
}
}

// Borrowed this test from IncrementalBulkIT and added test for metrics to it
public void testIncrementalBulkLowWatermarkSplitMetrics() throws Exception {
final String nodeName = internalCluster().startNode(
Settings.builder()
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), "512B")
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), "2048B")
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK.getKey(), "4KB")
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK_SIZE.getKey(), "1024B")
.build()
);
ensureStableCluster(1);

String index = "test";
createIndex(index);

IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, nodeName)
.filterPlugins(TestTelemetryPlugin.class)
.findFirst()
.orElseThrow();
testTelemetryPlugin.resetMeter();

IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();

AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
AtomicBoolean nextPage = new AtomicBoolean(false);

IndexRequest indexRequest = indexRequest(index);
long total = indexRequest.ramBytesUsed();
while (total < 2048) {
refCounted.incRef();
handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true));
assertTrue(nextPage.get());
nextPage.set(false);
indexRequest = indexRequest(index);
total += indexRequest.ramBytesUsed();
}

assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(0L));
assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(0L));

testTelemetryPlugin.collect();
assertThat(
getSingleRecordedMetric(
testTelemetryPlugin::getLongAsyncCounterMeasurement,
"es.indexing.coordinating.low_watermark_splits.total"
).getLong(),
equalTo(0L)
);
assertThat(
getSingleRecordedMetric(
testTelemetryPlugin::getLongAsyncCounterMeasurement,
"es.indexing.coordinating.high_watermark_splits.total"
).getLong(),
equalTo(0L)
);

refCounted.incRef();
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));

assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L)));
assertBusy(() -> assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(1L)));
assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(0L));

testTelemetryPlugin.collect();
assertThat(
getLatestRecordedMetric(
testTelemetryPlugin::getLongAsyncCounterMeasurement,
"es.indexing.coordinating.low_watermark_splits.total"
).getLong(),
equalTo(1L)
);
assertThat(
getLatestRecordedMetric(
testTelemetryPlugin::getLongAsyncCounterMeasurement,
"es.indexing.coordinating.high_watermark_splits.total"
).getLong(),
equalTo(0L)
);

PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
handler.lastItems(List.of(indexRequest), refCounted::decRef, future);

BulkResponse bulkResponse = safeGet(future);
assertNoFailures(bulkResponse);
assertFalse(refCounted.hasReferences());
}

// Borrowed this test from IncrementalBulkIT and added test for metrics to it
public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception {
final String nodeName = internalCluster().startNode(
Settings.builder()
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), "512B")
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), "2048B")
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK.getKey(), "4KB")
.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK_SIZE.getKey(), "1024B")
.build()
);
ensureStableCluster(1);

String index = "test";
createIndex(index);

IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, nodeName)
.filterPlugins(TestTelemetryPlugin.class)
.findFirst()
.orElseThrow();
testTelemetryPlugin.resetMeter();

AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
AtomicBoolean nextPage = new AtomicBoolean(false);

ArrayList<IncrementalBulkService.Handler> handlers = new ArrayList<>();
for (int i = 0; i < 4; ++i) {
ArrayList<DocWriteRequest<?>> requests = new ArrayList<>();
add512BRequests(requests, index);
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
handlers.add(handler);
refCounted.incRef();
handler.addItems(requests, refCounted::decRef, () -> nextPage.set(true));
assertTrue(nextPage.get());
nextPage.set(false);
}

// Test that a request smaller than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is not throttled
ArrayList<DocWriteRequest<?>> requestsNoThrottle = new ArrayList<>();
add512BRequests(requestsNoThrottle, index);
IncrementalBulkService.Handler handlerNoThrottle = incrementalBulkService.newBulkRequest();
handlers.add(handlerNoThrottle);
refCounted.incRef();
handlerNoThrottle.addItems(requestsNoThrottle, refCounted::decRef, () -> nextPage.set(true));
assertTrue(nextPage.get());
nextPage.set(false);
assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(0L));

testTelemetryPlugin.collect();
assertThat(
getSingleRecordedMetric(
testTelemetryPlugin::getLongAsyncCounterMeasurement,
"es.indexing.coordinating.low_watermark_splits.total"
).getLong(),
equalTo(0L)
);
assertThat(
getSingleRecordedMetric(
testTelemetryPlugin::getLongAsyncCounterMeasurement,
"es.indexing.coordinating.high_watermark_splits.total"
).getLong(),
equalTo(0L)
);

ArrayList<DocWriteRequest<?>> requestsThrottle = new ArrayList<>();
// Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled
add512BRequests(requestsThrottle, index);
add512BRequests(requestsThrottle, index);

CountDownLatch finishLatch = new CountDownLatch(1);
blockWritePool(threadPool, finishLatch);
IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest();
refCounted.incRef();
handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true));
assertFalse(nextPage.get());
finishLatch.countDown();

handlers.add(handlerThrottled);

// Wait until we are ready for the next page
assertBusy(() -> assertTrue(nextPage.get()));
assertBusy(() -> assertThat(indexingPressure.stats().getHighWaterMarkSplits(), equalTo(1L)));
assertThat(indexingPressure.stats().getLowWaterMarkSplits(), equalTo(0L));

testTelemetryPlugin.collect();
assertThat(
getLatestRecordedMetric(
testTelemetryPlugin::getLongAsyncCounterMeasurement,
"es.indexing.coordinating.low_watermark_splits.total"
).getLong(),
equalTo(0L)
);
assertThat(
getLatestRecordedMetric(
testTelemetryPlugin::getLongAsyncCounterMeasurement,
"es.indexing.coordinating.high_watermark_splits.total"
).getLong(),
equalTo(1L)
);

for (IncrementalBulkService.Handler h : handlers) {
refCounted.incRef();
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
h.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);
BulkResponse bulkResponse = safeGet(future);
assertNoFailures(bulkResponse);
}

assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L)));
refCounted.decRef();
assertFalse(refCounted.hasReferences());
testTelemetryPlugin.collect();
}

private static Measurement getSingleRecordedMetric(Function<String, List<Measurement>> metricGetter, String name) {
final List<Measurement> measurements = metricGetter.apply(name);
assertFalse("Indexing metric is not recorded", measurements.isEmpty());
Expand All @@ -470,4 +688,47 @@ private static boolean doublesEquals(double expected, double actual) {
final double eps = .0000001;
return Math.abs(expected - actual) < eps;
}

private static IndexRequest indexRequest(String index) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(index);
indexRequest.source(Map.of("field", randomAlphaOfLength(10)));
return indexRequest;
}

private static void add512BRequests(ArrayList<DocWriteRequest<?>> requests, String index) {
long total = 0;
while (total < 512) {
IndexRequest indexRequest = indexRequest(index);
requests.add(indexRequest);
total += indexRequest.ramBytesUsed();
}
assertThat(total, lessThan(1024L));
}

private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) {
final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax();
final var startBarrier = new CyclicBarrier(threadCount + 1);
final var blockingTask = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
fail(e);
}

@Override
protected void doRun() {
safeAwait(startBarrier);
safeAwait(finishLatch);
}

@Override
public boolean isForceExecution() {
return true;
}
};
for (int i = 0; i < threadCount; i++) {
threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask);
}
safeAwait(startBarrier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ static TransportVersion def(int id) {
public static final TransportVersion FAST_REFRESH_RCO_2 = def(8_795_00_0);
public static final TransportVersion ESQL_ENRICH_RUNTIME_WARNINGS = def(8_796_00_0);
public static final TransportVersion INGEST_PIPELINE_CONFIGURATION_AS_MAP = def(8_797_00_0);
public static final TransportVersion INDEXING_PRESSURE_THROTTLING_STATS = def(8_798_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Loading

0 comments on commit 45d5990

Please sign in to comment.