From e77022031aefdb5caed962be90491abe1a54739a Mon Sep 17 00:00:00 2001 From: gkodukula <95613561+gkodukula@users.noreply.github.com> Date: Mon, 17 Oct 2022 20:44:42 +0530 Subject: [PATCH] feat: Onboard New FEC dataset (#513) --- .../run_csv_transform_kub/csv_transform.py | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/datasets/fec/pipelines/_images/run_csv_transform_kub/csv_transform.py b/datasets/fec/pipelines/_images/run_csv_transform_kub/csv_transform.py index d4d00cc9a..c3ff99928 100644 --- a/datasets/fec/pipelines/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/fec/pipelines/_images/run_csv_transform_kub/csv_transform.py @@ -37,9 +37,9 @@ def main( target_file: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str, - csv_headers: typing.List[str], chunksize: str, pipeline_name: str, + csv_headers: typing.List[str], ) -> None: logging.info( @@ -52,6 +52,7 @@ def main( "individuals" not in pipeline_name and "other_committee_tx_2020" not in pipeline_name ): + logging.info(f"Downloading file from {source_url}...") download_file(source_url, source_file_zip_file) unzip_file(source_file_zip_file, source_file_path) @@ -76,15 +77,6 @@ def main( df.drop(df[df["cmte_id"] == "C00622357"].index, inplace=True) elif "committee_contributions_20" in pipeline_name: df.columns = csv_headers - df = df.rename(columns=lambda x: x.strip()) - - elif "candidate_committe_20" in pipeline_name: - pass - - elif "committee_20" in pipeline_name: - df.drop(df[df["cmte_id"] == "C00622357"].index, inplace=True) - - elif "committee_contributions_20" in pipeline_name: df["transaction_dt"] = df["transaction_dt"].astype(str) date_for_length(df, "transaction_dt") df = resolve_date_format(df, "transaction_dt", pipeline_name) @@ -102,6 +94,8 @@ def main( else: df.columns = csv_headers pass + + logging.info(f"Saving to output file.. {target_file}") try: save_to_new_file(df, file_path=str(target_file)) except Exception as e: @@ -117,9 +111,8 @@ def main( else: logging.info(f"Downloading file gs://{source_bucket}/{source_object}") download_blob(source_bucket, source_object, source_file) - process_source_file( - source_file, target_file, chunksize, csv_headers, pipeline_name + source_file, target_file, chunksize, pipeline_name, csv_headers ) logging.info( f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" @@ -132,6 +125,7 @@ def process_source_file( target_file: str, chunksize: str, pipeline_name: str, + csv_headers: typing.List[str], ) -> None: logging.info(f"Opening source file {source_file}") csv.field_size_limit(512 << 10) @@ -149,6 +143,7 @@ def process_source_file( target_file, chunk_number, pipeline_name, + csv_headers, ) data = [] chunk_number += 1 @@ -158,6 +153,7 @@ def process_source_file( target_file, chunk_number, pipeline_name, + csv_headers, ) @@ -351,7 +347,7 @@ def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), target_gcs_bucket=os.environ.get("TARGET_GCS_BUCKET", ""), target_gcs_path=os.environ.get("TARGET_GCS_PATH", ""), - csv_headers=json.loads(os.environ["CSV_HEADERS"]), chunksize=os.environ.get("CHUNKSIZE", ""), pipeline_name=os.environ.get("PIPELINE_NAME", ""), + csv_headers=json.loads(os.environ["CSV_HEADERS"]), )