Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support PG types in CDC #9337

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ydb/core/tx/datashard/change_record_cdc_serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ class TJsonSerializer: public TBaseSerializer {
case NScheme::NTypeIds::Yson:
return YsonToJson(cell.AsBuf());
case NScheme::NTypeIds::Pg:
// TODO: support pg types
Y_ABORT("pg types are not supported");
return NJson::TJsonValue(PgToString(cell.AsBuf(), type));
case NScheme::NTypeIds::Uuid:
return NJson::TJsonValue(NUuid::UuidBytesToString(cell.Data()));
default:
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/tx/datashard/change_sender_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/scheme/protos/type_info.pb.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
Expand Down Expand Up @@ -619,8 +620,13 @@ class TCdcChangeSenderMain

schema.reserve(pqConfig.PartitionKeySchemaSize());
for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) {
// TODO: support pg types
schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId()));
if (keySchema.GetTypeId() == NScheme::NTypeIds::Pg) {
schema.push_back(NScheme::TTypeInfo(
keySchema.GetTypeId(),
NPg::TypeDescFromPgTypeId(keySchema.GetTypeInfo().GetPgTypeId())));
} else {
schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId()));
}
}

TSet<TPQPartitionInfo, TPQPartitionInfo::TLess> partitions(schema);
Expand Down
25 changes: 24 additions & 1 deletion ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,9 @@ Y_UNIT_TEST_SUITE(Cdc) {
.SetEnableChangefeedDebeziumJsonFormat(true)
.SetEnableTopicMessageMeta(true)
.SetEnableChangefeedInitialScan(true)
.SetEnableUuidAsPrimaryKey(true);
.SetEnableUuidAsPrimaryKey(true)
.SetEnableTablePgTypes(true)
.SetEnablePgSyntax(true);

Server = new TServer(settings);
if (useRealThreads) {
Expand Down Expand Up @@ -1972,6 +1974,13 @@ Y_UNIT_TEST_SUITE(Cdc) {
{"datetime64_value", "Datetime64", false, false},
{"timestamp64_value", "Timestamp64", false, false},
{"interval64_value", "Interval64", false, false},
{"pgint2_value", "pgint2", false, false},
{"pgint4_value", "pgint4", false, false},
{"pgint8_value", "pgint8", false, false},
{"pgfloat4_value", "pgfloat4", false, false},
{"pgfloat8_value", "pgfloat8", false, false},
{"pgbytea_value", "pgbytea", false, false},
{"pgtext_value", "pgtext", false, false},
});
TopicRunner::Read(table, Updates(NKikimrSchemeOp::ECdcStreamFormatJson), {
R"(UPSERT INTO `/Root/Table` (key, int32_value) VALUES (1, -100500);)",
Expand All @@ -1997,6 +2006,13 @@ Y_UNIT_TEST_SUITE(Cdc) {
R"(UPSERT INTO `/Root/Table` (key, datetime64_value) VALUES (21, CAST(1597235696 AS Datetime64));)",
R"(UPSERT INTO `/Root/Table` (key, timestamp64_value) VALUES (22, CAST(1597235696123456 AS Timestamp64));)",
R"(UPSERT INTO `/Root/Table` (key, interval64_value) VALUES (23, CAST(-300500 AS Interval64));)",
R"(UPSERT INTO `/Root/Table` (key, pgint2_value) VALUES (24, -42ps);)",
R"(UPSERT INTO `/Root/Table` (key, pgint4_value) VALUES (25, -420p);)",
R"(UPSERT INTO `/Root/Table` (key, pgint8_value) VALUES (26, -4200pb);)",
R"(UPSERT INTO `/Root/Table` (key, pgfloat4_value) VALUES (27, 3.1415pf4);)",
R"(UPSERT INTO `/Root/Table` (key, pgfloat8_value) VALUES (28, 2.718pf8);)",
R"(UPSERT INTO `/Root/Table` (key, pgbytea_value) VALUES (29, 'lorem "ipsum"'pb);)",
R"(UPSERT INTO `/Root/Table` (key, pgtext_value) VALUES (30, 'lorem "ipsum"'p);)",
}, {
R"({"key":[1],"update":{"int32_value":-100500}})",
R"({"key":[2],"update":{"uint32_value":100500}})",
Expand All @@ -2021,6 +2037,13 @@ Y_UNIT_TEST_SUITE(Cdc) {
R"({"key":[21],"update":{"datetime64_value":1597235696}})",
R"({"key":[22],"update":{"timestamp64_value":1597235696123456}})",
R"({"key":[23],"update":{"interval64_value":-300500}})",
R"({"key":[24],"update":{"pgint2_value":"-42"}})",
R"({"key":[25],"update":{"pgint4_value":"-420"}})",
R"({"key":[26],"update":{"pgint8_value":"-4200"}})",
R"({"key":[27],"update":{"pgfloat4_value":"3.1415"}})",
R"({"key":[28],"update":{"pgfloat8_value":"2.718"}})",
R"({"key":[29],"update":{"pgbytea_value":"\\x6c6f72656d2022697073756d22"}})",
R"({"key":[30],"update":{"pgtext_value":"lorem \"ipsum\""}})",
});
}

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/datashard/export_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ TString DyNumberToString(TStringBuf data) {
return result;
}

TString PgToString(TStringBuf data, const NScheme::TTypeInfo& typeInfo) {
const NPg::TConvertResult& pgResult = NPg::PgNativeTextFromNativeBinary(data, typeInfo.GetPgTypeDesc());
Y_ABORT_UNLESS(pgResult.Error.Empty());
return pgResult.Str;
}

bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TString& err) {
Y_UNUSED(err);
using namespace NYql::NDecimal;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/export_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ TMaybe<Ydb::Scheme::ModifyPermissionsRequest> GenYdbPermissions(

TString DecimalToString(const std::pair<ui64, i64>& loHi);
TString DyNumberToString(TStringBuf data);
TString PgToString(TStringBuf data, const NScheme::TTypeInfo& typeInfo);
bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TString& err);
bool DyNumberToStream(TStringBuf data, IOutputStream& out, TString& err);
bool PgToStream(TStringBuf data, const NScheme::TTypeInfo& typeInfo, IOutputStream& out, TString& err);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/ut_change_exchange/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ PEERDIR(
library/cpp/regex/pcre
library/cpp/svnversion
ydb/core/kqp/ut/common
ydb/core/testlib/default
ydb/core/testlib/pg
ydb/core/tx
ydb/library/yql/public/udf/service/exception_policy
ydb/public/lib/yson_value
Expand Down
Loading