Skip to content

Commit

Permalink
Export missing data (#26)
Browse files Browse the repository at this point in the history
* issue export missing data as -99.9 resolved

* issue #25 export missing data resolved
  • Loading branch information
Midms27 authored Dec 10, 2021
1 parent f901f7f commit 035569d
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 18 deletions.
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
# Surface

## Run Docker

docker-compose up

## Run and Stop the project

docker-compose -f docker-compose-dev.yml up

docker-compose -f docker-compose-dev.yml stop

## Production env

in api file add the production env, you can find a example in "./api/production.env.example", add the values of variables, in hosts and ports you can put 0

## Generate Docker Images
docker-compose -f docker-compose-dev.yml build

## Running with Docker

docker-compose -f docker-compose-dev.yml up postgres cache redis api
Expand All @@ -8,6 +25,14 @@ docker-compose -f docker-compose-dev.yml up postgres cache redis api

docker-compose -f docker-compose-dev.yml exec api bash load_initial_data.sh

### if you're using windows

docker-compose -f docker-compose-dev.yml exec api bash

python manage.py migrate

python manage.py loaddata /surface/fixtures/*

## Create superuser

docker-compose -f docker-compose-dev.yml exec api python manage.py createsuperuser
Expand All @@ -17,3 +42,13 @@ docker-compose -f docker-compose-dev.yml exec api python manage.py createsuperus
docker-compose -f docker-compose-prd.yml -p surface_new exec api bash load_initial_data.sh

docker-compose -f docker-compose-prd.yml -p surface_new exec api python manage.py collectstatic --noinput

## Loading data

docker-compose -f docker-compose-dev.yml exec postgres pg_restore -U dba -d surface_db /data/shared/dump_surface_20211114.dump

docker-compose -f docker-compose-dev.yml exec postgres psql -U dba -d surface_db -c "\COPY raw_data FROM '/data/shared/dump_raw_data_20211130.csv' WITH DELIMITER ',' CSV HEADER;"

## Access DB manually

docker-compose -f docker-compose-dev.yml exec postgres psql -U dba -d surface_db
154 changes: 140 additions & 14 deletions api/wx/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from django.core.cache import cache
from django.db import connection


from tempestas_api import settings
from wx.decoders.flash import read_data as read_data_flash
from wx.decoders.hobo import read_file as read_file_hobo
Expand Down Expand Up @@ -898,13 +899,13 @@ def export_data(station_id, source, start_date, end_date, variable_ids, file_id)
current_end_datetime = datetime_list[i + 1]

with connection.cursor() as cursor:

if source == 'raw_data':
cursor.execute(f'''

query_raw_data = '''
WITH processed_data AS (
SELECT datetime
,var.id as variable_id
,CASE WHEN var.variable_type ilike 'code' THEN data.code ELSE data.measured::varchar END AS value
,COALESCE(CASE WHEN var.variable_type ilike 'code' THEN data.code ELSE data.measured::varchar END, '-99.9') AS value
FROM raw_data data
JOIN wx_variable var ON data.variable_id = var.id AND var.id IN %(variable_ids)s
WHERE data.datetime >= %(start_datetime)s
Expand All @@ -913,22 +914,147 @@ def export_data(station_id, source, start_date, end_date, variable_ids, file_id)
)
SELECT (generated_time + interval '%(utc_offset)s minutes') at time zone 'utc' as datetime
,variable.id
,value
,COALESCE(value, '-99.9')
FROM generate_series(%(start_datetime)s, %(end_datetime)s - INTERVAL '1 seconds', INTERVAL '%(data_interval)s seconds') generated_time
JOIN wx_variable variable ON variable.id IN %(variable_ids)s
LEFT JOIN processed_data ON datetime = generated_time AND variable.id = variable_id
''', {'utc_offset': station.utc_offset_minutes, 'variable_ids': variable_ids,
'''

logging.info(query_raw_data, {'utc_offset': station.utc_offset_minutes, 'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id, 'data_interval': current_datafile.interval_in_seconds})

cursor.execute(query_raw_data, {'utc_offset': station.utc_offset_minutes, 'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id, 'data_interval': current_datafile.interval_in_seconds})
else:
cursor.execute(f'''
SELECT {date_source}, var.id, {measured_source}
FROM {source} data
JOIN wx_variable var ON data.variable_id = var.id AND var.id in %s
WHERE data.{datetime_variable} >= %s
AND data.{datetime_variable} < %s
AND data.station_id = %s
''', (variable_ids, current_start_datetime, current_end_datetime, station_id,))

elif source == 'hourly_summary':

query_hourly = '''
WITH processed_data AS (
SELECT datetime ,var.id as variable_id
,COALESCE(CASE WHEN var.sampling_operation_id in (1,2) THEN data.avg_value::real
WHEN var.sampling_operation_id = 3 THEN data.min_value
WHEN var.sampling_operation_id = 4 THEN data.max_value
WHEN var.sampling_operation_id = 6 THEN data.sum_value
ELSE data.sum_value END, '-99.9') as value
FROM hourly_summary data
JOIN wx_variable var ON data.variable_id = var.id AND var.id IN %(variable_ids)s
WHERE data.datetime >= %(start_datetime)s
AND data.datetime < %(end_datetime)s
AND data.station_id = %(station_id)s
)
SELECT (generated_time + interval '%(utc_offset)s minutes') at time zone 'utc' as datetime
,variable.id
,COALESCE(value, '-99.9')
FROM generate_series(%(start_datetime)s, %(end_datetime)s - INTERVAL '1 seconds', INTERVAL '1 hours') generated_time
JOIN wx_variable variable ON variable.id IN %(variable_ids)s
LEFT JOIN processed_data ON datetime = generated_time AND variable.id = variable_id
'''

logging.info(query_hourly,{'utc_offset': station.utc_offset_minutes, 'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

cursor.execute(query_hourly,{'utc_offset': station.utc_offset_minutes, 'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

elif source == 'daily_summary':

query_daily = '''
WITH processed_data AS (
SELECT day ,var.id as variable_id
,COALESCE(CASE WHEN var.sampling_operation_id in (1,2) THEN data.avg_value::real
WHEN var.sampling_operation_id = 3 THEN data.min_value
WHEN var.sampling_operation_id = 4 THEN data.max_value
WHEN var.sampling_operation_id = 6 THEN data.sum_value
ELSE data.sum_value END, '-99.9') as value
FROM daily_summary data
JOIN wx_variable var ON data.variable_id = var.id AND var.id IN %(variable_ids)s
WHERE data.day >= %(start_datetime)s
AND data.day < %(end_datetime)s
AND data.station_id = %(station_id)s
)
SELECT (generated_time) as datetime
,variable.id
,COALESCE(value, '-99.9')
FROM generate_series(%(start_datetime)s, %(end_datetime)s - INTERVAL '1 seconds', INTERVAL '1 days') generated_time
JOIN wx_variable variable ON variable.id IN %(variable_ids)s
LEFT JOIN processed_data ON day = generated_time AND variable.id = variable_id
'''

logging.info(query_daily, {'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

cursor.execute(query_daily, {'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

elif source == 'monthly_summary':

query_monthly = '''
WITH processed_data AS (
SELECT date ,var.id as variable_id
,COALESCE(CASE WHEN var.sampling_operation_id in (1,2) THEN data.avg_value::real
WHEN var.sampling_operation_id = 3 THEN data.min_value
WHEN var.sampling_operation_id = 4 THEN data.max_value
WHEN var.sampling_operation_id = 6 THEN data.sum_value
ELSE data.sum_value END, '-99.9') as value
FROM monthly_summary data
JOIN wx_variable var ON data.variable_id = var.id AND var.id IN %(variable_ids)s
WHERE data.date >= %(start_datetime)s
AND data.date < %(end_datetime)s
AND data.station_id = %(station_id)s
)
SELECT (generated_time) as datetime
,variable.id
,COALESCE(value, '-99.9')
FROM generate_series(%(start_datetime)s, %(end_datetime)s , INTERVAL '1 months') generated_time
JOIN wx_variable variable ON variable.id IN %(variable_ids)s
LEFT JOIN processed_data ON date = generated_time AND variable.id = variable_id
'''

logging.info(query_monthly, {'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

cursor.execute(query_monthly, {'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

elif source == 'yearly_summary':

query_yearly = '''
WITH processed_data AS (
SELECT date ,var.id as variable_id
,COALESCE(CASE WHEN var.sampling_operation_id in (1,2) THEN data.avg_value::real
WHEN var.sampling_operation_id = 3 THEN data.min_value
WHEN var.sampling_operation_id = 4 THEN data.max_value
WHEN var.sampling_operation_id = 6 THEN data.sum_value
ELSE data.sum_value END, '-99.9') as value
FROM yearly_summary data
JOIN wx_variable var ON data.variable_id = var.id AND var.id IN %(variable_ids)s
WHERE data.date >= %(start_datetime)s
AND data.date < %(end_datetime)s
AND data.station_id = %(station_id)s
)
SELECT (generated_time) as datetime
,variable.id
,COALESCE(value, '-99.9')
FROM generate_series(%(start_datetime)s, %(end_datetime)s , INTERVAL '1 years') generated_time
JOIN wx_variable variable ON variable.id IN %(variable_ids)s
LEFT JOIN processed_data ON date = generated_time AND variable.id = variable_id
'''

logging.info(query_yearly, {'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

cursor.execute(query_yearly, {'variable_ids': variable_ids,
'start_datetime': current_start_datetime, 'end_datetime': current_end_datetime,
'station_id': station_id})

query_result = query_result + cursor.fetchall()

Expand Down
11 changes: 7 additions & 4 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ services:
postgres:
container_name: surface-database
image: timescale/timescaledb-postgis:2.3.0-pg13
ports:
- "5432:5432"
restart: unless-stopped
env_file:
- api/production.env
Expand All @@ -41,6 +43,7 @@ services:
- ./api:/surface
- ./data/media:/data/media
- ./data/shared:/data/shared
- ./data/exported_data:/data/exported_data
- ./data/documents:/data/documents
ports:
- "8080:8000"
Expand All @@ -59,14 +62,14 @@ services:
dockerfile: Dockerfile
context: api
container_name: surface-celery-worker
command: celery -A tempestas_api worker -l info
command: /home/surface/.local/bin/celery -A tempestas_api worker -l info
env_file:
- api/production.env
restart: unless-stopped
volumes:
- ./api:/surface
- ./data/media/documents/ingest:/data/documents/ingest
- ./data/media/exported_data:/data/exported_data
- ./data/documents/ingest:/data/documents/ingest
- ./data/exported_data:/data/exported_data
- ./data/shared:/data/shared
depends_on:
- api
Expand All @@ -83,7 +86,7 @@ services:
context: api
container_name: surface-celery-beat
restart: unless-stopped
command: celery -A tempestas_api beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
command: /home/surface/.local/bin/celery -A tempestas_api beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
env_file:
- api/production.env
volumes:
Expand Down

0 comments on commit 035569d

Please sign in to comment.