Skip to content

Commit

Permalink
HTTP-42-BatchRequest - add support for batch request processing in HT…
Browse files Browse the repository at this point in the history
…TP sink #8 tests

Signed-off-by: Krzysztof Chmielewski <[email protected]>
  • Loading branch information
kristoffSC committed Jun 29, 2023
1 parent b78ff43 commit 7a865ea
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,10 @@
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.utils.JavaNetHttpClientFactory;
import com.getindata.connectors.http.internal.utils.ThreadUtils;

public abstract class AbstractRequestSubmitter implements RequestSubmitter {

// TODO Add this property to config. Make sure to add note in README.md that will describe that
// any value greater than one will break order of messages.
protected static final int HTTP_CLIENT_THREAD_POOL_SIZE = 1;

protected static final int HTTP_CLIENT_PUBLISHING_THREAD_POOL_SIZE = 1;

protected static final String DEFAULT_REQUEST_TIMEOUT_SECONDS = "30";
Expand All @@ -32,7 +27,10 @@ public abstract class AbstractRequestSubmitter implements RequestSubmitter {

protected final HttpClient httpClient;

public AbstractRequestSubmitter(Properties properties, String[] headersAndValues) {
public AbstractRequestSubmitter(
Properties properties,
String[] headersAndValues,
HttpClient httpClient) {

this.headersAndValues = headersAndValues;
this.publishingThreadPool =
Expand All @@ -46,12 +44,6 @@ public AbstractRequestSubmitter(Properties properties, String[] headersAndValues
DEFAULT_REQUEST_TIMEOUT_SECONDS)
);

ExecutorService httpClientExecutor =
Executors.newFixedThreadPool(
HTTP_CLIENT_THREAD_POOL_SIZE,
new ExecutorThreadFactory(
"http-sink-client-request-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER));

this.httpClient = JavaNetHttpClientFactory.createClient(properties, httpClientExecutor);
this.httpClient = httpClient;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.getindata.connectors.http.internal.sink.httpclient;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublisher;
Expand All @@ -10,11 +11,13 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;

import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
Expand All @@ -24,8 +27,12 @@ public class BatchRequestSubmitter extends AbstractRequestSubmitter {

private final int httpReqeustBatchSize;

public BatchRequestSubmitter(Properties properties, String[] headersAndValue) {
super(properties, headersAndValue);
public BatchRequestSubmitter(
Properties properties,
String[] headersAndValue,
HttpClient httpClient) {

super(properties, headersAndValue, httpClient);

this.httpReqeustBatchSize = Integer.parseInt(
properties.getProperty(HttpConnectorConfigConstants.SINK_HTTP_BATCH_REQUEST_SIZE)
Expand All @@ -34,35 +41,52 @@ public BatchRequestSubmitter(Properties properties, String[] headersAndValue) {

@Override
public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(
String endpointUrl,
List<HttpSinkRequestEntry> requestToSubmit) {
String endpointUrl,
List<HttpSinkRequestEntry> requestsToSubmit) {

if (requestsToSubmit.isEmpty()) {
return Collections.emptyList();
}

var responseFutures = new ArrayList<CompletableFuture<JavaNetHttpResponseWrapper>>();

int counter = 0;
String previousReqeustMethod = requestToSubmit.get(0).method;
String previousReqeustMethod = requestsToSubmit.get(0).method;
List<HttpSinkRequestEntry> reqeustBatch = new ArrayList<>(httpReqeustBatchSize);
for (var entry : requestToSubmit) {
reqeustBatch.add(entry);
if (++counter % httpReqeustBatchSize == 0
|| !previousReqeustMethod.equalsIgnoreCase(entry.method)) {
for (var entry : requestsToSubmit) {
if (!previousReqeustMethod.equalsIgnoreCase(entry.method)) {
// break batch and submit
responseFutures.add(sendBatch(endpointUrl, reqeustBatch));
reqeustBatch.clear();
// start a new batch for new HTTP method.
reqeustBatch.add(entry);
} else {
reqeustBatch.add(entry);
if (++counter % httpReqeustBatchSize == 0) {
// batch is full, submit and start new batch.
responseFutures.add(sendBatch(endpointUrl, reqeustBatch));
reqeustBatch.clear();
}
}
previousReqeustMethod = entry.method;
}

if (!reqeustBatch.isEmpty()) {
// submit anything that left
responseFutures.add(sendBatch(endpointUrl, reqeustBatch));
}

return responseFutures;
}

@VisibleForTesting
int getBatchSize() {
return httpReqeustBatchSize;
}

private CompletableFuture<JavaNetHttpResponseWrapper> sendBatch(
String endpointUrl,
List<HttpSinkRequestEntry> reqeustBatch) {

System.out.println("aaaaa " + new String(reqeustBatch.get(0).element));
var endpointUri = URI.create(endpointUrl);
return httpClient
.sendAsync(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
package com.getindata.connectors.http.internal.sink.httpclient;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import com.getindata.connectors.http.internal.config.ConfigException;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.utils.JavaNetHttpClientFactory;
import com.getindata.connectors.http.internal.utils.ThreadUtils;

public class BatchRequestSubmitterFactory implements RequestSubmitterFactory {

// TODO Add this property to config. Make sure to add note in README.md that will describe that
// any value greater than one will break order of messages.
int HTTP_CLIENT_THREAD_POOL_SIZE = 1;

private final String maxBatchSize;

public BatchRequestSubmitterFactory(int maxBatchSize) {
if (maxBatchSize < 1) {
throw new IllegalArgumentException(
"Batch Request submitter batch size must be greater than zero.");
}
this.maxBatchSize = String.valueOf(maxBatchSize);
}

@Override
public RequestSubmitter createSubmitter(Properties properties, String[] headersAndValues) {
public BatchRequestSubmitter createSubmitter(Properties properties, String[] headersAndValues) {
String batchRequestSize =
properties.getProperty(HttpConnectorConfigConstants.SINK_HTTP_BATCH_REQUEST_SIZE);
if (StringUtils.isNullOrWhitespaceOnly(batchRequestSize)) {
Expand Down Expand Up @@ -45,6 +58,19 @@ public RequestSubmitter createSubmitter(Properties properties, String[] headersA
);
}
}
return new BatchRequestSubmitter(properties, headersAndValues);

ExecutorService httpClientExecutor =
Executors.newFixedThreadPool(
HTTP_CLIENT_THREAD_POOL_SIZE,
new ExecutorThreadFactory(
"http-sink-client-batch-request-worker",
ThreadUtils.LOGGING_EXCEPTION_HANDLER)
);

return new BatchRequestSubmitter(
properties,
headersAndValues,
JavaNetHttpClientFactory.createClient(properties, httpClientExecutor)
);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,33 @@
package com.getindata.connectors.http.internal.sink.httpclient;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import com.getindata.connectors.http.internal.utils.JavaNetHttpClientFactory;
import com.getindata.connectors.http.internal.utils.ThreadUtils;

public class PerRequestRequestSubmitterFactory implements RequestSubmitterFactory {

// TODO Add this property to config. Make sure to add note in README.md that will describe that
// any value greater than one will break order of messages.
int HTTP_CLIENT_THREAD_POOL_SIZE = 1;

@Override
public RequestSubmitter createSubmitter(Properties properties, String[] headersAndValues) {
return new PerRequestSubmitter(properties, headersAndValues);

ExecutorService httpClientExecutor =
Executors.newFixedThreadPool(
HTTP_CLIENT_THREAD_POOL_SIZE,
new ExecutorThreadFactory(
"http-sink-client-per-request-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER));

return new PerRequestSubmitter(
properties,
headersAndValues,
JavaNetHttpClientFactory.createClient(properties, httpClientExecutor)
);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.getindata.connectors.http.internal.sink.httpclient;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers;
Expand All @@ -19,8 +20,12 @@
@Slf4j
public class PerRequestSubmitter extends AbstractRequestSubmitter {

public PerRequestSubmitter(Properties properties, String[] headersAndValues) {
super(properties, headersAndValues);
public PerRequestSubmitter(
Properties properties,
String[] headersAndValues,
HttpClient httpClient) {

super(properties, headersAndValues, httpClient);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.getindata.connectors.http.internal.sink.httpclient;

import java.util.Properties;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.getindata.connectors.http.internal.config.ConfigException;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;

class BatchRequestSubmitterFactoryTest {

@ParameterizedTest
@ValueSource(ints = {0, -1})
public void shouldThrowIfInvalidDefaultSize(int invalidArgument) {
assertThrows(
IllegalArgumentException.class,
() -> new BatchRequestSubmitterFactory(invalidArgument)
);
}

@Test
public void shouldCreateSubmitterWithDefaultBatchSize() {

int defaultBatchSize = 10;
BatchRequestSubmitter submitter = new BatchRequestSubmitterFactory(defaultBatchSize)
.createSubmitter(new Properties(), new String[0]);

assertThat(submitter.getBatchSize()).isEqualTo(defaultBatchSize);
}

@ParameterizedTest
@ValueSource(strings = {"1", "2"})
public void shouldCreateSubmitterWithCustomBatchSize(String batchSize) {

Properties properties = new Properties();
properties.setProperty(
HttpConnectorConfigConstants.SINK_HTTP_BATCH_REQUEST_SIZE,
batchSize
);

BatchRequestSubmitter submitter = new BatchRequestSubmitterFactory(10)
.createSubmitter(properties, new String[0]);

assertThat(submitter.getBatchSize()).isEqualTo(Integer.valueOf(batchSize));
}

@ParameterizedTest
@ValueSource(strings = {"0", "-1"})
public void shouldThrowIfBatchSizeToSmall(String invalidBatchSize) {

Properties properties = new Properties();
properties.setProperty(
HttpConnectorConfigConstants.SINK_HTTP_BATCH_REQUEST_SIZE,
invalidBatchSize
);

BatchRequestSubmitterFactory factory = new BatchRequestSubmitterFactory(10);

assertThrows(
ConfigException.class,
() -> factory.createSubmitter(properties, new String[0])
);
}

@ParameterizedTest
@ValueSource(strings = {"1.1", "2,2", "hello"})
public void shouldThrowIfInvalidBatchSize(String invalidBatchSize) {

Properties properties = new Properties();
properties.setProperty(
HttpConnectorConfigConstants.SINK_HTTP_BATCH_REQUEST_SIZE,
invalidBatchSize
);

BatchRequestSubmitterFactory factory = new BatchRequestSubmitterFactory(10);

assertThrows(
ConfigException.class,
() -> factory.createSubmitter(properties, new String[0])
);
}



}
Loading

0 comments on commit 7a865ea

Please sign in to comment.