Skip to content

Commit

Permalink
Optimize CPU usage when read blob (always use count limit) (#12349)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Dec 7, 2024
1 parent 817dc99 commit 751da8a
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 63 deletions.
2 changes: 1 addition & 1 deletion ydb/core/persqueue/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) const {
ui32 sourceIdCount = 0;
TVector<TString> sourceIds;

static NScheme::TTypeCodecs ui32Codecs(NScheme::NTypeIds::Uint32), ui64Codecs(NScheme::NTypeIds::Uint64), stringCodecs(NScheme::NTypeIds::String);
static const NScheme::TTypeCodecs ui32Codecs(NScheme::NTypeIds::Uint32), ui64Codecs(NScheme::NTypeIds::Uint64), stringCodecs(NScheme::NTypeIds::String);
//read order
{
auto chunk = NScheme::IChunkDecoder::ReadChunk(GetChunk(data, dataEnd), &ui32Codecs);
Expand Down
18 changes: 11 additions & 7 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,15 +425,16 @@ TReadAnswer TReadInfo::FormAnswer(
size -= lastBlobSize;
}
lastBlobSize = 0;
return (size >= Size || cnt >= Count);
return cnt >= Count;
}
return !AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && (size >= Size || cnt >= Count);
// For backward compatibility, we keep the behavior for older clients for non-FirstClassCitizen
return !AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && cnt >= Count;
};

Y_ABORT_UNLESS(blobs.size() == Blobs.size());
response->Check();
bool needStop = false;
for (ui32 pos = 0; pos < blobs.size() && !needStop; ++pos) {
for (ui32 pos = 0; pos < blobs.size() && !needStop && size < Size; ++pos) {
Y_ABORT_UNLESS(Blobs[pos].Offset == blobs[pos].Offset, "Mismatch %" PRIu64 " vs %" PRIu64, Blobs[pos].Offset, blobs[pos].Offset);
Y_ABORT_UNLESS(Blobs[pos].Count == blobs[pos].Count, "Mismatch %" PRIu32 " vs %" PRIu32, Blobs[pos].Count, blobs[pos].Count);

Expand Down Expand Up @@ -470,7 +471,7 @@ TReadAnswer TReadInfo::FormAnswer(
Y_ABORT_UNLESS(offset < Offset || partNo <= PartNo);
TKey key(TKeyPrefix::TypeData, TPartitionId(0), offset, partNo, count, internalPartsCount, false);
ui64 firstHeaderOffset = GetFirstHeaderOffset(key, blobValue);
for (TBlobIterator it(key, blobValue); it.IsValid(); it.Next()) {
for (TBlobIterator it(key, blobValue); it.IsValid() && !needStop; it.Next()) {
TBatch batch = it.GetBatch();
auto& header = batch.Header;
batch.Unpack();
Expand All @@ -489,8 +490,8 @@ TReadAnswer TReadInfo::FormAnswer(
continue;


PQ_LOG_D("FormAnswer processing batch offset "
<< (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount() << " size " << header.GetPayloadSize() << " from pos " << pos << " cbcount " << batch.Blobs.size());
PQ_LOG_D("FormAnswer processing batch offset " << (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount()
<< " size " << header.GetPayloadSize() << " from pos " << pos << " cbcount " << batch.Blobs.size());

for (size_t i = pos; i < batch.Blobs.size(); ++i) {
TClientBlob &res = batch.Blobs[i];
Expand All @@ -514,7 +515,10 @@ TReadAnswer TReadInfo::FormAnswer(
++PartNo;
}

needStop = updateUsage(res);
if (updateUsage(res)) {
needStop = true;
break;
}
}
}
}
Expand Down
58 changes: 19 additions & 39 deletions ydb/core/persqueue/ut/pq_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1174,13 +1174,9 @@ Y_UNIT_TEST(TestWritePQBigMessage) {
CmdWrite(0, "sourceid0", data, tc, false, {}, true);
PQGetPartInfo(0, 27, tc);

Cerr << ">>>>> 1" << Endl << Flush;
CmdRead(0, 0, Max<i32>(), Max<i32>(), 13, false, tc);
Cerr << ">>>>> 2" << Endl << Flush;
CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc);
CmdRead(0, 1, Max<i32>(), Max<i32>(), 25, false, tc);
Cerr << ">>>>> 3" << Endl << Flush;
CmdRead(0, 24, Max<i32>(), Max<i32>(), 2, false, tc);
Cerr << ">>>>> 4" << Endl << Flush;
CmdRead(0, 26, Max<i32>(), Max<i32>(), 1, false, tc);

activeZone = false;
Expand Down Expand Up @@ -1606,27 +1602,19 @@ Y_UNIT_TEST(TestPQRead) {
CmdRead(0, 26, Max<i32>(), Max<i32>(), 0, true, tc);

CmdRead(0, 0, Max<i32>(), Max<i32>(), 25, false, tc);
CmdRead(0, 0, 10, 100_MB, 15, false, tc);
CmdRead(0, 9, 1, 100_MB, 6, false, tc);
CmdRead(0, 0, 10, 100_MB, 10, false, tc);
CmdRead(0, 9, 1, 100_MB, 1, false, tc);
CmdRead(0, 23, 3, 100_MB, 3, false, tc);

Cerr << ">>>>> CmdRead 1" << Endl << Flush;
CmdRead(0, 3, 1000, 511_KB, 4, false, tc);
Cerr << ">>>>> CmdRead 2" << Endl << Flush;
CmdRead(0, 3, 1000, 511_KB, 4, false, tc);
Cerr << ">>>>> CmdRead 3" << Endl << Flush;
CmdRead(0, 3, 1000, 1_KB, 4, false, tc); //at least one message will be readed always
Cerr << ">>>>> CmdRead 4" << Endl << Flush;
CmdRead(0, 25, 1000, 1_KB, 1, false, tc); //at least one message will be readed always, from head

activeZone = true;
Cerr << ">>>>> CmdRead 5" << Endl << Flush;
CmdRead(0, 9, 1000, 3_MB, 6, false, tc);
Cerr << ">>>>> CmdRead 6" << Endl << Flush;
CmdRead(0, 9, 1000, 3_MB - 10_KB, 6, false, tc);
Cerr << ">>>>> CmdRead 7" << Endl << Flush;
CmdRead(0, 25, 1000, 512_KB, 1, false, tc); //from head
Cerr << ">>>>> CmdRead 8" << Endl << Flush;
CmdRead(0, 24, 1000, 512_KB, 1, false, tc); //from head

CmdRead(0, 23, 1000, 98_MB, 3, false, tc);
Expand Down Expand Up @@ -1715,16 +1703,16 @@ Y_UNIT_TEST(TestPQReadAhead) {
PQGetPartInfo(0, 22, tc);
activeZone = true;

Cerr << ">>>>> 1" << Endl << Flush;
CmdRead(0, 0, 1, 100_MB, 12, false, tc);
Cerr << ">>>>> 2" << Endl << Flush;
CmdRead(0, 1, 1, 100_MB, 11, false, tc);
Cerr << ">>>>> 3" << Endl << Flush;
CmdRead(0, 2, 1, 100_MB, 10, false, tc);
Cerr << ">>>>> 4" << Endl << Flush;
CmdRead(0, 3, 1, 100_MB, 9, false, tc);
Cerr << ">>>>> 5" << Endl << Flush;
CmdRead(0, 4, 10, 100_MB, 16, false, tc);
CmdRead(0, 0, 1, 100_MB, 1, false, tc);
CmdRead(0, 1, 1, 100_MB, 1, false, tc);
CmdRead(0, 2, 1, 100_MB, 1, false, tc);
CmdRead(0, 3, 1, 100_MB, 1, false, tc);
CmdRead(0, 4, 10, 100_MB, 10, false, tc);

CmdRead(0, 0, Max<i32>(), 100_KB, 12, false, tc);
CmdRead(0, 1, Max<i32>(), 100_KB, 11, false, tc);
CmdRead(0, 2, Max<i32>(), 100_KB, 10, false, tc);
CmdRead(0, 3, Max<i32>(), 100_KB, 9, false, tc);
});
}

Expand Down Expand Up @@ -2040,7 +2028,7 @@ Y_UNIT_TEST(TestPQCacheSizeManagement) {

TAutoPtr<IEventHandle> handle;
for (ui32 i = 0; i < 10; ++i) {
CmdRead(0, 0, 1, 100_MB, 7, false, tc);
CmdRead(0, 0, 1, 100_MB, 1, false, tc);
PQTabletRestart(tc);
}
});
Expand Down Expand Up @@ -2091,28 +2079,20 @@ Y_UNIT_TEST(TestMaxTimeLagRewind) {
}
const auto ts = tc.Runtime->GetCurrentTime();

Cerr << ">>>>> 1" << Endl << Flush;
CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {0});
Cerr << ">>>>> 2" << Endl << Flush;
CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {21}, TDuration::Minutes(3).MilliSeconds());
Cerr << ">>>>> 3" << Endl << Flush;
CmdRead(0, 22, 1, Max<i32>(), 6, false, tc, {22}, TDuration::Minutes(3).MilliSeconds());
Cerr << ">>>>> 4" << Endl << Flush;
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {0});
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, TDuration::Minutes(3).MilliSeconds());
CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, TDuration::Minutes(3).MilliSeconds());
CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 1000);

Cerr << ">>>>> 5" << Endl << Flush;
CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {21}, 0,
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 0,
(ts - TDuration::Minutes(3)).MilliSeconds());
Cerr << ">>>>> 6" << Endl << Flush;
CmdRead(0, 22, 1, Max<i32>(), 6, false, tc, {22}, 0,
CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 0,
(ts - TDuration::Minutes(3)).MilliSeconds());
Cerr << ">>>>> 7" << Endl << Flush;
CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 0,
(ts - TDuration::Seconds(1)).MilliSeconds());

PQTabletPrepare({.readFromTimestampsMs=(ts - TDuration::Seconds(1)).MilliSeconds()},
{{"aaa", true}}, tc);
Cerr << ">>>>> 8" << Endl << Flush;
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {34});

});
Expand Down
60 changes: 60 additions & 0 deletions ydb/services/datastreams/datastreams_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2353,6 +2353,66 @@ Y_UNIT_TEST_SUITE(DataStreams) {

}

Y_UNIT_TEST(TestGetRecordsWithCount) {
TInsecureDatastreamsTestServer testServer;
const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME;
{
auto result = testServer.DataStreamsClient->CreateStream(streamName,
NYDS_V1::TCreateStreamSettings().ShardCount(1)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

const ui32 recordsCount = 16;
std::vector<ui64> timestamps;
{
std::string data;
data.resize(1_MB); // big messages. compaction must will be completed.
std::iota(data.begin(), data.end(), 1);
std::random_device rd;
std::mt19937 generator{rd()};

for (ui32 i = 1; i <= recordsCount; ++i) {
std::shuffle(data.begin(), data.end(), generator);
{
TString id = Sprintf("%04u", i);
NYDS_V1::TDataRecord dataRecord{{data.begin(), data.end()}, id, ""};
//
// we make sure that the value of WriteTimestampMs is between neighboring timestamps
//
timestamps.push_back(TInstant::Now().MilliSeconds());
Sleep(TDuration::MilliSeconds(500));
auto result = testServer.DataStreamsClient->PutRecord(streamName, dataRecord).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
Sleep(TDuration::MilliSeconds(500));
}
}

for (ui32 i = 0; i < recordsCount; ++i) {
TString shardIterator;

{
auto result = testServer.DataStreamsClient->GetShardIterator(streamName, "shard-000000",
YDS_V1::ShardIteratorType::AT_TIMESTAMP,
NYDS_V1::TGetShardIteratorSettings().Timestamp(timestamps[i])).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
shardIterator = result.GetResult().shard_iterator();
}

{
auto result = testServer.DataStreamsClient->GetRecords(shardIterator,
NYDS_V1::TGetRecordsSettings().Limit(1)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResult().records().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(std::stoi(result.GetResult().records().begin()->sequence_number()), i);
}
}
}

Y_UNIT_TEST(TestGetRecordsStreamWithMultipleShards) {
TInsecureDatastreamsTestServer testServer;
const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME;
Expand Down
24 changes: 8 additions & 16 deletions ydb/services/persqueue_v1/persqueue_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1161,14 +1161,12 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
ui32 totalMsg = 0;
ui64 nextReadId = 1;
Sleep(TDuration::Seconds(3));
Cerr << ">>>>> 1" << Endl << Flush;
setup.DoWrite(pqClient->GetDriver(), "acc/topic1", 1_MB, 50);

Cerr << "First read\n";
setup.DoRead(assignId, nextReadId, totalMsg, 40);
setup.DoRead(assignId, nextReadId, totalMsg, 43);
setup.DoRead(assignId, nextReadId, totalMsg, 42);

Cerr << ">>>>> 3" << Endl << Flush;
Topic::StreamReadMessage::FromClient req;
req.mutable_read_request()->set_bytes_size(40_MB);
if (!setup.ControlStream->Write(req)) {
Expand All @@ -1177,14 +1175,11 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Cerr << "Second read\n";
setup.DoRead(assignId, nextReadId, totalMsg, 50);

Cerr << ">>>>> 5" << Endl << Flush;
Sleep(TDuration::Seconds(1));
Cerr << ">>>>> 6" << Endl << Flush;
auto cachedData = RequestCacheData(runtime, new TEvPQ::TEvGetFullDirectReadData());
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.begin()->second.StagedReads.size(), 0);
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.begin()->second.Reads.size(), 0);
Cerr << ">>>>> 7" << Endl << Flush;
}

Y_UNIT_TEST(DirectReadBadCases) {
Expand Down Expand Up @@ -2822,7 +2817,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value);

Cerr << ">>>>> 1" << Endl << Flush;
auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 23);
auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16);
Cerr << ">>>>> 2" << Endl << Flush;
auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);

Expand All @@ -2834,9 +2829,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", 32+i}, value);

Cerr << ">>>>> 3" << Endl << Flush;
info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 23);
info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16);
Cerr << ">>>>> 4" << Endl << Flush;
info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 22);
info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);

ui32 fromDisk = info0.BlobsFromDisk + info16.BlobsFromDisk;
ui32 fromCache = info0.BlobsFromCache + info16.BlobsFromCache;
Expand Down Expand Up @@ -2896,16 +2891,13 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
server.AnnoyingClient->WriteToPQ({secondTopic, 0, "source1", i}, mb);
}

Cerr << ">>>>> 1" << Endl << Flush;
auto info1 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 1, "user1"}, 7);
Cerr << ">>>>> 2" << Endl << Flush;
auto info2 = server.AnnoyingClient->ReadFromPQ({secondTopic, 0, 0, 1, "user1"}, 7);
Cerr << ">>>>> 3" << Endl << Flush;
auto info1 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 1, "user1"}, 1);
auto info2 = server.AnnoyingClient->ReadFromPQ({secondTopic, 0, 0, 1, "user1"}, 1);

UNIT_ASSERT_VALUES_EQUAL(info1.BlobsFromCache, 1);
UNIT_ASSERT_VALUES_EQUAL(info2.BlobsFromCache, 1);
UNIT_ASSERT_VALUES_EQUAL(info1.Values.size(), 7);
UNIT_ASSERT_VALUES_EQUAL(info2.Values.size(), 7);
UNIT_ASSERT_VALUES_EQUAL(info1.Values.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(info2.Values.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(info1.Values[0].size(), valueSize);
UNIT_ASSERT_VALUES_EQUAL(info2.Values[0].size(), valueSize);
UNIT_ASSERT(info1.Values[0] == value1);
Expand Down

0 comments on commit 751da8a

Please sign in to comment.