From c025070ae2e7d097c66d3a73aad79ff395a0f2e3 Mon Sep 17 00:00:00 2001 From: Steven Gerrits Date: Mon, 28 Oct 2024 16:32:39 +0000 Subject: [PATCH] fix exp --- vespadb/observations/utils.py | 18 ++++- vespadb/observations/views.py | 127 ++++++++++++++++++++++------------ 2 files changed, 98 insertions(+), 47 deletions(-) diff --git a/vespadb/observations/utils.py b/vespadb/observations/utils.py index f7d2258..81a6a17 100644 --- a/vespadb/observations/utils.py +++ b/vespadb/observations/utils.py @@ -4,7 +4,7 @@ import time from functools import wraps from django.db import connection, OperationalError -from typing import Callable, TypeVar, Any, cast +from typing import Callable, TypeVar, Any, cast, Generator, List F = TypeVar("F", bound=Callable[..., Any]) @@ -55,3 +55,19 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: raise return cast(F, wrapper) return decorator + +def retry_query(queryset: Generator[Any, None, None], retries: int = 3, delay: int = 5) -> List[Any]: + """Execute a query with retries to handle intermittent database connection errors.""" + for attempt in range(retries): + try: + return list(queryset) + except OperationalError: + if attempt < retries - 1: + time.sleep(delay) + connection.close() + else: + # Raise a more informative error if retries are exhausted + raise OperationalError(f"Database connection failed after {retries} attempts") + + # This return is added to satisfy type checkers, though it should never reach here. + return [] diff --git a/vespadb/observations/views.py b/vespadb/observations/views.py index 8ac5b7a..789dcab 100644 --- a/vespadb/observations/views.py +++ b/vespadb/observations/views.py @@ -4,9 +4,10 @@ import datetime import io import json -import time import logging -from typing import TYPE_CHECKING, Any +import csv +import json +from typing import TYPE_CHECKING, Any, Generator, Any, Union from django.contrib.gis.db.models.functions import Transform from django.contrib.gis.geos import GEOSGeometry @@ -17,8 +18,8 @@ from django.db import transaction from django.db.models import CharField, OuterRef, QuerySet, Subquery, Value from django.db.models.functions import Coalesce -from django.db.utils import IntegrityError, OperationalError -from django.http import HttpResponse, JsonResponse +from django.db.utils import IntegrityError +from django.http import HttpResponse, JsonResponse, StreamingHttpResponse, HttpRequest from django.db import connection from django.utils.decorators import method_decorator from django.utils.timezone import now @@ -43,7 +44,7 @@ from vespadb.observations.cache import invalidate_geojson_cache, invalidate_observation_cache from vespadb.observations.filters import ObservationFilter from vespadb.observations.helpers import parse_and_convert_to_utc -from vespadb.observations.utils import db_retry +from vespadb.observations.utils import retry_query from vespadb.observations.models import Municipality, Observation, Province from vespadb.observations.serializers import ( MunicipalitySerializer, @@ -60,8 +61,15 @@ BBOX_LENGTH = 4 GEOJSON_REDIS_CACHE_EXPIRATION = 900 # 15 minutes GET_REDIS_CACHE_EXPIRATION = 86400 # 1 day - - +BATCH_SIZE = 150 +CSV_HEADERS = [ + "id", "created_datetime", "modified_datetime", "location", "source", + "nest_height", "nest_size", "nest_location", "nest_type", + "observation_datetime", "modified_by", "created_by", "province", + "eradication_date", "municipality", "images", "public_domain", + "municipality_name", "modified_by_first_name", "created_by_first_name", + "wn_notes", "eradication_result", "wn_id", "wn_validation_status" +] class ObservationsViewSet(ModelViewSet): # noqa: PLR0904 """ViewSet for the Observation model.""" @@ -635,49 +643,76 @@ def save_observations(self, valid_data: list[dict[str, Any]]) -> Response: ) @method_decorator(ratelimit(key="ip", rate="60/m", method="GET", block=True)) @action(detail=False, methods=["get"], permission_classes=[AllowAny]) - def export(self, request: Request) -> Response: - retries = 3 - delay = 5 - - for attempt in range(retries): - try: - export_format = request.query_params.get("export_format", "csv").lower() - queryset = self.filter_queryset(self.get_queryset()) + def export(self, request: HttpRequest) -> Union[StreamingHttpResponse, JsonResponse]: + """ + Export observations data as CSV in a memory-efficient, streamable format. + + Handles large datasets by streaming data in chunks to avoid memory overload. + Only supports CSV export; JSON format is no longer available. + """ + if request.query_params.get("export_format", "csv").lower() != "csv": + return JsonResponse({"error": "Only CSV export is supported"}, status=400) - serialized_data = [] - errors = [] + # Filter queryset + queryset = self.filter_queryset(self.get_queryset()) - for obj in queryset.iterator(chunk_size=100): - try: - serialized_obj = self.get_serializer(obj).data - serialized_data.append(serialized_obj) - except Exception as inner_error: - errors.append({ - "id": obj.id, - "error": str(inner_error) - }) - - if export_format == "csv": - return self.export_as_csv(serialized_data) - return JsonResponse({"data": serialized_data, "errors": errors}, safe=False) - - except OperationalError: - if attempt < retries - 1: - time.sleep(delay) - connection.close() - else: - return JsonResponse({"error": "Database connection failed after retries"}, safe=False) - except Exception as e: - return JsonResponse({"errors": str(e)}, safe=False) - - def export_as_csv(self, data: list[dict[str, Any]]) -> HttpResponse: - """Export the data as a CSV file.""" - response = HttpResponse(content_type="text/csv") - writer = csv.DictWriter(response, fieldnames=data[0].keys()) - writer.writeheader() - writer.writerows(data) + # Define response with streaming CSV data + response = StreamingHttpResponse( + self.generate_csv_rows(queryset), content_type="text/csv" + ) + response["Content-Disposition"] = 'attachment; filename="observations_export.csv"' return response + def generate_csv_rows(self, queryset: QuerySet) -> Generator[bytes, None, None]: + """ + Generator that yields rows of CSV data, handling large datasets efficiently. + + Converts each observation to a dictionary row, handling missing or misconfigured + data gracefully, and writes to CSV format on-the-fly to avoid memory overuse. + """ + # Yield CSV header row + yield self._csv_line(CSV_HEADERS) + + # Iterate over queryset in chunks to avoid high memory usage + for obj in queryset.iterator(chunk_size=500): + row = self.serialize_observation(obj) + yield self._csv_line(row) + + + def serialize_observation(self, obj: Observation) -> list[str]: + """ + Serialize observation to a list of values in the same order as headers. + + Handles potential data misconfigurations, such as missing attributes or + inconsistent formats, to ensure robust data handling. + """ + try: + return [ + str(getattr(obj, field, "")) or "" for field in [ + "id", "created_datetime", "modified_datetime", "location", "source", + "nest_height", "nest_size", "nest_location", "nest_type", + "observation_datetime", "modified_by_id", "created_by_id", "province_id", + "eradication_date", "municipality_id", "images", "public_domain", + "municipality_name", "modified_by_first_name", "created_by_first_name", + "wn_notes", "eradication_result", "wn_id", "wn_validation_status" + ] + ] + except Exception as e: + # Log and handle any serialization issues + logger.exception(f"Error serializing observation {obj.id}: {e}") + return [""] * len(CSV_HEADERS) + + def _csv_line(self, row: list[str]) -> bytes: + """ + Converts a list of strings into a CSV-encoded line. + + Ensures each row is CSV-compatible and byte-encoded for StreamingHttpResponse. + """ + buffer = io.StringIO() + writer = csv.writer(buffer) + writer.writerow(row) + return buffer.getvalue().encode("utf-8") + @require_GET def search_address(request: Request) -> JsonResponse: """