Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't Merge: Geocoding additions in progress #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.DS_Store
.idea
93 changes: 93 additions & 0 deletions python/s3Job.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from __future__ import print_function
import os
import string
import boto3
import json
import urllib
import re
from random import SystemRandom


reporter_dict = {
"WA": "1",
Expand All @@ -13,6 +17,84 @@
}


def submit_create_es_domain(client, host):
"""
Create Elasticsearch service with random hostname (potentially configurable),
and default config for now
"""
command = {'command': ['es.py', host, '-j', 'create', '-a', os.getenv('AWS_ACCOUNT')]}

job_submit_result = client.submit_job(
jobName='LoadElasticsearchTBD',
jobQueue='National-Voter-File-Job-Queue',
jobDefinition='Geocoder',
containerOverrides=command
)

job_id = job_submit_result['jobId']
return job_id


def submit_load_elasticsearch(client, host, bucket, state_name, depends_on):
"""
Run es_tiger_loader.py as normal, will need S3 bucket with information as var
"""
command = {'command': ['es_tiger_loader.py', state_name, '-e', host, '-b', bucket]}

job_submit_result = client.submit_job(
jobName='LoadElasticsearchTBD',
jobQueue='National-Voter-File-Job-Queue',
jobDefinition='Geocoder',
containerOverrides=command,
dependsOn=depends_on
)

job_id = job_submit_result['jobId']
return job_id


def submit_run_geocoder_job(client, host, bucket, key, state_name, depends_on):
"""
Run run.py to actually geocode results
- Need to pass --s3_bucket parameter with the bucket where output will be stored
- Can also include full S3 path to key because path will be split and put in
data/ in processing
"""
output_file = '.'.join(key.split('.')[:-1]) + '_output.' + key.split('.')[-1]
command = {'command': ['run.py', key, '-o', output_file, '-s', state_name, '-b', bucket, '-e', host]}

job_submit_result = client.submit_job(
jobName='LoadElasticsearchTBD',
jobQueue='National-Voter-File-Job-Queue',
jobDefinition='Geocoder',
containerOverrides=command,
dependsOn=depends_on
)

job_id = job_submit_result['jobId']
return job_id


def submit_delete_elasticsearch(client, host, depends_on):
"""
Can this be a short command queue to run at end? Does it have to be added to the container?
"""
response = client.delete_elasticsearch_domain(DomainName=host)

command = {'command': ['es.py', host, '-j', 'delete', '-a', os.getenv('AWS_ACCOUNT')]}

job_submit_result = client.submit_job(
jobName='LoadElasticsearchTBD',
jobQueue='National-Voter-File-Job-Queue',
jobDefinition='Geocoder',
containerOverrides=command,
dependsOn=depends_on
)

job_id = job_submit_result['jobId']
return job_id


def submit_file_copy_job(client, bucket, key):
s3_path = "s3://%s/%s" % (bucket, key)
command = {"command": ["sh", "-cxv", "aws s3 cp %s /work; chmod go+rw /work/%s" % (s3_path, key)]}
Expand Down Expand Up @@ -70,8 +152,19 @@ def submit_load_job(batch_client, input_file, state_name, report_date, reporter,
return job_submit_result['jobId']


def run_geocoder_tasks(batch_client, es_client, bucket, key, state_name):
rand_str = ''.join(SystemRandom().choice(string.ascii_lowercase + string.digits) for _ in range(6))
es_host = 'geocoder-{}'.format(rand_str)
create_es_job = submit_create_es_domain(batch_client, es_host)
# nvf-tiger-2016 bucket is static, can be moved
load_job = submit_load_elasticsearch(batch_client, es_host, 'nvf-tiger-2016', state_name, {'jobId': create_es_job})
geocode_job = submit_run_geocoder_job(batch_client, es_host, bucket, key, state_name, {'jobId': load_job})
delete_job = submit_delete_elasticsearch(batch_client, es_host, {'jobId': geocode_job})


def lambda_handler(event, context):
batch_client = boto3.client('batch')
es_client = boto3.client('es')
""":type: pyboto3.batch"""

s3 = boto3.resource('s3')
Expand Down