Skip to content

Commit

Permalink
changing dags for HON game
Browse files Browse the repository at this point in the history
  • Loading branch information
jay-dhanwant-yral committed Aug 24, 2024
1 parent 366bbc6 commit c89bc87
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 4 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ jobs:
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./gcs_to_bigquery_metadata/
gcloud composer environments storage dags import \
--environment data-pipeline-orchestrator \
--location us-central1 \
--source ./user_video_metrics/
8 changes: 4 additions & 4 deletions global_popular_videos_l7d/ds__global_popular_videos_l7d.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ def send_alert_to_google_chat():
video_id,
like_perc,
watch_perc,
AVG(like_perc) OVER() AS mean_like_perc,
STDDEV(like_perc) OVER() AS stddev_like_perc,
AVG(watch_perc) OVER() AS mean_watch_perc,
STDDEV(watch_perc) OVER() AS stddev_watch_perc
AVG(like_perc) OVER() AS mean_like_perc, -- global mean like_perc
STDDEV(like_perc) OVER() AS stddev_like_perc, -- global stddev like_perc
AVG(watch_perc) OVER() AS mean_watch_perc, -- global mean watch_perc
STDDEV(watch_perc) OVER() AS stddev_watch_perc -- global stddev watch_perc
FROM
stats
),
Expand Down
129 changes: 129 additions & 0 deletions user_video_metrics/ds__user_video_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from datetime import datetime
from google.cloud import bigquery
import requests

def send_alert_to_google_chat():
webhook_url = "https://chat.googleapis.com/v1/spaces/AAAAkUFdZaw/messages?key=AIzaSyDdI0hCZtE6vySjMm-WEfRq3CPzqKqqsHI&token=VC5HDNQgqVLbhRVQYisn_IO2WUAvrDeRV9_FTizccic"
message = {
"text": f"DAG user_video_metrics_dag failed."
}
requests.post(webhook_url, json=message)

def check_table_exists():
client = bigquery.Client()
query = """
SELECT COUNT(*)
FROM `hot-or-not-feed-intelligence.yral_ds.INFORMATION_SCHEMA.TABLES`
WHERE table_name = 'user_video_metrics'
"""
query_job = client.query(query)
results = query_job.result()
for row in results:
return row[0] > 0

def create_initial_query():
return """
CREATE OR REPLACE TABLE `hot-or-not-feed-intelligence.yral_ds.user_video_metrics` AS
SELECT
user_id,
AVG(CASE WHEN liked THEN 1 ELSE 0 END) AS user_like_avg,
STDDEV(CASE WHEN liked THEN 1 ELSE 0 END) AS user_like_stddev,
SUM(CASE WHEN liked THEN 1 ELSE 0 END) AS total_likes,
AVG(CASE WHEN shared THEN 1 ELSE 0 END) AS user_share_avg,
STDDEV(CASE WHEN shared THEN 1 ELSE 0 END) AS user_share_stddev,
SUM(CASE WHEN shared THEN 1 ELSE 0 END) AS total_shares,
AVG(mean_percentage_watched) AS user_watch_percentage_avg,
STDDEV(mean_percentage_watched) AS user_watch_percentage_stddev,
COUNT(mean_percentage_watched) AS total_watches,
MAX(last_watched_timestamp) AS last_update_timestamp
FROM
`hot-or-not-feed-intelligence.yral_ds.userVideoRelation`
GROUP BY user_id;
"""

def create_incremental_query():
return """
MERGE `hot-or-not-feed-intelligence.yral_ds.user_video_metrics` T
USING (
SELECT
user_id,
AVG(CASE WHEN liked THEN 1 ELSE 0 END) AS user_like_avg,
STDDEV(CASE WHEN liked THEN 1 ELSE 0 END) AS user_like_stddev,
SUM(CASE WHEN liked THEN 1 ELSE 0 END) AS total_likes,
AVG(CASE WHEN shared THEN 1 ELSE 0 END) AS user_share_avg,
STDDEV(CASE WHEN shared THEN 1 ELSE 0 END) AS user_share_stddev,
SUM(CASE WHEN shared THEN 1 ELSE 0 END) AS total_shares,
AVG(mean_percentage_watched) AS user_watch_percentage_avg,
STDDEV(mean_percentage_watched) AS user_watch_percentage_stddev,
COUNT(mean_percentage_watched) AS total_watches,
MAX(last_watched_timestamp) AS last_update_timestamp
FROM
`hot-or-not-feed-intelligence.yral_ds.userVideoRelation`
WHERE
last_watched_timestamp > (SELECT MAX(last_update_timestamp) FROM `hot-or-not-feed-intelligence.yral_ds.user_video_metrics`)
GROUP BY user_id
) S -- new batch metrics
ON T.user_id = S.user_id
WHEN MATCHED THEN
UPDATE SET
T.user_like_avg = (T.user_like_avg * T.total_watches + S.user_like_avg * S.total_watches) / (T.total_watches + S.total_watches),
T.user_like_stddev = SQRT(
(
(T.total_watches - 1) * POW(T.user_like_stddev, 2) +
(S.total_watches - 1) * POW(S.user_like_stddev, 2) +
(T.total_watches * S.total_watches / (T.total_watches + S.total_watches)) * POW(T.user_like_avg - S.user_like_avg, 2)
) / (T.total_watches + S.total_watches - 1)
),
T.total_likes = T.total_likes + S.total_likes,
T.user_share_avg = (T.user_share_avg * T.total_watches + S.user_share_avg * S.total_watches) / (T.total_watches + S.total_watches),
T.user_share_stddev = SQRT(
(
(T.total_watches - 1) * POW(T.user_share_stddev, 2) +
(S.total_watches - 1) * POW(S.user_share_stddev, 2) +
(T.total_watches * S.total_watches / (T.total_watches + S.total_watches)) * POW(T.user_share_avg - S.user_share_avg, 2)
) / (T.total_watches + S.total_watches - 1)
),
T.total_shares = T.total_shares + S.total_shares,
T.user_watch_percentage_avg = (T.user_watch_percentage_avg * T.total_watches + S.user_watch_percentage_avg * S.total_watches) / (T.total_watches + S.total_watches),
T.user_watch_percentage_stddev = SQRT(
(
(T.total_watches - 1) * POW(T.user_watch_percentage_stddev, 2) +
(S.total_watches - 1) * POW(S.user_watch_percentage_stddev, 2) +
(T.total_watches * S.total_watches / (T.total_watches + S.total_watches)) * POW(T.user_watch_percentage_avg - S.user_watch_percentage_avg, 2)
) / (T.total_watches + S.total_watches - 1)
),
T.total_watches = T.total_watches + S.total_watches,
T.last_update_timestamp = S.last_update_timestamp
WHEN NOT MATCHED THEN
INSERT (user_id, user_like_avg, user_like_stddev, total_likes, user_share_avg, user_share_stddev, total_shares, user_watch_percentage_avg, user_watch_percentage_stddev, total_watches, last_update_timestamp)
VALUES (S.user_id, S.user_like_avg, S.user_like_stddev, S.total_likes, S.user_share_avg, S.user_share_stddev, S.total_shares, S.user_watch_percentage_avg, S.user_watch_percentage_stddev, S.total_watches, S.last_update_timestamp)
"""

def run_query():
if check_table_exists():
query = create_incremental_query()
else:
query = create_initial_query()

client = bigquery.Client()
query_job = client.query(query)
query_job.result()

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 1,
}

with DAG('user_video_metrics_dag', default_args=default_args, schedule_interval='*/15 * * * *', catchup=False) as dag:
run_query_task = PythonOperator(
task_id='run_query_task',
python_callable=run_query,
on_failure_callback=send_alert_to_google_chat
)

0 comments on commit c89bc87

Please sign in to comment.