-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathPerRequestSubmitter.java
80 lines (68 loc) · 2.7 KB
/
PerRequestSubmitter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.getindata.connectors.http.internal.sink.httpclient;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpRequest.Builder;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
/**
* This implementation creates HTTP requests for every processed event.
*/
@Slf4j
public class PerRequestSubmitter extends AbstractRequestSubmitter {
public PerRequestSubmitter(
Properties properties,
String[] headersAndValues,
HttpClient httpClient) {
super(properties, headersAndValues, httpClient);
}
@Override
public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(
String endpointUrl,
List<HttpSinkRequestEntry> requestToSubmit) {
var endpointUri = URI.create(endpointUrl);
var responseFutures = new ArrayList<CompletableFuture<JavaNetHttpResponseWrapper>>();
for (var entry : requestToSubmit) {
HttpRequest httpRequest = buildHttpRequest(entry, endpointUri);
var response = httpClient
.sendAsync(
httpRequest.getHttpRequest(),
HttpResponse.BodyHandlers.ofString())
.exceptionally(ex -> {
// TODO This will be executed on a ForkJoinPool Thread... refactor this someday.
log.error("Request fatally failed because of an exception", ex);
return null;
})
.thenApplyAsync(
res -> new JavaNetHttpResponseWrapper(httpRequest, res),
publishingThreadPool
);
responseFutures.add(response);
}
return responseFutures;
}
private HttpRequest buildHttpRequest(HttpSinkRequestEntry requestEntry, URI endpointUri) {
Builder requestBuilder = java.net.http.HttpRequest
.newBuilder()
.uri(endpointUri)
.version(Version.HTTP_1_1)
.timeout(Duration.ofSeconds(httpRequestTimeOutSeconds))
.method(requestEntry.method,
BodyPublishers.ofByteArray(requestEntry.element));
if (headersAndValues.length != 0) {
requestBuilder.headers(headersAndValues);
}
return new HttpRequest(
requestBuilder.build(),
List.of(requestEntry.element),
requestEntry.method
);
}
}