-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathPerRequestRequestSubmitterFactory.java
33 lines (25 loc) · 1.25 KB
/
PerRequestRequestSubmitterFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.getindata.connectors.http.internal.sink.httpclient;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import com.getindata.connectors.http.internal.utils.JavaNetHttpClientFactory;
import com.getindata.connectors.http.internal.utils.ThreadUtils;
public class PerRequestRequestSubmitterFactory implements RequestSubmitterFactory {
// TODO Add this property to config. Make sure to add note in README.md that will describe that
// any value greater than one will break order of messages.
int HTTP_CLIENT_THREAD_POOL_SIZE = 1;
@Override
public RequestSubmitter createSubmitter(Properties properties, String[] headersAndValues) {
ExecutorService httpClientExecutor =
Executors.newFixedThreadPool(
HTTP_CLIENT_THREAD_POOL_SIZE,
new ExecutorThreadFactory(
"http-sink-client-per-request-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER));
return new PerRequestSubmitter(
properties,
headersAndValues,
JavaNetHttpClientFactory.createClient(properties, httpClientExecutor)
);
}
}