From 526ce3e374da996812a9d78ababcc6c39235bc0b Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 7 Jan 2025 21:04:11 +0000 Subject: [PATCH 1/5] Upgrade version of io.debezium --- sdks/java/io/debezium/build.gradle | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/debezium/build.gradle b/sdks/java/io/debezium/build.gradle index e3b88e22607d..5c5c3bcc17c3 100644 --- a/sdks/java/io/debezium/build.gradle +++ b/sdks/java/io/debezium/build.gradle @@ -60,9 +60,9 @@ dependencies { permitUnusedDeclared "org.apache.kafka:connect-json:2.5.0" // BEAM-11761 // Debezium dependencies - implementation group: 'io.debezium', name: 'debezium-core', version: '1.3.1.Final' - testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '1.3.1.Final' - testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '1.3.1.Final' + implementation group: 'io.debezium', name: 'debezium-core', version: '1.9.8.Final' + testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '1.9.8.Final' + testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '1.9.8.Final' } test { From d76865855b10b8ad05a8cecf472c8f1e56f0d0ee Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 7 Jan 2025 21:25:01 +0000 Subject: [PATCH 2/5] Update exception type --- .../debezium/DebeziumReadSchemaTransformTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java index c4b5d2d1f890..60d2fdd779a6 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertThrows; +import io.debezium.DebeziumException; import java.time.Duration; import java.util.Arrays; import java.util.stream.Collectors; @@ -28,7 +29,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.kafka.connect.errors.ConnectException; import org.hamcrest.Matchers; import org.junit.ClassRule; import org.junit.Test; @@ -130,9 +130,9 @@ public void testNoProblem() { @Test public void testWrongUser() { Pipeline readPipeline = Pipeline.create(); - ConnectException ex = + DebeziumException ex = assertThrows( - ConnectException.class, + DebeziumException.class, () -> { PCollectionRowTuple.empty(readPipeline) .apply( @@ -151,9 +151,9 @@ public void testWrongUser() { @Test public void testWrongPassword() { Pipeline readPipeline = Pipeline.create(); - ConnectException ex = + DebeziumException ex = assertThrows( - ConnectException.class, + DebeziumException.class, () -> { PCollectionRowTuple.empty(readPipeline) .apply( @@ -172,9 +172,9 @@ public void testWrongPassword() { @Test public void testWrongPort() { Pipeline readPipeline = Pipeline.create(); - ConnectException ex = + DebeziumException ex = assertThrows( - ConnectException.class, + DebeziumException.class, () -> { PCollectionRowTuple.empty(readPipeline) .apply(makePtransform(userName, password, database, 12345, "localhost")) From 1bb59f1fde2137e4191e3af2b05b55a0bbfa8396 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 7 Jan 2025 21:34:12 +0000 Subject: [PATCH 3/5] Update version in expected string --- .../org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java | 2 +- .../org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java index 12ba57bad45d..ab4f71de676d 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java @@ -210,7 +210,7 @@ public void testDebeziumIOMySql() { .withMaxNumberOfRecords(30) .withCoder(StringUtf8Coder.of())); String expected = - "{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\"," + "{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.9.8.Final\",\"name\":\"dbserver1\"," + "\"database\":\"inventory\",\"schema\":\"mysql-bin.000003\",\"table\":\"addresses\"},\"before\":null," + "\"after\":{\"fields\":{\"zip\":\"76036\",\"city\":\"Euless\"," + "\"street\":\"3183 Moore Avenue\",\"id\":10,\"state\":\"Texas\",\"customer_id\":1001," diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index de6c14722898..7fb30a1099aa 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.auth.Credentials; From a70073b35f1e74c36e658f57914319c5e70ba63e Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 8 Jan 2025 18:20:07 +0000 Subject: [PATCH 4/5] More progress towards update --- CHANGES.md | 1 + sdks/java/io/debezium/build.gradle | 6 +++--- .../main/java/org/apache/beam/io/debezium/DebeziumIO.java | 3 ++- .../io/debezium/DebeziumReadSchemaTransformProvider.java | 8 +++++++- .../apache/beam/io/debezium/KafkaSourceConsumerFn.java | 8 ++++---- .../beam/io/debezium/DebeziumReadSchemaTransformTest.java | 3 ++- 6 files changed, 19 insertions(+), 10 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index d5cbb76fb3d5..5f25c4d1b9bf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -71,6 +71,7 @@ ## Breaking Changes * AWS V1 I/Os have been removed (Java). As part of this, x-lang Python Kinesis I/O has been updated to consume the V2 IO and it also no longer supports setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)). +* Debezium IO (Java) has been upgraded from depending on version 1.3.1.Final of io.debezium to 2.7.4.Final. This may cause some breaking changes since the libraries do not maintain full compatibility ([#33526](https://github.com/apache/beam/issues/33526)). ## Deprecations diff --git a/sdks/java/io/debezium/build.gradle b/sdks/java/io/debezium/build.gradle index 5c5c3bcc17c3..1993f8898c8a 100644 --- a/sdks/java/io/debezium/build.gradle +++ b/sdks/java/io/debezium/build.gradle @@ -60,9 +60,9 @@ dependencies { permitUnusedDeclared "org.apache.kafka:connect-json:2.5.0" // BEAM-11761 // Debezium dependencies - implementation group: 'io.debezium', name: 'debezium-core', version: '1.9.8.Final' - testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '1.9.8.Final' - testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '1.9.8.Final' + implementation group: 'io.debezium', name: 'debezium-core', version: '2.7.4.Final' + testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '2.7.4.Final' + testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '2.7.4.Final' } test { diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java index 30ad8a5f9f74..95abb5673974 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java @@ -554,10 +554,11 @@ public Map getConfigurationMap() { configuration.computeIfAbsent(entry.getKey(), k -> entry.getValue()); } - // Set default Database History impl. if not provided + // Set default Database History impl and kafka topic prefix if not provided configuration.computeIfAbsent( "database.history", k -> KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName()); + configuration.computeIfAbsent("topic.prefix", k -> "beam-debezium-connector"); String stringProperties = Joiner.on('\n').withKeyValueSeparator(" -> ").join(configuration); LOG.debug("---------------- Connector configuration: {}", stringProperties); diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java index 9f227708e5e6..3d088121cd84 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java @@ -110,7 +110,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { connectorConfiguration .withConnectionProperty("table.include.list", configuration.getTable()) .withConnectionProperty("include.schema.changes", "false") - .withConnectionProperty("database.server.name", "beam-pipeline-server"); + // add random unique name/identifier for server to identify connector + .withConnectionProperty("database.server.name", "beam-pipeline-server") + .withConnectionProperty("database.server.id", "579676") + .withConnectionProperty( + "schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory") + .withConnectionProperty( + "schema.history.internal.file.filename", "data/schema_history.dat"); if (configuration.getDatabase().equals("POSTGRES")) { LOG.info( "As Database is POSTGRES, we set the `database.dbname` property to {}.", diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java index 0c9632a69926..dd1cbcf18ba6 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java @@ -22,9 +22,9 @@ import io.debezium.document.Document; import io.debezium.document.DocumentReader; import io.debezium.document.DocumentWriter; -import io.debezium.relational.history.AbstractDatabaseHistory; -import io.debezium.relational.history.DatabaseHistoryException; +import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.SchemaHistoryException; import java.io.IOException; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; @@ -430,7 +430,7 @@ public IsBounded isBounded() { } } - public static class DebeziumSDFDatabaseHistory extends AbstractDatabaseHistory { + public static class DebeziumSDFDatabaseHistory extends AbstractSchemaHistory { private List history; public DebeziumSDFDatabaseHistory() { @@ -453,7 +453,7 @@ public void start() { } @Override - protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException { + protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { LOG.debug("------------- Adding history! {}", record); history.add(DocumentWriter.defaultWriter().writeAsBytes(record.document())); diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java index 60d2fdd779a6..de34259bffb2 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java @@ -124,7 +124,8 @@ public void testNoProblem() { result.getSchema().getFields().stream() .map(field -> field.getName()) .collect(Collectors.toList()), - Matchers.containsInAnyOrder("before", "after", "source", "op", "ts_ms", "transaction")); + Matchers.containsInAnyOrder( + "before", "after", "source", "op", "ts_ms", "ts_us", "ts_ns", "transaction")); } @Test From 3d77c8a64129b1c36e71c39f5bf902a56c6a904f Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 8 Jan 2025 19:13:12 +0000 Subject: [PATCH 5/5] Upgrade antlr --- .gitignore | 3 +++ .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 2bad81975ba0..84141bc6d05e 100644 --- a/.gitignore +++ b/.gitignore @@ -34,6 +34,9 @@ sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources sdks/java/maven-archetypes/gcp-bom-examples/src/main/resources/archetype-resources/src/ sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/sample.txt +# Ignore generated debezium data +sdks/java/io/debezium/data/ + # Ignore files generated by the Python build process. **/*.pyc **/*.pyo diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 7b791ef9aa8e..1aea4f00ca09 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -664,8 +664,8 @@ class BeamModulePlugin implements Plugin { activemq_junit : "org.apache.activemq.tooling:activemq-junit:$activemq_version", activemq_kahadb_store : "org.apache.activemq:activemq-kahadb-store:$activemq_version", activemq_mqtt : "org.apache.activemq:activemq-mqtt:$activemq_version", - antlr : "org.antlr:antlr4:4.7", - antlr_runtime : "org.antlr:antlr4-runtime:4.7", + antlr : "org.antlr:antlr4:4.10", + antlr_runtime : "org.antlr:antlr4-runtime:4.10", args4j : "args4j:args4j:2.33", auto_value_annotations : "com.google.auto.value:auto-value-annotations:$autovalue_version", avro : "org.apache.avro:avro:1.11.4",