diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 19ffb12859183..db509afb68da9 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -532,6 +532,8 @@ protected void doRun() { } final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver); Metadata metadata = clusterState.metadata(); + // go over all the requests and create a ShardId -> Operations mapping + Map> requestsByShard = new HashMap<>(); for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest docWriteRequest = bulkRequest.requests.get(i); // the request can only be null because we set it to null in the previous step, so it gets ignored @@ -587,6 +589,12 @@ protected void doRun() { default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]"); } + + ShardId shardId = clusterService.operationRouting() + .indexShards(clusterState, concreteIndex.getName(), docWriteRequest.id(), docWriteRequest.routing()) + .shardId(); + List shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); + shardRequests.add(new BulkItemRequest(i, docWriteRequest)); } catch (OpenSearchParseException | IllegalArgumentException | RoutingMissingException e) { BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.id(), e); BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure); @@ -596,21 +604,6 @@ protected void doRun() { } } - // first, go over all the requests and create a ShardId -> Operations mapping - Map> requestsByShard = new HashMap<>(); - for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocWriteRequest request = bulkRequest.requests.get(i); - if (request == null) { - continue; - } - String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName(); - ShardId shardId = clusterService.operationRouting() - .indexShards(clusterState, concreteIndex, request.id(), request.routing()) - .shardId(); - List shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); - shardRequests.add(new BulkItemRequest(i, request)); - } - if (requestsByShard.isEmpty()) { BulkItemResponse[] response = responses.toArray(new BulkItemResponse[responses.length()]); long tookMillis = buildTookInMillis(startTimeNanos);