-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathBatchRequestSubmitter.java
140 lines (115 loc) · 5.19 KB
/
BatchRequestSubmitter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package com.getindata.connectors.http.internal.sink.httpclient;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest.BodyPublisher;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpRequest.Builder;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
/**
* This implementation groups received events in batches and submits each batch as individual HTTP
* requests. Batch is created based on batch size or based on HTTP method type.
*/
@Slf4j
public class BatchRequestSubmitter extends AbstractRequestSubmitter {
private static final byte[] BATCH_START_BYTES = "[".getBytes(StandardCharsets.UTF_8);
private static final byte[] BATCH_END_BYTES = "]".getBytes(StandardCharsets.UTF_8);
private static final byte[] BATCH_ELEMENT_DELIM_BYTES = ",".getBytes(StandardCharsets.UTF_8);
private final int httpRequestBatchSize;
public BatchRequestSubmitter(
Properties properties,
String[] headersAndValue,
HttpClient httpClient) {
super(properties, headersAndValue, httpClient);
this.httpRequestBatchSize = Integer.parseInt(
properties.getProperty(HttpConnectorConfigConstants.SINK_HTTP_BATCH_REQUEST_SIZE)
);
}
@Override
public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(
String endpointUrl,
List<HttpSinkRequestEntry> requestsToSubmit) {
if (requestsToSubmit.isEmpty()) {
return Collections.emptyList();
}
var responseFutures = new ArrayList<CompletableFuture<JavaNetHttpResponseWrapper>>();
String previousReqeustMethod = requestsToSubmit.get(0).method;
List<HttpSinkRequestEntry> requestBatch = new ArrayList<>(httpRequestBatchSize);
for (var entry : requestsToSubmit) {
if (requestBatch.size() == httpRequestBatchSize
|| !previousReqeustMethod.equalsIgnoreCase(entry.method)) {
// break batch and submit
responseFutures.add(sendBatch(endpointUrl, requestBatch));
requestBatch.clear();
}
requestBatch.add(entry);
previousReqeustMethod = entry.method;
}
// submit anything that left
responseFutures.add(sendBatch(endpointUrl, requestBatch));
return responseFutures;
}
@VisibleForTesting
int getBatchSize() {
return httpRequestBatchSize;
}
private CompletableFuture<JavaNetHttpResponseWrapper> sendBatch(
String endpointUrl,
List<HttpSinkRequestEntry> reqeustBatch) {
HttpRequest httpRequest = buildHttpRequest(reqeustBatch, URI.create(endpointUrl));
return httpClient
.sendAsync(
httpRequest.getHttpRequest(),
HttpResponse.BodyHandlers.ofString())
.exceptionally(ex -> {
// TODO This will be executed on a ForkJoinPool Thread... refactor this someday.
log.error("Request fatally failed because of an exception", ex);
return null;
})
.thenApplyAsync(
res -> new JavaNetHttpResponseWrapper(httpRequest, res),
publishingThreadPool
);
}
private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> reqeustBatch, URI endpointUri) {
try {
var method = reqeustBatch.get(0).method;
List<byte[]> elements = new ArrayList<>(reqeustBatch.size());
BodyPublisher publisher;
// By default, Java's BodyPublishers.ofByteArrays(elements) will just put Jsons
// into the HTTP body without any context.
// What we do here is we pack every Json/byteArray into Json Array hence '[' and ']'
// at the end, and we separate every element with comma.
elements.add(BATCH_START_BYTES);
for (HttpSinkRequestEntry entry : reqeustBatch) {
elements.add(entry.element);
elements.add(BATCH_ELEMENT_DELIM_BYTES);
}
elements.set(elements.size() - 1, BATCH_END_BYTES);
publisher = BodyPublishers.ofByteArrays(elements);
Builder requestBuilder = java.net.http.HttpRequest
.newBuilder()
.uri(endpointUri)
.version(Version.HTTP_1_1)
.timeout(Duration.ofSeconds(httpRequestTimeOutSeconds))
.method(method, publisher);
if (headersAndValues.length != 0) {
requestBuilder.headers(headersAndValues);
}
return new HttpRequest(requestBuilder.build(), elements, method);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}