Skip to content

Commit

Permalink
YQ-3975 RD fixed fault for parsing errors without filter (#12707)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Dec 19, 2024
1 parent 3317b75 commit 01d95e5
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,7 @@ class TTopicFilters : public ITopicFilters {
continue;
}

if (filterHandler.GetPurecalcFilter()) {
PushToFilter(filterHandler, offsets, columnIndex, values, numberRows);
continue;
}

// Clients without filters
LOG_ROW_DISPATCHER_TRACE("Add " << numberRows << " rows to client " << consumer->GetFilterId() << " without filtering");
for (ui64 rowId = 0; rowId < numberRows; ++rowId) {
consumer->OnFilteredData(rowId);
}
PushToFilter(filterHandler, offsets, columnIndex, values, numberRows);
}
Stats.AddFilterLatency(TInstant::Now() - startFilter);
}
Expand Down Expand Up @@ -193,7 +184,9 @@ class TTopicFilters : public ITopicFilters {
LOG_ROW_DISPATCHER_TRACE("Create filter with id " << filter->GetFilterId());

IPurecalcFilter::TPtr purecalcFilter;
if (filter->GetWhereFilter()) {
if (const auto& predicate = filter->GetWhereFilter()) {
LOG_ROW_DISPATCHER_TRACE("Create purecalc filter for predicate '" << predicate << "' (filter id: " << filter->GetFilterId() << ")");

auto filterStatus = CreatePurecalcFilter(filter);
if (filterStatus.IsFail()) {
return filterStatus;
Expand Down Expand Up @@ -225,9 +218,6 @@ class TTopicFilters : public ITopicFilters {

private:
void PushToFilter(const TFilterHandler& filterHandler, const TVector<ui64>& offsets, const TVector<ui64>& columnIndex, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) {
const auto filter = filterHandler.GetPurecalcFilter();
Y_ENSURE(filter, "Expected initialized filter");

const auto consumer = filterHandler.GetConsumer();
const auto& columnIds = consumer->GetColumnIds();

Expand All @@ -246,8 +236,13 @@ class TTopicFilters : public ITopicFilters {
}
}

LOG_ROW_DISPATCHER_TRACE("Pass " << numberRows << " rows to purecalc filter (client id: " << consumer->GetFilterId() << ")");
filter->FilterData(result, numberRows);
if (const auto filter = filterHandler.GetPurecalcFilter()) {
LOG_ROW_DISPATCHER_TRACE("Pass " << numberRows << " rows to purecalc filter (filter id: " << consumer->GetFilterId() << ")");
filter->FilterData(result, numberRows);
} else if (numberRows) {
LOG_ROW_DISPATCHER_TRACE("Add " << numberRows << " rows to client " << consumer->GetFilterId() << " without filtering");
consumer->OnFilteredBatch(0, numberRows - 1);
}
}

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class IFilteredDataConsumer : public IPurecalcFilterConsumer {
virtual const TVector<ui64>& GetColumnIds() const = 0;
virtual TMaybe<ui64> GetNextMessageOffset() const = 0;

virtual void OnFilteredBatch(ui64 firstRow, ui64 lastRow) = 0; // inclusive interval [firstRow, lastRow]

virtual void OnFilterStarted() = 0;
virtual void OnFilteringError(TStatus status) = 0;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
Client->StartClientSession();
}

void OnFilteredBatch(ui64 firstRow, ui64 lastRow) override {
LOG_ROW_DISPATCHER_TRACE("OnFilteredBatch, rows [" << firstRow << ", " << lastRow << "]");
for (ui64 rowId = firstRow; rowId <= lastRow; ++rowId) {
OnFilteredData(rowId);
}
}

void OnFilteredData(ui64 rowId) override {
const ui64 offset = Self.Offsets->at(rowId);
if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && offset < *nextOffset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,23 +414,23 @@ class TJsonParser : public TTopicParserBase {

simdjson::ondemand::document_stream documents;
CHECK_JSON_ERROR(Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE).get(documents)) {
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error));
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)));
}

size_t rowId = 0;
for (auto document : documents) {
if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) {
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1);
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1 << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size)));
}

const ui64 offset = Buffer.Offsets[rowId];
CHECK_JSON_ERROR(document.error()) {
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error));
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)));
}

for (auto item : document.get_object()) {
CHECK_JSON_ERROR(item.error()) {
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error));
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)));
}

const auto it = ColumnsIndex.find(item.escaped_key().value());
Expand All @@ -445,7 +445,7 @@ class TJsonParser : public TTopicParserBase {
}

if (Y_UNLIKELY(rowId != Buffer.NumberValues)) {
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId);
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size)));
}

const ui64 firstOffset = Buffer.Offsets.front();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class TOptionalCell : public TBaseFixture::ICell {

void Validate(const NYql::NUdf::TUnboxedValue& parsedValue) const override {
if (!parsedValue) {
UNIT_FAIL("Unexpected NULL value for optional cell");
return;
}
Value->Validate(parsedValue.GetOptionalValue());
Expand Down Expand Up @@ -166,7 +167,7 @@ void TBaseFixture::SetUp(NUnitTest::TTestContext&) {
TAutoPtr<NKikimr::TAppPrepare> app = new NKikimr::TAppPrepare();
Runtime.SetLogBackend(NActors::CreateStderrBackend());
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NActors::NLog::PRI_TRACE);
Runtime.SetDispatchTimeout(TDuration::Seconds(5));
Runtime.SetDispatchTimeout(WAIT_TIMEOUT);
Runtime.Initialize(app->Unwrap());

// Init tls context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

namespace NFq::NRowDispatcher::NTests {

static constexpr TDuration WAIT_TIMEOUT = TDuration::Seconds(20);

class TBaseFixture : public NUnitTest::TBaseFixture, public TTypeParser {
public:
// Helper classes for checking serialized rows in multi type format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class TFormatHadlerFixture : public TBaseFixture {
FormatHandler = CreateTestFormatHandler(config, settings);
}

TStatus MakeClient(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback, ui64 expectedFilteredRows = 1) {
[[nodiscard]] TStatus MakeClient(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback, ui64 expectedFilteredRows = 1) {
ClientIds.emplace_back(ClientIds.size(), 0, 0, 0);

auto client = MakeIntrusive<TClientDataConsumer>(ClientIds.back(), columns, whereFilter, callback, expectedFilteredRows);
Expand Down Expand Up @@ -202,6 +202,30 @@ class TFormatHadlerFixture : public TBaseFixture {
FormatHandler->RemoveClient(clientId);
}

public:
static TCallback EmptyCheck() {
return [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {};
}

static TCallback OneBatchCheck(std::function<void(TRope&& messages, TVector<ui64>&& offsets)> callback) {
return [callback](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
auto [messages, offsets] = data.front();

UNIT_ASSERT(!offsets.empty());
callback(std::move(messages), std::move(offsets));
};
}

TCallback OneRowCheck(ui64 offset, const TRow& row) const {
return OneBatchCheck([this, offset, row](TRope&& messages, TVector<ui64>&& offsets) {
UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(offsets.front(), offset);

CheckMessageBatch(messages, TBatch().AddRow(row));
});
}

private:
void ExtractClientsData() {
for (auto& client : Clients) {
Expand Down Expand Up @@ -233,33 +257,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
CheckSuccess(MakeClient(
{commonColumn, {"col_first", "[DataType; String]"}},
"WHERE col_first = \"str_first__large__\"",
[&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);

auto [messages, offsets] = data.front();
UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset + 1);

CheckMessageBatch(messages, TBatch().AddRow(
TRow().AddString("event2").AddString("str_first__large__")
));
}
OneRowCheck(firstOffset + 1, TRow().AddString("event2").AddString("str_first__large__"))
));

CheckSuccess(MakeClient(
{commonColumn, {"col_second", "[DataType; String]"}},
"WHERE col_second = \"str_second\"",
[&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);

auto [messages, offsets] = data.front();
UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset);

CheckMessageBatch(messages, TBatch().AddRow(
TRow().AddString("event1").AddString("str_second")
));
}
OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second"))
));

ParseMessages({
Expand Down Expand Up @@ -288,14 +292,10 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
R"({"col_a": false, "col_b": {"X": "Y"}})"
};

CheckSuccess(MakeClient(schema, "WHERE FALSE", [&](TQueue<std::pair<TRope, TVector<ui64>>>&&) {}, 0));

auto trueChacker = [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
auto [messages, offsets] = data.front();
CheckSuccess(MakeClient(schema, "WHERE FALSE", EmptyCheck(), 0));

const auto trueChacker = OneBatchCheck([&](TRope&& messages, TVector<ui64>&& offsets) {
TBatch expectedBatch;
UNIT_ASSERT(!offsets.empty());
for (ui64 offset : offsets) {
UNIT_ASSERT(offset - firstOffset < testData.size());
expectedBatch.AddRow(
Expand All @@ -304,7 +304,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
}

CheckMessageBatch(messages, expectedBatch);
};
});
CheckSuccess(MakeClient(schema, "WHERE TRUE", trueChacker, 3));
CheckSuccess(MakeClient(schema, "", trueChacker, 2));

Expand All @@ -323,7 +323,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
Y_UNIT_TEST_F(ClientValidation, TFormatHadlerFixture) {
const TVector<TSchemaColumn> schema = {{"data", "[DataType; String]"}};
const TString filter = "WHERE FALSE";
const auto callback = [&](TQueue<std::pair<TRope, TVector<ui64>>>&&) {};
const auto callback = EmptyCheck();
CheckSuccess(MakeClient(schema, filter, callback, 0));

CheckError(
Expand All @@ -349,27 +349,12 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
const ui64 firstOffset = 42;
const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"};

CheckSuccess(MakeClient(
{commonColumn, {"col_first", "[OptionalType; [DataType; Uint8]]"}},
"WHERE TRUE",
[&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {},
0
));
CheckSuccess(MakeClient({commonColumn, {"col_first", "[OptionalType; [DataType; Uint8]]"}}, "WHERE TRUE", EmptyCheck(), 0));

CheckSuccess(MakeClient(
{commonColumn, {"col_second", "[DataType; String]"}},
"WHERE col_second = \"str_second\"",
[&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);

auto [messages, offsets] = data.front();
UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset);

CheckMessageBatch(messages, TBatch().AddRow(
TRow().AddString("event1").AddString("str_second")
));
}
OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second"))
));

CheckClientError(
Expand All @@ -379,6 +364,26 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
TStringBuilder() << "Failed to parse json string at offset " << firstOffset << ", got parsing error for column 'col_first' with type [OptionalType; [DataType; Uint8]]"
);
}

Y_UNIT_TEST_F(ClientErrorWithEmptyFilter, TFormatHadlerFixture) {
const ui64 firstOffset = 42;
const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"};

CheckSuccess(MakeClient({commonColumn, {"col_first", "[DataType; String]"}}, "", EmptyCheck(), 0));

CheckSuccess(MakeClient(
{commonColumn, {"col_second", "[DataType; String]"}},
"WHERE col_second = \"str_second\"",
OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second"))
));

CheckClientError(
{GetMessage(firstOffset, R"({"com_col": "event1", "col_second": "str_second"})")},
ClientIds[0],
EStatusId::PRECONDITION_FAILED,
TStringBuilder() << "Failed to parse json messages, found 1 missing values from offset " << firstOffset << " in non optional column 'col_first' with type [DataType; String]"
);
}
}

} // namespace NFq::NRowDispatcher::NTests
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ class TFiterFixture : public TBaseFixture {
}
}

void OnFilteredBatch(ui64 firstRow, ui64 lastRow) override {
UNIT_ASSERT_C(Started, "Unexpected data for not started filter");
for (ui64 rowId = firstRow; rowId <= lastRow; ++rowId) {
Callback(rowId);
}
}

void OnFilteredData(ui64 rowId) override {
UNIT_ASSERT_C(Started, "Unexpected data for not started filter");
Callback(rowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,9 @@ Y_UNIT_TEST_SUITE(TestJsonParser) {
Y_UNIT_TEST_F(JsonStructureValidation, TJsonParserFixture) {
CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; String]]"}}));
CheckColumnError(R"({"a1": Yelse})", 0, EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a1' with type [OptionalType; [DataType; String]] subissue: { <main>: Error: Failed to determine json value type, current token: 'Yelse', error: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc. }");
CheckBatchError(R"({"a1": "st""r"})", EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << FIRST_OFFSET + 1 << ", json item was corrupted: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc.");
CheckBatchError(R"({"a1": "x"} {"a1": "y"})", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 2 << " but got 2");
CheckBatchError(R"({)", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 3 << " but got 0");
CheckBatchError(R"({"a1": "st""r"})", EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << FIRST_OFFSET + 1 << ", json item was corrupted: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc. Current data batch: {\"a1\": \"st\"\"r\"}");
CheckBatchError(R"({"a1": "x"} {"a1": "y"})", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 2 << " but got 2 (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: {\"a1\": \"x\"} {\"a1\": \"y\"}");
CheckBatchError(R"({)", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 3 << " but got 0 (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: {");
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ void TTopicSession::StartClientSession(TClientsInfo& info) {

void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
const auto& source = ev->Get()->Record.GetSource();
LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: " << source.GetPredicate() << ", offset: " << ev->Get()->Record.GetOffset());
LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: '" << source.GetPredicate() << "', offset: " << ev->Get()->Record.GetOffset());

if (!CheckNewClient(ev)) {
return;
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ namespace {
using namespace NKikimr;
using namespace NYql::NDq;

const ui64 TimeoutBeforeStartSessionSec = 3;
const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;
constexpr ui64 TimeoutBeforeStartSessionSec = 3;
constexpr ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;
static_assert(GrabTimeoutSec <= WAIT_TIMEOUT.Seconds());

class TFixture : public NTests::TBaseFixture {
public:
Expand Down

0 comments on commit 01d95e5

Please sign in to comment.