From 71d473315dda328502e72cdabad4569dbec3794f Mon Sep 17 00:00:00 2001 From: Jarvis Date: Mon, 26 Jun 2023 22:12:49 +0800 Subject: [PATCH 1/3] [Fea] support avro format - added avro format and UT - kafka support avro format and e2e test --- .../connector-kafka/pom.xml | 5 + .../seatunnel/kafka/config/MessageFormat.java | 3 +- .../DefaultSeaTunnelRowSerializer.java | 3 + .../kafka/sink/KafkaSinkFactory.java | 5 +- .../seatunnel/kafka/source/KafkaSource.java | 3 + .../e2e/connector/kafka/KafkaIT.java | 752 +++++++++--------- .../fake_source_to_kafka_avro_format.conf | 76 ++ .../resources/avro/kafka_avro_to_console.conf | 88 ++ seatunnel-formats/pom.xml | 1 + .../seatunnel-format-avro/pom.xml | 45 ++ .../avro/AvroDeserializationSchema.java | 57 ++ .../format/avro/AvroSerializationSchema.java | 64 ++ .../format/avro/AvroToRowConverter.java | 199 +++++ .../format/avro/RowToAvroConverter.java | 237 ++++++ .../avro/exception/AvroFormatErrorCode.java | 44 + .../SeaTunnelAvroFormatException.java | 29 + .../format/avro/AvroConverterTest.java | 174 ++++ .../avro/AvroSerializationSchemaTest.java | 165 ++++ 18 files changed, 1586 insertions(+), 364 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf create mode 100644 seatunnel-formats/seatunnel-format-avro/pom.xml create mode 100644 seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java create mode 100644 seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java create mode 100644 seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java create mode 100644 seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java create mode 100644 seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/AvroFormatErrorCode.java create mode 100644 seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/SeaTunnelAvroFormatException.java create mode 100644 seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroConverterTest.java create mode 100644 seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-connectors-v2/connector-kafka/pom.xml index 0ce4bba6b17..60dc3049289 100644 --- a/seatunnel-connectors-v2/connector-kafka/pom.xml +++ b/seatunnel-connectors-v2/connector-kafka/pom.xml @@ -61,6 +61,11 @@ seatunnel-format-compatible-debezium-json ${project.version} + + org.apache.seatunnel + seatunnel-format-avro + ${project.version} + diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java index 65b5cc27699..04d85c509b2 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java @@ -21,5 +21,6 @@ public enum MessageFormat { JSON, TEXT, CANAL_JSON, - COMPATIBLE_DEBEZIUM_JSON + COMPATIBLE_DEBEZIUM_JSON, + AVRO } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java index 06005de0035..9007e21e671 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat; import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException; +import org.apache.seatunnel.format.avro.AvroSerializationSchema; import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema; import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema; import org.apache.seatunnel.format.json.JsonSerializationSchema; @@ -221,6 +222,8 @@ private static SerializationSchema createSerializationSchema( return new CanalJsonSerializationSchema(rowType); case COMPATIBLE_DEBEZIUM_JSON: return new CompatibleDebeziumJsonSerializationSchema(rowType, isKey); + case AVRO: + return new AvroSerializationSchema(rowType); default: throw new SeaTunnelJsonFormatException( CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format); diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java index 450ba3f1cde..61dd15890c6 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java @@ -43,7 +43,10 @@ public OptionRule optionRule() { .conditional( Config.FORMAT, Arrays.asList( - MessageFormat.JSON, MessageFormat.CANAL_JSON, MessageFormat.TEXT), + MessageFormat.JSON, + MessageFormat.CANAL_JSON, + MessageFormat.TEXT, + MessageFormat.AVRO), Config.TOPIC) .optional( Config.KAFKA_CONFIG, diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java index 741d7521643..b2aaecc6e72 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java @@ -45,6 +45,7 @@ import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode; import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException; import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState; +import org.apache.seatunnel.format.avro.AvroDeserializationSchema; import org.apache.seatunnel.format.json.JsonDeserializationSchema; import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema; import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException; @@ -266,6 +267,8 @@ private void setDeserialization(Config config) { .setIgnoreParseErrors(true) .build(); break; + case AVRO: + deserializationSchema = new AvroDeserializationSchema(typeInfo); default: throw new SeaTunnelJsonFormatException( CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 922798c3ded..348b1e26f46 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.e2e.connector.kafka; +import org.apache.seatunnel.format.text.TextSerializationSchema; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; @@ -35,7 +36,6 @@ import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; -import org.apache.seatunnel.format.text.TextSerializationSchema; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -80,366 +80,394 @@ @Slf4j @DisabledOnContainer( - value = {}, - disabledReason = "Override TestSuiteBase @DisabledOnContainer") + value = {}, + disabledReason = "Override TestSuiteBase @DisabledOnContainer") public class KafkaIT extends TestSuiteBase implements TestResource { - private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9"; - - private static final String KAFKA_HOST = "kafkaCluster"; - - private static final MessageFormat DEFAULT_FORMAT = MessageFormat.JSON; - - private static final String DEFAULT_FIELD_DELIMITER = ","; - - private KafkaProducer producer; - - private KafkaContainer kafkaContainer; - - @BeforeAll - @Override - public void startUp() throws Exception { - kafkaContainer = - new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) - .withNetwork(NETWORK) - .withNetworkAliases(KAFKA_HOST) - .withLogConsumer( - new Slf4jLogConsumer( - DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); - Startables.deepStart(Stream.of(kafkaContainer)).join(); - log.info("Kafka container started"); - Awaitility.given() - .ignoreExceptions() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(this::initKafkaProducer); - - log.info("Write 100 records to topic test_topic_source"); - DefaultSeaTunnelRowSerializer serializer = - DefaultSeaTunnelRowSerializer.create( - "test_topic_source", - SEATUNNEL_ROW_TYPE, - DEFAULT_FORMAT, - DEFAULT_FIELD_DELIMITER); - generateTestData(serializer::serializeRow, 0, 100); - } - - @AfterAll - @Override - public void tearDown() throws Exception { - if (producer != null) { - producer.close(); - } - if (kafkaContainer != null) { - kafkaContainer.close(); - } - } - - @TestTemplate - public void testSinkKafka(TestContainer container) throws IOException, InterruptedException { - Container.ExecResult execResult = container.executeJob("/kafka_sink_fake_to_kafka.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - - String topicName = "test_topic"; - Map data = getKafkaConsumerData(topicName); - ObjectMapper objectMapper = new ObjectMapper(); - String key = data.keySet().iterator().next(); - ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class); - Assertions.assertTrue(objectNode.has("c_map")); - Assertions.assertTrue(objectNode.has("c_string")); - Assertions.assertEquals(10, data.size()); - } - - @TestTemplate - public void testTextFormatSinkKafka(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/textFormatIT/fake_source_to_text_sink_kafka.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - - String topicName = "test_text_topic"; - Map data = getKafkaConsumerData(topicName); - Assertions.assertEquals(10, data.size()); - } - - @TestTemplate - public void testDefaultRandomSinkKafka(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/kafka_default_sink_fake_to_kafka.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - - String topicName = "topic_default_sink_test"; - List data = getKafkaConsumerListData(topicName); - Assertions.assertEquals(10, data.size()); - } - - @TestTemplate - public void testExtractTopicFunction(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = container.executeJob("/extractTopic_fake_to_kafka.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - - String topicName = "test_extract_topic"; - Map data = getKafkaConsumerData(topicName); - ObjectMapper objectMapper = new ObjectMapper(); - String key = data.keySet().iterator().next(); - ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class); - Assertions.assertTrue(objectNode.has("c_map")); - Assertions.assertTrue(objectNode.has("c_string")); - Assertions.assertEquals(10, data.size()); - } - - @TestTemplate - public void testSourceKafkaTextToConsole(TestContainer container) - throws IOException, InterruptedException { - TextSerializationSchema serializer = - TextSerializationSchema.builder() - .seaTunnelRowType(SEATUNNEL_ROW_TYPE) - .delimiter(",") - .build(); - generateTestData( - row -> new ProducerRecord<>("test_topic_text", null, serializer.serialize(row)), - 0, - 100); - Container.ExecResult execResult = - container.executeJob("/textFormatIT/kafka_source_text_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - @TestTemplate - public void testSourceKafkaJsonToConsole(TestContainer container) - throws IOException, InterruptedException { - DefaultSeaTunnelRowSerializer serializer = - DefaultSeaTunnelRowSerializer.create( - "test_topic_json", - SEATUNNEL_ROW_TYPE, - DEFAULT_FORMAT, - DEFAULT_FIELD_DELIMITER); - generateTestData(row -> serializer.serializeRow(row), 0, 100); - Container.ExecResult execResult = - container.executeJob("/jsonFormatIT/kafka_source_json_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - @TestTemplate - public void testSourceKafkaJsonFormatErrorHandleWaySkipToConsole(TestContainer container) - throws IOException, InterruptedException { - DefaultSeaTunnelRowSerializer serializer = - DefaultSeaTunnelRowSerializer.create( - "test_topic_error_message", - SEATUNNEL_ROW_TYPE, - DEFAULT_FORMAT, - DEFAULT_FIELD_DELIMITER); - generateTestData(row -> serializer.serializeRow(row), 0, 100); - Container.ExecResult execResult = - container.executeJob( - "/kafka/kafkasource_format_error_handle_way_skip_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - @TestTemplate - public void testSourceKafkaJsonFormatErrorHandleWayFailToConsole(TestContainer container) - throws IOException, InterruptedException { - DefaultSeaTunnelRowSerializer serializer = - DefaultSeaTunnelRowSerializer.create( - "test_topic_error_message", - SEATUNNEL_ROW_TYPE, - DEFAULT_FORMAT, - DEFAULT_FIELD_DELIMITER); - generateTestData(row -> serializer.serializeRow(row), 0, 100); - Container.ExecResult execResult = - container.executeJob( - "/kafka/kafkasource_format_error_handle_way_fail_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - @TestTemplate - public void testSourceKafka(TestContainer container) throws IOException, InterruptedException { - testKafkaLatestToConsole(container); - testKafkaEarliestToConsole(container); - testKafkaSpecificOffsetsToConsole(container); - testKafkaTimestampToConsole(container); - } - - @TestTemplate - public void testSourceKafkaStartConfig(TestContainer container) - throws IOException, InterruptedException { - DefaultSeaTunnelRowSerializer serializer = - DefaultSeaTunnelRowSerializer.create( - "test_topic_group", - SEATUNNEL_ROW_TYPE, - DEFAULT_FORMAT, - DEFAULT_FIELD_DELIMITER); - generateTestData(row -> serializer.serializeRow(row), 100, 150); - testKafkaGroupOffsetsToConsole(container); - } - - public void testKafkaLatestToConsole(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/kafka/kafkasource_latest_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - public void testKafkaEarliestToConsole(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/kafka/kafkasource_earliest_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - public void testKafkaSpecificOffsetsToConsole(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/kafka/kafkasource_specific_offsets_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - public void testKafkaGroupOffsetsToConsole(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/kafka/kafkasource_group_offset_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - public void testKafkaTimestampToConsole(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/kafka/kafkasource_timestamp_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - private void initKafkaProducer() { - Properties props = new Properties(); - String bootstrapServers = kafkaContainer.getBootstrapServers(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producer = new KafkaProducer<>(props); - } - - private Properties kafkaConsumerConfig() { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "seatunnel-kafka-sink-group"); - props.put( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - OffsetResetStrategy.EARLIEST.toString().toLowerCase()); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - return props; - } - - @SuppressWarnings("checkstyle:Indentation") - private void generateTestData(ProducerRecordConverter converter, int start, int end) { - for (int i = start; i < end; i++) { - SeaTunnelRow row = - new SeaTunnelRow( - new Object[] { - Long.valueOf(i), - Collections.singletonMap("key", Short.parseShort("1")), - new Byte[] {Byte.parseByte("1")}, - "string", - Boolean.FALSE, - Byte.parseByte("1"), - Short.parseShort("1"), - Integer.parseInt("1"), - Long.parseLong("1"), - Float.parseFloat("1.1"), - Double.parseDouble("1.1"), - BigDecimal.valueOf(11, 1), - "test".getBytes(), - LocalDate.now(), - LocalDateTime.now() - }); - ProducerRecord producerRecord = converter.convert(row); - producer.send(producerRecord); - } - } - - private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE = - new SeaTunnelRowType( - new String[] { - "id", - "c_map", - "c_array", - "c_string", - "c_boolean", - "c_tinyint", - "c_smallint", - "c_int", - "c_bigint", - "c_float", - "c_double", - "c_decimal", - "c_bytes", - "c_date", - "c_timestamp" - }, - new SeaTunnelDataType[] { - BasicType.LONG_TYPE, - new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), - ArrayType.BYTE_ARRAY_TYPE, - BasicType.STRING_TYPE, - BasicType.BOOLEAN_TYPE, - BasicType.BYTE_TYPE, - BasicType.SHORT_TYPE, - BasicType.INT_TYPE, - BasicType.LONG_TYPE, - BasicType.FLOAT_TYPE, - BasicType.DOUBLE_TYPE, - new DecimalType(2, 1), - PrimitiveByteArrayType.INSTANCE, - LocalTimeType.LOCAL_DATE_TYPE, - LocalTimeType.LOCAL_DATE_TIME_TYPE - }); - - private Map getKafkaConsumerData(String topicName) { - Map data = new HashMap<>(); - try (KafkaConsumer consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { - consumer.subscribe(Arrays.asList(topicName)); - Map offsets = - consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); - Long endOffset = offsets.entrySet().iterator().next().getValue(); - Long lastProcessedOffset = -1L; - - do { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - if (lastProcessedOffset < record.offset()) { - data.put(record.key(), record.value()); - } - lastProcessedOffset = record.offset(); - } - } while (lastProcessedOffset < endOffset - 1); - } - return data; - } - - private List getKafkaConsumerListData(String topicName) { - List data = new ArrayList<>(); - try (KafkaConsumer consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { - consumer.subscribe(Arrays.asList(topicName)); - Map offsets = - consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); - Long endOffset = offsets.entrySet().iterator().next().getValue(); - Long lastProcessedOffset = -1L; - - do { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - if (lastProcessedOffset < record.offset()) { - data.add(record.value()); - } - lastProcessedOffset = record.offset(); - } - } while (lastProcessedOffset < endOffset - 1); - } - return data; - } - - interface ProducerRecordConverter { - ProducerRecord convert(SeaTunnelRow row); - } + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9"; + + private static final String KAFKA_HOST = "kafkaCluster"; + + private static final MessageFormat DEFAULT_FORMAT = MessageFormat.JSON; + + private static final String DEFAULT_FIELD_DELIMITER = ","; + + private KafkaProducer producer; + + private KafkaContainer kafkaContainer; + + @BeforeAll + @Override + public void startUp() throws Exception { + kafkaContainer = + new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); + Startables.deepStart(Stream.of(kafkaContainer)).join(); + log.info("Kafka container started"); + Awaitility.given() + .ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initKafkaProducer); + + log.info("Write 100 records to topic test_topic_source"); + DefaultSeaTunnelRowSerializer serializer = + DefaultSeaTunnelRowSerializer.create( + "test_topic_source", + SEATUNNEL_ROW_TYPE, + DEFAULT_FORMAT, + DEFAULT_FIELD_DELIMITER); + generateTestData(serializer::serializeRow, 0, 100); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (producer != null) { + producer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + } + + @TestTemplate + public void testSinkKafka(TestContainer container) throws IOException, + InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafka_sink_fake_to_kafka.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + String topicName = "test_topic"; + Map data = getKafkaConsumerData(topicName); + ObjectMapper objectMapper = new ObjectMapper(); + String key = data.keySet().iterator().next(); + ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class); + Assertions.assertTrue(objectNode.has("c_map")); + Assertions.assertTrue(objectNode.has("c_string")); + Assertions.assertEquals(10, data.size()); + } + + @TestTemplate + public void testTextFormatSinkKafka(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/textFormatIT/fake_source_to_text_sink_kafka.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + String topicName = "test_text_topic"; + Map data = getKafkaConsumerData(topicName); + Assertions.assertEquals(10, data.size()); + } + + @TestTemplate + public void testDefaultRandomSinkKafka(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafka_default_sink_fake_to_kafka.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + String topicName = "topic_default_sink_test"; + List data = getKafkaConsumerListData(topicName); + Assertions.assertEquals(10, data.size()); + } + + @TestTemplate + public void testExtractTopicFunction(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/extractTopic_fake_to_kafka.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + String topicName = "test_extract_topic"; + Map data = getKafkaConsumerData(topicName); + ObjectMapper objectMapper = new ObjectMapper(); + String key = data.keySet().iterator().next(); + ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class); + Assertions.assertTrue(objectNode.has("c_map")); + Assertions.assertTrue(objectNode.has("c_string")); + Assertions.assertEquals(10, data.size()); + } + + @TestTemplate + public void testSourceKafkaTextToConsole(TestContainer container) + throws IOException, InterruptedException { + TextSerializationSchema serializer = + TextSerializationSchema.builder() + .seaTunnelRowType(SEATUNNEL_ROW_TYPE) + .delimiter(",") + .build(); + generateTestData( + row -> new ProducerRecord<>("test_topic_text", null, + serializer.serialize(row)), + 0, + 100); + Container.ExecResult execResult = + container.executeJob("/textFormatIT/kafka_source_text_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + @TestTemplate + public void testSourceKafkaJsonToConsole(TestContainer container) + throws IOException, InterruptedException { + DefaultSeaTunnelRowSerializer serializer = + DefaultSeaTunnelRowSerializer.create( + "test_topic_json", + SEATUNNEL_ROW_TYPE, + DEFAULT_FORMAT, + DEFAULT_FIELD_DELIMITER); + generateTestData(row -> serializer.serializeRow(row), 0, 100); + Container.ExecResult execResult = + container.executeJob("/jsonFormatIT/kafka_source_json_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + @TestTemplate + public void testSourceKafkaJsonFormatErrorHandleWaySkipToConsole(TestContainer container) + throws IOException, InterruptedException { + DefaultSeaTunnelRowSerializer serializer = + DefaultSeaTunnelRowSerializer.create( + "test_topic_error_message", + SEATUNNEL_ROW_TYPE, + DEFAULT_FORMAT, + DEFAULT_FIELD_DELIMITER); + generateTestData(row -> serializer.serializeRow(row), 0, 100); + Container.ExecResult execResult = + container.executeJob( + "/kafka/kafkasource_format_error_handle_way_skip_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + @TestTemplate + public void testSourceKafkaJsonFormatErrorHandleWayFailToConsole(TestContainer container) + throws IOException, InterruptedException { + DefaultSeaTunnelRowSerializer serializer = + DefaultSeaTunnelRowSerializer.create( + "test_topic_error_message", + SEATUNNEL_ROW_TYPE, + DEFAULT_FORMAT, + DEFAULT_FIELD_DELIMITER); + generateTestData(row -> serializer.serializeRow(row), 0, 100); + Container.ExecResult execResult = + container.executeJob( + "/kafka/kafkasource_format_error_handle_way_fail_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + @TestTemplate + public void testSourceKafka(TestContainer container) throws IOException, + InterruptedException { + testKafkaLatestToConsole(container); + testKafkaEarliestToConsole(container); + testKafkaSpecificOffsetsToConsole(container); + testKafkaTimestampToConsole(container); + } + + @TestTemplate + public void testSourceKafkaStartConfig(TestContainer container) + throws IOException, InterruptedException { + DefaultSeaTunnelRowSerializer serializer = + DefaultSeaTunnelRowSerializer.create( + "test_topic_group", + SEATUNNEL_ROW_TYPE, + DEFAULT_FORMAT, + DEFAULT_FIELD_DELIMITER); + generateTestData(row -> serializer.serializeRow(row), 100, 150); + testKafkaGroupOffsetsToConsole(container); + } + + @TestTemplate + public void testFakeSourceToKafkaAvroFormat(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/avro/fake_source_to_kafka_avro_format.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + @TestTemplate + public void testKafkaAvroToConsole(TestContainer container) throws IOException, InterruptedException { + DefaultSeaTunnelRowSerializer serializer = + DefaultSeaTunnelRowSerializer.create( + "test_avro_topic", + SEATUNNEL_ROW_TYPE, + MessageFormat.AVRO, + DEFAULT_FIELD_DELIMITER); + generateTestData(row -> serializer.serializeRow(row), 0, 100); + Container.ExecResult execResult = + container.executeJob( + "/avro/kafka_avro_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaLatestToConsole(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafka/kafkasource_latest_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaEarliestToConsole(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafka/kafkasource_earliest_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaSpecificOffsetsToConsole(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafka/kafkasource_specific_offsets_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaGroupOffsetsToConsole(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafka/kafkasource_group_offset_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaTimestampToConsole(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafka/kafkasource_timestamp_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + private void initKafkaProducer() { + Properties props = new Properties(); + String bootstrapServers = kafkaContainer.getBootstrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(props); + } + + private Properties kafkaConsumerConfig() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "seatunnel-kafka-sink-group"); + props.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.EARLIEST.toString().toLowerCase()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return props; + } + + @SuppressWarnings("checkstyle:Indentation") + private void generateTestData(ProducerRecordConverter converter, int start, int end) { + for (int i = start; i < end; i++) { + SeaTunnelRow row = + new SeaTunnelRow( + new Object[]{ + Long.valueOf(i), + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[]{Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.now(), + LocalDateTime.now() + }); + ProducerRecord producerRecord = converter.convert(row); + producer.send(producerRecord); + } + } + + private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE = + new SeaTunnelRowType( + new String[]{ + "id", + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_bytes", + "c_date", + "c_timestamp" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), + ArrayType.BYTE_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + new DecimalType(2, 1), + PrimitiveByteArrayType.INSTANCE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + }); + + private Map getKafkaConsumerData(String topicName) { + Map data = new HashMap<>(); + try (KafkaConsumer consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + consumer.subscribe(Arrays.asList(topicName)); + Map offsets = + consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); + Long endOffset = offsets.entrySet().iterator().next().getValue(); + Long lastProcessedOffset = -1L; + + do { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + if (lastProcessedOffset < record.offset()) { + data.put(record.key(), record.value()); + } + lastProcessedOffset = record.offset(); + } + } while (lastProcessedOffset < endOffset - 1); + } + return data; + } + + private List getKafkaConsumerListData(String topicName) { + List data = new ArrayList<>(); + try (KafkaConsumer consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + consumer.subscribe(Arrays.asList(topicName)); + Map offsets = + consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); + Long endOffset = offsets.entrySet().iterator().next().getValue(); + Long lastProcessedOffset = -1L; + + do { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + if (lastProcessedOffset < record.offset()) { + data.add(record.value()); + } + lastProcessedOffset = record.offset(); + } + } while (lastProcessedOffset < endOffset - 1); + } + return data; + } + + interface ProducerRecordConverter { + ProducerRecord convert(SeaTunnelRow row); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf new file mode 100644 index 00000000000..c6f5d6944f6 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + result_table_name = "fake" + } +} + +sink { + Kafka { + bootstrap.servers = "kafkaCluster:9092" + topic = "test_avro_topic" + format = avro + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf new file mode 100644 index 00000000000..1b2dadf6a93 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9092" + topic = "test_avro_topic" + result_table_name = "kafka_table" + kafka.auto.offset.reset = "earliest" + format_error_handle_way = skip + schema = { + fields { + id = bigint + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(2, 1)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } +} + +sink { + Console { + source_table_name = "kafka_table" + } + Assert { + source_table_name = "kafka_table" + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-formats/pom.xml b/seatunnel-formats/pom.xml index 983a8629ce8..8320a84bbac 100644 --- a/seatunnel-formats/pom.xml +++ b/seatunnel-formats/pom.xml @@ -30,6 +30,7 @@ seatunnel-format-json seatunnel-format-text seatunnel-format-compatible-debezium-json + seatunnel-format-avro diff --git a/seatunnel-formats/seatunnel-format-avro/pom.xml b/seatunnel-formats/seatunnel-format-avro/pom.xml new file mode 100644 index 00000000000..7e50a113b75 --- /dev/null +++ b/seatunnel-formats/seatunnel-format-avro/pom.xml @@ -0,0 +1,45 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-formats + ${revision} + + + seatunnel-format-avro + SeaTunnel : Formats : Avro + + + 1.11.1 + + + + + org.apache.seatunnel + seatunnel-api + ${project.version} + + + org.apache.avro + avro + ${avro.version} + + + + diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java new file mode 100644 index 00000000000..e554e17e901 --- /dev/null +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.avro; + +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; + +import java.io.IOException; + +public class AvroDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = -7907358485475741366L; + + private final SeaTunnelRowType rowType; + private final AvroToRowConverter converter; + private final DatumReader reader; + + public AvroDeserializationSchema(SeaTunnelRowType rowType) { + this.rowType = rowType; + this.converter = new AvroToRowConverter(); + this.reader = converter.getReader(); + } + + @Override + public SeaTunnelRow deserialize(byte[] message) throws IOException { + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(message, null); + GenericRecord record = reader.read(null, decoder); + return converter.converter(record, rowType); + } + + @Override + public SeaTunnelDataType getProducedType() { + return this.rowType; + } +} diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java new file mode 100644 index 00000000000..02f6bbc8eec --- /dev/null +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.avro; + +import org.apache.seatunnel.api.serialization.SerializationSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode; +import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException; + +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class AvroSerializationSchema implements SerializationSchema { + + private static final long serialVersionUID = 4438784443025715370L; + + private final ByteArrayOutputStream out; + private final BinaryEncoder encoder; + private final RowToAvroConverter converter; + private final DatumWriter writer; + + public AvroSerializationSchema(SeaTunnelRowType rowType) { + this.out = new ByteArrayOutputStream(); + this.encoder = EncoderFactory.get().binaryEncoder(out, null); + this.converter = new RowToAvroConverter(rowType); + this.writer = this.converter.getWriter(); + } + + @Override + public byte[] serialize(SeaTunnelRow element) { + GenericRecord record = converter.convertRowToGenericRecord(element); + try { + out.reset(); + writer.write(record, encoder); + encoder.flush(); + return out.toByteArray(); + } catch (IOException e) { + throw new SeaTunnelAvroFormatException( + AvroFormatErrorCode.SERIALIZATION_ERROR, e.toString()); + } + } +} diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java new file mode 100644 index 00000000000..bcc4726072d --- /dev/null +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.avro; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode; +import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException; + +import org.apache.avro.Conversions; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.data.TimeConversions; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; + +public class AvroToRowConverter implements Serializable { + + private static final long serialVersionUID = 8177020083886379563L; + + private final DatumReader reader; + + public AvroToRowConverter() { + this.reader = createReader(); + } + + public DatumReader getReader() { + return reader; + } + + private DatumReader createReader() { + GenericDatumReader datumReader = new GenericDatumReader<>(); + datumReader.getData().addLogicalTypeConversion(new Conversions.DecimalConversion()); + datumReader.getData().addLogicalTypeConversion(new TimeConversions.DateConversion()); + datumReader + .getData() + .addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); + return datumReader; + } + + public SeaTunnelRow converter(GenericRecord record, SeaTunnelRowType rowType) { + String[] fieldNames = rowType.getFieldNames(); + + Object[] values = new Object[fieldNames.length]; + for (int i = 0; i < fieldNames.length; i++) { + if (!record.hasField(fieldNames[i])) { + values[i] = null; + continue; + } + values[i] = + convertField( + rowType.getFieldType(i), + record.getSchema().getField(fieldNames[i]), + record.get(fieldNames[i])); + } + return new SeaTunnelRow(values); + } + + private Object convertField(SeaTunnelDataType dataType, Schema.Field field, Object val) { + switch (dataType.getSqlType()) { + case MAP: + case STRING: + case BOOLEAN: + case SMALLINT: + case INT: + case BIGINT: + case FLOAT: + case DOUBLE: + case NULL: + case BYTES: + return val; + case TINYINT: + Class typeClass = dataType.getTypeClass(); + if (typeClass == Byte.class) { + Integer integer = (Integer) val; + return integer.byteValue(); + } + return val; + case ARRAY: + BasicType basicType = ((ArrayType) dataType).getElementType(); + List list = (List) val; + return convertArray(list, basicType); + case DECIMAL: + LogicalTypes.Decimal decimal = + (LogicalTypes.Decimal) field.schema().getLogicalType(); + ByteBuffer buffer = (ByteBuffer) val; + byte[] bytes = buffer.array(); + return new BigDecimal(new BigInteger(bytes), decimal.getScale()); + case DATE: + return LocalDate.ofEpochDay((Long) val); + case TIMESTAMP: + return LocalDateTime.ofInstant( + Instant.ofEpochMilli((Long) val), ZoneOffset.of("+8")); + case ROW: + SeaTunnelRowType subRow = (SeaTunnelRowType) dataType; + return converter((GenericRecord) val, subRow); + default: + String errorMsg = + String.format( + "SeaTunnel file connector is not supported for this data type [%s]", + dataType.getSqlType()); + throw new SeaTunnelAvroFormatException( + AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg); + } + } + + protected static Object convertArray(List val, SeaTunnelDataType dataType) { + if (val == null) { + return null; + } + int length = val.size(); + switch (dataType.getSqlType()) { + case STRING: + String[] strings = new String[length]; + for (int i = 0; i < strings.length; i++) { + strings[i] = val.get(i).toString(); + } + return strings; + case BOOLEAN: + Boolean[] booleans = new Boolean[length]; + for (int i = 0; i < booleans.length; i++) { + booleans[i] = (Boolean) val.get(i); + } + return booleans; + case BYTES: + Byte[] bytes = new Byte[length]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = (Byte) val.get(i); + } + return bytes; + case SMALLINT: + Short[] shorts = new Short[length]; + for (int i = 0; i < shorts.length; i++) { + shorts[i] = (Short) val.get(i); + } + return shorts; + case INT: + Integer[] integers = new Integer[length]; + for (int i = 0; i < integers.length; i++) { + integers[i] = (Integer) val.get(i); + } + return integers; + case BIGINT: + Long[] longs = new Long[length]; + for (int i = 0; i < longs.length; i++) { + longs[i] = (Long) val.get(i); + } + return longs; + case FLOAT: + Float[] floats = new Float[length]; + for (int i = 0; i < floats.length; i++) { + floats[i] = (Float) val.get(i); + } + return floats; + case DOUBLE: + Double[] doubles = new Double[length]; + for (int i = 0; i < doubles.length; i++) { + doubles[i] = (Double) val.get(i); + } + return doubles; + default: + String errorMsg = + String.format( + "SeaTunnel avro array format is not supported for this data type [%s]", + dataType.getSqlType()); + throw new SeaTunnelAvroFormatException( + AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg); + } + } +} diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java new file mode 100644 index 00000000000..21a39f5952a --- /dev/null +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.avro; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode; +import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException; + +import org.apache.avro.Conversions; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.data.TimeConversions; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.io.DatumWriter; + +import lombok.extern.slf4j.Slf4j; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +public class RowToAvroConverter implements Serializable { + + private static final long serialVersionUID = -576124379280229724L; + + private final Schema schema; + private final SeaTunnelRowType rowType; + private final DatumWriter writer; + + public RowToAvroConverter(SeaTunnelRowType rowType) { + this.schema = buildAvroSchemaWithRowType(rowType); + this.rowType = rowType; + this.writer = createWriter(); + } + + private DatumWriter createWriter() { + GenericDatumWriter datumWriter = new GenericDatumWriter<>(schema); + datumWriter.getData().addLogicalTypeConversion(new Conversions.DecimalConversion()); + datumWriter.getData().addLogicalTypeConversion(new TimeConversions.DateConversion()); + datumWriter + .getData() + .addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); + return datumWriter; + } + + public Schema getSchema() { + return schema; + } + + public DatumWriter getWriter() { + return writer; + } + + public GenericRecord convertRowToGenericRecord(SeaTunnelRow element) { + GenericRecordBuilder builder = new GenericRecordBuilder(schema); + String[] fieldNames = rowType.getFieldNames(); + for (int i = 0; i < fieldNames.length; i++) { + String fieldName = rowType.getFieldName(i); + Object value = element.getField(i); + builder.set(fieldName.toLowerCase(), resolveObject(value, rowType.getFieldType(i))); + } + return builder.build(); + } + + private Schema buildAvroSchemaWithRowType(SeaTunnelRowType seaTunnelRowType) { + List fields = new ArrayList<>(); + SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); + String[] fieldNames = seaTunnelRowType.getFieldNames(); + for (int i = 0; i < fieldNames.length; i++) { + fields.add(generateField(fieldNames[i], fieldTypes[i])); + } + return Schema.createRecord("SeaTunnelRecord", null, null, false, fields); + } + + private Schema.Field generateField(String fieldName, SeaTunnelDataType seaTunnelDataType) { + return new Schema.Field( + fieldName, + seaTunnelDataType2AvroDataType(fieldName, seaTunnelDataType), + null, + null); + } + + private Schema seaTunnelDataType2AvroDataType( + String fieldName, SeaTunnelDataType seaTunnelDataType) { + + switch (seaTunnelDataType.getSqlType()) { + case STRING: + return Schema.create(Schema.Type.STRING); + case BYTES: + return Schema.create(Schema.Type.BYTES); + case TINYINT: + case SMALLINT: + case INT: + return Schema.create(Schema.Type.INT); + case BIGINT: + return Schema.create(Schema.Type.LONG); + case FLOAT: + return Schema.create(Schema.Type.FLOAT); + case DOUBLE: + return Schema.create(Schema.Type.DOUBLE); + case BOOLEAN: + return Schema.create(Schema.Type.BOOLEAN); + case MAP: + SeaTunnelDataType valueType = ((MapType) seaTunnelDataType).getValueType(); + return Schema.createMap(seaTunnelDataType2AvroDataType(fieldName, valueType)); + case ARRAY: + BasicType elementType = ((ArrayType) seaTunnelDataType).getElementType(); + return Schema.createArray(seaTunnelDataType2AvroDataType(fieldName, elementType)); + case ROW: + SeaTunnelDataType[] fieldTypes = + ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes(); + String[] fieldNames = ((SeaTunnelRowType) seaTunnelDataType).getFieldNames(); + List subField = new ArrayList<>(); + for (int i = 0; i < fieldNames.length; i++) { + subField.add(generateField(fieldNames[i], fieldTypes[i])); + } + return Schema.createRecord(fieldName, null, null, false, subField); + case DECIMAL: + int precision = ((DecimalType) seaTunnelDataType).getPrecision(); + int scale = ((DecimalType) seaTunnelDataType).getScale(); + LogicalTypes.Decimal decimal = LogicalTypes.decimal(precision, scale); + return decimal.addToSchema(Schema.create(Schema.Type.BYTES)); + case TIMESTAMP: + return LogicalTypes.localTimestampMillis() + .addToSchema(Schema.create(Schema.Type.LONG)); + case DATE: + return LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); + case NULL: + return Schema.create(Schema.Type.NULL); + default: + String errorMsg = + String.format( + "SeaTunnel file connector is not supported for this data type [%s]", + seaTunnelDataType.getSqlType()); + throw new SeaTunnelAvroFormatException( + AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg); + } + } + + private Object resolveObject(Object data, SeaTunnelDataType seaTunnelDataType) { + if (data == null) { + return null; + } + switch (seaTunnelDataType.getSqlType()) { + case STRING: + case SMALLINT: + case INT: + case BIGINT: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case MAP: + return data; + case TINYINT: + Class typeClass = seaTunnelDataType.getTypeClass(); + if (typeClass == Byte.class) { + if (data instanceof Byte) { + Byte aByte = (Byte) data; + return Byte.toUnsignedInt(aByte); + } + } + return data; + case DECIMAL: + BigDecimal decimal = (BigDecimal) data; + return ByteBuffer.wrap(decimal.unscaledValue().toByteArray()); + case DATE: + LocalDate localDate = (LocalDate) data; + return localDate.toEpochDay(); + case BYTES: + return ByteBuffer.wrap((byte[]) data); + case ARRAY: + // BasicType basicType = ((ArrayType) + // seaTunnelDataType).getElementType(); + // return Util.convertArray((Object[]) data, basicType); + BasicType basicType = ((ArrayType) seaTunnelDataType).getElementType(); + List records = new ArrayList<>(((Object[]) data).length); + for (Object object : (Object[]) data) { + Object resolvedObject = resolveObject(object, basicType); + records.add(resolvedObject); + } + return records; + case ROW: + SeaTunnelRow seaTunnelRow = (SeaTunnelRow) data; + SeaTunnelDataType[] fieldTypes = + ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes(); + String[] fieldNames = ((SeaTunnelRowType) seaTunnelDataType).getFieldNames(); + Schema recordSchema = + buildAvroSchemaWithRowType((SeaTunnelRowType) seaTunnelDataType); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(recordSchema); + for (int i = 0; i < fieldNames.length; i++) { + recordBuilder.set( + fieldNames[i].toLowerCase(), + resolveObject(seaTunnelRow.getField(i), fieldTypes[i])); + } + return recordBuilder.build(); + case TIMESTAMP: + LocalDateTime dateTime = (LocalDateTime) data; + return (dateTime).toInstant(ZoneOffset.of("+8")).toEpochMilli(); + default: + String errorMsg = + String.format( + "SeaTunnel avro format is not supported for this data type [%s]", + seaTunnelDataType.getSqlType()); + throw new SeaTunnelAvroFormatException( + AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg); + } + } +} diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/AvroFormatErrorCode.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/AvroFormatErrorCode.java new file mode 100644 index 00000000000..6f60a479328 --- /dev/null +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/AvroFormatErrorCode.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.avro.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum AvroFormatErrorCode implements SeaTunnelErrorCode { + UNSUPPORTED_DATA_TYPE("AVRO-01", "Unsupported data type."), + SERIALIZATION_ERROR("AVRO-02", "serialize error."), + FILED_NOT_EXIST("AVRO-03", "Field not exist."); + + private final String code; + private final String description; + + AvroFormatErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/SeaTunnelAvroFormatException.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/SeaTunnelAvroFormatException.java new file mode 100644 index 00000000000..93c45323b49 --- /dev/null +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/exception/SeaTunnelAvroFormatException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.avro.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class SeaTunnelAvroFormatException extends SeaTunnelRuntimeException { + + public SeaTunnelAvroFormatException( + SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } +} diff --git a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroConverterTest.java b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroConverterTest.java new file mode 100644 index 00000000000..66d847dcc3a --- /dev/null +++ b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroConverterTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.avro; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.apache.avro.generic.GenericRecord; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; + +class AvroConverterTest { + + private SeaTunnelRow buildSeaTunnelRow() { + SeaTunnelRow subSeaTunnelRow = new SeaTunnelRow(14); + Map map = new HashMap(); + map.put("k1", "v1"); + map.put("k2", "v2"); + String[] strArray = new String[] {"l1", "l2"}; + byte byteVal = 100; + LocalDate localDate = LocalDate.of(2023, 1, 1); + + BigDecimal bigDecimal = new BigDecimal("61592600349703735722.724745739637773662"); + LocalDateTime localDateTime = LocalDateTime.of(2023, 1, 1, 6, 30, 40); + + subSeaTunnelRow.setField(0, map); + subSeaTunnelRow.setField(1, strArray); + subSeaTunnelRow.setField(2, "strVal"); + subSeaTunnelRow.setField(3, true); + subSeaTunnelRow.setField(4, 1); + subSeaTunnelRow.setField(5, 2); + subSeaTunnelRow.setField(6, 3); + subSeaTunnelRow.setField(7, Long.MAX_VALUE - 1); + subSeaTunnelRow.setField(8, 33.333F); + subSeaTunnelRow.setField(9, 123.456); + subSeaTunnelRow.setField(10, byteVal); + subSeaTunnelRow.setField(11, localDate); + subSeaTunnelRow.setField(12, bigDecimal); + subSeaTunnelRow.setField(13, localDateTime); + + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(15); + seaTunnelRow.setField(0, map); + seaTunnelRow.setField(1, strArray); + seaTunnelRow.setField(2, "strVal"); + seaTunnelRow.setField(3, true); + seaTunnelRow.setField(4, 1); + seaTunnelRow.setField(5, 2); + seaTunnelRow.setField(6, 3); + seaTunnelRow.setField(7, Long.MAX_VALUE - 1); + seaTunnelRow.setField(8, 33.333F); + seaTunnelRow.setField(9, 123.456); + seaTunnelRow.setField(10, byteVal); + seaTunnelRow.setField(11, localDate); + seaTunnelRow.setField(12, bigDecimal); + seaTunnelRow.setField(13, localDateTime); + seaTunnelRow.setField(14, subSeaTunnelRow); + return seaTunnelRow; + } + + private SeaTunnelRowType buildSeaTunnelRowType() { + String[] subField = { + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_bytes", + "c_date", + "c_decimal", + "c_timestamp" + }; + SeaTunnelDataType[] subFieldTypes = { + new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE), + ArrayType.STRING_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.INT_TYPE, + BasicType.INT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + BasicType.BYTE_TYPE, + LocalTimeType.LOCAL_DATE_TYPE, + new DecimalType(38, 18), + LocalTimeType.LOCAL_DATE_TIME_TYPE + }; + SeaTunnelRowType subRow = new SeaTunnelRowType(subField, subFieldTypes); + + String[] fieldNames = { + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_bytes", + "c_date", + "c_decimal", + "c_timestamp", + "c_row" + }; + SeaTunnelDataType[] fieldTypes = { + new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE), + ArrayType.STRING_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.INT_TYPE, + BasicType.INT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + BasicType.BYTE_TYPE, + LocalTimeType.LOCAL_DATE_TYPE, + new DecimalType(38, 18), + LocalTimeType.LOCAL_DATE_TIME_TYPE, + subRow + }; + SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, fieldTypes); + return rowType; + } + + @Test + public void testConverter() { + + SeaTunnelRowType rowType = buildSeaTunnelRowType(); + SeaTunnelRow seaTunnelRow = buildSeaTunnelRow(); + RowToAvroConverter rowToAvroConverter = new RowToAvroConverter(rowType); + GenericRecord record = rowToAvroConverter.convertRowToGenericRecord(seaTunnelRow); + + AvroToRowConverter avroToRowConverter = new AvroToRowConverter(); + SeaTunnelRow converterRow = avroToRowConverter.converter(record, rowType); + + Assertions.assertEquals(converterRow, seaTunnelRow); + } +} diff --git a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java new file mode 100644 index 00000000000..3291317bf8b --- /dev/null +++ b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.avro; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; + +class AvroSerializationSchemaTest { + + private SeaTunnelRow buildSeaTunnelRow() { + SeaTunnelRow subSeaTunnelRow = new SeaTunnelRow(14); + Map map = new HashMap(); + map.put("k1", "v1"); + map.put("k2", "v2"); + String[] strArray = new String[] {"l1", "l2"}; + byte byteVal = 100; + LocalDate localDate = LocalDate.of(2023, 1, 1); + BigDecimal bigDecimal = new BigDecimal("61592600349703735722.724745739637773662"); + LocalDateTime localDateTime = LocalDateTime.of(2023, 1, 1, 6, 30, 40); + + subSeaTunnelRow.setField(0, map); + subSeaTunnelRow.setField(1, strArray); + subSeaTunnelRow.setField(2, "strVal"); + subSeaTunnelRow.setField(3, true); + subSeaTunnelRow.setField(4, 1); + subSeaTunnelRow.setField(5, 2); + subSeaTunnelRow.setField(6, 3); + subSeaTunnelRow.setField(7, Long.MAX_VALUE - 1); + subSeaTunnelRow.setField(8, 33.333F); + subSeaTunnelRow.setField(9, 123.456); + subSeaTunnelRow.setField(10, byteVal); + subSeaTunnelRow.setField(11, localDate); + subSeaTunnelRow.setField(12, bigDecimal); + subSeaTunnelRow.setField(13, localDateTime); + + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(15); + seaTunnelRow.setField(0, map); + seaTunnelRow.setField(1, strArray); + seaTunnelRow.setField(2, "strVal"); + seaTunnelRow.setField(3, true); + seaTunnelRow.setField(4, 1); + seaTunnelRow.setField(5, 2); + seaTunnelRow.setField(6, 3); + seaTunnelRow.setField(7, Long.MAX_VALUE - 1); + seaTunnelRow.setField(8, 33.333F); + seaTunnelRow.setField(9, 123.456); + seaTunnelRow.setField(10, byteVal); + seaTunnelRow.setField(11, localDate); + seaTunnelRow.setField(12, bigDecimal); + seaTunnelRow.setField(13, localDateTime); + seaTunnelRow.setField(14, subSeaTunnelRow); + return seaTunnelRow; + } + + private SeaTunnelRowType buildSeaTunnelRowType() { + String[] subField = { + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_bytes", + "c_date", + "c_decimal", + "c_timestamp" + }; + SeaTunnelDataType[] subFieldTypes = { + new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE), + ArrayType.STRING_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.INT_TYPE, + BasicType.INT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + BasicType.BYTE_TYPE, + LocalTimeType.LOCAL_DATE_TYPE, + new DecimalType(38, 18), + LocalTimeType.LOCAL_DATE_TIME_TYPE + }; + SeaTunnelRowType subRow = new SeaTunnelRowType(subField, subFieldTypes); + + String[] fieldNames = { + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_bytes", + "c_date", + "c_decimal", + "c_timestamp", + "c_row" + }; + SeaTunnelDataType[] fieldTypes = { + new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE), + ArrayType.STRING_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.INT_TYPE, + BasicType.INT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + BasicType.BYTE_TYPE, + LocalTimeType.LOCAL_DATE_TYPE, + new DecimalType(38, 18), + LocalTimeType.LOCAL_DATE_TIME_TYPE, + subRow + }; + SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, fieldTypes); + return rowType; + } + + @Test + public void testSerialization() { + SeaTunnelRowType rowType = buildSeaTunnelRowType(); + SeaTunnelRow seaTunnelRow = buildSeaTunnelRow(); + AvroSerializationSchema serializationSchema = new AvroSerializationSchema(rowType); + byte[] serialize = serializationSchema.serialize(seaTunnelRow); + assert serialize.length > 0; + } +} From 07eb66d854c4421ed8939d0e2b7db8f5f5cf1b48 Mon Sep 17 00:00:00 2001 From: Jarvis Date: Tue, 27 Jun 2023 08:35:58 +0800 Subject: [PATCH 2/3] [Feature] fix code style and update avro version Signed-off-by: Jarvis --- docs/en/connector-v2/sink/Kafka.md | 4 +- docs/en/connector-v2/source/kafka.md | 2 +- .../seatunnel/kafka/source/KafkaSource.java | 1 + seatunnel-dist/release-docs/LICENSE | 3 +- .../e2e/connector/kafka/KafkaIT.java | 774 +++++++++--------- .../resources/avro/kafka_avro_to_console.conf | 1 + .../seatunnel-format-avro/pom.xml | 2 +- .../avro/AvroDeserializationSchema.java | 5 +- .../format/avro/AvroSerializationSchema.java | 48 +- .../format/avro/AvroToRowConverter.java | 15 +- .../format/avro/RowToAvroConverter.java | 9 +- tools/dependencies/known-dependencies.txt | 1 + 12 files changed, 429 insertions(+), 436 deletions(-) diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md index 4dbd3a84ce7..c3bfa4a5dff 100644 --- a/docs/en/connector-v2/sink/Kafka.md +++ b/docs/en/connector-v2/sink/Kafka.md @@ -32,7 +32,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on Kafka Topic. -Currently two formats are supported: +Currently, two formats are supported: 1. Fill in the name of the topic. @@ -108,7 +108,7 @@ Kafka distinguishes different transactions by different transactionId. This para ### format -Data format. The default format is json. Optional text format. The default field separator is ",". +Data format. The default format is json. Optional text, avro format. The default field separator is ",". If you customize the delimiter, add the "field_delimiter" option. ### field_delimiter diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index 06f60af6d87..a45b61f2858 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -73,7 +73,7 @@ The structure of the data, including field names and field types. ## format -Data format. The default format is json. Optional text format. The default field separator is ", ". +Data format. The default format is json. Optional text, avro format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option. ## format_error_handle_way diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java index b2aaecc6e72..e4ebdd78f3e 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java @@ -269,6 +269,7 @@ private void setDeserialization(Config config) { break; case AVRO: deserializationSchema = new AvroDeserializationSchema(typeInfo); + break; default: throw new SeaTunnelJsonFormatException( CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format); diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE index 5d19a35a874..2894b67dfbf 100644 --- a/seatunnel-dist/release-docs/LICENSE +++ b/seatunnel-dist/release-docs/LICENSE @@ -220,7 +220,8 @@ The text of each license is the standard Apache 2.0 license. (Apache License 2.0) aircompressor (io.airlift:aircompressor:0.10 - http://github.com/airlift/aircompressor) (Apache License, Version 2.0) Apache Yetus - Audience Annotations (org.apache.yetus:audience-annotations:0.11.0 - https://yetus.apache.org/audience-annotations) (The Apache Software License, Version 2.0) Apache Avro (org.apache.avro:avro:1.8.2 - http://avro.apache.org) - (Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/) + (The Apache Software License, Version 2.0) Apache Avro (org.apache.avro:avro:1.10.2 - http://avro.apache.org) + (Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/) (Apache License, Version 2.0) Apache Commons Collections (org.apache.commons:commons-collections4:4.4 - https://commons.apache.org/proper/commons-collections/) (Apache License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.20 - https://commons.apache.org/proper/commons-compress/) (The Apache Software License, Version 2.0) Commons Lang (commons-lang:commons-lang:2.6 - http://commons.apache.org/lang/) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 348b1e26f46..7e2bf30c93f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.e2e.connector.kafka; -import org.apache.seatunnel.format.text.TextSerializationSchema; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; @@ -36,6 +35,7 @@ import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.format.text.TextSerializationSchema; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -80,394 +80,388 @@ @Slf4j @DisabledOnContainer( - value = {}, - disabledReason = "Override TestSuiteBase @DisabledOnContainer") + value = {}, + disabledReason = "Override TestSuiteBase @DisabledOnContainer") public class KafkaIT extends TestSuiteBase implements TestResource { - private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9"; - - private static final String KAFKA_HOST = "kafkaCluster"; - - private static final MessageFormat DEFAULT_FORMAT = MessageFormat.JSON; - - private static final String DEFAULT_FIELD_DELIMITER = ","; - - private KafkaProducer producer; - - private KafkaContainer kafkaContainer; - - @BeforeAll - @Override - public void startUp() throws Exception { - kafkaContainer = - new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) - .withNetwork(NETWORK) - .withNetworkAliases(KAFKA_HOST) - .withLogConsumer( - new Slf4jLogConsumer( - DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); - Startables.deepStart(Stream.of(kafkaContainer)).join(); - log.info("Kafka container started"); - Awaitility.given() - .ignoreExceptions() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(this::initKafkaProducer); - - log.info("Write 100 records to topic test_topic_source"); - DefaultSeaTunnelRowSerializer serializer = - DefaultSeaTunnelRowSerializer.create( - "test_topic_source", - SEATUNNEL_ROW_TYPE, - DEFAULT_FORMAT, - DEFAULT_FIELD_DELIMITER); - generateTestData(serializer::serializeRow, 0, 100); - } - - @AfterAll - @Override - public void tearDown() throws Exception { - if (producer != null) { - producer.close(); - } - if (kafkaContainer != null) { - kafkaContainer.close(); - } - } - - @TestTemplate - public void testSinkKafka(TestContainer container) throws IOException, - InterruptedException { - Container.ExecResult execResult = - container.executeJob("/kafka_sink_fake_to_kafka.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - - String topicName = "test_topic"; - Map data = getKafkaConsumerData(topicName); - ObjectMapper objectMapper = new ObjectMapper(); - String key = data.keySet().iterator().next(); - ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class); - Assertions.assertTrue(objectNode.has("c_map")); - Assertions.assertTrue(objectNode.has("c_string")); - Assertions.assertEquals(10, data.size()); - } - - @TestTemplate - public void testTextFormatSinkKafka(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/textFormatIT/fake_source_to_text_sink_kafka.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - - String topicName = "test_text_topic"; - Map data = getKafkaConsumerData(topicName); - Assertions.assertEquals(10, data.size()); - } - - @TestTemplate - public void testDefaultRandomSinkKafka(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/kafka_default_sink_fake_to_kafka.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - - String topicName = "topic_default_sink_test"; - List data = getKafkaConsumerListData(topicName); - Assertions.assertEquals(10, data.size()); - } - - @TestTemplate - public void testExtractTopicFunction(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/extractTopic_fake_to_kafka.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - - String topicName = "test_extract_topic"; - Map data = getKafkaConsumerData(topicName); - ObjectMapper objectMapper = new ObjectMapper(); - String key = data.keySet().iterator().next(); - ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class); - Assertions.assertTrue(objectNode.has("c_map")); - Assertions.assertTrue(objectNode.has("c_string")); - Assertions.assertEquals(10, data.size()); - } - - @TestTemplate - public void testSourceKafkaTextToConsole(TestContainer container) - throws IOException, InterruptedException { - TextSerializationSchema serializer = - TextSerializationSchema.builder() - .seaTunnelRowType(SEATUNNEL_ROW_TYPE) - .delimiter(",") - .build(); - generateTestData( - row -> new ProducerRecord<>("test_topic_text", null, - serializer.serialize(row)), - 0, - 100); - Container.ExecResult execResult = - container.executeJob("/textFormatIT/kafka_source_text_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - @TestTemplate - public void testSourceKafkaJsonToConsole(TestContainer container) - throws IOException, InterruptedException { - DefaultSeaTunnelRowSerializer serializer = - DefaultSeaTunnelRowSerializer.create( - "test_topic_json", - SEATUNNEL_ROW_TYPE, - DEFAULT_FORMAT, - DEFAULT_FIELD_DELIMITER); - generateTestData(row -> serializer.serializeRow(row), 0, 100); - Container.ExecResult execResult = - container.executeJob("/jsonFormatIT/kafka_source_json_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - @TestTemplate - public void testSourceKafkaJsonFormatErrorHandleWaySkipToConsole(TestContainer container) - throws IOException, InterruptedException { - DefaultSeaTunnelRowSerializer serializer = - DefaultSeaTunnelRowSerializer.create( - "test_topic_error_message", - SEATUNNEL_ROW_TYPE, - DEFAULT_FORMAT, - DEFAULT_FIELD_DELIMITER); - generateTestData(row -> serializer.serializeRow(row), 0, 100); - Container.ExecResult execResult = - container.executeJob( - "/kafka/kafkasource_format_error_handle_way_skip_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - @TestTemplate - public void testSourceKafkaJsonFormatErrorHandleWayFailToConsole(TestContainer container) - throws IOException, InterruptedException { - DefaultSeaTunnelRowSerializer serializer = - DefaultSeaTunnelRowSerializer.create( - "test_topic_error_message", - SEATUNNEL_ROW_TYPE, - DEFAULT_FORMAT, - DEFAULT_FIELD_DELIMITER); - generateTestData(row -> serializer.serializeRow(row), 0, 100); - Container.ExecResult execResult = - container.executeJob( - "/kafka/kafkasource_format_error_handle_way_fail_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - @TestTemplate - public void testSourceKafka(TestContainer container) throws IOException, - InterruptedException { - testKafkaLatestToConsole(container); - testKafkaEarliestToConsole(container); - testKafkaSpecificOffsetsToConsole(container); - testKafkaTimestampToConsole(container); - } - - @TestTemplate - public void testSourceKafkaStartConfig(TestContainer container) - throws IOException, InterruptedException { - DefaultSeaTunnelRowSerializer serializer = - DefaultSeaTunnelRowSerializer.create( - "test_topic_group", - SEATUNNEL_ROW_TYPE, - DEFAULT_FORMAT, - DEFAULT_FIELD_DELIMITER); - generateTestData(row -> serializer.serializeRow(row), 100, 150); - testKafkaGroupOffsetsToConsole(container); - } - - @TestTemplate - public void testFakeSourceToKafkaAvroFormat(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/avro/fake_source_to_kafka_avro_format.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - @TestTemplate - public void testKafkaAvroToConsole(TestContainer container) throws IOException, InterruptedException { - DefaultSeaTunnelRowSerializer serializer = - DefaultSeaTunnelRowSerializer.create( - "test_avro_topic", - SEATUNNEL_ROW_TYPE, - MessageFormat.AVRO, - DEFAULT_FIELD_DELIMITER); - generateTestData(row -> serializer.serializeRow(row), 0, 100); - Container.ExecResult execResult = - container.executeJob( - "/avro/kafka_avro_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - public void testKafkaLatestToConsole(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/kafka/kafkasource_latest_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - public void testKafkaEarliestToConsole(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/kafka/kafkasource_earliest_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - public void testKafkaSpecificOffsetsToConsole(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/kafka/kafkasource_specific_offsets_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - public void testKafkaGroupOffsetsToConsole(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/kafka/kafkasource_group_offset_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - public void testKafkaTimestampToConsole(TestContainer container) - throws IOException, InterruptedException { - Container.ExecResult execResult = - container.executeJob("/kafka/kafkasource_timestamp_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - - private void initKafkaProducer() { - Properties props = new Properties(); - String bootstrapServers = kafkaContainer.getBootstrapServers(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producer = new KafkaProducer<>(props); - } - - private Properties kafkaConsumerConfig() { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "seatunnel-kafka-sink-group"); - props.put( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - OffsetResetStrategy.EARLIEST.toString().toLowerCase()); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - return props; - } - - @SuppressWarnings("checkstyle:Indentation") - private void generateTestData(ProducerRecordConverter converter, int start, int end) { - for (int i = start; i < end; i++) { - SeaTunnelRow row = - new SeaTunnelRow( - new Object[]{ - Long.valueOf(i), - Collections.singletonMap("key", Short.parseShort("1")), - new Byte[]{Byte.parseByte("1")}, - "string", - Boolean.FALSE, - Byte.parseByte("1"), - Short.parseShort("1"), - Integer.parseInt("1"), - Long.parseLong("1"), - Float.parseFloat("1.1"), - Double.parseDouble("1.1"), - BigDecimal.valueOf(11, 1), - "test".getBytes(), - LocalDate.now(), - LocalDateTime.now() - }); - ProducerRecord producerRecord = converter.convert(row); - producer.send(producerRecord); - } - } - - private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE = - new SeaTunnelRowType( - new String[]{ - "id", - "c_map", - "c_array", - "c_string", - "c_boolean", - "c_tinyint", - "c_smallint", - "c_int", - "c_bigint", - "c_float", - "c_double", - "c_decimal", - "c_bytes", - "c_date", - "c_timestamp" - }, - new SeaTunnelDataType[]{ - BasicType.LONG_TYPE, - new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), - ArrayType.BYTE_ARRAY_TYPE, - BasicType.STRING_TYPE, - BasicType.BOOLEAN_TYPE, - BasicType.BYTE_TYPE, - BasicType.SHORT_TYPE, - BasicType.INT_TYPE, - BasicType.LONG_TYPE, - BasicType.FLOAT_TYPE, - BasicType.DOUBLE_TYPE, - new DecimalType(2, 1), - PrimitiveByteArrayType.INSTANCE, - LocalTimeType.LOCAL_DATE_TYPE, - LocalTimeType.LOCAL_DATE_TIME_TYPE - }); - - private Map getKafkaConsumerData(String topicName) { - Map data = new HashMap<>(); - try (KafkaConsumer consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { - consumer.subscribe(Arrays.asList(topicName)); - Map offsets = - consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); - Long endOffset = offsets.entrySet().iterator().next().getValue(); - Long lastProcessedOffset = -1L; - - do { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - if (lastProcessedOffset < record.offset()) { - data.put(record.key(), record.value()); - } - lastProcessedOffset = record.offset(); - } - } while (lastProcessedOffset < endOffset - 1); - } - return data; - } - - private List getKafkaConsumerListData(String topicName) { - List data = new ArrayList<>(); - try (KafkaConsumer consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { - consumer.subscribe(Arrays.asList(topicName)); - Map offsets = - consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); - Long endOffset = offsets.entrySet().iterator().next().getValue(); - Long lastProcessedOffset = -1L; - - do { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - if (lastProcessedOffset < record.offset()) { - data.add(record.value()); - } - lastProcessedOffset = record.offset(); - } - } while (lastProcessedOffset < endOffset - 1); - } - return data; - } - - interface ProducerRecordConverter { - ProducerRecord convert(SeaTunnelRow row); - } + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9"; + + private static final String KAFKA_HOST = "kafkaCluster"; + + private static final MessageFormat DEFAULT_FORMAT = MessageFormat.JSON; + + private static final String DEFAULT_FIELD_DELIMITER = ","; + + private KafkaProducer producer; + + private KafkaContainer kafkaContainer; + + @BeforeAll + @Override + public void startUp() throws Exception { + kafkaContainer = + new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); + Startables.deepStart(Stream.of(kafkaContainer)).join(); + log.info("Kafka container started"); + Awaitility.given() + .ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initKafkaProducer); + + log.info("Write 100 records to topic test_topic_source"); + DefaultSeaTunnelRowSerializer serializer = + DefaultSeaTunnelRowSerializer.create( + "test_topic_source", + SEATUNNEL_ROW_TYPE, + DEFAULT_FORMAT, + DEFAULT_FIELD_DELIMITER); + generateTestData(serializer::serializeRow, 0, 100); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (producer != null) { + producer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + } + + @TestTemplate + public void testSinkKafka(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/kafka_sink_fake_to_kafka.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + String topicName = "test_topic"; + Map data = getKafkaConsumerData(topicName); + ObjectMapper objectMapper = new ObjectMapper(); + String key = data.keySet().iterator().next(); + ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class); + Assertions.assertTrue(objectNode.has("c_map")); + Assertions.assertTrue(objectNode.has("c_string")); + Assertions.assertEquals(10, data.size()); + } + + @TestTemplate + public void testTextFormatSinkKafka(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/textFormatIT/fake_source_to_text_sink_kafka.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + String topicName = "test_text_topic"; + Map data = getKafkaConsumerData(topicName); + Assertions.assertEquals(10, data.size()); + } + + @TestTemplate + public void testDefaultRandomSinkKafka(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafka_default_sink_fake_to_kafka.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + String topicName = "topic_default_sink_test"; + List data = getKafkaConsumerListData(topicName); + Assertions.assertEquals(10, data.size()); + } + + @TestTemplate + public void testExtractTopicFunction(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/extractTopic_fake_to_kafka.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + String topicName = "test_extract_topic"; + Map data = getKafkaConsumerData(topicName); + ObjectMapper objectMapper = new ObjectMapper(); + String key = data.keySet().iterator().next(); + ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class); + Assertions.assertTrue(objectNode.has("c_map")); + Assertions.assertTrue(objectNode.has("c_string")); + Assertions.assertEquals(10, data.size()); + } + + @TestTemplate + public void testSourceKafkaTextToConsole(TestContainer container) + throws IOException, InterruptedException { + TextSerializationSchema serializer = + TextSerializationSchema.builder() + .seaTunnelRowType(SEATUNNEL_ROW_TYPE) + .delimiter(",") + .build(); + generateTestData( + row -> new ProducerRecord<>("test_topic_text", null, serializer.serialize(row)), + 0, + 100); + Container.ExecResult execResult = + container.executeJob("/textFormatIT/kafka_source_text_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + @TestTemplate + public void testSourceKafkaJsonToConsole(TestContainer container) + throws IOException, InterruptedException { + DefaultSeaTunnelRowSerializer serializer = + DefaultSeaTunnelRowSerializer.create( + "test_topic_json", + SEATUNNEL_ROW_TYPE, + DEFAULT_FORMAT, + DEFAULT_FIELD_DELIMITER); + generateTestData(row -> serializer.serializeRow(row), 0, 100); + Container.ExecResult execResult = + container.executeJob("/jsonFormatIT/kafka_source_json_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + @TestTemplate + public void testSourceKafkaJsonFormatErrorHandleWaySkipToConsole(TestContainer container) + throws IOException, InterruptedException { + DefaultSeaTunnelRowSerializer serializer = + DefaultSeaTunnelRowSerializer.create( + "test_topic_error_message", + SEATUNNEL_ROW_TYPE, + DEFAULT_FORMAT, + DEFAULT_FIELD_DELIMITER); + generateTestData(row -> serializer.serializeRow(row), 0, 100); + Container.ExecResult execResult = + container.executeJob( + "/kafka/kafkasource_format_error_handle_way_skip_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + @TestTemplate + public void testSourceKafkaJsonFormatErrorHandleWayFailToConsole(TestContainer container) + throws IOException, InterruptedException { + DefaultSeaTunnelRowSerializer serializer = + DefaultSeaTunnelRowSerializer.create( + "test_topic_error_message", + SEATUNNEL_ROW_TYPE, + DEFAULT_FORMAT, + DEFAULT_FIELD_DELIMITER); + generateTestData(row -> serializer.serializeRow(row), 0, 100); + Container.ExecResult execResult = + container.executeJob( + "/kafka/kafkasource_format_error_handle_way_fail_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + @TestTemplate + public void testSourceKafka(TestContainer container) throws IOException, InterruptedException { + testKafkaLatestToConsole(container); + testKafkaEarliestToConsole(container); + testKafkaSpecificOffsetsToConsole(container); + testKafkaTimestampToConsole(container); + } + + @TestTemplate + public void testSourceKafkaStartConfig(TestContainer container) + throws IOException, InterruptedException { + DefaultSeaTunnelRowSerializer serializer = + DefaultSeaTunnelRowSerializer.create( + "test_topic_group", + SEATUNNEL_ROW_TYPE, + DEFAULT_FORMAT, + DEFAULT_FIELD_DELIMITER); + generateTestData(row -> serializer.serializeRow(row), 100, 150); + testKafkaGroupOffsetsToConsole(container); + } + + @TestTemplate + public void testFakeSourceToKafkaAvroFormat(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/avro/fake_source_to_kafka_avro_format.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + @TestTemplate + public void testKafkaAvroToConsole(TestContainer container) + throws IOException, InterruptedException { + DefaultSeaTunnelRowSerializer serializer = + DefaultSeaTunnelRowSerializer.create( + "test_avro_topic", + SEATUNNEL_ROW_TYPE, + MessageFormat.AVRO, + DEFAULT_FIELD_DELIMITER); + generateTestData(row -> serializer.serializeRow(row), 0, 100); + Container.ExecResult execResult = container.executeJob("/avro/kafka_avro_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaLatestToConsole(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafka/kafkasource_latest_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaEarliestToConsole(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafka/kafkasource_earliest_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaSpecificOffsetsToConsole(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafka/kafkasource_specific_offsets_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaGroupOffsetsToConsole(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafka/kafkasource_group_offset_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaTimestampToConsole(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafka/kafkasource_timestamp_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + private void initKafkaProducer() { + Properties props = new Properties(); + String bootstrapServers = kafkaContainer.getBootstrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(props); + } + + private Properties kafkaConsumerConfig() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "seatunnel-kafka-sink-group"); + props.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.EARLIEST.toString().toLowerCase()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return props; + } + + @SuppressWarnings("checkstyle:Indentation") + private void generateTestData(ProducerRecordConverter converter, int start, int end) { + for (int i = start; i < end; i++) { + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + Long.valueOf(i), + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[] {Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.now(), + LocalDateTime.now() + }); + ProducerRecord producerRecord = converter.convert(row); + producer.send(producerRecord); + } + } + + private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE = + new SeaTunnelRowType( + new String[] { + "id", + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_bytes", + "c_date", + "c_timestamp" + }, + new SeaTunnelDataType[] { + BasicType.LONG_TYPE, + new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), + ArrayType.BYTE_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + new DecimalType(2, 1), + PrimitiveByteArrayType.INSTANCE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + }); + + private Map getKafkaConsumerData(String topicName) { + Map data = new HashMap<>(); + try (KafkaConsumer consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + consumer.subscribe(Arrays.asList(topicName)); + Map offsets = + consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); + Long endOffset = offsets.entrySet().iterator().next().getValue(); + Long lastProcessedOffset = -1L; + + do { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + if (lastProcessedOffset < record.offset()) { + data.put(record.key(), record.value()); + } + lastProcessedOffset = record.offset(); + } + } while (lastProcessedOffset < endOffset - 1); + } + return data; + } + + private List getKafkaConsumerListData(String topicName) { + List data = new ArrayList<>(); + try (KafkaConsumer consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + consumer.subscribe(Arrays.asList(topicName)); + Map offsets = + consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); + Long endOffset = offsets.entrySet().iterator().next().getValue(); + Long lastProcessedOffset = -1L; + + do { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + if (lastProcessedOffset < record.offset()) { + data.add(record.value()); + } + lastProcessedOffset = record.offset(); + } + } while (lastProcessedOffset < endOffset - 1); + } + return data; + } + + interface ProducerRecordConverter { + ProducerRecord convert(SeaTunnelRow row); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf index 1b2dadf6a93..3fc1a57c3d7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf @@ -33,6 +33,7 @@ source { topic = "test_avro_topic" result_table_name = "kafka_table" kafka.auto.offset.reset = "earliest" + format = avro format_error_handle_way = skip schema = { fields { diff --git a/seatunnel-formats/seatunnel-format-avro/pom.xml b/seatunnel-formats/seatunnel-format-avro/pom.xml index 7e50a113b75..232af650c59 100644 --- a/seatunnel-formats/seatunnel-format-avro/pom.xml +++ b/seatunnel-formats/seatunnel-format-avro/pom.xml @@ -26,7 +26,7 @@ SeaTunnel : Formats : Avro - 1.11.1 + 1.10.2 diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java index e554e17e901..3d2c3fba595 100644 --- a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java @@ -24,7 +24,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; import java.io.IOException; @@ -35,18 +34,16 @@ public class AvroDeserializationSchema implements DeserializationSchema reader; public AvroDeserializationSchema(SeaTunnelRowType rowType) { this.rowType = rowType; this.converter = new AvroToRowConverter(); - this.reader = converter.getReader(); } @Override public SeaTunnelRow deserialize(byte[] message) throws IOException { BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(message, null); - GenericRecord record = reader.read(null, decoder); + GenericRecord record = this.converter.getReader().read(null, decoder); return converter.converter(record, rowType); } diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java index 02f6bbc8eec..92d37bec601 100644 --- a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java @@ -34,31 +34,31 @@ public class AvroSerializationSchema implements SerializationSchema { - private static final long serialVersionUID = 4438784443025715370L; + private static final long serialVersionUID = 4438784443025715370L; - private final ByteArrayOutputStream out; - private final BinaryEncoder encoder; - private final RowToAvroConverter converter; - private final DatumWriter writer; + private final ByteArrayOutputStream out; + private final BinaryEncoder encoder; + private final RowToAvroConverter converter; + private final DatumWriter writer; - public AvroSerializationSchema(SeaTunnelRowType rowType) { - this.out = new ByteArrayOutputStream(); - this.encoder = EncoderFactory.get().binaryEncoder(out, null); - this.converter = new RowToAvroConverter(rowType); - this.writer = this.converter.getWriter(); - } + public AvroSerializationSchema(SeaTunnelRowType rowType) { + this.out = new ByteArrayOutputStream(); + this.encoder = EncoderFactory.get().binaryEncoder(out, null); + this.converter = new RowToAvroConverter(rowType); + this.writer = this.converter.getWriter(); + } - @Override - public byte[] serialize(SeaTunnelRow element) { - GenericRecord record = converter.convertRowToGenericRecord(element); - try { - out.reset(); - writer.write(record, encoder); - encoder.flush(); - return out.toByteArray(); - } catch (IOException e) { - throw new SeaTunnelAvroFormatException( - AvroFormatErrorCode.SERIALIZATION_ERROR, e.toString()); - } - } + @Override + public byte[] serialize(SeaTunnelRow element) { + GenericRecord record = converter.convertRowToGenericRecord(element); + try { + out.reset(); + writer.write(record, encoder); + encoder.flush(); + return out.toByteArray(); + } catch (IOException e) { + throw new SeaTunnelAvroFormatException( + AvroFormatErrorCode.SERIALIZATION_ERROR, e.toString()); + } + } } diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java index bcc4726072d..898793c2413 100644 --- a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java @@ -47,13 +47,14 @@ public class AvroToRowConverter implements Serializable { private static final long serialVersionUID = 8177020083886379563L; - private final DatumReader reader; + private DatumReader reader = null; - public AvroToRowConverter() { - this.reader = createReader(); - } + public AvroToRowConverter() {} public DatumReader getReader() { + if (reader == null) { + reader = createReader(); + } return reader; } @@ -63,7 +64,7 @@ private DatumReader createReader() { datumReader.getData().addLogicalTypeConversion(new TimeConversions.DateConversion()); datumReader .getData() - .addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); + .addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); return datumReader; } @@ -72,7 +73,7 @@ public SeaTunnelRow converter(GenericRecord record, SeaTunnelRowType rowType) { Object[] values = new Object[fieldNames.length]; for (int i = 0; i < fieldNames.length; i++) { - if (!record.hasField(fieldNames[i])) { + if (record.getSchema().getField(fieldNames[i]) == null) { values[i] = null; continue; } @@ -126,7 +127,7 @@ private Object convertField(SeaTunnelDataType dataType, Schema.Field field, O default: String errorMsg = String.format( - "SeaTunnel file connector is not supported for this data type [%s]", + "SeaTunnel avro format is not supported for this data type [%s]", dataType.getSqlType()); throw new SeaTunnelAvroFormatException( AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg); diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java index 21a39f5952a..c237afdd860 100644 --- a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/RowToAvroConverter.java @@ -37,8 +37,6 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.io.DatumWriter; -import lombok.extern.slf4j.Slf4j; - import java.io.Serializable; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -68,7 +66,7 @@ private DatumWriter createWriter() { datumWriter.getData().addLogicalTypeConversion(new TimeConversions.DateConversion()); datumWriter .getData() - .addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); + .addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); return datumWriter; } @@ -150,8 +148,7 @@ private Schema seaTunnelDataType2AvroDataType( LogicalTypes.Decimal decimal = LogicalTypes.decimal(precision, scale); return decimal.addToSchema(Schema.create(Schema.Type.BYTES)); case TIMESTAMP: - return LogicalTypes.localTimestampMillis() - .addToSchema(Schema.create(Schema.Type.LONG)); + return LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); case DATE: return LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); case NULL: @@ -159,7 +156,7 @@ private Schema seaTunnelDataType2AvroDataType( default: String errorMsg = String.format( - "SeaTunnel file connector is not supported for this data type [%s]", + "SeaTunnel avro format is not supported for this data type [%s]", seaTunnelDataType.getSqlType()); throw new SeaTunnelAvroFormatException( AvroFormatErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg); diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 3a1e736b68b..f9cb4a009f1 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -38,3 +38,4 @@ listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar accessors-smart-2.4.7.jar asm-9.1.jar json-smart-2.4.7.jar +avro-1.10.2.jar \ No newline at end of file From 45937247673dfc01111de27f212c6d54881d4103 Mon Sep 17 00:00:00 2001 From: jarvis Date: Mon, 10 Jul 2023 19:56:26 +0800 Subject: [PATCH 3/3] [Feature] update actions config to run CI on fork repo Signed-off-by: jarvis --- .github/workflows/backend.yml | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index d719051013b..c1ea9bb4fe1 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -18,9 +18,9 @@ name: Backend on: push: + pull_request: branches: - dev - pull_request: paths-ignore: - 'docs/**' - '**/*.md' @@ -32,7 +32,7 @@ concurrency: jobs: license-header: - if: github.repository == 'apache/seatunnel' + if: github.repository == '${{github.actor}}/seatunnel' name: License header runs-on: ubuntu-latest timeout-minutes: 10 @@ -44,7 +44,7 @@ jobs: uses: apache/skywalking-eyes@985866ce7e324454f61e22eb2db2e998db09d6f3 code-style: - if: github.repository == 'apache/seatunnel' + if: github.repository == '${{github.actor}}/seatunnel' name: Code style runs-on: ubuntu-latest timeout-minutes: 10 @@ -56,7 +56,7 @@ jobs: run: ./mvnw --batch-mode --quiet --no-snapshot-updates clean spotless:check dead-link: - if: github.repository == 'apache/seatunnel' + if: github.repository == '${{github.actor}}/seatunnel' name: Dead links runs-on: ubuntu-latest timeout-minutes: 30 @@ -69,7 +69,7 @@ jobs: done sanity-check: - if: github.repository == 'apache/seatunnel' + if: github.repository == '${{github.actor}}/seatunnel' name: Sanity check results needs: [ license-header, code-style, dead-link ] runs-on: ubuntu-latest @@ -83,8 +83,7 @@ jobs: changes: runs-on: ubuntu-latest - # To prevent error when there's no base branch - if: github.repository == 'apache/seatunnel' + if: github.repository == '${{github.actor}}/seatunnel' timeout-minutes: 10 outputs: api: ${{ steps.filter.outputs.api }}