diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3a3edbb..a84f468 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,54 +13,58 @@ jobs: runs-on: ubuntu-latest steps: - - name: Checkout code - uses: actions/checkout@v2 + - name: Checkout code + uses: actions/checkout@v2 - - name: Authenticate to Google Cloud - uses: google-github-actions/auth@v1 - with: - credentials_json: ${{ secrets.GCP_SA_KEY }} + - name: Authenticate to Google Cloud + uses: google-github-actions/auth@v1 + with: + credentials_json: ${{ secrets.GCP_SA_KEY }} - - name: Set up Cloud SDK - uses: google-github-actions/setup-gcloud@v1 - with: - version: 'latest' + - name: Set up Cloud SDK + uses: google-github-actions/setup-gcloud@v1 + with: + version: "latest" - - name: Import DAGs to Cloud Composer - run: | - gcloud composer environments storage dags import \ + - name: Import DAGs to Cloud Composer + run: | + gcloud composer environments storage dags import \ + --environment data-pipeline-orchestrator \ + --location us-central1 \ + --source ./demo_spark_run/ + gcloud composer environments storage dags import \ + --environment data-pipeline-orchestrator \ + --location us-central1 \ + --source ./event_count_update/ + gcloud composer environments storage dags import \ + --environment data-pipeline-orchestrator \ + --location us-central1 \ + --source ./video_embedding/ + gcloud composer environments storage dags import \ + --environment data-pipeline-orchestrator \ + --location us-central1 \ + --source ./user_video_relation/ + gcloud composer environments storage dags import \ --environment data-pipeline-orchestrator \ --location us-central1 \ - --source ./demo_spark_run/ - gcloud composer environments storage dags import \ + --source ./user_base_facts/ + gcloud composer environments storage dags import \ --environment data-pipeline-orchestrator \ --location us-central1 \ - --source ./event_count_update/ - gcloud composer environments storage dags import \ + --source ./global_popular_videos_l7d/ + gcloud composer environments storage dags import \ --environment data-pipeline-orchestrator \ --location us-central1 \ - --source ./video_embedding/ - gcloud composer environments storage dags import \ + --source ./global_popular_videos_l90d/ + gcloud composer environments storage dags import \ --environment data-pipeline-orchestrator \ --location us-central1 \ - --source ./user_video_relation/ - gcloud composer environments storage dags import \ - --environment data-pipeline-orchestrator \ - --location us-central1 \ - --source ./user_base_facts/ - gcloud composer environments storage dags import \ - --environment data-pipeline-orchestrator \ - --location us-central1 \ - --source ./global_popular_videos_l7d/ - gcloud composer environments storage dags import \ - --environment data-pipeline-orchestrator \ - --location us-central1 \ - --source ./global_popular_videos_l90d/ - gcloud composer environments storage dags import \ - --environment data-pipeline-orchestrator \ - --location us-central1 \ - --source ./local_popular_videos_l90d/ - gcloud composer environments storage dags import \ - --environment data-pipeline-orchestrator \ - --location us-central1 \ - --source ./local_popular_videos_l7d/ + --source ./local_popular_videos_l90d/ + gcloud composer environments storage dags import \ + --environment data-pipeline-orchestrator \ + --location us-central1 \ + --source ./local_popular_videos_l7d/ + gcloud composer environments storage dags import \ + --environment data-pipeline-orchestrator \ + --location us-central1 \ + --source ./gcs_to_bigquery_metadata/ diff --git a/gcs_to_bigquery_metadata/gcs_to_bigquery_metadata.py b/gcs_to_bigquery_metadata/gcs_to_bigquery_metadata.py new file mode 100644 index 0000000..b7d6f76 --- /dev/null +++ b/gcs_to_bigquery_metadata/gcs_to_bigquery_metadata.py @@ -0,0 +1,46 @@ +from airflow import DAG +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryExecuteQueryOperator, + BigQueryGetDataOperator, +) +from airflow.operators.python_operator import PythonOperator +from airflow.providers.google.cloud.hooks.gcs import GCSHook +from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.utils.dates import days_ago +from datetime import datetime, timedelta +import requests + +default_args = { + "owner": "airflow", + "retries": 0, + # "depends_on_past": True, +} + + +with DAG( + "gcs_to_bigquery_metadata_dag", + default_args=default_args, + description="DAG for metadata enrichment", + schedule_interval=None, + max_active_runs=1, # Ensures only one active run at a time + start_date=days_ago(1), + catchup=False, +) as dag: + + def enrich_metadata(**kwargs): + hook = GCSHook() + print(dir(hook)) + obj_list = hook.list("yral-videos") + + for obj_mp4 in obj_list[:10]: + print(obj_mp4) + obj = hook.get_metadata("yral-videos", obj_mp4) + print(obj) + + enrich_objects = PythonOperator( + task_id="enrich_objects", + provide_context=True, + python_callable=enrich_metadata, + ) + + (enrich_objects)