Skip to content

Commit

Permalink
Use topic api in pq_async_io tests (#6156)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Oct 3, 2024
1 parent d9828e0 commit 1faa700
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 27 deletions.
43 changes: 20 additions & 23 deletions ydb/tests/fq/pq_async_io/ut_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ void PQWrite(
cfg.SetDatabase(GetDefaultPqDatabase());
cfg.SetLog(CreateLogBackend("cerr"));
NYdb::TDriver driver(cfg);
NYdb::NPersQueue::TPersQueueClient client(driver);
NYdb::NPersQueue::TWriteSessionSettings sessionSettings;
NYdb::NTopic::TTopicClient client(driver);
NYdb::NTopic::TWriteSessionSettings sessionSettings;
sessionSettings
.Path(topic)
.MessageGroupId("src_id")
.Codec(NYdb::NPersQueue::ECodec::RAW);
.Codec(NYdb::NTopic::ECodec::RAW);
auto session = client.CreateSimpleBlockingWriteSession(sessionSettings);
for (const TString& data : sequence) {
UNIT_ASSERT_C(session->Write(data), "Failed to write message with body \"" << data << "\" to topic " << topic);
Expand All @@ -175,17 +175,16 @@ std::vector<TString> PQReadUntil(
cfg.SetDatabase(GetDefaultPqDatabase());
cfg.SetLog(CreateLogBackend("cerr"));
NYdb::TDriver driver(cfg);
NYdb::NPersQueue::TPersQueueClient client(driver);
NYdb::NPersQueue::TReadSessionSettings sessionSettings;
NYdb::NTopic::TTopicClient client(driver);
NYdb::NTopic::TReadSessionSettings sessionSettings;
sessionSettings
.AppendTopics(topic)
.ConsumerName(DefaultPqConsumer)
.DisableClusterDiscovery(true);
.ConsumerName(DefaultPqConsumer);

auto promise = NThreading::NewPromise();
std::vector<TString> result;

sessionSettings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& ev) {
sessionSettings.EventHandlers_.SimpleDataHandlers([&](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) {
for (const auto& message : ev.GetMessages()) {
result.emplace_back(message.GetData());
}
Expand All @@ -194,7 +193,7 @@ std::vector<TString> PQReadUntil(
}
}, false, false);

std::shared_ptr<NYdb::NPersQueue::IReadSession> session = client.CreateReadSession(sessionSettings);
std::shared_ptr<NYdb::NTopic::IReadSession> session = client.CreateReadSession(sessionSettings);
UNIT_ASSERT(promise.GetFuture().Wait(timeout));
session->Close(TDuration::Zero());
session = nullptr;
Expand Down Expand Up @@ -224,20 +223,18 @@ void PQCreateStream(const TString& streamName)
}

void AddReadRule(NYdb::TDriver& driver, const TString& streamName) {
NYdb::NPersQueue::TPersQueueClient client(driver);

auto result = client.AddReadRule(
streamName,
NYdb::NPersQueue::TAddReadRuleSettings()
.ReadRule(
NYdb::NPersQueue::TReadRuleSettings()
.ConsumerName(DefaultPqConsumer)
.ServiceType("yandex-query")
.SupportedCodecs({
NYdb::NPersQueue::ECodec::RAW
})
)
).ExtractValueSync();
NYdb::NTopic::TTopicClient client(driver);

auto alterTopicSettings =
NYdb::NTopic::TAlterTopicSettings()
.BeginAddConsumer(DefaultPqConsumer)
.SetSupportedCodecs(
{
NYdb::NTopic::ECodec::RAW
})
.EndAddConsumer();
auto result = client.AlterTopic(streamName, alterTopicSettings).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
}
Expand Down
3 changes: 1 addition & 2 deletions ydb/tests/fq/pq_async_io/ut_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/minikql/mkql_alloc.h>

#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/core/testlib/basics/runtime.h>

#include <library/cpp/testing/unittest/registar.h>
Expand Down
3 changes: 1 addition & 2 deletions ydb/tests/fq/pq_async_io/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ SRCS(
PEERDIR(
ydb/library/yql/minikql/computation/llvm14
ydb/library/yql/providers/common/ut_helpers
ydb/public/sdk/cpp/client/ydb_datastreams
ydb/public/sdk/cpp/client/ydb_persqueue_public
ydb/public/sdk/cpp/client/ydb_topic
)

YQL_LAST_ABI_VERSION()
Expand Down

0 comments on commit 1faa700

Please sign in to comment.