From 7f450b5dd991d54235bcfa77b945c775d7593e9e Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Fri, 28 Jun 2024 17:14:28 -0400 Subject: [PATCH 01/14] feat(cdc): support sql server cdc --- Cargo.lock | 131 ++--- ci/docker-compose.yml | 2 + ci/scripts/e2e-source-test.sh | 9 + .../sql_server_cdc/sql_server_cdc.slt | 256 ++++++++++ .../sql_server_cdc/sql_server_cdc_insert.sql | 24 + .../sql_server_cdc/sql_server_cdc_prepare.sql | 72 +++ .../connector/api/source/SourceTypeE.java | 3 + .../source/SourceValidateHandler.java | 8 + .../source/common/DbzConnectorConfig.java | 35 ++ .../source/common/DbzSourceUtils.java | 29 +- .../source/common/SqlServerValidator.java | 310 ++++++++++++ .../source/common/ValidatorUtils.java | 4 + .../src/main/resources/sql_server.properties | 24 + .../main/resources/validate_sql.properties | 7 + .../risingwave-source-cdc/pom.xml | 4 + java/pom.xml | 5 + proto/catalog.proto | 2 +- proto/connector_service.proto | 1 + risedev.yml | 1 + src/common/src/types/datetime.rs | 126 ++--- src/common/src/types/decimal.rs | 16 +- src/common/src/types/from_sql.rs | 20 +- src/common/src/types/ordered_float.rs | 6 +- src/common/src/types/timestamptz.rs | 8 +- src/compute/tests/cdc_tests.rs | 2 + src/connector/Cargo.toml | 11 +- src/connector/src/error.rs | 1 + src/connector/src/macros.rs | 3 +- src/connector/src/parser/mod.rs | 2 + src/connector/src/parser/sql_server.rs | 297 +++++++++++ src/connector/src/parser/unified/debezium.rs | 7 + src/connector/src/parser/unified/json.rs | 1 - src/connector/src/sink/sqlserver.rs | 51 +- src/connector/src/source/base.rs | 5 +- .../src/source/cdc/enumerator/mod.rs | 14 +- src/connector/src/source/cdc/external/mod.rs | 40 ++ .../src/source/cdc/external/sql_server.rs | 467 ++++++++++++++++++ src/connector/src/source/cdc/mod.rs | 3 + src/connector/src/source/cdc/source/reader.rs | 5 +- src/connector/src/source/cdc/split.rs | 55 ++- src/connector/src/source/reader/reader.rs | 2 +- src/connector/src/with_options.rs | 10 + src/frontend/src/handler/create_source.rs | 39 +- src/frontend/src/handler/create_table.rs | 12 +- src/license/src/feature.rs | 1 + 45 files changed, 1869 insertions(+), 262 deletions(-) create mode 100644 e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt create mode 100644 e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql create mode 100644 e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql create mode 100644 java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java create mode 100644 java/connector-node/risingwave-connector-service/src/main/resources/sql_server.properties create mode 100644 src/connector/src/parser/sql_server.rs create mode 100644 src/connector/src/source/cdc/external/sql_server.rs diff --git a/Cargo.lock b/Cargo.lock index 220d030807e14..48022ff5efd27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4274,70 +4274,6 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" -[[package]] -name = "encoding" -version = "0.2.33" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b0d943856b990d12d3b55b359144ff341533e516d94098b1d3fc1ac666d36ec" -dependencies = [ - "encoding-index-japanese", - "encoding-index-korean", - "encoding-index-simpchinese", - "encoding-index-singlebyte", - "encoding-index-tradchinese", -] - -[[package]] -name = "encoding-index-japanese" -version = "1.20141219.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04e8b2ff42e9a05335dbf8b5c6f7567e5591d0d916ccef4e0b1710d32a0d0c91" -dependencies = [ - "encoding_index_tests", -] - -[[package]] -name = "encoding-index-korean" -version = "1.20141219.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dc33fb8e6bcba213fe2f14275f0963fd16f0a02c878e3095ecfdf5bee529d81" -dependencies = [ - "encoding_index_tests", -] - -[[package]] -name = "encoding-index-simpchinese" -version = "1.20141219.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d87a7194909b9118fc707194baa434a4e3b0fb6a5a757c73c3adb07aa25031f7" -dependencies = [ - "encoding_index_tests", -] - -[[package]] -name = "encoding-index-singlebyte" -version = "1.20141219.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3351d5acffb224af9ca265f435b859c7c01537c0849754d3db3fdf2bfe2ae84a" -dependencies = [ - "encoding_index_tests", -] - -[[package]] -name = "encoding-index-tradchinese" -version = "1.20141219.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd0e20d5688ce3cab59eb3ef3a2083a5c77bf496cb798dc6fcdb75f323890c18" -dependencies = [ - "encoding_index_tests", -] - -[[package]] -name = "encoding_index_tests" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a246d82be1c9d791c5dfde9a2bd045fc3cbba3fa2b11ad558f27d01712f00569" - [[package]] name = "encoding_rs" version = "0.8.33" @@ -7915,6 +7851,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-src" +version = "300.3.1+3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7259953d42a81bf137fbbd73bd30a8e1914d6dce43c2b90ed575783a22608b91" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.103" @@ -7923,6 +7868,7 @@ checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] @@ -7999,6 +7945,20 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "opentls" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f561874f8d6ecfb674fc08863414040c93cc90c0b6963fe679895fab8b65560" +dependencies = [ + "futures-util", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "url", +] + [[package]] name = "ordered-float" version = "2.10.0" @@ -11891,18 +11851,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rustls" -version = "0.20.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" -dependencies = [ - "log", - "ring 0.16.20", - "sct", - "webpki", -] - [[package]] name = "rustls" version = "0.21.11" @@ -13775,8 +13723,7 @@ dependencies = [ [[package]] name = "tiberius" version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc6e2bf3e4b5be181a2a2ceff4b9b12e2684010d436a6958bd564fbc8094d44d" +source = "git+https://github.com/risingwavelabs/tiberius.git?rev=f834f2deeb9e2fb08afaf73865f330cf31a3876a#f834f2deeb9e2fb08afaf73865f330cf31a3876a" dependencies = [ "async-trait", "asynchronous-codec", @@ -13785,19 +13732,18 @@ dependencies = [ "bytes", "chrono", "connection-string", - "encoding", + "encoding_rs", "enumflags2", "futures-util", "num-traits", "once_cell", + "opentls", "pin-project-lite", "pretty-hex", "rust_decimal", - "rustls-native-certs 0.6.3", - "rustls-pemfile 1.0.4", "thiserror", "time", - "tokio-rustls 0.23.4", + "tokio", "tokio-util", "tracing", "uuid", @@ -14008,17 +13954,6 @@ dependencies = [ "rand", ] -[[package]] -name = "tokio-rustls" -version = "0.23.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" -dependencies = [ - "rustls 0.20.9", - "tokio", - "webpki", -] - [[package]] name = "tokio-rustls" version = "0.24.1" @@ -15337,16 +15272,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki" -version = "0.22.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" -dependencies = [ - "ring 0.17.5", - "untrusted 0.9.0", -] - [[package]] name = "webpki-roots" version = "0.25.2" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 83cb000566d4a..78ad69c0995ab 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -74,6 +74,7 @@ services: image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240731 depends_on: - mysql + - sqlserver-server - db - message_queue - schemaregistry @@ -215,6 +216,7 @@ services: environment: ACCEPT_EULA: 'Y' SA_PASSWORD: 'SomeTestOnly@SA' + MSSQL_AGENT_ENABLED: "true" starrocks-fe-server: container_name: starrocks-fe-server diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index c4b4713af81cc..7b45c619eb6cf 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -63,6 +63,14 @@ echo "--- starting risingwave cluster" RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-1cn-1fe-with-recovery +echo "--- Install sql server client" +curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add - +curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list | sudo tee /etc/apt/sources.list.d/msprod.list +apt-get update -y +ACCEPT_EULA=Y DEBIAN_FRONTEND=noninteractive apt-get install -y mssql-tools unixodbc-dev +export PATH="/opt/mssql-tools/bin/:$PATH" +sleep 2 + echo "--- mongodb cdc test" # install the mongo shell wget http://archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb @@ -79,6 +87,7 @@ risedev slt './e2e_test/source/cdc/mongodb/**/*.slt' echo "--- inline cdc test" export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 +export SQLCMDSERVER=sqlserver-server SQLCMDUSER=SA SQLCMDPASSWORD="SomeTestOnly@SA" SQLCMDDBNAME=mydb risedev slt './e2e_test/source/cdc_inline/**/*.slt' echo "--- opendal source test" diff --git a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt new file mode 100644 index 0000000000000..33080c86316d2 --- /dev/null +++ b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt @@ -0,0 +1,256 @@ + +control substitution on + +# ------------ data prepare stage ------------ +system ok +sqlcmd -C -i e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql -b + +# ------------ validate stage ------------ +# TODO(kexiang): add more tests here +# invalid username +# invalid password +# invalid table name +# invalid primary key +# column name mismatch +# column data type mismatch +# format & encode provided and match with debezium json, this is okay +# format & encode provided but mismatch with debezium json, this is not allowed + + +statement error +CREATE TABLE orders ( + order_id INT PRIMARY KEY, + order_date BIGINT, + customer_name VARCHAR, + price DECIMAL, + product_id INT, + order_status SMALLINT +) WITH ( + connector = 'sqlserver-cdc', + hostname = '${SQLCMDSERVER:sqlserver-server}', + port = '${SQLCMDPORT:1433}', + username = '${SQLCMDUSER:$USER}', + password = '${SQLCMDPASSWORD}', + table.name = 'orders', + database.name = 'mydb', +); + +statement error +CREATE TABLE single_type ( + id INT, + c_time time, + PRIMARY KEY (id) +) WITH ( + connector = 'sqlserver-cdc', + hostname = '${SQLCMDSERVER:sqlserver-server}', + port = '${SQLCMDPORT:1433}', + username = '${SQLCMDUSER:$USER}', + password = '${SQLCMDPASSWORD}', + table.name = 'single_type', + database.name = 'mydb', +); + +statement error +CREATE TABLE sqlserver_all_data_types ( + id INT PRIMARY KEY, + c_bit BOOLEAN, + c_tinyint SMALLINT, + c_smallint SMALLINT, + c_int INTEGER, + c_bigint BIGINT, + c_decimal DECIMAL, + c_real REAL, + c_float FLOAT, + c_varchar VARCHAR, + c_varbinary BYTEA, + c_date DATE, + c_time TIME, + c_datetime2 TIMESTAMP, + c_datetimeoffset TIMESTAMPTZ +) WITH ( + connector = 'sqlserver-cdc', + hostname = '${SQLCMDSERVER:sqlserver-server}', + port = '${SQLCMDPORT:1433}', + username = '${SQLCMDUSER:$USER}', + password = '${SQLCMDPASSWORD}', + table.name = 'sqlserver_all_data_types', + database.name = 'mydb', +); + + +# ------------ Create source/table/mv stage ------------ +# create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON` +statement ok +CREATE SOURCE mssql_source WITH ( + connector = 'sqlserver-cdc', + hostname = '${SQLCMDSERVER:sqlserver-server}', + port = '${SQLCMDPORT:1433}', + username = '${SQLCMDUSER:$USER}', + password = '${SQLCMDPASSWORD}', + database.name = 'mydb', +); + +statement error Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source +create materialized view mv as select * from mssql_source; + +statement error The upstream table name must contain schema name prefix* +CREATE TABLE shared_orders ( + order_id INT, + order_date BIGINT, + customer_name VARCHAR, + price DECIMAL, + product_id INT, + order_status SMALLINT, + PRIMARY KEY (order_id) +) from mssql_source table 'orders'; + +statement ok +CREATE TABLE shared_orders ( + order_id INT, + order_date BIGINT, + customer_name VARCHAR, + price DECIMAL, + product_id INT, + order_status SMALLINT, + PRIMARY KEY (order_id) +) from mssql_source table 'dbo.orders'; + +statement ok +CREATE TABLE shared_single_type ( + id INT, + c_time time, + PRIMARY KEY (id) +) from mssql_source table 'dbo.single_type'; + +statement ok +CREATE TABLE shared_sqlserver_all_data_types ( + id INT, + c_bit BOOLEAN, + c_tinyint SMALLINT, + c_smallint SMALLINT, + c_int INTEGER, + c_bigint BIGINT, + c_decimal DECIMAL, + c_real REAL, + c_float FLOAT, + c_varchar VARCHAR, + c_varbinary BYTEA, + c_date DATE, + c_time TIME, + c_datetime2 TIMESTAMP, + c_datetimeoffset TIMESTAMPTZ, + PRIMARY KEY (id) +) from mssql_source table 'dbo.sqlserver_all_data_types'; + +statement ok +create materialized view shared_orders_cnt as select count(*) as cnt from shared_orders; + +statement ok +create materialized view shared_single_type_cnt as select count(*) as cnt from shared_single_type; + +statement ok +create materialized view shared_sqlserver_all_data_types_cnt as select count(*) as cnt from shared_sqlserver_all_data_types; + +# sleep to ensure the data in mssql tables is consumed from Debezium message instead of backfill. +sleep 20s + +# ------------ check stage ------------ +query I +select cnt from shared_orders_cnt; +---- +3 + +query I +select cnt from shared_single_type_cnt; +---- +1 + +query I +select cnt from shared_sqlserver_all_data_types_cnt; +---- +3 + +query III +select * from shared_orders order by order_id; +---- +1 1558430840000 Bob 11 1 1 +2 1558430840001 Alice 21 2 1 +3 1558430840002 Alice 19 2 1 + +query I +SELECT * from shared_single_type order by id; +---- +3 23:59:59.999 + +query TTTTTTT +SELECT * from shared_sqlserver_all_data_types order by id; +---- +1 f 0 0 0 0 0 0 0 (empty) NULL 2001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 00:00:00+00:00 +2 t 255 -32768 -2147483648 -9223372036854775808 -10 -10000 -10000 aa \xff 1990-01-01 13:59:59.123 2000-01-01 11:00:00.123 1990-01-01 00:00:01.123+00:00 +3 t 127 32767 2147483647 9223372036854775807 -10 10000 10000 zzzz \xffffffff 2999-12-31 23:59:59.999 2099-12-31 23:59:59.999 2999-12-31 23:59:59.999+00:00 + +# ------------ kill cluster ------------ +# system ok +# risedev kill + +sleep 30s + +# ------------ add rows stage ------------ +system ok +sqlcmd -C -i e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql -b + +# sleep 10s + +# ------------ recover cluster ------------ +# system ok +# risedev dev ci-1cn-1fe-with-recovery + +sleep 20s + +# ------------ check after recovery stage ------------ + +query I +select cnt from shared_orders_cnt; +---- +6 + +query I +select cnt from shared_single_type_cnt; +---- +2 + +query I +select cnt from shared_sqlserver_all_data_types_cnt; +---- +6 + + +query III +select * from shared_orders order by order_id; +---- +1 1558430840000 Bob 11 1 1 +2 1558430840001 Alice 21 2 1 +3 1558430840002 Alice 19 2 1 +11 1558430840000 Bob 11 1 1 +12 1558430840001 Alice 21 2 1 +13 1558430840002 Alice 19 2 1 + +query I +SELECT * from shared_single_type order by id; +---- +3 23:59:59.999 +13 23:59:59.999 + +query TTTTTTT +SELECT * from shared_sqlserver_all_data_types order by id; +---- +1 f 0 0 0 0 0 0 0 (empty) NULL 2001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 00:00:00+00:00 +2 t 255 -32768 -2147483648 -9223372036854775808 -10 -10000 -10000 aa \xff 1990-01-01 13:59:59.123 2000-01-01 11:00:00.123 1990-01-01 00:00:01.123+00:00 +3 t 127 32767 2147483647 9223372036854775807 -10 10000 10000 zzzz \xffffffff 2999-12-31 23:59:59.999 2099-12-31 23:59:59.999 2999-12-31 23:59:59.999+00:00 +11 f 0 0 0 0 0 0 0 (empty) NULL 2001-01-01 00:00:00 2001-01-01 00:00:00 2001-01-01 00:00:00+00:00 +12 t 255 -32768 -2147483648 -9223372036854775808 -10 -10000 -10000 aa \xff 1990-01-01 13:59:59.123 2000-01-01 11:00:00.123 1990-01-01 00:00:01.123+00:00 +13 t 127 32767 2147483647 9223372036854775807 -10 10000 10000 zzzz \xffffffff 2999-12-31 23:59:59.999 2099-12-31 23:59:59.999 2999-12-31 23:59:59.999+00:00 + +# ------------ drop stage ------------ +statement ok +drop source mssql_source cascade; \ No newline at end of file diff --git a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql new file mode 100644 index 0000000000000..e5089aa86d020 --- /dev/null +++ b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql @@ -0,0 +1,24 @@ + +INSERT INTO + orders ( + order_id, + order_date, + customer_name, + price, + product_id, + order_status + ) +VALUES + (11, 1558430840000, 'Bob', 10.50, 1, 1), + (12, 1558430840001, 'Alice', 20.50, 2, 1), + (13, 1558430840002, 'Alice', 18.50, 2, 1); + + +INSERT INTO single_type VALUES (13, '23:59:59.999') + + +INSERT INTO sqlserver_all_data_types VALUES (11, 'False', 0, 0, 0, 0, 0, 0, 0, '', NULL, '2001-01-01', '00:00:00', '2001-01-01 00:00:00', '2001-01-01 00:00:00'); + +INSERT INTO sqlserver_all_data_types VALUES (12, 'True', 255, -32768, -2147483648, -9223372036854775808, -10.0, -9999.999999, -10000.0, 'aa', 0xff, '1990-01-01', '13:59:59.123', '2000-01-01 11:00:00.123', '1990-01-01 00:00:01.123'); + +INSERT INTO sqlserver_all_data_types VALUES (13, 'True', 127, 32767, 2147483647, 9223372036854775807, -10.0, 9999.999999, 10000.0, 'zzzz', 0xffffffff, '2999-12-31', '23:59:59.999', '2099-12-31 23:59:59.999', '2999-12-31 23:59:59.999') diff --git a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql new file mode 100644 index 0000000000000..bec368ab00d4f --- /dev/null +++ b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql @@ -0,0 +1,72 @@ +EXEC sys.sp_cdc_enable_db; + +CREATE TABLE orders ( + order_id INT PRIMARY KEY, + order_date BIGINT, + customer_name NVARCHAR(200), + price DECIMAL, + product_id INT, + order_status SMALLINT +); + +EXEC sys.sp_cdc_enable_table + @source_schema = 'dbo', + @source_name = 'orders', + @role_name = NULL; + + +INSERT INTO + orders ( + order_id, + order_date, + customer_name, + price, + product_id, + order_status + ) +VALUES + (1, 1558430840000, 'Bob', 10.50, 1, 1), + (2, 1558430840001, 'Alice', 20.50, 2, 1), + (3, 1558430840002, 'Alice', 18.50, 2, 1); + +CREATE TABLE single_type ( + id INT PRIMARY KEY, + c_time time, +); + +EXEC sys.sp_cdc_enable_table + @source_schema = 'dbo', + @source_name = 'single_type', + @role_name = NULL; + +INSERT INTO single_type VALUES (3, '23:59:59.999') + + +CREATE TABLE sqlserver_all_data_types ( + id INT PRIMARY KEY, + c_bit bit, + c_tinyint tinyint, + c_smallint smallint, + c_int int, + c_bigint bigint, + c_decimal DECIMAL(28), + c_real real, + c_float float, + c_varchar varchar(4), + c_varbinary varbinary(4), + c_date date, + c_time time, + c_datetime2 datetime2, + c_datetimeoffset datetimeoffset +); + +EXEC sys.sp_cdc_enable_table + @source_schema = 'dbo', + @source_name = 'sqlserver_all_data_types', + @role_name = NULL; + +INSERT INTO sqlserver_all_data_types VALUES (1, 'False', 0, 0, 0, 0, 0, 0, 0, '', NULL, '2001-01-01', '00:00:00', '2001-01-01 00:00:00', '2001-01-01 00:00:00'); + +INSERT INTO sqlserver_all_data_types VALUES (2, 'True', 255, -32768, -2147483648, -9223372036854775808, -10.0, -9999.999999, -10000.0, 'aa', 0xff, '1990-01-01', '13:59:59.123', '2000-01-01 11:00:00.123', '1990-01-01 00:00:01.123'); + +INSERT INTO sqlserver_all_data_types VALUES (3, 'True', 127, 32767, 2147483647, 9223372036854775807, -10.0, 9999.999999, 10000.0, 'zzzz', 0xffffffff, '2999-12-31', '23:59:59.999', '2099-12-31 23:59:59.999', '2999-12-31 23:59:59.999') diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/SourceTypeE.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/SourceTypeE.java index 0c9858ab4fd5d..21a548bcb825f 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/SourceTypeE.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/SourceTypeE.java @@ -21,6 +21,7 @@ public enum SourceTypeE { POSTGRES, CITUS, MONGODB, + SQL_SERVER, INVALID; public static SourceTypeE valueOf(ConnectorServiceProto.SourceType type) { @@ -33,6 +34,8 @@ public static SourceTypeE valueOf(ConnectorServiceProto.SourceType type) { return SourceTypeE.CITUS; case MONGODB: return SourceTypeE.MONGODB; + case SQL_SERVER: + return SourceTypeE.SQL_SERVER; default: return SourceTypeE.INVALID; } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java index 309ab8db7af4e..458ed8a6d7a3f 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java @@ -157,6 +157,14 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re var validator = new MongoDbValidator(props); validator.validateDbConfig(); break; + case SQL_SERVER: + ensureRequiredProps(props, isCdcSourceJob); + ensurePropNotBlank(props, DbzConnectorConfig.SQL_SERVER_SCHEMA_NAME); + try (var sqlServerValidator = + new SqlServerValidator(props, tableSchema, isCdcSourceJob)) { + sqlServerValidator.validateAll(); + } + break; default: LOG.warn("Unknown source type"); throw ValidatorUtils.invalidArgument("Unknown source type"); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index faae0048649b0..fb8aa62916f60 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -55,11 +55,15 @@ public class DbzConnectorConfig { public static final String PG_PUB_CREATE = "publication.create.enable"; public static final String PG_SCHEMA_NAME = "schema.name"; + /* Sql Server configs */ + public static final String SQL_SERVER_SCHEMA_NAME = "schema.name"; + /* RisingWave configs */ private static final String DBZ_CONFIG_FILE = "debezium.properties"; private static final String MYSQL_CONFIG_FILE = "mysql.properties"; private static final String POSTGRES_CONFIG_FILE = "postgres.properties"; private static final String MONGODB_CONFIG_FILE = "mongodb.properties"; + private static final String SQL_SERVER_CONFIG_FILE = "sql_server.properties"; private static final String DBZ_PROPERTY_PREFIX = "debezium."; @@ -249,7 +253,38 @@ public DbzConnectorConfig( mongodbProps.setProperty("name", connectorName); dbzProps.putAll(mongodbProps); + } else if (source == SourceTypeE.SQL_SERVER) { + var sqlServerProps = initiateDbConfig(SQL_SERVER_CONFIG_FILE, substitutor); + // disable snapshot locking at all + sqlServerProps.setProperty("snapshot.locking.mode", "none"); + if (isCdcBackfill) { + // if startOffset is specified, we should continue + // reading changes from the given offset + if (null != startOffset && !startOffset.isBlank()) { + // skip the initial snapshot for cdc backfill + sqlServerProps.setProperty("snapshot.mode", "recovery"); + sqlServerProps.setProperty( + ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); + } else { + sqlServerProps.setProperty("snapshot.mode", "no_data"); + } + } else { + // if snapshot phase is finished and offset is specified, we will continue reading + // changes from the given offset + if (snapshotDone && null != startOffset && !startOffset.isBlank()) { + sqlServerProps.setProperty("snapshot.mode", "recovery"); + sqlServerProps.setProperty( + ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); + } + } + dbzProps.putAll(sqlServerProps); + if (isCdcSourceJob) { + // remove table filtering for the shared Sql Server source, since we + // allow user to ingest tables in different schemas + LOG.info("Disable table filtering for the shared Sql Server source"); + dbzProps.remove("table.include.list"); + } } else { throw new RuntimeException("unsupported source type: " + source); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java index 83c6d59fac921..13ca00261287b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -19,6 +19,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import javax.management.JMException; @@ -112,6 +113,9 @@ public static boolean waitForStreamingRunning( } else if (sourceType == SourceTypeE.POSTGRES) { return waitForStreamingRunningInner( "postgres", dbServerName, waitStreamingStartTimeout); + } else if (sourceType == SourceTypeE.SQL_SERVER) { + return waitForStreamingRunningInner( + "sql_server", dbServerName, waitStreamingStartTimeout); } else { LOG.info("Unsupported backfill source, just return true for {}", dbServerName); return true; @@ -162,12 +166,23 @@ private static boolean isStreamingRunning(String connector, String server, Strin private static ObjectName getStreamingMetricsObjectName( String connector, String server, String context) throws MalformedObjectNameException { - return new ObjectName( - "debezium." - + connector - + ":type=connector-metrics,context=" - + context - + ",server=" - + server); + if (Objects.equals(connector, "sql_server")) { + // TODO: fulfill the task id here, by WKX + return new ObjectName( + "debezium." + + connector + + ":type=connector-metrics,task=0,context=" + + context + + ",server=" + + server); + } else { + return new ObjectName( + "debezium." + + connector + + ":type=connector-metrics,context=" + + context + + ",server=" + + server); + } } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java new file mode 100644 index 0000000000000..1c0e8176add74 --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java @@ -0,0 +1,310 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector.source.common; + +import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.api.source.SourceTypeE; +import com.risingwave.proto.Data; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SqlServerValidator extends DatabaseValidator implements AutoCloseable { + static final Logger LOG = LoggerFactory.getLogger(SqlServerValidator.class); + + private final TableSchema tableSchema; + + private final Connection jdbcConnection; + + private final String user; + private final String dbName; + private final String schemaName; + private final String tableName; + + // Whether the properties to validate is shared by multiple tables. + // If true, we will skip validation check for table + private final boolean isCdcSourceJob; + + public SqlServerValidator( + Map userProps, TableSchema tableSchema, boolean isCdcSourceJob) + throws SQLException { + this.tableSchema = tableSchema; + + var dbHost = userProps.get(DbzConnectorConfig.HOST); + var dbPort = userProps.get(DbzConnectorConfig.PORT); + var dbName = userProps.get(DbzConnectorConfig.DB_NAME); + var user = userProps.get(DbzConnectorConfig.USER); + var password = userProps.get(DbzConnectorConfig.PASSWORD); + + var jdbcUrl = ValidatorUtils.getJdbcUrl(SourceTypeE.SQL_SERVER, dbHost, dbPort, dbName); + this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password); + + this.dbName = dbName; + this.user = user; + this.schemaName = userProps.get(DbzConnectorConfig.SQL_SERVER_SCHEMA_NAME); + this.tableName = userProps.get(DbzConnectorConfig.TABLE_NAME); + this.isCdcSourceJob = isCdcSourceJob; + } + + @Override + public void validateDbConfig() { + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("sqlserver.db.cdc.enabled"))) { + // check whether cdc has been enabled + var res = stmt.executeQuery(); + while (res.next()) { + if (!res.getString(1).equals(dbName)) { + throw ValidatorUtils.invalidArgument( + "Sql Server's DB_NAME() '" + + res.getString(1) + + "' does not match db_name'" + + dbName + + "'."); + } + if (res.getInt(2) != 1) { + throw ValidatorUtils.invalidArgument( + "Sql Server's '" + + dbName + + "' has not enabled CDC.\nPlease modify the config your Sql Server with 'EXEC sys.sp_cdc_enable_db'."); + } + } + } catch (SQLException e) { + throw ValidatorUtils.internalError(e.getMessage()); + } + if (isCdcSourceJob) { + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("sqlserver.sql.agent.enabled"))) { + // check whether sql server agent is enabled. It's required to run + // fn_cdc_get_max_lsn + var res = stmt.executeQuery(); + while (res.next()) { + if (res.wasNull()) { + throw ValidatorUtils.invalidArgument( + "Sql Server's sql server agent is not activated.\nYou can check it by running `SELECT servicename, startup_type_desc, status_desc FROM sys.dm_server_services WHERE servicename LIKE 'SQL Server Agent%'` in Sql Server."); + } + } + } catch (SQLException e) { + throw ValidatorUtils.internalError(e.getMessage()); + } + } + } + + @Override + public void validateUserPrivilege() { + try { + validatePrivileges(); + } catch (SQLException e) { + throw ValidatorUtils.internalError(e.getMessage()); + } + } + + @Override + public void validateTable() { + try { + validateTableSchema(); + } catch (SQLException e) { + throw ValidatorUtils.internalError(e.getMessage()); + } + } + + @Override + boolean isCdcSourceJob() { + return isCdcSourceJob; + } + + private void validateTableSchema() throws SQLException { + if (isCdcSourceJob) { + return; + } + // check whether table exist + try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("sqlserver.table"))) { + stmt.setString(1, schemaName); + stmt.setString(2, tableName); + var res = stmt.executeQuery(); + while (res.next()) { + if (res.getInt(1) == 0) { + throw ValidatorUtils.invalidArgument( + String.format( + "Sql Server table '%s'.'%s' doesn't exist", + schemaName, tableName)); + } + } + } + + // check cdc enabled + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("sqlserver.table.cdc.enabled"))) { + stmt.setString(1, schemaName); + stmt.setString(2, tableName); + var res = stmt.executeQuery(); + while (res.next()) { + if (res.getInt(1) != 1) { + throw ValidatorUtils.invalidArgument( + "Table '" + + schemaName + + "." + + tableName + + "' has not enabled CDC.\nPlease ensure CDC is enabled."); + } + } + } + + // check primary key + try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("sqlserver.pk"))) { + stmt.setString(1, this.schemaName); + stmt.setString(2, this.tableName); + var res = stmt.executeQuery(); + var pkFields = new HashSet(); + while (res.next()) { + var name = res.getString(1); + pkFields.add(name); + } + + if (!isPrimaryKeyMatch(tableSchema, pkFields)) { + throw ValidatorUtils.invalidArgument("Primary key mismatch"); + } + } + + // Check whether source schema match table schema on upstream + // All columns defined must exist in upstream database + try (var stmt = + jdbcConnection.prepareStatement(ValidatorUtils.getSql("sqlserver.table_schema"))) { + stmt.setString(1, this.schemaName); + stmt.setString(2, this.tableName); + var res = stmt.executeQuery(); + + // Field names in lower case -> data type + Map schema = new HashMap<>(); + while (res.next()) { + var field = res.getString(1); + var dataType = res.getString(2); + schema.put(field.toLowerCase(), dataType); + } + + for (var e : tableSchema.getColumnTypes().entrySet()) { + // skip validate internal columns + if (e.getKey().startsWith(ValidatorUtils.INTERNAL_COLUMN_PREFIX)) { + continue; + } + var dataType = schema.get(e.getKey().toLowerCase()); + if (dataType == null) { + throw ValidatorUtils.invalidArgument( + "Column '" + e.getKey() + "' not found in the upstream database"); + } + if (!isDataTypeCompatible(dataType, e.getValue())) { + throw ValidatorUtils.invalidArgument( + "Incompatible data type of column " + e.getKey()); + } + } + } + } + + private void validatePrivileges() throws SQLException { + if (isCdcSourceJob) { + return; + } + + try (var stmt = + jdbcConnection.prepareStatement(ValidatorUtils.getSql("sqlserver.has.perms"))) { + stmt.setString(1, this.schemaName); + stmt.setString(2, this.tableName); + var res = stmt.executeQuery(); + while (res.next()) { + if (res.getInt(1) != 1) { + throw ValidatorUtils.invalidArgument( + "Sql Server user '" + + user + + "' must have select privilege on table '" + + schemaName + + "." + + tableName + + "''s CDC table."); + } + } + } + } + + @Override + public void close() throws Exception { + if (null != jdbcConnection) { + jdbcConnection.close(); + } + } + + public static boolean isPrimaryKeyMatch(TableSchema sourceSchema, Set pkFields) { + if (sourceSchema.getPrimaryKeys().size() != pkFields.size()) { + return false; + } + for (var colName : sourceSchema.getPrimaryKeys()) { + if (!pkFields.contains(colName)) { + return false; + } + } + return true; + } + + private boolean isDataTypeCompatible(String ssDataType, Data.DataType.TypeName typeName) { + // TODO: add more data type compatibility check, by WKX + int val = typeName.getNumber(); + switch (ssDataType) { + case "bit": + return val == Data.DataType.TypeName.BOOLEAN_VALUE; + case "tinyint": + case "smallint": + return Data.DataType.TypeName.INT16_VALUE <= val + && val <= Data.DataType.TypeName.INT64_VALUE; + case "integer": + return Data.DataType.TypeName.INT32_VALUE <= val + && val <= Data.DataType.TypeName.INT64_VALUE; + case "bigint": + return val == Data.DataType.TypeName.INT64_VALUE; + case "float": + case "real": + return val == Data.DataType.TypeName.FLOAT_VALUE + || val == Data.DataType.TypeName.DOUBLE_VALUE; + case "boolean": + return val == Data.DataType.TypeName.BOOLEAN_VALUE; + case "double": + case "double precision": + return val == Data.DataType.TypeName.DOUBLE_VALUE; + case "decimal": + case "numeric": + return val == Data.DataType.TypeName.DECIMAL_VALUE; + case "varchar": + case "character varying": + return val == Data.DataType.TypeName.VARCHAR_VALUE; + case "varbinary": + return val == Data.DataType.TypeName.BYTEA_VALUE; + case "date": + return val == Data.DataType.TypeName.DATE_VALUE; + case "time": + return val == Data.DataType.TypeName.TIME_VALUE; + case "datetime": + case "datetime2": + case "smalldatetime": + return val == Data.DataType.TypeName.TIMESTAMP_VALUE; + case "datetimeoffset": + return val == Data.DataType.TypeName.TIMESTAMPTZ_VALUE; + default: + return true; // true for other uncovered types + } + } +} diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/ValidatorUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/ValidatorUtils.java index 20d631a3267c9..4b79280e62daf 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/ValidatorUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/ValidatorUtils.java @@ -66,6 +66,10 @@ public static String getJdbcUrl( case POSTGRES: case CITUS: return String.format("jdbc:postgresql://%s:%s/%s", host, port, database); + case SQL_SERVER: + return String.format( + "jdbc:sqlserver://%s:%s;databaseName=%s;encrypt=false", + host, port, database); default: throw ValidatorUtils.invalidArgument("Unknown source type: " + sourceType); } diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/sql_server.properties b/java/connector-node/risingwave-connector-service/src/main/resources/sql_server.properties new file mode 100644 index 0000000000000..0e0c55c939ef5 --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/resources/sql_server.properties @@ -0,0 +1,24 @@ +# configs for sql server conneoctor +connector.class=io.debezium.connector.sqlserver.SqlServerConnector +# default snapshot mode to initial +snapshot.mode=${debezium.snapshot.mode:-initial} +database.hostname=${hostname} +database.port=${port} +database.user=${username} +database.password=${password} +database.names=${database.name} +table.include.list=${schema.name}.${table.name:-*} +# only read table schema of the captured tables in the specified database +schema.history.internal.store.only.captured.tables.ddl=true +schema.history.internal.store.only.captured.databases.ddl=true +# default to disable schema change events +include.schema.changes=${debezium.include.schema.changes:-false} +# default heartbeat interval 5 mins +heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000} +# In sharing cdc source mode, we will subscribe to multiple tables in the given database, +# so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display. +name=${hostname}:${port}:${database.name}.${schema.name}.${table.name:-RW_CDC_Sharing} +# In sharing cdc mode, transaction metadata will be enabled in frontend. +# For sql server, it's always false actually. +provide.transaction.metadata=${transactional:-false} +database.encrypt=false diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index 769c3cb1c8fb0..54214b6b46190 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -37,5 +37,12 @@ GROUP BY r1.rolname \ ), \ tmp AS (SELECT DISTINCT(UNNEST(m)) AS members FROM base) \ SELECT ARRAY_AGG(members) AS members FROM tmp +sqlserver.db.cdc.enabled=SELECT name, is_cdc_enabled FROM sys.databases WHERE name = DB_NAME() +sqlserver.table=SELECT count(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? +sqlserver.table.cdc.enabled=SELECT COUNT(*) FROM cdc.change_tables AS ct INNER JOIN sys.tables AS t ON ct.source_object_id = t.object_id INNER JOIN sys.schemas AS s ON t.schema_id = s.schema_id WHERE s.name = ? AND t.name = ? +sqlserver.pk=SELECT k.column_name FROM information_schema.table_constraints t INNER JOIN information_schema.key_column_usage k ON t.constraint_name = k.constraint_name AND t.table_name = k.table_name WHERE t.constraint_type = 'PRIMARY KEY' AND t.table_schema = ? AND t.table_name = ? +sqlserver.table_schema=SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION +sqlserver.has.perms=SELECT HAS_PERMS_BY_NAME('cdc.' + ct.capture_instance + '_CT', 'OBJECT', 'SELECT') FROM cdc.change_tables AS ct INNER JOIN sys.tables AS t ON ct.source_object_id = t.object_id INNER JOIN sys.schemas AS s ON t.schema_id = s.schema_id WHERE s.name = ? AND t.name = ? +sqlserver.sql.agent.enabled=SELECT sys.fn_cdc_get_max_lsn() citus.distributed_table=select citus_table_type from citus_tables where table_name=?::regclass postgres.rds.role.check=SELECT r.rolname, r.rolsuper, r.rolinherit, r.rolcreaterole, r.rolcreatedb, r.rolcanlogin, r.rolconnlimit, r.rolvaliduntil, ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid) as memberof , r.rolreplication , r.rolbypassrls FROM pg_catalog.pg_roles r WHERE r.rolname = ? diff --git a/java/connector-node/risingwave-source-cdc/pom.xml b/java/connector-node/risingwave-source-cdc/pom.xml index 839de9c1f319d..fb1519f6a183c 100644 --- a/java/connector-node/risingwave-source-cdc/pom.xml +++ b/java/connector-node/risingwave-source-cdc/pom.xml @@ -47,6 +47,10 @@ io.debezium debezium-connector-mongodb + + io.debezium + debezium-connector-sqlserver + diff --git a/java/pom.xml b/java/pom.xml index 58f33e9da56d9..c0521bd6dfc4a 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -173,6 +173,11 @@ debezium-connector-mongodb ${debezium.version} + + io.debezium + debezium-connector-sqlserver + ${debezium.version} + org.postgresql postgresql diff --git a/proto/catalog.proto b/proto/catalog.proto index e60c99814d9ec..d407dbe3936a4 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -77,7 +77,7 @@ message StreamSourceInfo { // // Currently, the following sources can be shared: // - // - Direct CDC sources (mysql & postgresql) + // - Direct CDC sources (mysql & postgresql & sqlserver) // - MQ sources (Kafka) bool cdc_source_job = 13; // Only used when `cdc_source_job` is `true`. diff --git a/proto/connector_service.proto b/proto/connector_service.proto index cf549a8e2e493..964d227452548 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -173,6 +173,7 @@ enum SourceType { POSTGRES = 2; CITUS = 3; MONGODB = 4; + SQL_SERVER = 5; } message GetEventStreamRequest { diff --git a/risedev.yml b/risedev.yml index b4b558face0e5..4918560354112 100644 --- a/risedev.yml +++ b/risedev.yml @@ -36,6 +36,7 @@ profile: # - use: grafana # visualization - use: meta-node + # meta-backend: etcd - use: compute-node - use: frontend diff --git a/src/common/src/types/datetime.rs b/src/common/src/types/datetime.rs index 140ad98c621cd..72bdc1b72ef48 100644 --- a/src/common/src/types/datetime.rs +++ b/src/common/src/types/datetime.rs @@ -26,7 +26,7 @@ use bytes::BytesMut; use chrono::{ DateTime, Datelike, Days, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday, }; -use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type}; +use postgres_types::{accepts, to_sql_checked, IsNull, Type}; use risingwave_common_estimate_size::ZeroHeapSize; use thiserror::Error; @@ -41,7 +41,7 @@ const LEAP_DAYS: &[i32] = &[0, 31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]; const NORMAL_DAYS: &[i32] = &[0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]; macro_rules! impl_chrono_wrapper { - ($variant_name:ident, $chrono:ty) => { + ($variant_name:ident, $chrono:ty, $pg_type:ident) => { #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] #[repr(transparent)] pub struct $variant_name(pub $chrono); @@ -67,105 +67,43 @@ macro_rules! impl_chrono_wrapper { } impl ZeroHeapSize for $variant_name {} - }; -} - -impl_chrono_wrapper!(Date, NaiveDate); -impl_chrono_wrapper!(Timestamp, NaiveDateTime); -impl_chrono_wrapper!(Time, NaiveTime); - -impl ToSql for Date { - accepts!(DATE); - - to_sql_checked!(); - - fn to_sql( - &self, - ty: &Type, - out: &mut BytesMut, - ) -> std::result::Result> - where - Self: Sized, - { - self.0.to_sql(ty, out) - } -} - -impl<'a> FromSql<'a> for Date { - fn from_sql( - ty: &Type, - raw: &'a [u8], - ) -> std::result::Result> { - let instant = NaiveDate::from_sql(ty, raw)?; - Ok(Self::from(instant)) - } - - fn accepts(ty: &Type) -> bool { - matches!(*ty, Type::DATE) - } -} - -impl ToSql for Time { - accepts!(TIME); - - to_sql_checked!(); - - fn to_sql( - &self, - ty: &Type, - out: &mut BytesMut, - ) -> std::result::Result> - where - Self: Sized, - { - self.0.to_sql(ty, out) - } -} -impl<'a> FromSql<'a> for Time { - fn from_sql( - ty: &Type, - raw: &'a [u8], - ) -> std::result::Result> { - let instant = NaiveTime::from_sql(ty, raw)?; - Ok(Self::from(instant)) - } + impl postgres_types::ToSql for $variant_name { + accepts!($pg_type); - fn accepts(ty: &Type) -> bool { - matches!(*ty, Type::TIME) - } -} + to_sql_checked!(); -impl ToSql for Timestamp { - accepts!(TIMESTAMP); + fn to_sql( + &self, + ty: &Type, + out: &mut BytesMut, + ) -> std::result::Result> + where + Self: Sized, + { + self.0.to_sql(ty, out) + } + } - to_sql_checked!(); + impl<'a> postgres_types::FromSql<'a> for $variant_name { + fn from_sql( + ty: &Type, + raw: &'a [u8], + ) -> std::result::Result> { + let instant = <$chrono as postgres_types::FromSql>::from_sql(ty, raw)?; + Ok(Self::from(instant)) + } - fn to_sql( - &self, - ty: &Type, - out: &mut BytesMut, - ) -> std::result::Result> - where - Self: Sized, - { - self.0.to_sql(ty, out) - } + fn accepts(ty: &Type) -> bool { + matches!(*ty, Type::$pg_type) + } + } + }; } -impl<'a> FromSql<'a> for Timestamp { - fn from_sql( - ty: &Type, - raw: &'a [u8], - ) -> std::result::Result> { - let instant = NaiveDateTime::from_sql(ty, raw)?; - Ok(Self::from(instant)) - } - - fn accepts(ty: &Type) -> bool { - matches!(*ty, Type::TIMESTAMP) - } -} +impl_chrono_wrapper!(Date, NaiveDate, DATE); +impl_chrono_wrapper!(Timestamp, NaiveDateTime, TIMESTAMP); +impl_chrono_wrapper!(Time, NaiveTime, TIME); /// Parse a date from varchar. /// diff --git a/src/common/src/types/decimal.rs b/src/common/src/types/decimal.rs index 9523157239e2e..8a9055bbc1ef0 100644 --- a/src/common/src/types/decimal.rs +++ b/src/common/src/types/decimal.rs @@ -21,7 +21,7 @@ use bytes::{BufMut, BytesMut}; use num_traits::{ CheckedAdd, CheckedDiv, CheckedMul, CheckedNeg, CheckedRem, CheckedSub, Num, One, Zero, }; -use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type}; +use postgres_types::{accepts, to_sql_checked, IsNull, ToSql}; use risingwave_common_estimate_size::ZeroHeapSize; use rust_decimal::prelude::FromStr; use rust_decimal::{Decimal as RustDecimal, Error, MathematicalOps as _, RoundingStrategy}; @@ -96,7 +96,7 @@ impl ToSql for Decimal { fn to_sql( &self, - ty: &Type, + ty: &postgres_types::Type, out: &mut BytesMut, ) -> Result> where @@ -132,9 +132,9 @@ impl ToSql for Decimal { } } -impl<'a> FromSql<'a> for Decimal { +impl<'a> postgres_types::FromSql<'a> for Decimal { fn from_sql( - ty: &Type, + ty: &postgres_types::Type, raw: &'a [u8], ) -> Result> { let mut rdr = Cursor::new(raw); @@ -145,15 +145,17 @@ impl<'a> FromSql<'a> for Decimal { 0xC000 => Ok(Self::NaN), 0xD000 => Ok(Self::PositiveInf), 0xF000 => Ok(Self::NegativeInf), - _ => RustDecimal::from_sql(ty, raw).map(Self::Normalized), + _ => ::from_sql(ty, raw).map(Self::Normalized), } } - fn accepts(ty: &Type) -> bool { - matches!(*ty, Type::NUMERIC) + fn accepts(ty: &postgres_types::Type) -> bool { + matches!(*ty, postgres_types::Type::NUMERIC) } } + + macro_rules! impl_convert_int { ($T:ty) => { impl core::convert::From<$T> for Decimal { diff --git a/src/common/src/types/from_sql.rs b/src/common/src/types/from_sql.rs index ba1d49892c602..2ca58f760f745 100644 --- a/src/common/src/types/from_sql.rs +++ b/src/common/src/types/from_sql.rs @@ -23,16 +23,16 @@ impl<'a> FromSql<'a> for ScalarImpl { raw: &'a [u8], ) -> Result> { Ok(match *ty { - Type::BOOL => ScalarImpl::from(bool::from_sql(ty, raw)?), - Type::INT2 => ScalarImpl::from(i16::from_sql(ty, raw)?), - Type::INT4 => ScalarImpl::from(i32::from_sql(ty, raw)?), - Type::INT8 => ScalarImpl::from(i64::from_sql(ty, raw)?), - Type::FLOAT4 => ScalarImpl::from(f32::from_sql(ty, raw)?), - Type::FLOAT8 => ScalarImpl::from(f64::from_sql(ty, raw)?), - Type::DATE => ScalarImpl::from(Date::from_sql(ty, raw)?), - Type::TIME => ScalarImpl::from(Time::from_sql(ty, raw)?), - Type::TIMESTAMP => ScalarImpl::from(Timestamp::from_sql(ty, raw)?), - Type::TIMESTAMPTZ => ScalarImpl::from(Timestamptz::from_sql(ty, raw)?), + Type::BOOL => ScalarImpl::from(::from_sql(ty, raw)?), + Type::INT2 => ScalarImpl::from(::from_sql(ty, raw)?), + Type::INT4 => ScalarImpl::from(::from_sql(ty, raw)?), + Type::INT8 => ScalarImpl::from(::from_sql(ty, raw)?), + Type::FLOAT4 => ScalarImpl::from(::from_sql(ty, raw)?), + Type::FLOAT8 => ScalarImpl::from(::from_sql(ty, raw)?), + Type::DATE => ScalarImpl::from(::from_sql(ty, raw)?), + Type::TIME => ScalarImpl::from(