Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP-42 Add support for Batch request submission in HTTP Sink. #58

Merged
merged 13 commits into from
Jul 4, 2023

Conversation

kristoffSC
Copy link
Collaborator

@kristoffSC kristoffSC commented Jun 15, 2023

Description

This PR adds support for submitting events by HTTP Sink in batch mode, meaning that body of one HTTP PUT/POST requests will contain data for many processed events.

The data is represented as Json array, like so:

[
  {
    "id": 1,
    "first_name": "Ninette",
    "last_name": "Clee",
    "gender": "Female",
    "stock": "CDZI",
    "currency": "RUB",
    "tx_date": "2021-08-24 15:22:59"
  },
  {
    "id": 2,
    "first_name": "Rob",
    "last_name": "Zombie",
    "gender": "Male",
    "stock": "DGICA",
    "currency": "GBP",
    "tx_date": "2021-10-25 20:53:54"
  },
  {
    "id": 3,
    "first_name": "Adam",
    "last_name": "Jones",
    "gender": "Male",
    "stock": "DGICA",
    "currency": "PLN",
    "tx_date": "2021-10-26 20:53:54"
  }
]

This is a breaking change. Users will have to adapt to it or restore "single mode" by setting:
gid.connector.http.sink.writer.request.mode = single

Resolves #42

PR Checklist

@kristoffSC kristoffSC added the enhancement New feature or request label Jun 15, 2023
@kristoffSC kristoffSC self-assigned this Jun 15, 2023
@github-actions
Copy link
Contributor

File Coverage [97.87%] 🍏
JavaNetSinkHttpClient.java 100% 🍏
PerRequestSubmitter.java 95% 🍏
Total Project Coverage 93.68% 🍏

@kristoffSC kristoffSC force-pushed the HTTP-42-BatchRequest branch 2 times, most recently from 9467a83 to 01bb95d Compare June 20, 2023 21:32
@kristoffSC kristoffSC force-pushed the HTTP-42-BatchRequest branch from 01bb95d to 627b54a Compare June 20, 2023 21:38
@kristoffSC kristoffSC force-pushed the HTTP-42-BatchRequest branch from 148d155 to b327c31 Compare June 22, 2023 19:32
@kristoffSC kristoffSC force-pushed the HTTP-42-BatchRequest branch from 35c9a16 to ea9fff3 Compare June 27, 2023 12:45
@github-actions
Copy link
Contributor

File Coverage [89.64%] 🍏
SinkRequestSubmitMode.java 100% 🍏
AbstractRequestSubmitter.java 100% 🍏
JavaNetSinkHttpClient.java 100% 🍏
PerRequestRequestSubmitterFactory.java 100% 🍏
JavaNetHttpClientFactory.java 95.35% 🍏
BatchRequestSubmitter.java 94.29% 🍏
PerRequestSubmitter.java 93.33% 🍏
HttpSinkInternal.java 75.52% 🍏
BatchRequestSubmitterFactory.java 50%
Total Project Coverage 93.19% 🍏

@github-actions
Copy link
Contributor

File Coverage [93.44%] 🍏
SinkRequestSubmitMode.java 100% 🍏
AbstractRequestSubmitter.java 100% 🍏
BatchRequestSubmitterFactory.java 100% 🍏
JavaNetSinkHttpClient.java 100% 🍏
PerRequestRequestSubmitterFactory.java 100% 🍏
JavaNetHttpClientFactory.java 95.35% 🍏
BatchRequestSubmitter.java 93.78% 🍏
PerRequestSubmitter.java 93.41% 🍏
HttpSinkInternal.java 75.52% 🍏
Total Project Coverage 93.83% 🍏

@kristoffSC kristoffSC force-pushed the HTTP-42-BatchRequest branch from 7a865ea to bbcb439 Compare June 29, 2023 23:09
@github-actions
Copy link
Contributor

File Coverage [93.44%] 🍏
SinkRequestSubmitMode.java 100% 🍏
AbstractRequestSubmitter.java 100% 🍏
BatchRequestSubmitterFactory.java 100% 🍏
JavaNetSinkHttpClient.java 100% 🍏
PerRequestRequestSubmitterFactory.java 100% 🍏
JavaNetHttpClientFactory.java 95.35% 🍏
BatchRequestSubmitter.java 93.78% 🍏
PerRequestSubmitter.java 93.41% 🍏
HttpSinkInternal.java 75.52% 🍏
Total Project Coverage 93.83% 🍏

@kristoffSC kristoffSC force-pushed the HTTP-42-BatchRequest branch from 8996221 to 26db840 Compare June 30, 2023 10:31
@github-actions
Copy link
Contributor

@github-actions
Copy link
Contributor

@github-actions
Copy link
Contributor

@kristoffSC kristoffSC force-pushed the HTTP-42-BatchRequest branch from 6a60321 to 2f4b615 Compare June 30, 2023 12:24
@kristoffSC kristoffSC requested a review from swtwsk June 30, 2023 12:24
@kristoffSC kristoffSC changed the title [Draft]HTTP-42-BatchRequest HTTP-42-BatchRequest Jun 30, 2023
@kristoffSC kristoffSC changed the title HTTP-42-BatchRequest HTTP-42 Add support for Batch request submission in HTTP Sink. Jun 30, 2023
@kristoffSC kristoffSC marked this pull request as ready for review June 30, 2023 12:25
@kristoffSC kristoffSC requested a review from grzegorz8 June 30, 2023 12:25
@github-actions
Copy link
Contributor

import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.utils.ThreadUtils;

public abstract class AbstractRequestSubmitter implements RequestSubmitter {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is base class for PerReqeust and Batch submitter implementations.

import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;

@Slf4j
public class BatchRequestSubmitter extends AbstractRequestSubmitter {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New thing - core logic for submitting requests as Json array in batch in one HTTP query.


this.httpClient = JavaNetHttpClientFactory.createClient(properties, httpClientExecutor);
this.httpPostRequestCallback = httpPostRequestCallback;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactoring moved to BatchRequestSubmitterFactory and SingleRequestSubmitterFactory

publishingThreadPool
);
responseFutures.add(response);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic for PerReqeust submmission - moved to PerRequestSubbitter class

import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;

@Slf4j
public class PerRequestSubmitter extends AbstractRequestSubmitter {
Copy link
Collaborator Author

@kristoffSC kristoffSC Jul 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New class containing old logic from JavaNetSinkHttpClient - this is the original PerReqeust submission.

…TP sink #9 Java doc

Signed-off-by: Krzysztof Chmielewski <[email protected]>
@github-actions
Copy link
Contributor

github-actions bot commented Jul 2, 2023

Copy link
Contributor

@swtwsk swtwsk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typos and a small incrementation bug.

Comment on lines 66 to 79
if (!previousReqeustMethod.equalsIgnoreCase(entry.method)) {
// break batch and submit
responseFutures.add(sendBatch(endpointUrl, reqeustBatch));
reqeustBatch.clear();
// start a new batch for new HTTP method.
reqeustBatch.add(entry);
} else {
reqeustBatch.add(entry);
if (++counter % httpReqeustBatchSize == 0) {
// batch is full, submit and start new batch.
responseFutures.add(sendBatch(endpointUrl, reqeustBatch));
reqeustBatch.clear();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could be simplified a bit:

Suggested change
if (!previousReqeustMethod.equalsIgnoreCase(entry.method)) {
// break batch and submit
responseFutures.add(sendBatch(endpointUrl, reqeustBatch));
reqeustBatch.clear();
// start a new batch for new HTTP method.
reqeustBatch.add(entry);
} else {
reqeustBatch.add(entry);
if (++counter % httpReqeustBatchSize == 0) {
// batch is full, submit and start new batch.
responseFutures.add(sendBatch(endpointUrl, reqeustBatch));
reqeustBatch.clear();
}
}
if (!previousReqeustMethod.equalsIgnoreCase(entry.method) || counter % httpReqeustBatchSize == 0) {
// break batch and submit
responseFutures.add(sendBatch(endpointUrl, reqeustBatch));
reqeustBatch.clear();
} else {
reqeustBatch.add(entry);
counter++;
}

Of course, it would mean that we will be sending the batch one iteration step later, but it should not be a problem anyway.

And btw, what's about counter? Wouldn't requestBatch.size() >= httpRequestBatchSize be sufficient? And, also, current version of the code has a small bug — counter is incremented only in the else branch, however in the if we're calling requestBatch.add(entry) too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed, please let me know wht do you think now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

…TP sink #10 - changes after Code review - fix typos

Signed-off-by: Krzysztof Chmielewski <[email protected]>
@github-actions
Copy link
Contributor

github-actions bot commented Jul 3, 2023

@kristoffSC kristoffSC requested a review from swtwsk July 3, 2023 12:18
@github-actions
Copy link
Contributor

github-actions bot commented Jul 3, 2023

Copy link
Contributor

@swtwsk swtwsk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LGTM. There's still some typos to fix. And a question about sending empty request — feel free to fix that or keep, your call.

Comment on lines 61 to 62
String previousReqeustMethod = requestsToSubmit.get(0).method;
List<HttpSinkRequestEntry> reqeustBatch = new ArrayList<>(httpRequestBatchSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still typos 😅

@kristoffSC kristoffSC force-pushed the HTTP-42-BatchRequest branch from 224a164 to ad9d58f Compare July 4, 2023 09:51
@github-actions
Copy link
Contributor

github-actions bot commented Jul 4, 2023

…TP sink #10 - changes after Code review - fix batch split

Signed-off-by: Krzysztof Chmielewski <[email protected]>
@kristoffSC kristoffSC force-pushed the HTTP-42-BatchRequest branch from ad9d58f to 4797cbf Compare July 4, 2023 10:06
@github-actions
Copy link
Contributor

github-actions bot commented Jul 4, 2023

@kristoffSC kristoffSC merged commit 9b94c92 into main Jul 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sink Writer to write in batches
2 participants