From 41516f53ea293b9f91b77dcbc3aad4b25fc4cf09 Mon Sep 17 00:00:00 2001 From: adamschmidt Date: Tue, 28 Mar 2023 23:15:34 +1100 Subject: [PATCH 1/5] fix: Snowflake remote storage Signed-off-by: adamschmidt --- .../feast/infra/offline_stores/snowflake.py | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 404927146a..e5c445c186 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -109,6 +109,9 @@ class SnowflakeOfflineStoreConfig(FeastConfigBaseModel): blob_export_location: Optional[str] = None """ Location (in S3, Google storage or Azure storage) where data is offloaded """ + convert_timestamp_columns: Optional[bool] = None + """ Convert timestamp columns on export to a Parquet-supported format """ + class Config: allow_population_by_field_name = True @@ -152,6 +155,21 @@ def pull_latest_from_table_or_query( + '"' ) + if config.offline_store.convert_timestamp_columns: + select_timestamps = list(map(lambda field_name: f"to_varchar({field_name}, 'YYYY-MM-DD\"T\"HH24:MI:SS.FFTZH:TZM') as {field_name}", timestamp_columns)) + inner_field_string = ( + '"' + + '", "'.join(join_key_columns + feature_name_columns) + + '", ' + + ", ".join(select_timestamps) + ) + else: + inner_field_string = ( + '"' + + '", "'.join(join_key_columns + feature_name_columns + timestamp_columns) + + '"' + ) + if data_source.snowflake_options.warehouse: config.offline_store.warehouse = data_source.snowflake_options.warehouse @@ -166,7 +184,7 @@ def pull_latest_from_table_or_query( {field_string} {f''', TRIM({repr(DUMMY_ENTITY_VAL)}::VARIANT,'"') AS "{DUMMY_ENTITY_ID}"''' if not join_key_columns else ""} FROM ( - SELECT {field_string}, + SELECT {inner_field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS "_feast_row" FROM {from_expression} WHERE "{timestamp_field}" BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' @@ -533,7 +551,7 @@ def to_remote_storage(self) -> List[str]: self.to_snowflake(table) query = f""" - COPY INTO '{self.config.offline_store.blob_export_location}/{table}' FROM "{self.config.offline_store.database}"."{self.config.offline_store.schema_}"."{table}"\n + COPY INTO '{self.export_path}/{table}' FROM "{self.config.offline_store.database}"."{self.config.offline_store.schema_}"."{table}"\n STORAGE_INTEGRATION = {self.config.offline_store.storage_integration_name}\n FILE_FORMAT = (TYPE = PARQUET) DETAILED_OUTPUT = TRUE From 934bfdf4f360aaa826ac864fe5bdd0dc6ee3a5dc Mon Sep 17 00:00:00 2001 From: adamschmidt Date: Wed, 29 Mar 2023 07:30:57 +1100 Subject: [PATCH 2/5] fix: lint Signed-off-by: adamschmidt --- sdk/python/feast/infra/offline_stores/snowflake.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index e5c445c186..3f5886ba5a 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -156,7 +156,7 @@ def pull_latest_from_table_or_query( ) if config.offline_store.convert_timestamp_columns: - select_timestamps = list(map(lambda field_name: f"to_varchar({field_name}, 'YYYY-MM-DD\"T\"HH24:MI:SS.FFTZH:TZM') as {field_name}", timestamp_columns)) + select_timestamps = list(map(lambda field_name: f"to_varchar({field_name}, 'YYYY-MM-DD\"T\"HH24:MI:SS.FFTZH:TZM') as {field_name}", timestamp_columns)) inner_field_string = ( '"' + '", "'.join(join_key_columns + feature_name_columns) From ca5d4ce227011c5fe4aabe4c721ee2cc51e5f017 Mon Sep 17 00:00:00 2001 From: adamschmidt Date: Wed, 29 Mar 2023 11:46:28 +1100 Subject: [PATCH 3/5] fix: field string build Signed-off-by: adamschmidt --- .../feast/infra/offline_stores/snowflake.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 3f5886ba5a..e3d26fe89e 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -156,19 +156,12 @@ def pull_latest_from_table_or_query( ) if config.offline_store.convert_timestamp_columns: + select_fields = list(map(lambda field_name: f'"{field_name}"', join_key_columns + feature_name_columns)) select_timestamps = list(map(lambda field_name: f"to_varchar({field_name}, 'YYYY-MM-DD\"T\"HH24:MI:SS.FFTZH:TZM') as {field_name}", timestamp_columns)) - inner_field_string = ( - '"' - + '", "'.join(join_key_columns + feature_name_columns) - + '", ' - + ", ".join(select_timestamps) - ) + inner_field_string = ", ".join(select_fields, select_timestamps) else: - inner_field_string = ( - '"' - + '", "'.join(join_key_columns + feature_name_columns + timestamp_columns) - + '"' - ) + select_fields = list(map(lambda field_name: f'"{field_name}"', join_key_columns + feature_name_columns + timestamp_columns)) + inner_field_string = ", ".join(select_fields) if data_source.snowflake_options.warehouse: config.offline_store.warehouse = data_source.snowflake_options.warehouse From d1d4c0decaa2649eec2d8c04ebd5b38914800fa3 Mon Sep 17 00:00:00 2001 From: adamschmidt Date: Wed, 29 Mar 2023 11:51:04 +1100 Subject: [PATCH 4/5] fix: join typo Signed-off-by: adamschmidt --- sdk/python/feast/infra/offline_stores/snowflake.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index e3d26fe89e..4b900bcdef 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -158,7 +158,7 @@ def pull_latest_from_table_or_query( if config.offline_store.convert_timestamp_columns: select_fields = list(map(lambda field_name: f'"{field_name}"', join_key_columns + feature_name_columns)) select_timestamps = list(map(lambda field_name: f"to_varchar({field_name}, 'YYYY-MM-DD\"T\"HH24:MI:SS.FFTZH:TZM') as {field_name}", timestamp_columns)) - inner_field_string = ", ".join(select_fields, select_timestamps) + inner_field_string = ", ".join(select_fields + select_timestamps) else: select_fields = list(map(lambda field_name: f'"{field_name}"', join_key_columns + feature_name_columns + timestamp_columns)) inner_field_string = ", ".join(select_fields) From bfa5b310ef5005ad4af579275866081f44f09451 Mon Sep 17 00:00:00 2001 From: adamschmidt Date: Wed, 29 Mar 2023 12:11:26 +1100 Subject: [PATCH 5/5] fix: formatting Signed-off-by: adamschmidt --- .../feast/infra/offline_stores/snowflake.py | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 4b900bcdef..1dc18256fa 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -156,11 +156,26 @@ def pull_latest_from_table_or_query( ) if config.offline_store.convert_timestamp_columns: - select_fields = list(map(lambda field_name: f'"{field_name}"', join_key_columns + feature_name_columns)) - select_timestamps = list(map(lambda field_name: f"to_varchar({field_name}, 'YYYY-MM-DD\"T\"HH24:MI:SS.FFTZH:TZM') as {field_name}", timestamp_columns)) + select_fields = list( + map( + lambda field_name: f'"{field_name}"', + join_key_columns + feature_name_columns, + ) + ) + select_timestamps = list( + map( + lambda field_name: f"to_varchar({field_name}, 'YYYY-MM-DD\"T\"HH24:MI:SS.FFTZH:TZM') as {field_name}", + timestamp_columns, + ) + ) inner_field_string = ", ".join(select_fields + select_timestamps) else: - select_fields = list(map(lambda field_name: f'"{field_name}"', join_key_columns + feature_name_columns + timestamp_columns)) + select_fields = list( + map( + lambda field_name: f'"{field_name}"', + join_key_columns + feature_name_columns + timestamp_columns, + ) + ) inner_field_string = ", ".join(select_fields) if data_source.snowflake_options.warehouse: