Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable retries of failed requests #6

Merged
merged 1 commit into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,18 @@ As a result, you should see a table with joined records like so:
The `msg` column shows parameters used with REST call for given JOIN record.

## TODO

### HTTP TableLookup Source
- Implement caches.
- Add support for other Flink types. Currently, STRING type is only fully supported.
- Think about Retry Policy for Http Request
- Use Flink Format [7] to parse Json response
- Add Configurable Timeout value
- Check other `//TODO`'s.

### HTTP Sink
- Make `HttpSink` retry the failed requests. Currently, it does not retry those at all, only adds their count to the `numRecordsSendErrors` metric. It should be thoroughly thought over how to do it efficiently and then implemented.

###
[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#lookup-join
</br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public HttpSinkWriter(
this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
}

// TODO: Reintroduce retries by adding backoff policy
@Override
protected void submitRequestEntries(
List<HttpSinkRequestEntry> requestEntries, Consumer<List<HttpSinkRequestEntry>> requestResult
Expand All @@ -63,15 +64,25 @@ protected void submitRequestEntries(
var failedRequestsNumber = requestEntries.size();
log.error("Http Sink fatally failed to write all {} requests", failedRequestsNumber);
numRecordsSendErrorsCounter.inc(failedRequestsNumber);
requestResult.accept(requestEntries);

// TODO: Make `HttpSink` retry the failed requests. Currently, it does not retry those at all,
// only adds their count to the `numRecordsSendErrors` metric. It is due to the fact we do not have
// a clear image how we want to do it, so it would be both efficient and correct.
// requestResult.accept(requestEntries);
} else if (response.getFailedRequests().size() > 0) {
var failedRequestsNumber = response.getFailedRequests().size();
log.error("Http Sink failed to write and will retry {} requests", failedRequestsNumber);
numRecordsSendErrorsCounter.inc(failedRequestsNumber);
requestResult.accept(response.getFailedRequests());
} else {
requestResult.accept(Collections.emptyList());

// TODO: Make `HttpSink` retry the failed requests. Currently, it does not retry those at all,
// only adds their count to the `numRecordsSendErrors` metric. It is due to the fact we do not have
// a clear image how we want to do it, so it would be both efficient and correct.
// requestResult.accept(response.getFailedRequests());
// } else {
// requestResult.accept(Collections.emptyList());
// }
}
requestResult.accept(Collections.emptyList());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
import com.getindata.connectors.http.sink.httpclient.JavaNetSinkHttpClient;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.http.Fault;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -26,10 +33,54 @@ public class HttpSinkConnectionTest {
private static final List<String> messages =
messageIds.stream().map(i -> "{\"http-sink-id\":" + i + "}").collect(Collectors.toList());

private StreamExecutionEnvironment env;
private WireMockServer wireMockServer;

// must be public because of the reflection
public static class SendErrorsTestReporter implements MetricReporter {
static volatile List<Counter> numRecordsSendErrors = null;

public static long getCount() {
return numRecordsSendErrors.stream().map(Counter::getCount).reduce(0L, Long::sum);
}

public static void reset() {
numRecordsSendErrors = new ArrayList<>();
}

@Override
public void open(MetricConfig metricConfig) {
}

@Override
public void close() {
}

@Override
public void notifyOfAddedMetric(
Metric metric, String s, MetricGroup metricGroup
) {
if ("numRecordsSendErrors".equals(s)) {
numRecordsSendErrors.add((Counter) metric);
}
}

@Override
public void notifyOfRemovedMetric(Metric metric, String s, MetricGroup metricGroup) {
}
}

@BeforeEach
public void setUp() {
SendErrorsTestReporter.reset();

env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration() {
{
this.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "sendErrorsTestReporter." +
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, SendErrorsTestReporter.class.getName());
}
});

wireMockServer = new WireMockServer(SERVER_PORT);
wireMockServer.start();
}
Expand All @@ -48,7 +99,6 @@ public void testConnection() throws Exception {
.withStatus(200)
.withBody("{}")));

var env = StreamExecutionEnvironment.getExecutionEnvironment();
var source = env.fromCollection(messages);
var httpSink = HttpSink.<String>builder()
.setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint")
Expand Down Expand Up @@ -97,7 +147,6 @@ public void testServerErrorConnection() throws Exception {
.willReturn(aResponse().withStatus(200))
.willSetStateTo("Cause Success"));

var env = StreamExecutionEnvironment.getExecutionEnvironment();
var source = env.fromCollection(List.of(messages.get(0)));
var httpSink = HttpSink.<String>builder()
.setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint")
Expand All @@ -109,9 +158,11 @@ public void testServerErrorConnection() throws Exception {
source.sinkTo(httpSink);
env.execute("Http Sink test failed connection");

var postedRequests = wireMockServer.findAll(postRequestedFor(urlPathEqualTo("/myendpoint")));
assertEquals(2, postedRequests.size());
assertEquals(postedRequests.get(0).getBodyAsString(), postedRequests.get(1).getBodyAsString());
assertEquals(1, SendErrorsTestReporter.getCount());
// TODO: reintroduce along with the retries
// var postedRequests = wireMockServer.findAll(postRequestedFor(urlPathEqualTo("/myendpoint")));
// assertEquals(2, postedRequests.size());
// assertEquals(postedRequests.get(0).getBodyAsString(), postedRequests.get(1).getBodyAsString());
}

@Test
Expand All @@ -120,7 +171,7 @@ public void testFailedConnection() throws Exception {
.withHeader("Content-Type", equalTo("application/json"))
.inScenario("Retry Scenario")
.whenScenarioStateIs(STARTED)
.willReturn(serverError().withFault(Fault.EMPTY_RESPONSE))
.willReturn(aResponse().withFault(Fault.EMPTY_RESPONSE))
.willSetStateTo("Cause Success"));
wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint"))
.withHeader("Content-Type", equalTo("application/json"))
Expand All @@ -129,7 +180,6 @@ public void testFailedConnection() throws Exception {
.willReturn(aResponse().withStatus(200))
.willSetStateTo("Cause Success"));

var env = StreamExecutionEnvironment.getExecutionEnvironment();
var source = env.fromCollection(List.of(messages.get(0)));
var httpSink = HttpSink.<String>builder()
.setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint")
Expand All @@ -141,8 +191,9 @@ public void testFailedConnection() throws Exception {
source.sinkTo(httpSink);
env.execute("Http Sink test failed connection");

var postedRequests = wireMockServer.findAll(postRequestedFor(urlPathEqualTo("/myendpoint")));
assertEquals(2, postedRequests.size());
assertEquals(postedRequests.get(0).getBodyAsString(), postedRequests.get(1).getBodyAsString());
assertEquals(1, SendErrorsTestReporter.getCount());
// var postedRequests = wireMockServer.findAll(postRequestedFor(urlPathEqualTo("/myendpoint")));
// assertEquals(2, postedRequests.size());
// assertEquals(postedRequests.get(0).getBodyAsString(), postedRequests.get(1).getBodyAsString());
}
}