Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better callback url base management (behind reverse proxy) #239

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ Here's a summary of the options available.
| CHUNK_SIZE | '16384' | Chunk size when processing the data file |
| CHUNK_INSERT_ROWS | '250' | Number of records to send a request to datastore |
| DOWNLOAD_TIMEOUT | '30' | Download timeout for requesting the file |
| SSL_VERIFY | False | Do not validate SSL certificates when requesting the data file (*Warning*: Do not use this setting in production) |
| DATAPUSHER_SSL_VERIFY | True | Do not validate SSL certificates when requesting the data file (*Warning*: Do not use this setting in production). Was used a different name from ckan SSL_VERIFY to prevent overlapping with the value set in the ckan imports |
| TYPES | [messytables.StringType, messytables.DecimalType, messytables.IntegerType, messytables.DateUtilType] | [Messytables][] types used internally, can be modified to customize the type guessing |
| TYPE_MAPPING | {'String': 'text', 'Integer': 'numeric', 'Decimal': 'numeric', 'DateUtil': 'timestamp'} | Internal Messytables type mapping |

Expand Down
78 changes: 58 additions & 20 deletions datapusher/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
else:
locale.setlocale(locale.LC_ALL, '')

DATAPUSHER_SSL_VERIFY = web.app.config.get('DATAPUSHER_SSL_VERIFY', True)
MAX_CONTENT_LENGTH = web.app.config.get('MAX_CONTENT_LENGTH') or 10485760
CHUNK_SIZE = web.app.config.get('CHUNK_SIZE') or 16384
CHUNK_INSERT_ROWS = web.app.config.get('CHUNK_INSERT_ROWS') or 250
Expand All @@ -37,12 +38,12 @@
if USE_PROXY:
DOWNLOAD_PROXY = web.app.config.get('DOWNLOAD_PROXY')

if web.app.config.get('SSL_VERIFY') in ['False', 'FALSE', '0', False, 0]:
SSL_VERIFY = False
if DATAPUSHER_SSL_VERIFY in ['False', 'FALSE', '0', False, 0]:
DATAPUSHER_SSL_VERIFY = False
else:
SSL_VERIFY = True
DATAPUSHER_SSL_VERIFY = True

if not SSL_VERIFY:
if not DATAPUSHER_SSL_VERIFY:
requests.packages.urllib3.disable_warnings()

_TYPE_MAPPING = {
Expand Down Expand Up @@ -202,7 +203,7 @@ def delete_datastore_resource(resource_id, api_key, ckan_url):
try:
delete_url = get_url('datastore_delete', ckan_url)
response = requests.post(delete_url,
verify=SSL_VERIFY,
verify=DATAPUSHER_SSL_VERIFY,
data=json.dumps({'id': resource_id,
'force': True}),
headers={'Content-Type': 'application/json',
Expand All @@ -218,7 +219,7 @@ def datastore_resource_exists(resource_id, api_key, ckan_url):
try:
search_url = get_url('datastore_search', ckan_url)
response = requests.post(search_url,
verify=SSL_VERIFY,
verify=DATAPUSHER_SSL_VERIFY,
data=json.dumps({'id': resource_id,
'limit': 0}),
headers={'Content-Type': 'application/json',
Expand Down Expand Up @@ -251,7 +252,7 @@ def send_resource_to_datastore(resource, headers, records,

url = get_url('datastore_create', ckan_url)
r = requests.post(url,
verify=SSL_VERIFY,
verify=DATAPUSHER_SSL_VERIFY,
data=json.dumps(request, cls=DatastoreEncoder),
headers={'Content-Type': 'application/json',
'Authorization': api_key}
Expand All @@ -269,7 +270,7 @@ def update_resource(resource, api_key, ckan_url):
url = get_url('resource_update', ckan_url)
r = requests.post(
url,
verify=SSL_VERIFY,
verify=DATAPUSHER_SSL_VERIFY,
data=json.dumps(resource),
headers={'Content-Type': 'application/json',
'Authorization': api_key}
Expand All @@ -284,7 +285,7 @@ def get_resource(resource_id, ckan_url, api_key):
"""
url = get_url('resource_show', ckan_url)
r = requests.post(url,
verify=SSL_VERIFY,
verify=DATAPUSHER_SSL_VERIFY,
data=json.dumps({'id': resource_id}),
headers={'Content-Type': 'application/json',
'Authorization': api_key}
Expand Down Expand Up @@ -323,19 +324,38 @@ def push_to_datastore(task_id, input, dry_run=False):
:type dry_run: boolean

'''
logging.debug("Executing push_to_datastore job")
logging.debug("DATAPUSHER_SSL_VERIFY %s" % DATAPUSHER_SSL_VERIFY)

# This response_logger is used to report job activity to ckan
handler = util.StoringHandler(task_id, input)
logger = logging.getLogger(task_id)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
response_logger = logging.getLogger(task_id)
response_logger.addHandler(handler)
response_logger.setLevel(logging.DEBUG)

validate_input(input)

data = input['metadata']

ckan_url = data['ckan_url']

# See ckan/ckanext/datapusher/logic/action.py
# See https://github.com/ckan/ckan-service-provider/blob/master/ckanserviceprovider/web.py
callback_url = input.get('result_url')

resource_id = data['resource_id']
original_url = data.get('original_url', None)
original_base_url = data.get('original_base_url', '')

api_key = input.get('api_key')

logging.debug("callback_url: %s" % callback_url)
logging.debug("resource_id: %s" % resource_id)
logging.debug("api_key: %s............" % api_key[0:8])
logging.debug("ckan_url: %s" % ckan_url)
logging.debug("original_url: %s" % original_url)
logging.debug("original_base_url: %s" % original_base_url)

try:
resource = get_resource(resource_id, ckan_url, api_key)
except util.JobError as e:
Expand All @@ -345,27 +365,45 @@ def push_to_datastore(task_id, input, dry_run=False):

# check if the resource url_type is a datastore
if resource.get('url_type') == 'datastore':
logger.info('Dump files are managed with the Datastore API')
response_logger.info('Dump files are managed with the Datastore API')
return

# check scheme
# this url is the external url of the resource if behind reverse proxy
# we had to replace this url_base with callback_url_base
url = resource.get('url')

# here we replace original_base_url, with ckan_url in the url variable
# example:
# ckan_url: http://ckan:5000/
# original_base_url: https://ckanportal.mydomain.com
#
# url:
# before: https://ckanportal.mydomain.com/dataset/515fa5c0-058e-4b53-b5ed-74ff38aca428/resource/203de699-c2fc-40f7-a740-0369a2ebaa78/download/test.csv
# after: http://ckan:5000/dataset/515fa5c0-058e-4b53-b5ed-74ff38aca428/resource/203de699-c2fc-40f7-a740-0369a2ebaa78/download/test.csv
if (original_base_url):
original_base_url_strip = original_base_url.rstrip("/")
ckan_url_strip = ckan_url.rstrip("/")
url = url.replace(original_base_url_strip, ckan_url_strip)

logging.debug("Verifying resource from url: %s" % url)

scheme = urlsplit(url).scheme
if scheme not in ('http', 'https', 'ftp'):
raise util.JobError(
'Only http, https, and ftp resources may be fetched.'
)

# fetch the resource data
logger.info('Fetching from: {0}'.format(url))
response_logger.info('Fetching from: {0}'.format(url))
headers = {}
if resource.get('url_type') == 'upload':
# If this is an uploaded file to CKAN, authenticate the request,
# otherwise we won't get file from private resources
headers['Authorization'] = api_key
try:
kwargs = {'headers': headers, 'timeout': DOWNLOAD_TIMEOUT,
'verify': SSL_VERIFY, 'stream': True}
'verify': DATAPUSHER_SSL_VERIFY, 'stream': True}
if USE_PROXY:
kwargs['proxies'] = {'http': DOWNLOAD_PROXY, 'https': DOWNLOAD_PROXY}
response = requests.get(url, **kwargs)
Expand Down Expand Up @@ -409,7 +447,7 @@ def push_to_datastore(task_id, input, dry_run=False):

if (resource.get('hash') == file_hash
and not data.get('ignore_hash')):
logger.info("The file hash hasn't changed: {hash}.".format(
response_logger.info("The file hash hasn't changed: {hash}.".format(
hash=file_hash))
return

Expand Down Expand Up @@ -481,7 +519,7 @@ def row_iterator():
the fields have significantly changed, it may also fail.
'''
if existing:
logger.info('Deleting "{res_id}" from datastore.'.format(
response_logger.info('Deleting "{res_id}" from datastore.'.format(
res_id=resource_id))
delete_datastore_resource(resource_id, api_key, ckan_url)

Expand All @@ -498,7 +536,7 @@ def row_iterator():
if type_override in list(_TYPE_MAPPING.values()):
h['type'] = type_override

logger.info('Determined headers and types: {headers}'.format(
response_logger.info('Determined headers and types: {headers}'.format(
headers=headers_dicts))

if dry_run:
Expand All @@ -508,12 +546,12 @@ def row_iterator():
for i, chunk in enumerate(chunky(result, CHUNK_INSERT_ROWS)):
records, is_it_the_last_chunk = chunk
count += len(records)
logger.info('Saving chunk {number} {is_last}'.format(
response_logger.info('Saving chunk {number} {is_last}'.format(
number=i, is_last='(last)' if is_it_the_last_chunk else ''))
send_resource_to_datastore(resource, headers_dicts, records,
is_it_the_last_chunk, api_key, ckan_url)

logger.info('Successfully pushed {n} entries to "{res_id}".'.format(
response_logger.info('Successfully pushed {n} entries to "{res_id}".'.format(
n=count, res_id=resource_id))

if data.get('set_url_type', False):
Expand Down
4 changes: 2 additions & 2 deletions deployment/datapusher_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
CHUNK_INSERT_ROWS = int(os.environ.get('DATAPUSHER_CHUNK_INSERT_ROWS', '250'))
DOWNLOAD_TIMEOUT = int(os.environ.get('DATAPUSHER_DOWNLOAD_TIMEOUT', '30'))

# Verify SSL
SSL_VERIFY = os.environ.get('DATAPUSHER_SSL_VERIFY', True)
# Verify SSL (Prevent overlapping with CKAN SSL_VERIFY)
DATAPUSHER_SSL_VERIFY = os.environ.get('DATAPUSHER_SSL_VERIFY', True)

# logging
#LOG_FILE = '/tmp/ckan_service.log'