diff --git a/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/csv_transform.py b/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/csv_transform.py index 3cd8c05eb..1fe72e229 100644 --- a/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/new_york_taxi_trips/pipelines/_images/run_csv_transform_kub/csv_transform.py @@ -39,6 +39,7 @@ def main( target_gcs_bucket: str, target_gcs_path: str, pipeline_name: str, + start_year: str, input_headers: typing.List[str], data_dtypes: dict, output_headers: typing.List[str], @@ -59,6 +60,7 @@ def main( target_gcs_bucket, target_gcs_path, pipeline_name, + int(start_year), input_headers, data_dtypes, output_headers, @@ -80,11 +82,12 @@ def execute_pipeline( target_gcs_bucket: str, target_gcs_path: str, pipeline_name: str, + start_year: int, input_headers: typing.List[str], data_dtypes: dict, output_headers: typing.List[str], ) -> None: - for year_number in range(datetime.now().year, (datetime.now().year - 6), -1): + for year_number in range(datetime.now().year, (start_year - 1), -1): process_year_data( source_url=source_url, year_number=int(year_number), @@ -532,20 +535,21 @@ def upload_file_to_gcs( logging.getLogger().setLevel(logging.INFO) main( - source_url=os.environ["SOURCE_URL"], - source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), - target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), - project_id=os.environ["PROJECT_ID"], - dataset_id=os.environ["DATASET_ID"], - table_id=os.environ["TABLE_ID"], - data_file_year_field=os.environ["DATA_FILE_YEAR_FIELD"], - data_file_month_field=os.environ["DATA_FILE_MONTH_FIELD"], - schema_path=os.environ["SCHEMA_PATH"], - chunksize=os.environ["CHUNKSIZE"], - target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], - target_gcs_path=os.environ["TARGET_GCS_PATH"], - pipeline_name=os.environ["PIPELINE_NAME"], - input_headers=json.loads(os.environ["INPUT_CSV_HEADERS"]), - data_dtypes=json.loads(os.environ["DATA_DTYPES"]), - output_headers=json.loads(os.environ["OUTPUT_CSV_HEADERS"]), + source_url=os.environ.get("SOURCE_URL", ""), + source_file=pathlib.Path(os.environ.get("SOURCE_FILE", "")).expanduser(), + target_file=pathlib.Path(os.environ.get("TARGET_FILE", "")).expanduser(), + project_id=os.environ.get("PROJECT_ID", ""), + dataset_id=os.environ.get("DATASET_ID", ""), + table_id=os.environ.get("TABLE_ID", ""), + data_file_year_field=os.environ.get("DATA_FILE_YEAR_FIELD", ""), + data_file_month_field=os.environ.get("DATA_FILE_MONTH_FIELD", ""), + schema_path=os.environ.get("SCHEMA_PATH", ""), + chunksize=os.environ.get("CHUNKSIZE", ""), + target_gcs_bucket=os.environ.get("TARGET_GCS_BUCKET", ""), + target_gcs_path=os.environ.get("TARGET_GCS_PATH", ""), + pipeline_name=os.environ.get("PIPELINE_NAME", ""), + start_year=os.environ.get("START_YEAR", "2009"), + input_headers=json.loads(os.environ.get("INPUT_CSV_HEADERS", "")), + data_dtypes=json.loads(os.environ.get("DATA_DTYPES", "")), + output_headers=json.loads(os.environ.get("OUTPUT_CSV_HEADERS", "")), ) diff --git a/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/new_york_taxi_trips_dag.py b/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/new_york_taxi_trips_dag.py index 500ac9e96..94b4c1dd5 100644 --- a/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/new_york_taxi_trips_dag.py +++ b/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/new_york_taxi_trips_dag.py @@ -37,7 +37,7 @@ location="us-central1-c", body={ "name": "new-york-taxi-trips", - "initial_node_count": 2, + "initial_node_count": 3, "network": "{{ var.value.vpc_network }}", "node_config": { "machine_type": "e2-standard-4", @@ -70,10 +70,11 @@ "DATA_FILE_YEAR_FIELD": "data_file_year", "DATA_FILE_MONTH_FIELD": "data_file_month", "SCHEMA_PATH": "{{ var.json.new_york_taxi_trips.container_registry.green_trips_schema_path }}", - "CHUNKSIZE": "{{ var.json.new_york_taxi_trips.container_registry.green_trips_chunk_size }}", + "CHUNKSIZE": "500000", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "{{ var.json.new_york_taxi_trips.container_registry.green_trips_target_gcs_path }}", "PIPELINE_NAME": "tlc_green_trips", + "START_YEAR": "2013", "INPUT_CSV_HEADERS": '["vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code",\n "pickup_location_id", "dropoff_location_id", "passenger_count", "trip_distance", "fare_amount",\n "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee",\n "imp_surcharge", "total_amount", "payment_type", "trip_type", "congestion_surcharge" ]', "DATA_DTYPES": '{ "vendor_id": "str",\n "pickup_datetime": "datetime64[ns]",\n "dropoff_datetime": "datetime64[ns]",\n "store_and_fwd_flag": "str",\n "rate_code": "str",\n "pickup_location_id": "str",\n "dropoff_location_id": "str",\n "passenger_count": "str",\n "trip_distance": "float64",\n "fare_amount": "float64",\n "extra": "float64",\n "mta_tax": "float64",\n "tip_amount": "float64",\n "tolls_amount": "float64",\n "ehail_fee": "float64",\n "imp_surcharge": "float64",\n "total_amount": "float64",\n "payment_type": "str",\n "trip_type": "str",\n "congestion_surcharge": "float64" }', "OUTPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code",\n "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax",\n "tip_amount", "tolls_amount", "ehail_fee", "total_amount", "payment_type",\n "distance_between_service", "time_between_service", "trip_type", "imp_surcharge", "pickup_location_id",\n "dropoff_location_id", "data_file_year", "data_file_month" ]', @@ -106,14 +107,20 @@ "DATASET_ID": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_dataset_id }}", "TABLE_ID": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_table_id }}", "SCHEMA_PATH": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_schema_path }}", - "CHUNKSIZE": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_chunk_size }}", + "CHUNKSIZE": "500000", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_target_gcs_path }}", "PIPELINE_NAME": "tlc_yellow_trips", + "START_YEAR": "2011", "INPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance",\n "rate_code", "store_and_fwd_flag", "pickup_location_id", "dropoff_location_id",\n "payment_type", "fare_amount", "extra", "mta_tax", "tip_amount",\n "tolls_amount", "imp_surcharge", "total_amount", "congestion_surcharge" ]', "DATA_DTYPES": '{ "vendor_id": "str",\n "pickup_datetime": "datetime64[ns]",\n "dropoff_datetime": "datetime64[ns]",\n "passenger_count": "str",\n "trip_distance": "float64",\n "rate_code": "str",\n "store_and_fwd_flag": "str",\n "pickup_location_id": "str",\n "dropoff_location_id": "str",\n "payment_type": "str",\n "fare_amount": "float64",\n "extra": "float64",\n "mta_tax": "float64",\n "tip_amount": "float64",\n "tolls_amount": "float64",\n "imp_surcharge": "float64",\n "total_amount": "float64",\n "congestion_surcharge": "float64" }', "OUTPUT_CSV_HEADERS": '[ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance",\n "rate_code", "store_and_fwd_flag", "payment_type", "fare_amount", "extra",\n "mta_tax", "tip_amount", "tolls_amount", "imp_surcharge", "total_amount",\n "pickup_location_id", "dropoff_location_id", "data_file_year", "data_file_month" ]', }, + resources={ + "request_memory": "12G", + "request_cpu": "1", + "request_ephemeral_storage": "16G", + }, ) delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( task_id="delete_cluster", diff --git a/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/pipeline.yaml b/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/pipeline.yaml index 4ee775731..a620bddbc 100644 --- a/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/pipeline.yaml +++ b/datasets/new_york_taxi_trips/pipelines/new_york_taxi_trips/pipeline.yaml @@ -41,7 +41,7 @@ dag: location: "us-central1-c" body: name: new-york-taxi-trips - initial_node_count: 2 + initial_node_count: 3 network: "{{ var.value.vpc_network }}" node_config: machine_type: e2-standard-4 @@ -70,10 +70,11 @@ dag: DATA_FILE_YEAR_FIELD: "data_file_year" DATA_FILE_MONTH_FIELD: "data_file_month" SCHEMA_PATH: "{{ var.json.new_york_taxi_trips.container_registry.green_trips_schema_path }}" - CHUNKSIZE: "{{ var.json.new_york_taxi_trips.container_registry.green_trips_chunk_size }}" + CHUNKSIZE: "500000" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "{{ var.json.new_york_taxi_trips.container_registry.green_trips_target_gcs_path }}" PIPELINE_NAME: "tlc_green_trips" + START_YEAR: "2013" INPUT_CSV_HEADERS: >- ["vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code", "pickup_location_id", "dropoff_location_id", "passenger_count", "trip_distance", "fare_amount", @@ -132,10 +133,11 @@ dag: DATASET_ID: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_dataset_id }}" TABLE_ID: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_table_id }}" SCHEMA_PATH: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_schema_path }}" - CHUNKSIZE: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_chunk_size }}" + CHUNKSIZE: "500000" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "{{ var.json.new_york_taxi_trips.container_registry.yellow_trips_target_gcs_path }}" PIPELINE_NAME: "tlc_yellow_trips" + START_YEAR: "2011" INPUT_CSV_HEADERS: >- [ "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count", "trip_distance", "rate_code", "store_and_fwd_flag", "pickup_location_id", "dropoff_location_id", @@ -165,6 +167,10 @@ dag: "rate_code", "store_and_fwd_flag", "payment_type", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "imp_surcharge", "total_amount", "pickup_location_id", "dropoff_location_id", "data_file_year", "data_file_month" ] + resources: + request_memory: "12G" + request_cpu: "1" + request_ephemeral_storage: "16G" - operator: "GKEDeleteClusterOperator" args: task_id: "delete_cluster"