Skip to content

Commit

Permalink
Revert "Destination Bigquery: Scaffolding for destinations v2 (#27268)"
Browse files Browse the repository at this point in the history
This reverts commit ba3e39b.
  • Loading branch information
edgao committed Jun 30, 2023
1 parent 7e7c6f2 commit 348c577
Show file tree
Hide file tree
Showing 49 changed files with 106 additions and 3,624 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.util.UUID;
import org.apache.avro.Schema;
Expand All @@ -33,14 +32,8 @@ public AvroRecordFactory(final Schema schema, final JsonAvroConverter converter)

public GenericData.Record getAvroRecord(final UUID id, final AirbyteRecordMessage recordMessage) throws JsonProcessingException {
final ObjectNode jsonRecord = MAPPER.createObjectNode();
if (TypingAndDedupingFlag.isDestinationV2()) {
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, id.toString());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, recordMessage.getEmittedAt());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, (Long) null);
} else {
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_ID, id.toString());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt());
}
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_ID, id.toString());
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt());
jsonRecord.setAll((ObjectNode) recordMessage.getData());

return converter.convertToGenericDataRecord(WRITER.writeValueAsBytes(jsonRecord), schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.DestinationConfig;
import io.airbyte.integrations.destination.record_buffer.BufferStorage;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer;
Expand All @@ -28,7 +27,6 @@
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class AvroSerializedBufferTest {
Expand All @@ -51,11 +49,6 @@ public class AvroSerializedBufferTest {
Field.of("nested_column", JsonSchemaType.OBJECT));
private static final ConfiguredAirbyteCatalog catalog = CatalogHelpers.createConfiguredAirbyteCatalog(STREAM, null, FIELDS);

@BeforeAll
public static void setup() {
DestinationConfig.initialize(Jsons.deserialize("{}"));
}

@Test
public void testSnappyAvroWriter() throws Exception {
final S3AvroFormatConfig config = new S3AvroFormatConfig(Jsons.jsonNode(Map.of("compression_codec", Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.amazonaws.util.IOUtils;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.DestinationConfig;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.Field;
Expand All @@ -34,7 +33,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class ParquetSerializedBufferTest {
Expand Down Expand Up @@ -62,11 +60,6 @@ public class ParquetSerializedBufferTest {
Field.of("datetime_with_timezone", JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE));
private static final ConfiguredAirbyteCatalog catalog = CatalogHelpers.createConfiguredAirbyteCatalog(STREAM, null, FIELDS);

@BeforeAll
public static void setup() {
DestinationConfig.initialize(Jsons.deserialize("{}"));
}

@Test
public void testUncompressedParquetWriter() throws Exception {
final S3DestinationConfig config = S3DestinationConfig.getS3DestinationConfig(Jsons.jsonNode(Map.of(
Expand Down
26 changes: 1 addition & 25 deletions airbyte-integrations/bases/base-java/run_with_normalization.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,10 @@ set -o pipefail
destination_exit_code=$?
echo '{"type": "LOG","log":{"level":"INFO","message":"Destination process done (exit code '"$destination_exit_code"')"}}'

# store original args
args=$@

while [ $# -ne 0 ]; do
case "$1" in
--config)
CONFIG_FILE="$2"
shift 2
;;
*)
# move on
shift
;;
esac
done

# restore original args after shifts
set -- $args

USE_1S1T_FORMAT="false"
if [[ -s "$CONFIG_FILE" ]]; then
USE_1S1T_FORMAT=$(jq -r '.use_1s1t_format' "$CONFIG_FILE")
fi

if test "$1" != 'write'
then
normalization_exit_code=0
elif test "$NORMALIZATION_TECHNIQUE" = 'LEGACY' && test "$USE_1S1T_FORMAT" != "true"
elif test "$NORMALIZATION_TECHNIQUE" = 'LEGACY'
then
echo '{"type": "LOG","log":{"level":"INFO","message":"Starting in-connector normalization"}}'
# the args in a write command are `write --catalog foo.json --config bar.json`
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
// save config to singleton
DestinationConfig.initialize(config);
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);

final Procedure consumeWriteStreamCallable = () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,4 @@ private JavaBaseConstants() {}
public static final String COLUMN_NAME_EMITTED_AT = "_airbyte_emitted_at";
public static final String COLUMN_NAME_DATA = "_airbyte_data";

// destination v2
public static final String COLUMN_NAME_AB_RAW_ID = "_airbyte_raw_id";
public static final String COLUMN_NAME_AB_LOADED_AT = "_airbyte_loaded_at";
public static final String COLUMN_NAME_AB_EXTRACTED_AT = "_airbyte_extracted_at";

public static final String AIRBYTE_NAMESPACE_SCHEMA = "airbyte";

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
Expand Down Expand Up @@ -93,13 +92,7 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume

private Instant nextFlushDeadline;
private final Duration bufferFlushFrequency;
private final String defaultNamespace;

/**
* Feel free to continue using this in non-1s1t destinations - it may be easier to use. However,
* 1s1t destinations should prefer the version which accepts a {@code defaultNamespace}.
*/
@Deprecated
public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final BufferingStrategy bufferingStrategy,
Expand All @@ -112,27 +105,7 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
onClose,
catalog,
isValidRecord,
Duration.ofMinutes(15),
// This is purely for backwards compatibility. Many older destinations handle this internally.
// Starting with Destinations V2, we recommend passing in an explicit namespace.
null);
}

public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final BufferingStrategy bufferingStrategy,
final OnCloseFunction onClose,
final ConfiguredAirbyteCatalog catalog,
final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord,
final String defaultNamespace) {
this(outputRecordCollector,
onStart,
bufferingStrategy,
onClose,
catalog,
isValidRecord,
Duration.ofMinutes(15),
defaultNamespace);
Duration.ofMinutes(15));
}

/*
Expand All @@ -146,8 +119,7 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
final OnCloseFunction onClose,
final ConfiguredAirbyteCatalog catalog,
final CheckedFunction<JsonNode, Boolean, Exception> isValidRecord,
final Duration flushFrequency,
final String defaultNamespace) {
final Duration flushFrequency) {
this.outputRecordCollector = outputRecordCollector;
this.hasStarted = false;
this.hasClosed = false;
Expand All @@ -160,7 +132,6 @@ public BufferedStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollect
this.bufferingStrategy = bufferingStrategy;
this.stateManager = new DefaultDestStateLifecycleManager();
this.bufferFlushFrequency = flushFrequency;
this.defaultNamespace = defaultNamespace;
}

@Override
Expand All @@ -186,12 +157,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
Preconditions.checkState(hasStarted, "Cannot accept records until consumer has started");
if (message.getType() == Type.RECORD) {
final AirbyteRecordMessage record = message.getRecord();
// TODO maybe only do this in 1s1t mode?
if (Strings.isNullOrEmpty(record.getNamespace())) {
record.setNamespace(defaultNamespace);
}
final AirbyteStreamNameNamespacePair stream;
stream = AirbyteStreamNameNamespacePair.fromRecordMessage(record);
final AirbyteStreamNameNamespacePair stream = AirbyteStreamNameNamespacePair.fromRecordMessage(record);

// if stream is not part of list of streams to sync to then throw invalid stream exception
if (!streamNames.contains(stream)) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,7 @@ private BufferedStreamConsumer getConsumerWithFlushFrequency() {
onClose,
CATALOG,
isValidRecord,
Duration.ofSeconds(PERIODIC_BUFFER_FREQUENCY),
null);
Duration.ofSeconds(PERIODIC_BUFFER_FREQUENCY));
return flushFrequencyConsumer;
}

Expand Down
7 changes: 0 additions & 7 deletions airbyte-integrations/bases/base-typing-deduping/build.gradle

This file was deleted.

Loading

0 comments on commit 348c577

Please sign in to comment.