Skip to content

Commit

Permalink
Handle connection errors on creating exports async.
Browse files Browse the repository at this point in the history
Fix #1219.
  • Loading branch information
ukanga committed Feb 7, 2018
1 parent 74ff169 commit b864769
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 17 deletions.
10 changes: 10 additions & 0 deletions onadata/apps/viewer/models/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ def __unicode__(self):
def __str__(self):
return unicode(self).encode('utf-8')

class ExportConnectionError(Exception):
"""
ExportConnectionError exception class.
"""
def __unicode__(self):
return _(u"Export server is down.")

def __str__(self):
return unicode(self).encode('utf-8')

XLS_EXPORT = 'xls'
CSV_EXPORT = 'csv'
KML_EXPORT = 'kml'
Expand Down
11 changes: 10 additions & 1 deletion onadata/apps/viewer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
from datetime import timedelta

import librabbitmq
from celery import task
from django.conf import settings
from django.shortcuts import get_object_or_404
Expand Down Expand Up @@ -46,6 +47,9 @@ def _get_export_details(username, id_string, export_id):
def create_async_export(xform, export_type, query, force_xlsx, options=None):
"""
Starts asynchronous export tasks and returns an export object.
Throws Export.ExportTypeError if export_type is not in EXPORT_TYPES.
Throws Export.ExportConnectionError if rabbitmq broker is down.
"""
username = xform.user.username
id_string = xform.id_string
Expand Down Expand Up @@ -89,7 +93,12 @@ def _create_export(xform, export_type, options):

# start async export
if export_type in export_types:
result = export_types[export_type].apply_async((), kwargs=options)
try:
result = export_types[export_type].apply_async((), kwargs=options)
except librabbitmq.ConnectionError as e:
export.delete()
report_exception("Error connecting to broker", e, sys.exc_info())
raise Export.ExportConnectionError
else:
raise Export.ExportTypeError

Expand Down
50 changes: 45 additions & 5 deletions onadata/libs/tests/utils/test_api_export_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from collections import OrderedDict, defaultdict

import mock
import librabbitmq
from celery import current_app
from celery.backends.amqp import BacklogLimitExceeded
from django.conf import settings
Expand All @@ -14,16 +15,17 @@
from onadata.apps.logger.models import XForm
from onadata.apps.main.tests.test_base import TestBase
from onadata.apps.viewer.models.export import Export
from onadata.libs.utils.api_export_tools import (get_async_response,
process_async_export,
response_for_format)
from onadata.libs.utils.api_export_tools import (
get_async_response, process_async_export, response_for_format)
from onadata.libs.utils.async_status import SUCCESSFUL, status_msg
from onadata.libs.exceptions import ServiceUnavailable


class TestApiExportTools(TestBase):
"""
Test api_export_tools.
"""

def _create_old_export(self, xform, export_type, options, filename=None):
options = OrderedDict(sorted(options.items()))
Export(
Expand Down Expand Up @@ -86,8 +88,10 @@ def test_get_async_response_export_does_not_exist(self, AsyncResult):
"""
Test get_async_response export does not exist.
"""

class MockAsyncResult(object): # pylint: disable=R0903
"""Mock AsyncResult"""

def __init__(self):
self.state = 'SUCCESS'
self.result = 1
Expand All @@ -108,8 +112,10 @@ def test_get_async_response_export_backlog_limit(self, AsyncResult):
"""
Test get_async_response export backlog limit exceeded.
"""

class MockAsyncResult(object): # pylint: disable=R0903
"""Mock AsyncResult"""

def __init__(self):
pass

Expand Down Expand Up @@ -137,9 +143,43 @@ def test_response_for_format(self):
self.assertIsNotNone(xform)
self.assertIsInstance(response_for_format(xform).data, dict)
self.assertIsInstance(response_for_format(xform, 'json').data, dict)
self.assertTrue(hasattr(response_for_format(xform, 'xls').data,
'file'))
self.assertTrue(
hasattr(response_for_format(xform, 'xls').data, 'file'))

xform.xls.storage.delete(xform.xls.name)
with self.assertRaises(Http404):
response_for_format(xform, 'xls')

# pylint: disable=invalid-name
@mock.patch(
'onadata.libs.utils.api_export_tools.viewer_task.create_async_export')
def test_process_async_export_connection_error(self, mock_task):
"""
Test process_async_export creates a new export.
"""
mock_task.side_effect = Export.ExportConnectionError
self._publish_transportation_form_and_submit_instance()
request = self.factory.post('/')
request.user = self.user
export_type = "csv"
options = defaultdict(dict)

with self.assertRaises(ServiceUnavailable):
process_async_export(
request, self.xform, export_type, options=options)

# pylint: disable=invalid-name
@mock.patch('onadata.libs.utils.api_export_tools.AsyncResult')
def test_get_async_response_connection_error(self, AsyncResult):
"""
Test get_async_response connection error.
"""
AsyncResult.side_effect = librabbitmq.ConnectionError
settings.CELERY_ALWAYS_EAGER = True
current_app.conf.CELERY_ALWAYS_EAGER = True
self._publish_transportation_form_and_submit_instance()
request = self.factory.post('/')
request.user = self.user

with self.assertRaises(ServiceUnavailable):
get_async_response('job_uuid', request, self.xform)
38 changes: 27 additions & 11 deletions onadata/libs/utils/api_export_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
"""
import json
import os
import sys
from datetime import datetime

import httplib2
import librabbitmq
from celery.backends.amqp import BacklogLimitExceeded
from celery.result import AsyncResult
from django.conf import settings
Expand All @@ -17,8 +19,8 @@
from oauth2client import client as google_client
from oauth2client.client import (HttpAccessTokenRefreshError,
OAuth2WebServerFlow, TokenRevokeError)
from oauth2client.contrib.django_util.storage import (
DjangoORMStorage as Storage)
from oauth2client.contrib.django_util.storage import \
DjangoORMStorage as Storage
from requests import ConnectionError
from rest_framework import exceptions, status
from rest_framework.response import Response
Expand All @@ -36,6 +38,7 @@
FAILED, PENDING, SUCCESSFUL, async_status, celery_state_to_status)
from onadata.libs.utils.common_tags import (
DATAVIEW_EXPORT, GROUPNAME_REMOVED_FLAG, OSM, SUBMISSION_TIME)
from onadata.libs.utils.common_tools import report_exception
from onadata.libs.utils.export_tools import (
check_pending_export, generate_attachments_zip_export, generate_export,
generate_external_export, generate_kml_export, generate_osm_export,
Expand Down Expand Up @@ -86,8 +89,14 @@ def _get_export_type(export_type):


# pylint: disable=too-many-arguments, too-many-locals, too-many-branches
def custom_response_handler(request, xform, query, export_type, token=None,
meta=None, dataview=False, filename=None):
def custom_response_handler(request,
xform,
query,
export_type,
token=None,
meta=None,
dataview=False,
filename=None):
"""
Returns a HTTP response with export file for download.
"""
Expand All @@ -112,7 +121,9 @@ def custom_response_handler(request, xform, query, export_type, token=None,
query)
except NoRecordsPermission:
return Response(
data=json.dumps({"details": _("You don't have permission")}),
data=json.dumps({
"details": _("You don't have permission")
}),
status=status.HTTP_403_FORBIDDEN,
content_type="application/json")

Expand All @@ -130,7 +141,7 @@ def custom_response_handler(request, xform, query, export_type, token=None,

return Response(
data=json.dumps({
"details": _("Sheets export only supported in async mode")
"details": _("Sheets export only supported in async mode")
}),
status=status.HTTP_403_FORBIDDEN,
content_type="application/json")
Expand Down Expand Up @@ -399,6 +410,7 @@ def process_async_export(request, xform, export_type, options=None):
xform, export_type, query, False, options=options)
}
else:
print "Do not create a new export."
export = newest_export_for(xform, export_type, options)

if not export.filename:
Expand Down Expand Up @@ -434,9 +446,12 @@ def _create_export_async(xform,
if export:
return export.task_id

export, async_result \
= viewer_task.create_async_export(xform, export_type, query,
force_xlsx, options=options)
try:
export, async_result = viewer_task.create_async_export(
xform, export_type, query, force_xlsx, options=options)
except Export.ExportConnectionError:
raise ServiceUnavailable

return async_result.task_id


Expand Down Expand Up @@ -485,9 +500,10 @@ def get_async_response(job_uuid, request, xform, count=0):
resp.update(job.result)
else:
resp.update({'progress': str(job.result)})
except ConnectionError as e:
except (librabbitmq.ConnectionError, ConnectionError) as e:
report_exception("Connection Error", e, sys.exc_info())
if count > 0:
raise ServiceUnavailable(unicode(e))
raise ServiceUnavailable

return get_async_response(job_uuid, request, xform, count + 1)
except BacklogLimitExceeded:
Expand Down

0 comments on commit b864769

Please sign in to comment.