Skip to content

Commit

Permalink
TransportBulkAction.doRun() (#16950)
Browse files Browse the repository at this point in the history
Signed-off-by: kkewwei <[email protected]>
Signed-off-by: kkewwei <[email protected]>
(cherry picked from commit fccd6c5)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Jan 10, 2025
1 parent 781aa6b commit 89b7609
Showing 1 changed file with 8 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ protected void doRun() {
}
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
Metadata metadata = clusterState.metadata();
// go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
// the request can only be null because we set it to null in the previous step, so it gets ignored
Expand Down Expand Up @@ -592,6 +594,12 @@ protected void doRun() {
default:
throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}

ShardId shardId = clusterService.operationRouting()
.indexShards(clusterState, concreteIndex.getName(), docWriteRequest.id(), docWriteRequest.routing())
.shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, docWriteRequest));
} catch (OpenSearchParseException | IllegalArgumentException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
Expand All @@ -601,21 +609,6 @@ protected void doRun() {
}
}

// first, go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
ShardId shardId = clusterService.operationRouting()
.indexShards(clusterState, concreteIndex, request.id(), request.routing())
.shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}

if (requestsByShard.isEmpty()) {
BulkItemResponse[] response = responses.toArray(new BulkItemResponse[responses.length()]);
long tookMillis = buildTookInMillis(startTimeNanos);
Expand Down

0 comments on commit 89b7609

Please sign in to comment.