-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathusvaccinecsvupload.py
82 lines (59 loc) · 2.59 KB
/
usvaccinecsvupload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import logging
import airflow
from airflow.models import Variable
from airflow import models
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import snowflake.connector as sf
import time
import random
import os
default_arguments = { 'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(0),
'retries':1 ,
'retry_delay':timedelta(minutes=5)}
etl_dag = DAG( 'US_VACCINE_CSV_UPLOAD',
default_args=default_arguments,
schedule_interval =None,
)
conn=sf.connect(
user=Variable.get("USER"),
password=Variable.get("PSWD"),
account=Variable.get("ACCOUNT"),
warehouse=Variable.get("WRH"),
database=Variable.get("US_DATABASE"),
schema=Variable.get("US_VACCINATIONS_SCHEMA")
)
def validation_connection(conn, **kwargs):
print('This is my Database : ' +str(conn.database))
print('\n This my Schema : ' +str(conn.schema))
print('\n Conection SUCCESS !!!! ' )
def carga_stage(conn, **kwargs):
csv_file='/mnt/c/csv/USVaccination/us_state_vaccinations.csv'
sql = 'put file://{0} @{1} auto_compress=true'.format(csv_file,Variable.get("US_STAGE"))
curs=conn.cursor()
curs.execute(sql)
print('UPLOAD TO STAGE SUCCESS')
def carga_table(conn, **kwargs):
sql2 = "copy into {0} from @{1}/us_state_vaccinations.csv.gz FILE_FORMAT=(TYPE=csv field_delimIter=',' skip_header=1) ON_ERROR = 'CONTINUE' ".format(Variable.get("US_VACCINE_TABLE"),Variable.get("US_STAGE"))
curs=conn.cursor()
curs.execute(sql2)
print('UPLOAD TABLE SUCCESS')
validation = PythonOperator(task_id='CONNECTION_SUCCESS',
provide_context=True,
python_callable=validation_connection,
op_kwargs={"conn":conn},
dag=etl_dag )
# Data Upload Task
upload_stage = PythonOperator(task_id='UPLOAD_STAGE',
python_callable=carga_stage,
op_kwargs={"conn":conn},
dag=etl_dag )
# Data Upload Task
upload_table = PythonOperator(task_id='UPLOAD_TABLE_SNOWFLAKE',
python_callable=carga_table,
op_kwargs={"conn":conn},
dag=etl_dag )
validation >> upload_stage >> upload_table