Skip to content

Commit

Permalink
feat: Onboard New FEC dataset (#513)
Browse files Browse the repository at this point in the history
  • Loading branch information
gkodukula authored Oct 17, 2022
1 parent e20e157 commit e770220
Showing 1 changed file with 9 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ def main(
target_file: pathlib.Path,
target_gcs_bucket: str,
target_gcs_path: str,
csv_headers: typing.List[str],
chunksize: str,
pipeline_name: str,
csv_headers: typing.List[str],
) -> None:

logging.info(
Expand All @@ -52,6 +52,7 @@ def main(
"individuals" not in pipeline_name
and "other_committee_tx_2020" not in pipeline_name
):
logging.info(f"Downloading file from {source_url}...")
download_file(source_url, source_file_zip_file)
unzip_file(source_file_zip_file, source_file_path)

Expand All @@ -76,15 +77,6 @@ def main(
df.drop(df[df["cmte_id"] == "C00622357"].index, inplace=True)
elif "committee_contributions_20" in pipeline_name:
df.columns = csv_headers
df = df.rename(columns=lambda x: x.strip())

elif "candidate_committe_20" in pipeline_name:
pass

elif "committee_20" in pipeline_name:
df.drop(df[df["cmte_id"] == "C00622357"].index, inplace=True)

elif "committee_contributions_20" in pipeline_name:
df["transaction_dt"] = df["transaction_dt"].astype(str)
date_for_length(df, "transaction_dt")
df = resolve_date_format(df, "transaction_dt", pipeline_name)
Expand All @@ -102,6 +94,8 @@ def main(
else:
df.columns = csv_headers
pass

logging.info(f"Saving to output file.. {target_file}")
try:
save_to_new_file(df, file_path=str(target_file))
except Exception as e:
Expand All @@ -117,9 +111,8 @@ def main(
else:
logging.info(f"Downloading file gs://{source_bucket}/{source_object}")
download_blob(source_bucket, source_object, source_file)

process_source_file(
source_file, target_file, chunksize, csv_headers, pipeline_name
source_file, target_file, chunksize, pipeline_name, csv_headers
)
logging.info(
f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}"
Expand All @@ -132,6 +125,7 @@ def process_source_file(
target_file: str,
chunksize: str,
pipeline_name: str,
csv_headers: typing.List[str],
) -> None:
logging.info(f"Opening source file {source_file}")
csv.field_size_limit(512 << 10)
Expand All @@ -149,6 +143,7 @@ def process_source_file(
target_file,
chunk_number,
pipeline_name,
csv_headers,
)
data = []
chunk_number += 1
Expand All @@ -158,6 +153,7 @@ def process_source_file(
target_file,
chunk_number,
pipeline_name,
csv_headers,
)


Expand Down Expand Up @@ -351,7 +347,7 @@ def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str)
target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(),
target_gcs_bucket=os.environ.get("TARGET_GCS_BUCKET", ""),
target_gcs_path=os.environ.get("TARGET_GCS_PATH", ""),
csv_headers=json.loads(os.environ["CSV_HEADERS"]),
chunksize=os.environ.get("CHUNKSIZE", ""),
pipeline_name=os.environ.get("PIPELINE_NAME", ""),
csv_headers=json.loads(os.environ["CSV_HEADERS"]),
)

0 comments on commit e770220

Please sign in to comment.