From 7396ee98c383df8742396de53eec65d652a6eb6a Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Tue, 6 Feb 2024 16:52:27 +0000 Subject: [PATCH 1/2] Add a select from the newly created foreign table, so we error if the file is not there or otherwise cant be accessed --- .../flowetl/operators/create_foreign_staging_table_operator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flowetl/flowetl/flowetl/operators/create_foreign_staging_table_operator.py b/flowetl/flowetl/flowetl/operators/create_foreign_staging_table_operator.py index 14221075f9..2b498949aa 100644 --- a/flowetl/flowetl/flowetl/operators/create_foreign_staging_table_operator.py +++ b/flowetl/flowetl/flowetl/operators/create_foreign_staging_table_operator.py @@ -66,6 +66,7 @@ def __init__( escape '{{{{ params.escape }}}}' {{% if params.encoding is defined %}}, {{{{ params.encoding }}}} {{% endif %}} ); + SELECT EXISTS(SELECT * FROM {{{{ staging_table }}}} LIMIT 1); """ fields_string = ",\n\t".join( f"{field_name} {field_type.upper()}" From a0eb43265b9ac387a696f6b40110a6d09fce0396 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Wed, 7 Feb 2024 16:47:00 +0000 Subject: [PATCH 2/2] Add a sensor which checks for a minimum number of rows --- CHANGELOG.md | 2 ++ .../flowetl/flowetl/sensors/n_rows_present_sensor.py | 12 ++++++++++++ 2 files changed, 14 insertions(+) create mode 100644 flowetl/flowetl/flowetl/sensors/n_rows_present_sensor.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 47ed7b176c..8679006a0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [Unreleased] ### Added +- FlowETL sensor `NRowsPresentSensor` which checks for a specified minimum number of rows. ### Changed +- `ForeignStagingTableOperator` will now error if the underlying file cannot be read or the command returns an error. [#5763](https://github.com/Flowminder/FlowKit/issues/5763) ### Fixed diff --git a/flowetl/flowetl/flowetl/sensors/n_rows_present_sensor.py b/flowetl/flowetl/flowetl/sensors/n_rows_present_sensor.py new file mode 100644 index 0000000000..389d340832 --- /dev/null +++ b/flowetl/flowetl/flowetl/sensors/n_rows_present_sensor.py @@ -0,0 +1,12 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from flowetl.mixins.fixed_sql_with_params_mixin import fixed_sql_operator_with_params + +NRowsPresentSensor = fixed_sql_operator_with_params( + class_name="NRowsPresentSensor", + sql="SELECT EXISTS(SELECT * FROM {{ staging_table }} LIMIT 1 (OFFSET {{ params.minimum_rows }} - 1));", + is_sensor=True, + params=["minimum_rows"], +)