Skip to content

Commit

Permalink
Merge branch 'master' into ddavydov/#1047-oncall-source-notion-fix-sc…
Browse files Browse the repository at this point in the history
…hema
  • Loading branch information
davydov-d authored Jan 11, 2023
2 parents f43810e + 592c656 commit e6bc273
Show file tree
Hide file tree
Showing 111 changed files with 2,755 additions and 559 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/platform-workflow-labels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,31 @@ on:

jobs:
label_issues:
if: ${{ github.event.action == 'labeled' && github.event.label.name == 'team/platform-workflow' }}
if: ${{ github.event.action == 'labeled' && github.event.label.name == 'team/platform-move' }}
runs-on: ubuntu-latest
permissions:
issues: write
steps:
- name: Label Issue
uses: andymckay/[email protected]
with:
add-labels: "platform-workflow/requires-grooming"
add-labels: "platform-move/requires-grooming"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Add Issue To Project
uses: actions/[email protected]
with:
project-url: https://github.com/orgs/airbytehq/projects/25
github-token: ${{ secrets.PLATFORM_WORKFLOW_PROJECT_AUTOMATION }}
unlabel_issues:
if: ${{ github.event.action == 'unlabeled' && github.event.label.name == 'team/platform-workflow' }}
if: ${{ github.event.action == 'unlabeled' && github.event.label.name == 'team/platform-move' }}
runs-on: ubuntu-latest
permissions:
issues: write
steps:
- name: Unlabel Issue
uses: andymckay/[email protected]
with:
remove-labels: "platform-workflow/requires-grooming"
remove-labels: "platform-move/requires-grooming"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Remove Issue From Project
uses: monry/[email protected]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->
"""
Partition the daterange into slices of size = step.
The start of the window is the minimum datetime between start_datetime - looback_window and the stream_state's datetime
The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime
The end of the window is the minimum datetime between the start of the window and end_datetime.
:param sync_mode:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ def test_validation_type_missing_required_fields():
min_datetime: "{{ config['start_time'] + day_delta(2) }}"
end_datetime: "{{ config['end_time'] }}"
cursor_field: "created"
lookback_window: "5d"
lookback_window: "P5D"
start_time_option:
inject_into: request_parameter
field_name: created[gte]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import datadog.trace.api.Trace;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -56,23 +56,27 @@ public class DefaultAirbyteSource implements AirbyteSource {
private Process sourceProcess = null;
private Iterator<AirbyteMessage> messageIterator = null;
private Integer exitValue = null;
private final boolean logConnectorMessages = new EnvVariableFeatureFlags().logConnectorMessages();
private final FeatureFlags featureFlags;

public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) {
this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER));
public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, final FeatureFlags featureFlags) {
this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER), featureFlags);
}

public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, final AirbyteStreamFactory streamFactory) {
this(integrationLauncher, streamFactory, new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION));
public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher,
final AirbyteStreamFactory streamFactory,
final FeatureFlags featureFlags) {
this(integrationLauncher, streamFactory, new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION), featureFlags);
}

@VisibleForTesting
DefaultAirbyteSource(final IntegrationLauncher integrationLauncher,
final AirbyteStreamFactory streamFactory,
final HeartbeatMonitor heartbeatMonitor) {
final HeartbeatMonitor heartbeatMonitor,
final FeatureFlags featureFlags) {
this.integrationLauncher = integrationLauncher;
this.streamFactory = streamFactory;
this.heartbeatMonitor = heartbeatMonitor;
this.featureFlags = featureFlags;
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -167,7 +171,7 @@ public void cancel() throws Exception {
}

private void logInitialStateAsJSON(final WorkerSourceConfig sourceConfig) {
if (!logConnectorMessages) {
if (!featureFlags.logConnectorMessages()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import datadog.trace.api.Trace;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.FailureReason;
import io.airbyte.config.State;
Expand Down Expand Up @@ -66,7 +66,7 @@ public class AirbyteMessageTracker implements MessageTracker {
private final List<AirbyteTraceMessage> destinationErrorTraceMessages;
private final List<AirbyteTraceMessage> sourceErrorTraceMessages;
private final StateAggregator stateAggregator;
private final boolean logConnectorMessages = new EnvVariableFeatureFlags().logConnectorMessages();
private final FeatureFlags featureFlags;

// These variables support SYNC level estimates and are meant for sources where stream level
// estimates are not possible e.g. CDC sources.
Expand All @@ -91,16 +91,18 @@ private enum ConnectorType {
DESTINATION
}

public AirbyteMessageTracker() {
public AirbyteMessageTracker(final FeatureFlags featureFlags) {
this(new StateDeltaTracker(STATE_DELTA_TRACKER_MEMORY_LIMIT_BYTES),
new DefaultStateAggregator(new EnvVariableFeatureFlags().useStreamCapableState()),
new StateMetricsTracker(STATE_METRICS_TRACKER_MESSAGE_LIMIT));
new DefaultStateAggregator(featureFlags.useStreamCapableState()),
new StateMetricsTracker(STATE_METRICS_TRACKER_MESSAGE_LIMIT),
featureFlags);
}

@VisibleForTesting
protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker,
final StateAggregator stateAggregator,
final StateMetricsTracker stateMetricsTracker) {
final StateMetricsTracker stateMetricsTracker,
final FeatureFlags featureFlags) {
this.sourceOutputState = new AtomicReference<>();
this.destinationOutputState = new AtomicReference<>();
this.streamToRunningCount = new HashMap<>();
Expand All @@ -115,6 +117,7 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker,
this.destinationErrorTraceMessages = new ArrayList<>();
this.sourceErrorTraceMessages = new ArrayList<>();
this.stateAggregator = stateAggregator;
this.featureFlags = featureFlags;
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -542,7 +545,7 @@ public Boolean getUnreliableStateTimingMetrics() {
}

private void logMessageAsJSON(final String caller, final AirbyteMessage message) {
if (!logConnectorMessages) {
if (!featureFlags.logConnectorMessages()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {

/**
* If true, launcher will use a separated isolated pool to run the job.
*
* <p>
* At this moment, we put custom connector jobs into an isolated pool.
*/
private final boolean useIsolatedPool;
Expand All @@ -56,13 +56,14 @@ public AirbyteIntegrationLauncher(final String jobId,
final String imageName,
final ProcessFactory processFactory,
final ResourceRequirements resourceRequirement,
final boolean useIsolatedPool) {
final boolean useIsolatedPool,
final FeatureFlags featureFlags) {
this.jobId = jobId;
this.attempt = attempt;
this.imageName = imageName;
this.processFactory = processFactory;
this.resourceRequirement = resourceRequirement;
this.featureFlags = new EnvVariableFeatureFlags();
this.featureFlags = featureFlags;
this.useIsolatedPool = useIsolatedPool;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.general;

import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.StandardSyncInput;
Expand Down Expand Up @@ -65,7 +66,7 @@ public class ReplicationWorkerPerformanceTest {
public void executeOneSync() throws InterruptedException {
final var perSource = new LimitedAirbyteSource();
final var perDestination = new EmptyAirbyteDestination();
final var messageTracker = new AirbyteMessageTracker();
final var messageTracker = new AirbyteMessageTracker(new EnvVariableFeatureFlags());
final var connectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater.class);
final var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01");
final var dstNamespaceMapper = new NamespacingMapper(NamespaceDefinitionType.DESTINATION, "", "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
Expand Down Expand Up @@ -80,6 +82,8 @@ class DefaultAirbyteSourceTest {

private static Path logJobRoot;

private static final FeatureFlags featureFlags = new EnvVariableFeatureFlags();

static {
try {
logJobRoot = Files.createTempDirectory(Path.of("/tmp"), "mdc_test");
Expand Down Expand Up @@ -137,7 +141,7 @@ void testSuccessfulLifecycle() throws Exception {

when(heartbeatMonitor.isBeating()).thenReturn(true).thenReturn(false);

final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor);
final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor, featureFlags);
source.start(SOURCE_CONFIG, jobRoot);

final List<AirbyteMessage> messages = Lists.newArrayList();
Expand Down Expand Up @@ -173,7 +177,7 @@ void testTaggedLogs() throws Exception {
when(heartbeatMonitor.isBeating()).thenReturn(true).thenReturn(false);

final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory,
heartbeatMonitor);
heartbeatMonitor, featureFlags);
source.start(SOURCE_CONFIG, jobRoot);

final List<AirbyteMessage> messages = Lists.newArrayList();
Expand All @@ -198,7 +202,7 @@ void testTaggedLogs() throws Exception {

@Test
void testNonzeroExitCodeThrows() throws Exception {
final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor);
final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor, featureFlags);
tap.start(SOURCE_CONFIG, jobRoot);

when(process.exitValue()).thenReturn(1);
Expand All @@ -208,7 +212,7 @@ void testNonzeroExitCodeThrows() throws Exception {

@Test
void testIgnoredExitCodes() throws Exception {
final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor);
final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor, featureFlags);
tap.start(SOURCE_CONFIG, jobRoot);
when(process.isAlive()).thenReturn(false);

Expand All @@ -220,7 +224,7 @@ void testIgnoredExitCodes() throws Exception {

@Test
void testGetExitValue() throws Exception {
final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor);
final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor, featureFlags);
source.start(SOURCE_CONFIG, jobRoot);

when(process.isAlive()).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.FailureReason;
import io.airbyte.config.State;
Expand Down Expand Up @@ -49,7 +50,7 @@ class AirbyteMessageTrackerTest {
@BeforeEach
void setup() {
final StateMetricsTracker stateMetricsTracker = new StateMetricsTracker(10L * 1024L * 1024L);
this.messageTracker = new AirbyteMessageTracker(mStateDeltaTracker, mStateAggregator, stateMetricsTracker);
this.messageTracker = new AirbyteMessageTracker(mStateDeltaTracker, mStateAggregator, stateMetricsTracker, new EnvVariableFeatureFlags());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.WorkerEnvConstants;
import io.airbyte.workers.WorkerConfigs;
Expand Down Expand Up @@ -49,14 +50,16 @@ class AirbyteIntegrationLauncherTest {
CONFIG, "{}",
CATALOG, "{}",
"state", "{}");

private static final FeatureFlags featureFlags = new EnvVariableFeatureFlags();
private static final Map<String, String> JOB_METADATA = Map.of(
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE,
WorkerEnvConstants.WORKER_JOB_ID, JOB_ID,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema()),
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(new EnvVariableFeatureFlags().applyFieldSelection()),
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, new EnvVariableFeatureFlags().fieldSelectionWorkspaces());
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()),
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()),
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces());

private WorkerConfigs workerConfigs;
@Mock
Expand All @@ -66,7 +69,8 @@ class AirbyteIntegrationLauncherTest {
@BeforeEach
void setUp() {
workerConfigs = new WorkerConfigs(new EnvConfigs());
launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements(), false);
launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements(), false,
featureFlags);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.commons.features;

/**
* Interface that describe which features are activated in airbyte. Currently the only
* Interface that describe which features are activated in airbyte. Currently, the only
* implementation relies on env. Ideally it should be on some DB.
*/
public interface FeatureFlags {
Expand Down
1 change: 1 addition & 0 deletions airbyte-config/init/src/main/resources/icons/teradata.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@
- name: Local CSV
destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
dockerRepository: airbyte/destination-csv
dockerImageTag: 0.2.10
dockerImageTag: 1.0.0
documentationUrl: https://docs.airbyte.com/integrations/destinations/local-csv
icon: file-csv.svg
releaseStage: alpha
Expand Down Expand Up @@ -429,3 +429,10 @@
icon: databend.svg
documentationUrl: https://docs.airbyte.com/integrations/destinations/databend
releaseStage: alpha
- name: Teradata Vantage
destinationDefinitionId: 58e6f9da-904e-11ed-a1eb-0242ac120002
dockerRepository: airbyte/destination-teradata
dockerImageTag: 0.1.0
icon: teradata.svg
documentationUrl: https://docs.airbyte.io/integrations/destinations/teradata
releaseStage: alpha
Loading

0 comments on commit e6bc273

Please sign in to comment.