Skip to content

Commit

Permalink
Revert "Support PG types in CDC (#9337)" (#9526)
Browse files Browse the repository at this point in the history
  • Loading branch information
maximyurchuk authored Sep 19, 2024
1 parent 8d954a5 commit 56fbcfc
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 41 deletions.
3 changes: 2 additions & 1 deletion ydb/core/tx/datashard/change_record_cdc_serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ class TJsonSerializer: public TBaseSerializer {
case NScheme::NTypeIds::Yson:
return YsonToJson(cell.AsBuf());
case NScheme::NTypeIds::Pg:
return NJson::TJsonValue(PgToString(cell.AsBuf(), type));
// TODO: support pg types
Y_ABORT("pg types are not supported");
case NScheme::NTypeIds::Uuid:
return NJson::TJsonValue(NUuid::UuidBytesToString(cell.Data()));
default:
Expand Down
10 changes: 2 additions & 8 deletions ydb/core/tx/datashard/change_sender_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/scheme/protos/type_info.pb.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
Expand Down Expand Up @@ -620,13 +619,8 @@ class TCdcChangeSenderMain

schema.reserve(pqConfig.PartitionKeySchemaSize());
for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) {
if (keySchema.GetTypeId() == NScheme::NTypeIds::Pg) {
schema.push_back(NScheme::TTypeInfo(
keySchema.GetTypeId(),
NPg::TypeDescFromPgTypeId(keySchema.GetTypeInfo().GetPgTypeId())));
} else {
schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId()));
}
// TODO: support pg types
schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId()));
}

TSet<TPQPartitionInfo, TPQPartitionInfo::TLess> partitions(schema);
Expand Down
25 changes: 1 addition & 24 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,9 +838,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
.SetEnableChangefeedDebeziumJsonFormat(true)
.SetEnableTopicMessageMeta(true)
.SetEnableChangefeedInitialScan(true)
.SetEnableUuidAsPrimaryKey(true)
.SetEnableTablePgTypes(true)
.SetEnablePgSyntax(true);
.SetEnableUuidAsPrimaryKey(true);

Server = new TServer(settings);
if (useRealThreads) {
Expand Down Expand Up @@ -1974,13 +1972,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
{"datetime64_value", "Datetime64", false, false},
{"timestamp64_value", "Timestamp64", false, false},
{"interval64_value", "Interval64", false, false},
{"pgint2_value", "pgint2", false, false},
{"pgint4_value", "pgint4", false, false},
{"pgint8_value", "pgint8", false, false},
{"pgfloat4_value", "pgfloat4", false, false},
{"pgfloat8_value", "pgfloat8", false, false},
{"pgbytea_value", "pgbytea", false, false},
{"pgtext_value", "pgtext", false, false},
});
TopicRunner::Read(table, Updates(NKikimrSchemeOp::ECdcStreamFormatJson), {
R"(UPSERT INTO `/Root/Table` (key, int32_value) VALUES (1, -100500);)",
Expand All @@ -2006,13 +1997,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
R"(UPSERT INTO `/Root/Table` (key, datetime64_value) VALUES (21, CAST(1597235696 AS Datetime64));)",
R"(UPSERT INTO `/Root/Table` (key, timestamp64_value) VALUES (22, CAST(1597235696123456 AS Timestamp64));)",
R"(UPSERT INTO `/Root/Table` (key, interval64_value) VALUES (23, CAST(-300500 AS Interval64));)",
R"(UPSERT INTO `/Root/Table` (key, pgint2_value) VALUES (24, -42ps);)",
R"(UPSERT INTO `/Root/Table` (key, pgint4_value) VALUES (25, -420p);)",
R"(UPSERT INTO `/Root/Table` (key, pgint8_value) VALUES (26, -4200pb);)",
R"(UPSERT INTO `/Root/Table` (key, pgfloat4_value) VALUES (27, 3.1415pf4);)",
R"(UPSERT INTO `/Root/Table` (key, pgfloat8_value) VALUES (28, 2.718pf8);)",
R"(UPSERT INTO `/Root/Table` (key, pgbytea_value) VALUES (29, 'lorem "ipsum"'pb);)",
R"(UPSERT INTO `/Root/Table` (key, pgtext_value) VALUES (30, 'lorem "ipsum"'p);)",
}, {
R"({"key":[1],"update":{"int32_value":-100500}})",
R"({"key":[2],"update":{"uint32_value":100500}})",
Expand All @@ -2037,13 +2021,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
R"({"key":[21],"update":{"datetime64_value":1597235696}})",
R"({"key":[22],"update":{"timestamp64_value":1597235696123456}})",
R"({"key":[23],"update":{"interval64_value":-300500}})",
R"({"key":[24],"update":{"pgint2_value":"-42"}})",
R"({"key":[25],"update":{"pgint4_value":"-420"}})",
R"({"key":[26],"update":{"pgint8_value":"-4200"}})",
R"({"key":[27],"update":{"pgfloat4_value":"3.1415"}})",
R"({"key":[28],"update":{"pgfloat8_value":"2.718"}})",
R"({"key":[29],"update":{"pgbytea_value":"\\x6c6f72656d2022697073756d22"}})",
R"({"key":[30],"update":{"pgtext_value":"lorem \"ipsum\""}})",
});
}

Expand Down
6 changes: 0 additions & 6 deletions ydb/core/tx/datashard/export_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,6 @@ TString DyNumberToString(TStringBuf data) {
return result;
}

TString PgToString(TStringBuf data, const NScheme::TTypeInfo& typeInfo) {
const NPg::TConvertResult& pgResult = NPg::PgNativeTextFromNativeBinary(data, typeInfo.GetPgTypeDesc());
Y_ABORT_UNLESS(pgResult.Error.Empty());
return pgResult.Str;
}

bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TString& err) {
Y_UNUSED(err);
using namespace NYql::NDecimal;
Expand Down
1 change: 0 additions & 1 deletion ydb/core/tx/datashard/export_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ TMaybe<Ydb::Scheme::ModifyPermissionsRequest> GenYdbPermissions(

TString DecimalToString(const std::pair<ui64, i64>& loHi);
TString DyNumberToString(TStringBuf data);
TString PgToString(TStringBuf data, const NScheme::TTypeInfo& typeInfo);
bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TString& err);
bool DyNumberToStream(TStringBuf data, IOutputStream& out, TString& err);
bool PgToStream(TStringBuf data, const NScheme::TTypeInfo& typeInfo, IOutputStream& out, TString& err);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/ut_change_exchange/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ PEERDIR(
library/cpp/regex/pcre
library/cpp/svnversion
ydb/core/kqp/ut/common
ydb/core/testlib/pg
ydb/core/testlib/default
ydb/core/tx
ydb/library/yql/public/udf/service/exception_policy
ydb/public/lib/yson_value
Expand Down

0 comments on commit 56fbcfc

Please sign in to comment.