Skip to content

Commit

Permalink
Merge 607a6d6 into 1738d8c
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Oct 10, 2024
2 parents 1738d8c + 607a6d6 commit 0672d17
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions ydb/services/persqueue_v1/actors/read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1724,12 +1724,22 @@ template <typename TServerMessage>
i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
constexpr bool UseMigrationProtocol = std::is_same_v<TServerMessage, PersQueue::V1::MigrationStreamingReadServerMessage>;

auto* partition_data = UseMigrationProtocol
? resp.mutable_data_batch()->mutable_partition_data(0)
: resp.mutable_read_response()->mutable_partition_data(0);

Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0);

for (auto& batch : *partition_data->mutable_batches()) {
if (batch.codec() == Ydb::PersQueue::V1::CODEC_UNSPECIFIED) {
batch.set_codec(Ydb::PersQueue::V1::CODEC_RAW);
}
}

if constexpr (UseMigrationProtocol) {
Y_ABORT_UNLESS(resp.data_batch().partition_data_size() == 1);
Response.mutable_data_batch()->add_partition_data()->Swap(resp.mutable_data_batch()->mutable_partition_data(0));
Response.mutable_data_batch()->add_partition_data()->Swap(partition_data);
} else {
Y_ABORT_UNLESS(resp.read_response().partition_data_size() == 1);
Response.mutable_read_response()->add_partition_data()->Swap(resp.mutable_read_response()->mutable_partition_data(0));
Response.mutable_read_response()->add_partition_data()->Swap(partition_data);
}

Response.set_status(Ydb::StatusIds::SUCCESS);
Expand All @@ -1739,6 +1749,7 @@ i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
return ByteSize - prev;
}


template <typename TServerMessage>
i64 TFormedReadResponse<TServerMessage>::ApplyDirectReadResponse(TEvPQProxy::TEvDirectReadResponse::TPtr& ev) {

Expand Down

0 comments on commit 0672d17

Please sign in to comment.