-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add BulkRequest support to High Level Rest client #23312
Add BulkRequest support to High Level Rest client #23312
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a few comments
parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy()); | ||
|
||
// Bulk API only supports newline delimited JSON | ||
XContentType xContentType = XContentType.JSON; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it also supports smile. should we validate that the format of all the index/update requests is the same and use that one? I wouldn't want to make things too complicated though given that most likely nobody uses smile here. People may also theoretically be using cbor or yaml in the single index requests now using the index api, as they don't have to go through REST.
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { | ||
IndexRequest indexRequest = (IndexRequest) request; | ||
BytesReference indexSource = indexRequest.source(); | ||
XContentType indexXContentType = indexRequest.getContentType(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting idea, to convert all the index requests to the target format, that way we can choose to always use either json or smile, without validating anything. Maybe that's better than what I suggested above. Although maybe tricky cause if a user sends cbor or smile, maybe they really expect cbor or smile to be stored in lucene rather than the json obtained from the conversion, but that is not possible when going through REST. I am on the fence. Maybe we should rather do some validation before we run the request, make sure that all of the requests have the same content-type and that can only be either json or smile. That way if something is not expected users are notified with an error. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I always forgot about Smile... I think it makes sense to check that all requests have the same content type and that it's either JSON or Smile. I updated the code and added tests for that, the first content type found dictates what's expected for the other requests.
assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.CREATED, bulkItemResponse.status()); | ||
} else if (requestOpType == DocWriteRequest.OpType.UPDATE) { | ||
assertEquals(errors[i], bulkItemResponse.isFailed()); | ||
assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.OK, bulkItemResponse.status()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these internal server errors ok? this seems fishy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are ElasticsearchException and ElasticsearchException almost always return INTERNAL_SERVER_ERROR as the status.
They come from shard info failures for the bulk item request, but the type is lost because the bulk response (and associated bulk item responses) are parsed back by the high level rest client.
I checked the status, I'm not sure that the test should check more than that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok let's try and fix this separately.
@javanna Round 2, ready |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @tlrx left a couple more comments. it's close though.
if (bulkContentType == null) { | ||
bulkContentType = requestContentType; | ||
} else { | ||
match = (requestContentType == bulkContentType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish we could throw exception here already rather than setting a flag. maybe add a static method that throws the exception so we don't worry about copying the message around?
// Bulk API only supports newline delimited JSON or Smile. Before executing | ||
// the bulk, we need to check that all requests have the same content-type | ||
// and this content-type is supported by the Bulk API. | ||
List<XContentType> allowedContentTypes = Arrays.asList(XContentType.JSON, XContentType.SMILE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a short method that checks the content type and get rid of the list? If this class becomes too big I am good with getting the bulk part out. some of this methods are going to be useful for msearch too at some point.
bulkContentType = requestContentType; | ||
} else { | ||
match = (requestContentType == bulkContentType); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code block is repeated and always deals with index requests I think. We can probably factor it out to a method.
@javanna Thanks again. Would you like to have another look, please? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a few minors, LGTM otherwise
return requestContentType; | ||
} | ||
if (requestContentType != xContentType) { | ||
throw new IllegalStateException("Mismatching content-type found for request with content-type [" + requestContentType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should throw illegal argument rather than illegal state.
|
||
/** | ||
* Ensure that the {@link IndexRequest}'s content type is supported by the Bulk API and that it conforms | ||
* to the current {@link BulkRequest}'s content type (if it's known at the time of this method get called). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mention the return type in the docs?
* Ensure that the {@link IndexRequest}'s content type is supported by the Bulk API and that it conforms | ||
* to the current {@link BulkRequest}'s content type (if it's known at the time of this method get called). | ||
*/ | ||
static XContentType ensureBulkContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enforceSameContentType or enforceContentType ?
assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.CREATED, bulkItemResponse.status()); | ||
} else if (requestOpType == DocWriteRequest.OpType.UPDATE) { | ||
assertEquals(errors[i], bulkItemResponse.isFailed()); | ||
assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.OK, bulkItemResponse.status()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok let's try and fix this separately.
This commit adds support for BulkRequest execution in the High Level Rest client.
2e2dd2b
to
d823ca7
Compare
Thanks @javanna ! |
* master: (54 commits) Keep the pipeline handler queue small initially Do not create String instances in 'Strings' methods accepting StringBuilder (elastic#22907) Tests: fix AwsS3ServiceImplTests Remove abstract InternalMetricsAggregation class (elastic#23326) Add BulkRequest support to High Level Rest client (elastic#23312) Wrap getCredentials() in a doPrivileged() block (elastic#23297) Respect promises on pipelined responses Align REST specs for HEAD requests Remove unnecessary result sorting in SearchPhaseController (elastic#23321) Fix SamplerAggregatorTests to have stable and predictable docIds Tests: Ensure multi node integ tests wait on first node Relocate a comment in HttpPipeliningHandler Add comments to HttpPipeliningHandler [TEST] Fix incorrect test cluster name in cluster health doc tests Build: Change location in zip of license and notice inclusion for plugins (elastic#23316) Script: Fix value of `ctx._now` to be current epoch time in milliseconds (elastic#23175) Build: Rework integ test setup and shutdown to ensure stop runs when desired (elastic#23304) Handle long overflow when adding paths' totals Don't set local node on cluster state used for node join validation (elastic#23311) Ensure that releasing listener is called ...
This commit adds support for BulkRequest execution in the High Level Rest client.