Skip to content

Commit

Permalink
Add acceptance tests for source close timeout (#8217)
Browse files Browse the repository at this point in the history
* add test connectors and bump versions

* add failure timeout acceptance test

* run gw format

* include exception in runtime exception

* mark as disabled and add comment
  • Loading branch information
lmossman authored Dec 16, 2021
1 parent b018769 commit 464c485
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/destination-e2e-test
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-e2e-test/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-e2e-test
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.slf4j.Logger;
Expand Down Expand Up @@ -53,7 +54,7 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final C
final Predicate<Long> anotherRecordPredicate =
config.has("max_records") ? recordNumber -> recordNumber < config.get("max_records").asLong() : recordNumber -> true;

final long sleepTime = config.has("message_interval") ? config.get("message_interval").asLong() : 3000L;
final Optional<Long> sleepTime = Optional.ofNullable(config.get("message_interval")).map(JsonNode::asLong);

final AtomicLong i = new AtomicLong();

Expand All @@ -63,11 +64,13 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final C
protected AirbyteMessage computeNext() {
if (anotherRecordPredicate.test(i.get())) {
if (i.get() != 0) {
try {
LOGGER.info("sleeping for {} ms", sleepTime);
sleep(sleepTime);
} catch (final InterruptedException e) {
throw new RuntimeException();
if (sleepTime.isPresent()) {
try {
LOGGER.info("sleeping for {} ms", sleepTime.get());
sleep(sleepTime.get());
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
}
i.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
},
{
"title": "Infinite Feed",
"required": ["type", "max_records", "message_interval"],
"additionalProperties": false,
"required": ["type", "max_records"],
"additionalProperties": true,
"properties": {
"type": {
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.airbyte.api.client.model.AirbyteStreamAndConfiguration;
import io.airbyte.api.client.model.AirbyteStreamConfiguration;
import io.airbyte.api.client.model.AttemptInfoRead;
import io.airbyte.api.client.model.AttemptStatus;
import io.airbyte.api.client.model.CheckConnectionRead;
import io.airbyte.api.client.model.ConnectionCreate;
import io.airbyte.api.client.model.ConnectionIdRequestBody;
Expand Down Expand Up @@ -86,6 +87,7 @@
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -102,6 +104,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -130,8 +133,8 @@ public class AcceptanceTests {
// assume env file is one directory level up from airbyte-tests.
private final static File ENV_FILE = Path.of(System.getProperty("user.dir")).getParent().resolve(".env").toFile();

private static final String SOURCE_E2E_TEST_CONNECTOR_VERSION = "0.1.0";
private static final String DESTINATION_E2E_TEST_CONNECTOR_VERSION = "0.1.0";
private static final String SOURCE_E2E_TEST_CONNECTOR_VERSION = "0.1.1";
private static final String DESTINATION_E2E_TEST_CONNECTOR_VERSION = "0.1.1";

private static final Charset UTF8 = StandardCharsets.UTF_8;
private static final boolean IS_KUBE = System.getenv().containsKey("KUBE");
Expand Down Expand Up @@ -860,6 +863,77 @@ public void testBackpressure() throws Exception {
}
}

// This test is disabled because it takes a couple minutes to run, as it is testing timeouts.
// It should be re-enabled when the @SlowIntegrationTest can be applied to it.
// See relevant issue: https://github.com/airbytehq/airbyte/issues/8397
@Test
@Order(17)
@Disabled
public void testFailureTimeout() throws Exception {
final SourceDefinitionRead sourceDefinition = apiClient.getSourceDefinitionApi().createSourceDefinition(new SourceDefinitionCreate()
.name("E2E Test Source")
.dockerRepository("airbyte/source-e2e-test")
.dockerImageTag(SOURCE_E2E_TEST_CONNECTOR_VERSION)
.documentationUrl(URI.create("https://example.com")));

final DestinationDefinitionRead destinationDefinition = apiClient.getDestinationDefinitionApi()
.createDestinationDefinition(new DestinationDefinitionCreate()
.name("E2E Test Destination")
.dockerRepository("airbyte/destination-e2e-test")
.dockerImageTag(DESTINATION_E2E_TEST_CONNECTOR_VERSION)
.documentationUrl(URI.create("https://example.com")));

final SourceRead source = createSource(
"E2E Test Source -" + UUID.randomUUID(),
workspaceId,
sourceDefinition.getSourceDefinitionId(),
Jsons.jsonNode(ImmutableMap.builder()
.put("type", "INFINITE_FEED")
.put("max_records", 1000)
.put("message_interval", 100)
.build()));

// Destination fails after processing 5 messages, so the job should fail after the graceful close
// timeout of 1 minute
final DestinationRead destination = createDestination(
"E2E Test Destination -" + UUID.randomUUID(),
workspaceId,
destinationDefinition.getDestinationDefinitionId(),
Jsons.jsonNode(ImmutableMap.builder()
.put("type", "FAILING")
.put("num_messages", 5)
.build()));

final String connectionName = "test-connection";
final UUID sourceId = source.getSourceId();
final UUID destinationId = destination.getDestinationId();
final AirbyteCatalog catalog = discoverSourceSchema(sourceId);

final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null)
.getConnectionId();

final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

// wait to get out of pending.
final JobRead runningJob = waitForJob(apiClient.getJobsApi(), connectionSyncRead1.getJob(), Sets.newHashSet(JobStatus.PENDING));

// wait for job for max of 3 minutes, by which time the job attempt should have failed
waitForJob(apiClient.getJobsApi(), runningJob, Sets.newHashSet(JobStatus.RUNNING), Duration.ofMinutes(3));

final JobIdRequestBody jobId = new JobIdRequestBody().id(runningJob.getId());
final JobInfoRead jobInfo = apiClient.getJobsApi().getJobInfo(jobId);
final AttemptInfoRead attemptInfoRead = jobInfo.getAttempts().get(jobInfo.getAttempts().size() - 1);

// assert that the job attempt failed, and cancel the job regardless of status to prevent retries
try {
assertEquals(AttemptStatus.FAILED, attemptInfoRead.getAttempt().getStatus());
} finally {
apiClient.getJobsApi().cancelJob(jobId);
}
}

private AirbyteCatalog discoverSourceSchema(final UUID sourceId) throws ApiException {
return apiClient.getSourceApi().discoverSchemaForSource(new SourceIdRequestBody().sourceId(sourceId)).getCatalog();
}
Expand Down Expand Up @@ -1199,14 +1273,23 @@ private static void waitForSuccessfulJob(final JobsApi jobsApi, final JobRead or
assertEquals(JobStatus.SUCCEEDED, job.getStatus());
}

@SuppressWarnings("BusyWait")
private static JobRead waitForJob(final JobsApi jobsApi, final JobRead originalJob, final Set<JobStatus> jobStatuses)
throws InterruptedException, ApiException {
return waitForJob(jobsApi, originalJob, jobStatuses, Duration.ofMinutes(6));
}

@SuppressWarnings("BusyWait")
private static JobRead waitForJob(final JobsApi jobsApi, final JobRead originalJob, final Set<JobStatus> jobStatuses, final Duration maxWaitTime)
throws InterruptedException, ApiException {
JobRead job = originalJob;
int count = 0;
while (count < 400 && jobStatuses.contains(job.getStatus())) {

final Instant waitStart = Instant.now();
while (jobStatuses.contains(job.getStatus())) {
if (Duration.between(waitStart, Instant.now()).compareTo(maxWaitTime) > 0) {
LOGGER.info("Max wait time of {} has been reached. Stopping wait.", maxWaitTime);
break;
}
sleep(1000);
count++;

job = jobsApi.getJobInfo(new JobIdRequestBody().id(job.getId())).getJob();
LOGGER.info("waiting: job id: {} config type: {} status: {}", job.getId(), job.getConfigType(), job.getStatus());
Expand Down

0 comments on commit 464c485

Please sign in to comment.