Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix exp #241

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion vespadb/observations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from functools import wraps
from django.db import connection, OperationalError
from typing import Callable, TypeVar, Any, cast
from typing import Callable, TypeVar, Any, cast, Generator, List

F = TypeVar("F", bound=Callable[..., Any])

Expand Down Expand Up @@ -55,3 +55,19 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
raise
return cast(F, wrapper)
return decorator

def retry_query(queryset: Generator[Any, None, None], retries: int = 3, delay: int = 5) -> List[Any]:
"""Execute a query with retries to handle intermittent database connection errors."""
for attempt in range(retries):
try:
return list(queryset)
except OperationalError:
if attempt < retries - 1:
time.sleep(delay)
connection.close()
else:
# Raise a more informative error if retries are exhausted
raise OperationalError(f"Database connection failed after {retries} attempts")

# This return is added to satisfy type checkers, though it should never reach here.
return []
127 changes: 81 additions & 46 deletions vespadb/observations/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import datetime
import io
import json
import time
import logging
from typing import TYPE_CHECKING, Any
import csv
import json
from typing import TYPE_CHECKING, Any, Generator, Any, Union

from django.contrib.gis.db.models.functions import Transform
from django.contrib.gis.geos import GEOSGeometry
Expand All @@ -17,8 +18,8 @@
from django.db import transaction
from django.db.models import CharField, OuterRef, QuerySet, Subquery, Value
from django.db.models.functions import Coalesce
from django.db.utils import IntegrityError, OperationalError
from django.http import HttpResponse, JsonResponse
from django.db.utils import IntegrityError
from django.http import HttpResponse, JsonResponse, StreamingHttpResponse, HttpRequest
from django.db import connection
from django.utils.decorators import method_decorator
from django.utils.timezone import now
Expand All @@ -43,7 +44,7 @@
from vespadb.observations.cache import invalidate_geojson_cache, invalidate_observation_cache
from vespadb.observations.filters import ObservationFilter
from vespadb.observations.helpers import parse_and_convert_to_utc
from vespadb.observations.utils import db_retry
from vespadb.observations.utils import retry_query
from vespadb.observations.models import Municipality, Observation, Province
from vespadb.observations.serializers import (
MunicipalitySerializer,
Expand All @@ -60,8 +61,15 @@
BBOX_LENGTH = 4
GEOJSON_REDIS_CACHE_EXPIRATION = 900 # 15 minutes
GET_REDIS_CACHE_EXPIRATION = 86400 # 1 day


BATCH_SIZE = 150
CSV_HEADERS = [
"id", "created_datetime", "modified_datetime", "location", "source",
"nest_height", "nest_size", "nest_location", "nest_type",
"observation_datetime", "modified_by", "created_by", "province",
"eradication_date", "municipality", "images", "public_domain",
"municipality_name", "modified_by_first_name", "created_by_first_name",
"wn_notes", "eradication_result", "wn_id", "wn_validation_status"
]
class ObservationsViewSet(ModelViewSet): # noqa: PLR0904
"""ViewSet for the Observation model."""

Expand Down Expand Up @@ -635,49 +643,76 @@ def save_observations(self, valid_data: list[dict[str, Any]]) -> Response:
)
@method_decorator(ratelimit(key="ip", rate="60/m", method="GET", block=True))
@action(detail=False, methods=["get"], permission_classes=[AllowAny])
def export(self, request: Request) -> Response:
retries = 3
delay = 5

for attempt in range(retries):
try:
export_format = request.query_params.get("export_format", "csv").lower()
queryset = self.filter_queryset(self.get_queryset())
def export(self, request: HttpRequest) -> Union[StreamingHttpResponse, JsonResponse]:
"""
Export observations data as CSV in a memory-efficient, streamable format.

Handles large datasets by streaming data in chunks to avoid memory overload.
Only supports CSV export; JSON format is no longer available.
"""
if request.query_params.get("export_format", "csv").lower() != "csv":
return JsonResponse({"error": "Only CSV export is supported"}, status=400)

serialized_data = []
errors = []
# Filter queryset
queryset = self.filter_queryset(self.get_queryset())

for obj in queryset.iterator(chunk_size=100):
try:
serialized_obj = self.get_serializer(obj).data
serialized_data.append(serialized_obj)
except Exception as inner_error:
errors.append({
"id": obj.id,
"error": str(inner_error)
})

if export_format == "csv":
return self.export_as_csv(serialized_data)
return JsonResponse({"data": serialized_data, "errors": errors}, safe=False)

except OperationalError:
if attempt < retries - 1:
time.sleep(delay)
connection.close()
else:
return JsonResponse({"error": "Database connection failed after retries"}, safe=False)
except Exception as e:
return JsonResponse({"errors": str(e)}, safe=False)

def export_as_csv(self, data: list[dict[str, Any]]) -> HttpResponse:
"""Export the data as a CSV file."""
response = HttpResponse(content_type="text/csv")
writer = csv.DictWriter(response, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
# Define response with streaming CSV data
response = StreamingHttpResponse(
self.generate_csv_rows(queryset), content_type="text/csv"
)
response["Content-Disposition"] = 'attachment; filename="observations_export.csv"'
return response

def generate_csv_rows(self, queryset: QuerySet) -> Generator[bytes, None, None]:
"""
Generator that yields rows of CSV data, handling large datasets efficiently.

Converts each observation to a dictionary row, handling missing or misconfigured
data gracefully, and writes to CSV format on-the-fly to avoid memory overuse.
"""
# Yield CSV header row
yield self._csv_line(CSV_HEADERS)

# Iterate over queryset in chunks to avoid high memory usage
for obj in queryset.iterator(chunk_size=500):
row = self.serialize_observation(obj)
yield self._csv_line(row)


def serialize_observation(self, obj: Observation) -> list[str]:
"""
Serialize observation to a list of values in the same order as headers.

Handles potential data misconfigurations, such as missing attributes or
inconsistent formats, to ensure robust data handling.
"""
try:
return [
str(getattr(obj, field, "")) or "" for field in [
"id", "created_datetime", "modified_datetime", "location", "source",
"nest_height", "nest_size", "nest_location", "nest_type",
"observation_datetime", "modified_by_id", "created_by_id", "province_id",
"eradication_date", "municipality_id", "images", "public_domain",
"municipality_name", "modified_by_first_name", "created_by_first_name",
"wn_notes", "eradication_result", "wn_id", "wn_validation_status"
]
]
except Exception as e:
# Log and handle any serialization issues
logger.exception(f"Error serializing observation {obj.id}: {e}")
return [""] * len(CSV_HEADERS)

def _csv_line(self, row: list[str]) -> bytes:
"""
Converts a list of strings into a CSV-encoded line.

Ensures each row is CSV-compatible and byte-encoded for StreamingHttpResponse.
"""
buffer = io.StringIO()
writer = csv.writer(buffer)
writer.writerow(row)
return buffer.getvalue().encode("utf-8")

@require_GET
def search_address(request: Request) -> JsonResponse:
"""
Expand Down
Loading