diff --git a/onadata/apps/viewer/models/export.py b/onadata/apps/viewer/models/export.py index 4088a21ee0..c128f1e903 100644 --- a/onadata/apps/viewer/models/export.py +++ b/onadata/apps/viewer/models/export.py @@ -59,6 +59,16 @@ def __unicode__(self): def __str__(self): return unicode(self).encode('utf-8') + class ExportConnectionError(Exception): + """ + ExportConnectionError exception class. + """ + def __unicode__(self): + return _(u"Export server is down.") + + def __str__(self): + return unicode(self).encode('utf-8') + XLS_EXPORT = 'xls' CSV_EXPORT = 'csv' KML_EXPORT = 'kml' diff --git a/onadata/apps/viewer/tasks.py b/onadata/apps/viewer/tasks.py index 0c703b0e01..8d2cfa7d95 100644 --- a/onadata/apps/viewer/tasks.py +++ b/onadata/apps/viewer/tasks.py @@ -5,6 +5,7 @@ import sys from datetime import timedelta +import librabbitmq from celery import task from django.conf import settings from django.shortcuts import get_object_or_404 @@ -46,6 +47,9 @@ def _get_export_details(username, id_string, export_id): def create_async_export(xform, export_type, query, force_xlsx, options=None): """ Starts asynchronous export tasks and returns an export object. + + Throws Export.ExportTypeError if export_type is not in EXPORT_TYPES. + Throws Export.ExportConnectionError if rabbitmq broker is down. """ username = xform.user.username id_string = xform.id_string @@ -89,7 +93,12 @@ def _create_export(xform, export_type, options): # start async export if export_type in export_types: - result = export_types[export_type].apply_async((), kwargs=options) + try: + result = export_types[export_type].apply_async((), kwargs=options) + except librabbitmq.ConnectionError as e: + export.delete() + report_exception("Error connecting to broker", e, sys.exc_info()) + raise Export.ExportConnectionError else: raise Export.ExportTypeError diff --git a/onadata/libs/tests/utils/test_api_export_tools.py b/onadata/libs/tests/utils/test_api_export_tools.py index 6d8420698f..671956073f 100644 --- a/onadata/libs/tests/utils/test_api_export_tools.py +++ b/onadata/libs/tests/utils/test_api_export_tools.py @@ -5,6 +5,7 @@ from collections import OrderedDict, defaultdict import mock +import librabbitmq from celery import current_app from celery.backends.amqp import BacklogLimitExceeded from django.conf import settings @@ -14,16 +15,17 @@ from onadata.apps.logger.models import XForm from onadata.apps.main.tests.test_base import TestBase from onadata.apps.viewer.models.export import Export -from onadata.libs.utils.api_export_tools import (get_async_response, - process_async_export, - response_for_format) +from onadata.libs.utils.api_export_tools import ( + get_async_response, process_async_export, response_for_format) from onadata.libs.utils.async_status import SUCCESSFUL, status_msg +from onadata.libs.exceptions import ServiceUnavailable class TestApiExportTools(TestBase): """ Test api_export_tools. """ + def _create_old_export(self, xform, export_type, options, filename=None): options = OrderedDict(sorted(options.items())) Export( @@ -86,8 +88,10 @@ def test_get_async_response_export_does_not_exist(self, AsyncResult): """ Test get_async_response export does not exist. """ + class MockAsyncResult(object): # pylint: disable=R0903 """Mock AsyncResult""" + def __init__(self): self.state = 'SUCCESS' self.result = 1 @@ -108,8 +112,10 @@ def test_get_async_response_export_backlog_limit(self, AsyncResult): """ Test get_async_response export backlog limit exceeded. """ + class MockAsyncResult(object): # pylint: disable=R0903 """Mock AsyncResult""" + def __init__(self): pass @@ -137,9 +143,43 @@ def test_response_for_format(self): self.assertIsNotNone(xform) self.assertIsInstance(response_for_format(xform).data, dict) self.assertIsInstance(response_for_format(xform, 'json').data, dict) - self.assertTrue(hasattr(response_for_format(xform, 'xls').data, - 'file')) + self.assertTrue( + hasattr(response_for_format(xform, 'xls').data, 'file')) xform.xls.storage.delete(xform.xls.name) with self.assertRaises(Http404): response_for_format(xform, 'xls') + + # pylint: disable=invalid-name + @mock.patch( + 'onadata.libs.utils.api_export_tools.viewer_task.create_async_export') + def test_process_async_export_connection_error(self, mock_task): + """ + Test process_async_export creates a new export. + """ + mock_task.side_effect = Export.ExportConnectionError + self._publish_transportation_form_and_submit_instance() + request = self.factory.post('/') + request.user = self.user + export_type = "csv" + options = defaultdict(dict) + + with self.assertRaises(ServiceUnavailable): + process_async_export( + request, self.xform, export_type, options=options) + + # pylint: disable=invalid-name + @mock.patch('onadata.libs.utils.api_export_tools.AsyncResult') + def test_get_async_response_connection_error(self, AsyncResult): + """ + Test get_async_response connection error. + """ + AsyncResult.side_effect = librabbitmq.ConnectionError + settings.CELERY_ALWAYS_EAGER = True + current_app.conf.CELERY_ALWAYS_EAGER = True + self._publish_transportation_form_and_submit_instance() + request = self.factory.post('/') + request.user = self.user + + with self.assertRaises(ServiceUnavailable): + get_async_response('job_uuid', request, self.xform) diff --git a/onadata/libs/utils/api_export_tools.py b/onadata/libs/utils/api_export_tools.py index 666ca0e53c..08d9a95ecc 100644 --- a/onadata/libs/utils/api_export_tools.py +++ b/onadata/libs/utils/api_export_tools.py @@ -4,9 +4,11 @@ """ import json import os +import sys from datetime import datetime import httplib2 +import librabbitmq from celery.backends.amqp import BacklogLimitExceeded from celery.result import AsyncResult from django.conf import settings @@ -17,8 +19,8 @@ from oauth2client import client as google_client from oauth2client.client import (HttpAccessTokenRefreshError, OAuth2WebServerFlow, TokenRevokeError) -from oauth2client.contrib.django_util.storage import ( - DjangoORMStorage as Storage) +from oauth2client.contrib.django_util.storage import \ + DjangoORMStorage as Storage from requests import ConnectionError from rest_framework import exceptions, status from rest_framework.response import Response @@ -36,6 +38,7 @@ FAILED, PENDING, SUCCESSFUL, async_status, celery_state_to_status) from onadata.libs.utils.common_tags import ( DATAVIEW_EXPORT, GROUPNAME_REMOVED_FLAG, OSM, SUBMISSION_TIME) +from onadata.libs.utils.common_tools import report_exception from onadata.libs.utils.export_tools import ( check_pending_export, generate_attachments_zip_export, generate_export, generate_external_export, generate_kml_export, generate_osm_export, @@ -86,8 +89,14 @@ def _get_export_type(export_type): # pylint: disable=too-many-arguments, too-many-locals, too-many-branches -def custom_response_handler(request, xform, query, export_type, token=None, - meta=None, dataview=False, filename=None): +def custom_response_handler(request, + xform, + query, + export_type, + token=None, + meta=None, + dataview=False, + filename=None): """ Returns a HTTP response with export file for download. """ @@ -112,7 +121,9 @@ def custom_response_handler(request, xform, query, export_type, token=None, query) except NoRecordsPermission: return Response( - data=json.dumps({"details": _("You don't have permission")}), + data=json.dumps({ + "details": _("You don't have permission") + }), status=status.HTTP_403_FORBIDDEN, content_type="application/json") @@ -130,7 +141,7 @@ def custom_response_handler(request, xform, query, export_type, token=None, return Response( data=json.dumps({ - "details": _("Sheets export only supported in async mode") + "details": _("Sheets export only supported in async mode") }), status=status.HTTP_403_FORBIDDEN, content_type="application/json") @@ -399,6 +410,7 @@ def process_async_export(request, xform, export_type, options=None): xform, export_type, query, False, options=options) } else: + print "Do not create a new export." export = newest_export_for(xform, export_type, options) if not export.filename: @@ -434,9 +446,12 @@ def _create_export_async(xform, if export: return export.task_id - export, async_result \ - = viewer_task.create_async_export(xform, export_type, query, - force_xlsx, options=options) + try: + export, async_result = viewer_task.create_async_export( + xform, export_type, query, force_xlsx, options=options) + except Export.ExportConnectionError: + raise ServiceUnavailable + return async_result.task_id @@ -485,9 +500,10 @@ def get_async_response(job_uuid, request, xform, count=0): resp.update(job.result) else: resp.update({'progress': str(job.result)}) - except ConnectionError as e: + except (librabbitmq.ConnectionError, ConnectionError) as e: + report_exception("Connection Error", e, sys.exc_info()) if count > 0: - raise ServiceUnavailable(unicode(e)) + raise ServiceUnavailable return get_async_response(job_uuid, request, xform, count + 1) except BacklogLimitExceeded: