From 8c251d8233e5fc5bc3845d6c59ab986e5de12978 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Mon, 14 Oct 2024 10:07:16 +0300 Subject: [PATCH] YQ-3722 RD fixed verify failed on empty fields (#10351) --- .../fq/libs/row_dispatcher/json_filter.cpp | 4 +-- ydb/tests/fq/yds/test_row_dispatcher.py | 28 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index d43bc2fbf365..2013d3fb5595 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -111,8 +112,7 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumerPush(std::move(result)); diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 083d6fa5aa28..61b760f3d98b 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -285,6 +285,34 @@ def test_filter(self, kikimr, client): issues = str(client.describe_query(query_id).result.query.transient_issue) assert "Row dispatcher will use the predicate: WHERE (`time` > 101" in issues, "Incorrect Issues: " + issues + @yq_v1 + def test_filter_missing_fields(self, kikimr, client): + client.create_yds_connection( + YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True + ) + self.init_topics("test_filter") + + sql = Rf''' + INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` + SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}` + WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String, event String NOT NULL)) + WHERE data = "";''' + + query_id = start_yds_query(kikimr, client, sql) + wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) + + data = [ + '{"time": 101, "event": "event1"}', + '{"time": 102, "data": null, "event": "event2"}', + ] + + self.write_stream(data) + expected = ['101', '102'] + assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + + wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1) + stop_yds_query(client, query_id) + @yq_v1 def test_filter_use_unsupported_predicate(self, kikimr, client): client.create_yds_connection(