-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HTTP-42 Add support for Batch request submission in HTTP Sink. #58
Conversation
…TP sink. Signed-off-by: Krzysztof Chmielewski <[email protected]>
|
…TP sink #2 Signed-off-by: Krzysztof Chmielewski <[email protected]>
…TP sink #3 Signed-off-by: Krzysztof Chmielewski <[email protected]>
9467a83
to
01bb95d
Compare
…TP sink #4 Signed-off-by: Krzysztof Chmielewski <[email protected]>
01bb95d
to
627b54a
Compare
…TP sink #5 tests Signed-off-by: Krzysztof Chmielewski <[email protected]>
148d155
to
b327c31
Compare
…TP sink #6 tests Signed-off-by: Krzysztof Chmielewski <[email protected]>
35c9a16
to
ea9fff3
Compare
…TP sink #7 tests Signed-off-by: Krzysztof Chmielewski <[email protected]>
|
|
…TP sink #8 tests Signed-off-by: Krzysztof Chmielewski <[email protected]>
7a865ea
to
bbcb439
Compare
|
…TP sink #9 tests Signed-off-by: Krzysztof Chmielewski <[email protected]>
8996221
to
26db840
Compare
|
|
|
…TP - doc. Signed-off-by: Krzysztof Chmielewski <[email protected]>
6a60321
to
2f4b615
Compare
|
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; | ||
import com.getindata.connectors.http.internal.utils.ThreadUtils; | ||
|
||
public abstract class AbstractRequestSubmitter implements RequestSubmitter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is base class for PerReqeust and Batch submitter implementations.
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry; | ||
|
||
@Slf4j | ||
public class BatchRequestSubmitter extends AbstractRequestSubmitter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New thing - core logic for submitting requests as Json array in batch in one HTTP query.
|
||
this.httpClient = JavaNetHttpClientFactory.createClient(properties, httpClientExecutor); | ||
this.httpPostRequestCallback = httpPostRequestCallback; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactoring moved to BatchRequestSubmitterFactory and SingleRequestSubmitterFactory
publishingThreadPool | ||
); | ||
responseFutures.add(response); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic for PerReqeust submmission - moved to PerRequestSubbitter class
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry; | ||
|
||
@Slf4j | ||
public class PerRequestSubmitter extends AbstractRequestSubmitter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New class containing old logic from JavaNetSinkHttpClient - this is the original PerReqeust submission.
…TP sink #9 Java doc Signed-off-by: Krzysztof Chmielewski <[email protected]>
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typos and a small incrementation bug.
src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java
Outdated
Show resolved
Hide resolved
src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java
Outdated
Show resolved
Hide resolved
if (!previousReqeustMethod.equalsIgnoreCase(entry.method)) { | ||
// break batch and submit | ||
responseFutures.add(sendBatch(endpointUrl, reqeustBatch)); | ||
reqeustBatch.clear(); | ||
// start a new batch for new HTTP method. | ||
reqeustBatch.add(entry); | ||
} else { | ||
reqeustBatch.add(entry); | ||
if (++counter % httpReqeustBatchSize == 0) { | ||
// batch is full, submit and start new batch. | ||
responseFutures.add(sendBatch(endpointUrl, reqeustBatch)); | ||
reqeustBatch.clear(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it could be simplified a bit:
if (!previousReqeustMethod.equalsIgnoreCase(entry.method)) { | |
// break batch and submit | |
responseFutures.add(sendBatch(endpointUrl, reqeustBatch)); | |
reqeustBatch.clear(); | |
// start a new batch for new HTTP method. | |
reqeustBatch.add(entry); | |
} else { | |
reqeustBatch.add(entry); | |
if (++counter % httpReqeustBatchSize == 0) { | |
// batch is full, submit and start new batch. | |
responseFutures.add(sendBatch(endpointUrl, reqeustBatch)); | |
reqeustBatch.clear(); | |
} | |
} | |
if (!previousReqeustMethod.equalsIgnoreCase(entry.method) || counter % httpReqeustBatchSize == 0) { | |
// break batch and submit | |
responseFutures.add(sendBatch(endpointUrl, reqeustBatch)); | |
reqeustBatch.clear(); | |
} else { | |
reqeustBatch.add(entry); | |
counter++; | |
} |
Of course, it would mean that we will be sending the batch one iteration step later, but it should not be a problem anyway.
And btw, what's about counter
? Wouldn't requestBatch.size() >= httpRequestBatchSize
be sufficient? And, also, current version of the code has a small bug — counter
is incremented only in the else
branch, however in the if
we're calling requestBatch.add(entry)
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed, please let me know wht do you think now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@swtwsk ^
...ain/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java
Outdated
Show resolved
Hide resolved
…TP sink #10 - changes after Code review - fix typos Signed-off-by: Krzysztof Chmielewski <[email protected]>
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM. There's still some typos to fix. And a question about sending empty request — feel free to fix that or keep, your call.
String previousReqeustMethod = requestsToSubmit.get(0).method; | ||
List<HttpSinkRequestEntry> reqeustBatch = new ArrayList<>(httpRequestBatchSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still typos 😅
src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java
Outdated
Show resolved
Hide resolved
224a164
to
ad9d58f
Compare
|
…TP sink #10 - changes after Code review - fix batch split Signed-off-by: Krzysztof Chmielewski <[email protected]>
ad9d58f
to
4797cbf
Compare
|
Description
This PR adds support for submitting events by HTTP Sink in batch mode, meaning that body of one HTTP PUT/POST requests will contain data for many processed events.
The data is represented as Json array, like so:
This is a breaking change. Users will have to adapt to it or restore "single mode" by setting:
gid.connector.http.sink.writer.request.mode = single
Resolves #42
PR Checklist