Skip to content

Commit

Permalink
YQ-3722 RD fixed verify failed on empty fields (ydb-platform#10351)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored and kardymonds committed Oct 16, 2024
1 parent 202cb01 commit 8c251d8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
4 changes: 2 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/library/yql/minikql/mkql_alloc.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_terminator.h>
#include <ydb/library/yql/minikql/mkql_string_util.h>

#include <ydb/core/fq/libs/row_dispatcher/json_filter.h>
#include <ydb/core/fq/libs/actors/logging/log.h>
Expand Down Expand Up @@ -111,8 +112,7 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, c

size_t fieldId = 0;
for (const auto& column : values.second) {
NYql::NUdf::TStringValue str(column[rowId]);
items[FieldsPositions[fieldId++]] = NYql::NUdf::TUnboxedValuePod(std::move(str));
items[FieldsPositions[fieldId++]] = NKikimr::NMiniKQL::MakeString(column[rowId]);
}

Worker->Push(std::move(result));
Expand Down
28 changes: 28 additions & 0 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,34 @@ def test_filter(self, kikimr, client):
issues = str(client.describe_query(query_id).result.query.transient_issue)
assert "Row dispatcher will use the predicate: WHERE (`time` > 101" in issues, "Incorrect Issues: " + issues

@yq_v1
def test_filter_missing_fields(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_filter")

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String, event String NOT NULL))
WHERE data = "";'''

query_id = start_yds_query(kikimr, client, sql)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)

data = [
'{"time": 101, "event": "event1"}',
'{"time": 102, "data": null, "event": "event2"}',
]

self.write_stream(data)
expected = ['101', '102']
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected

wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
stop_yds_query(client, query_id)

@yq_v1
def test_filter_use_unsupported_predicate(self, kikimr, client):
client.create_yds_connection(
Expand Down

0 comments on commit 8c251d8

Please sign in to comment.