-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathJavaNetSinkHttpClient.java
121 lines (98 loc) · 4.94 KB
/
JavaNetSinkHttpClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package com.getindata.connectors.http.internal.sink.httpclient;
import java.net.http.HttpClient;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig;
import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
/**
* An implementation of {@link SinkHttpClient} that uses Java 11's {@link HttpClient}. This
* implementation supports HTTP traffic only.
*/
@Slf4j
public class JavaNetSinkHttpClient implements SinkHttpClient {
private final String[] headersAndValues;
private final Map<String, String> headerMap;
private final HttpStatusCodeChecker statusCodeChecker;
private final HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
private final RequestSubmitter requestSubmitter;
public JavaNetSinkHttpClient(
Properties properties,
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
RequestSubmitterFactory requestSubmitterFactory) {
this.httpPostRequestCallback = httpPostRequestCallback;
this.headerMap = HttpHeaderUtils.prepareHeaderMap(
HttpConnectorConfigConstants.SINK_HEADER_PREFIX,
properties,
headerPreprocessor
);
// TODO Inject this via constructor when implementing a response processor.
// Processor will be injected and it will wrap statusChecker implementation.
ComposeHttpStatusCodeCheckerConfig checkerConfig =
ComposeHttpStatusCodeCheckerConfig.builder()
.properties(properties)
.whiteListPrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST)
.errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST)
.build();
this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig);
this.headersAndValues = HttpHeaderUtils.toHeaderAndValueArray(this.headerMap);
this.requestSubmitter = requestSubmitterFactory.createSubmitter(
properties,
headersAndValues
);
}
@Override
public CompletableFuture<SinkHttpClientResponse> putRequests(
List<HttpSinkRequestEntry> requestEntries,
String endpointUrl) {
return submitRequests(requestEntries, endpointUrl)
.thenApply(responses -> prepareSinkHttpClientResponse(responses, endpointUrl));
}
private CompletableFuture<List<JavaNetHttpResponseWrapper>> submitRequests(
List<HttpSinkRequestEntry> requestEntries,
String endpointUrl) {
var responseFutures = requestSubmitter.submit(endpointUrl, requestEntries);
var allFutures = CompletableFuture.allOf(responseFutures.toArray(new CompletableFuture[0]));
return allFutures.thenApply(_void -> responseFutures.stream().map(CompletableFuture::join)
.collect(Collectors.toList()));
}
private SinkHttpClientResponse prepareSinkHttpClientResponse(
List<JavaNetHttpResponseWrapper> responses,
String endpointUrl) {
var successfulResponses = new ArrayList<HttpSinkRequestEntry>();
var failedResponses = new ArrayList<HttpSinkRequestEntry>();
for (var response : responses) {
var sinkRequestEntry = response.getSinkRequestEntry();
var optResponse = response.getResponse();
httpPostRequestCallback.call(
optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);
// TODO Add response processor here and orchestrate it with statusCodeChecker.
if (optResponse.isEmpty() ||
statusCodeChecker.isErrorCode(optResponse.get().statusCode())) {
failedResponses.add(sinkRequestEntry);
} else {
successfulResponses.add(sinkRequestEntry);
}
}
return new SinkHttpClientResponse(successfulResponses, failedResponses);
}
@VisibleForTesting
String[] getHeadersAndValues() {
return Arrays.copyOf(headersAndValues, headersAndValues.length);
}
}