From 1faa700fdac58d295c2b813ffbd47b9f7cadf2d6 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 3 Oct 2024 17:24:57 +0300 Subject: [PATCH] Use topic api in pq_async_io tests (#6156) --- ydb/tests/fq/pq_async_io/ut_helpers.cpp | 43 ++++++++++++------------- ydb/tests/fq/pq_async_io/ut_helpers.h | 3 +- ydb/tests/fq/pq_async_io/ya.make | 3 +- 3 files changed, 22 insertions(+), 27 deletions(-) diff --git a/ydb/tests/fq/pq_async_io/ut_helpers.cpp b/ydb/tests/fq/pq_async_io/ut_helpers.cpp index f5aaf34f6831..a26d2b367bee 100644 --- a/ydb/tests/fq/pq_async_io/ut_helpers.cpp +++ b/ydb/tests/fq/pq_async_io/ut_helpers.cpp @@ -148,12 +148,12 @@ void PQWrite( cfg.SetDatabase(GetDefaultPqDatabase()); cfg.SetLog(CreateLogBackend("cerr")); NYdb::TDriver driver(cfg); - NYdb::NPersQueue::TPersQueueClient client(driver); - NYdb::NPersQueue::TWriteSessionSettings sessionSettings; + NYdb::NTopic::TTopicClient client(driver); + NYdb::NTopic::TWriteSessionSettings sessionSettings; sessionSettings .Path(topic) .MessageGroupId("src_id") - .Codec(NYdb::NPersQueue::ECodec::RAW); + .Codec(NYdb::NTopic::ECodec::RAW); auto session = client.CreateSimpleBlockingWriteSession(sessionSettings); for (const TString& data : sequence) { UNIT_ASSERT_C(session->Write(data), "Failed to write message with body \"" << data << "\" to topic " << topic); @@ -175,17 +175,16 @@ std::vector PQReadUntil( cfg.SetDatabase(GetDefaultPqDatabase()); cfg.SetLog(CreateLogBackend("cerr")); NYdb::TDriver driver(cfg); - NYdb::NPersQueue::TPersQueueClient client(driver); - NYdb::NPersQueue::TReadSessionSettings sessionSettings; + NYdb::NTopic::TTopicClient client(driver); + NYdb::NTopic::TReadSessionSettings sessionSettings; sessionSettings .AppendTopics(topic) - .ConsumerName(DefaultPqConsumer) - .DisableClusterDiscovery(true); + .ConsumerName(DefaultPqConsumer); auto promise = NThreading::NewPromise(); std::vector result; - sessionSettings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& ev) { + sessionSettings.EventHandlers_.SimpleDataHandlers([&](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) { for (const auto& message : ev.GetMessages()) { result.emplace_back(message.GetData()); } @@ -194,7 +193,7 @@ std::vector PQReadUntil( } }, false, false); - std::shared_ptr session = client.CreateReadSession(sessionSettings); + std::shared_ptr session = client.CreateReadSession(sessionSettings); UNIT_ASSERT(promise.GetFuture().Wait(timeout)); session->Close(TDuration::Zero()); session = nullptr; @@ -224,20 +223,18 @@ void PQCreateStream(const TString& streamName) } void AddReadRule(NYdb::TDriver& driver, const TString& streamName) { - NYdb::NPersQueue::TPersQueueClient client(driver); - - auto result = client.AddReadRule( - streamName, - NYdb::NPersQueue::TAddReadRuleSettings() - .ReadRule( - NYdb::NPersQueue::TReadRuleSettings() - .ConsumerName(DefaultPqConsumer) - .ServiceType("yandex-query") - .SupportedCodecs({ - NYdb::NPersQueue::ECodec::RAW - }) - ) - ).ExtractValueSync(); + NYdb::NTopic::TTopicClient client(driver); + + auto alterTopicSettings = + NYdb::NTopic::TAlterTopicSettings() + .BeginAddConsumer(DefaultPqConsumer) + .SetSupportedCodecs( + { + NYdb::NTopic::ECodec::RAW + }) + .EndAddConsumer(); + auto result = client.AlterTopic(streamName, alterTopicSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); } diff --git a/ydb/tests/fq/pq_async_io/ut_helpers.h b/ydb/tests/fq/pq_async_io/ut_helpers.h index 6e9f92007d2b..1fc4653a0430 100644 --- a/ydb/tests/fq/pq_async_io/ut_helpers.h +++ b/ydb/tests/fq/pq_async_io/ut_helpers.h @@ -9,8 +9,7 @@ #include #include -#include -#include +#include #include #include diff --git a/ydb/tests/fq/pq_async_io/ya.make b/ydb/tests/fq/pq_async_io/ya.make index c27e93ce4ce6..82f425209b20 100644 --- a/ydb/tests/fq/pq_async_io/ya.make +++ b/ydb/tests/fq/pq_async_io/ya.make @@ -7,8 +7,7 @@ SRCS( PEERDIR( ydb/library/yql/minikql/computation/llvm14 ydb/library/yql/providers/common/ut_helpers - ydb/public/sdk/cpp/client/ydb_datastreams - ydb/public/sdk/cpp/client/ydb_persqueue_public + ydb/public/sdk/cpp/client/ydb_topic ) YQL_LAST_ABI_VERSION()