Skip to content

Commit

Permalink
Merge branch 'main' of github.com:go-bazzinga/ds-dags
Browse files Browse the repository at this point in the history
  • Loading branch information
jay-dhanwant-yral committed Aug 13, 2024
2 parents fdea54f + 79055ec commit f366de0
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 40 deletions.
84 changes: 44 additions & 40 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,58 @@ jobs:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Checkout code
uses: actions/checkout@v2

- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v1
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}
- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v1
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}

- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@v1
with:
version: 'latest'
- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@v1
with:
version: "latest"

- name: Import DAGs to Cloud Composer
run: |
gcloud composer environments storage dags import \
- name: Import DAGs to Cloud Composer
run: |
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./demo_spark_run/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./event_count_update/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./video_embedding/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./user_video_relation/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./demo_spark_run/
gcloud composer environments storage dags import \
--source ./user_base_facts/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./event_count_update/
gcloud composer environments storage dags import \
--source ./global_popular_videos_l7d/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./video_embedding/
gcloud composer environments storage dags import \
--source ./global_popular_videos_l90d/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./user_video_relation/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./user_base_facts/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./global_popular_videos_l7d/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./global_popular_videos_l90d/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./local_popular_videos_l90d/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./local_popular_videos_l7d/
--source ./local_popular_videos_l90d/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./local_popular_videos_l7d/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./gcs_to_bigquery_metadata/
46 changes: 46 additions & 0 deletions gcs_to_bigquery_metadata/gcs_to_bigquery_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryExecuteQueryOperator,
BigQueryGetDataOperator,
)
from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import requests

default_args = {
"owner": "airflow",
"retries": 0,
# "depends_on_past": True,
}


with DAG(
"gcs_to_bigquery_metadata_dag",
default_args=default_args,
description="DAG for metadata enrichment",
schedule_interval=None,
max_active_runs=1, # Ensures only one active run at a time
start_date=days_ago(1),
catchup=False,
) as dag:

def enrich_metadata(**kwargs):
hook = GCSHook()
print(dir(hook))
obj_list = hook.list("yral-videos")

for obj_mp4 in obj_list[:10]:
print(obj_mp4)
obj = hook.get_metadata("yral-videos", obj_mp4)
print(obj)

enrich_objects = PythonOperator(
task_id="enrich_objects",
provide_context=True,
python_callable=enrich_metadata,
)

(enrich_objects)

0 comments on commit f366de0

Please sign in to comment.