diff --git a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp index 803a775d1054..fcffe11d6771 100644 --- a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp +++ b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp @@ -182,8 +182,7 @@ class TJsonSerializer: public TBaseSerializer { case NScheme::NTypeIds::Yson: return YsonToJson(cell.AsBuf()); case NScheme::NTypeIds::Pg: - // TODO: support pg types - Y_ABORT("pg types are not supported"); + return NJson::TJsonValue(PgToString(cell.AsBuf(), type)); case NScheme::NTypeIds::Uuid: return NJson::TJsonValue(NUuid::UuidBytesToString(cell.Data())); default: diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 131e5e6dbf64..b1d5e9ba247d 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -619,8 +620,12 @@ class TCdcChangeSenderMain schema.reserve(pqConfig.PartitionKeySchemaSize()); for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) { - // TODO: support pg types - schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId())); + if (keySchema.GetTypeId() == NScheme::NTypeIds::Pg) { + schema.push_back(NScheme::TTypeInfo( + NPg::TypeDescFromPgTypeId(keySchema.GetTypeInfo().GetPgTypeId()))); + } else { + schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId())); + } } TSet partitions(schema); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index cfeefb3ed30b..0d30aa73b1ce 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -838,7 +838,9 @@ Y_UNIT_TEST_SUITE(Cdc) { .SetEnableChangefeedDebeziumJsonFormat(true) .SetEnableTopicMessageMeta(true) .SetEnableChangefeedInitialScan(true) - .SetEnableUuidAsPrimaryKey(true); + .SetEnableUuidAsPrimaryKey(true) + .SetEnableTablePgTypes(true) + .SetEnablePgSyntax(true); Server = new TServer(settings); if (useRealThreads) { @@ -1972,6 +1974,13 @@ Y_UNIT_TEST_SUITE(Cdc) { {"datetime64_value", "Datetime64", false, false}, {"timestamp64_value", "Timestamp64", false, false}, {"interval64_value", "Interval64", false, false}, + {"pgint2_value", "pgint2", false, false}, + {"pgint4_value", "pgint4", false, false}, + {"pgint8_value", "pgint8", false, false}, + {"pgfloat4_value", "pgfloat4", false, false}, + {"pgfloat8_value", "pgfloat8", false, false}, + {"pgbytea_value", "pgbytea", false, false}, + {"pgtext_value", "pgtext", false, false}, }); TopicRunner::Read(table, Updates(NKikimrSchemeOp::ECdcStreamFormatJson), { R"(UPSERT INTO `/Root/Table` (key, int32_value) VALUES (1, -100500);)", @@ -1997,6 +2006,13 @@ Y_UNIT_TEST_SUITE(Cdc) { R"(UPSERT INTO `/Root/Table` (key, datetime64_value) VALUES (21, CAST(1597235696 AS Datetime64));)", R"(UPSERT INTO `/Root/Table` (key, timestamp64_value) VALUES (22, CAST(1597235696123456 AS Timestamp64));)", R"(UPSERT INTO `/Root/Table` (key, interval64_value) VALUES (23, CAST(-300500 AS Interval64));)", + R"(UPSERT INTO `/Root/Table` (key, pgint2_value) VALUES (24, -42ps);)", + R"(UPSERT INTO `/Root/Table` (key, pgint4_value) VALUES (25, -420p);)", + R"(UPSERT INTO `/Root/Table` (key, pgint8_value) VALUES (26, -4200pb);)", + R"(UPSERT INTO `/Root/Table` (key, pgfloat4_value) VALUES (27, 3.1415pf4);)", + R"(UPSERT INTO `/Root/Table` (key, pgfloat8_value) VALUES (28, 2.718pf8);)", + R"(UPSERT INTO `/Root/Table` (key, pgbytea_value) VALUES (29, 'lorem "ipsum"'pb);)", + R"(UPSERT INTO `/Root/Table` (key, pgtext_value) VALUES (30, 'lorem "ipsum"'p);)", }, { R"({"key":[1],"update":{"int32_value":-100500}})", R"({"key":[2],"update":{"uint32_value":100500}})", @@ -2021,6 +2037,13 @@ Y_UNIT_TEST_SUITE(Cdc) { R"({"key":[21],"update":{"datetime64_value":1597235696}})", R"({"key":[22],"update":{"timestamp64_value":1597235696123456}})", R"({"key":[23],"update":{"interval64_value":-300500}})", + R"({"key":[24],"update":{"pgint2_value":"-42"}})", + R"({"key":[25],"update":{"pgint4_value":"-420"}})", + R"({"key":[26],"update":{"pgint8_value":"-4200"}})", + R"({"key":[27],"update":{"pgfloat4_value":"3.1415"}})", + R"({"key":[28],"update":{"pgfloat8_value":"2.718"}})", + R"({"key":[29],"update":{"pgbytea_value":"\\x6c6f72656d2022697073756d22"}})", + R"({"key":[30],"update":{"pgtext_value":"lorem \"ipsum\""}})", }); } diff --git a/ydb/core/tx/datashard/export_common.cpp b/ydb/core/tx/datashard/export_common.cpp index 53cdc71f2134..152b8d171435 100644 --- a/ydb/core/tx/datashard/export_common.cpp +++ b/ydb/core/tx/datashard/export_common.cpp @@ -113,6 +113,12 @@ TString DyNumberToString(TStringBuf data) { return result; } +TString PgToString(TStringBuf data, const NScheme::TTypeInfo& typeInfo) { + const NPg::TConvertResult& pgResult = NPg::PgNativeTextFromNativeBinary(data, typeInfo.GetPgTypeDesc()); + Y_ABORT_UNLESS(pgResult.Error.Empty()); + return pgResult.Str; +} + bool DecimalToStream(const std::pair& loHi, IOutputStream& out, TString& err) { Y_UNUSED(err); using namespace NYql::NDecimal; diff --git a/ydb/core/tx/datashard/export_common.h b/ydb/core/tx/datashard/export_common.h index fac97dde1d77..04887f0489d8 100644 --- a/ydb/core/tx/datashard/export_common.h +++ b/ydb/core/tx/datashard/export_common.h @@ -41,6 +41,7 @@ TMaybe GenYdbPermissions( TString DecimalToString(const std::pair& loHi); TString DyNumberToString(TStringBuf data); +TString PgToString(TStringBuf data, const NScheme::TTypeInfo& typeInfo); bool DecimalToStream(const std::pair& loHi, IOutputStream& out, TString& err); bool DyNumberToStream(TStringBuf data, IOutputStream& out, TString& err); bool PgToStream(TStringBuf data, const NScheme::TTypeInfo& typeInfo, IOutputStream& out, TString& err); diff --git a/ydb/core/tx/datashard/ut_change_exchange/ya.make b/ydb/core/tx/datashard/ut_change_exchange/ya.make index d1621d16cf9d..d3a52ea72035 100644 --- a/ydb/core/tx/datashard/ut_change_exchange/ya.make +++ b/ydb/core/tx/datashard/ut_change_exchange/ya.make @@ -23,7 +23,7 @@ PEERDIR( library/cpp/regex/pcre library/cpp/svnversion ydb/core/kqp/ut/common - ydb/core/testlib/default + ydb/core/testlib/pg ydb/core/tx ydb/library/yql/public/udf/service/exception_policy ydb/public/lib/yson_value