From 07ae962817c2a96f1a5e36dd40dc39522411502f Mon Sep 17 00:00:00 2001 From: Steven Gerrits Date: Wed, 18 Dec 2024 10:42:03 +0000 Subject: [PATCH] dec r4-export --- nginx.conf | 28 +++- vespadb/observations/views.py | 260 ++++++++++++++++++++++------------ 2 files changed, 198 insertions(+), 90 deletions(-) diff --git a/nginx.conf b/nginx.conf index 48ac6e7..641d390 100644 --- a/nginx.conf +++ b/nginx.conf @@ -8,13 +8,28 @@ http { include mime.types; default_type application/octet-stream; + # Global timeout settings + proxy_connect_timeout 300; + proxy_send_timeout 300; + proxy_read_timeout 300; + send_timeout 300; + sendfile on; keepalive_timeout 65; + # Buffering settings for large responses + proxy_buffering on; + proxy_buffer_size 16k; + proxy_buffers 8 16k; + proxy_busy_buffers_size 32k; + server { listen 80; server_name uat.vespadb.be; + # Increase client body size limit if needed + client_max_body_size 20M; + location /static/ { alias /workspaces/vespadb/collected_static/; } @@ -29,6 +44,10 @@ http { proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; + + proxy_connect_timeout 300s; + proxy_send_timeout 300s; + proxy_read_timeout 300s; } } @@ -36,6 +55,9 @@ http { listen 80; server_name data.vespawatch.be; + # Increase client body size limit if needed + client_max_body_size 20M; + location /static/ { alias /workspaces/vespadb/collected_static/; } @@ -50,6 +72,10 @@ http { proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; + + proxy_connect_timeout 300s; + proxy_send_timeout 300s; + proxy_read_timeout 300s; } } -} +} \ No newline at end of file diff --git a/vespadb/observations/views.py b/vespadb/observations/views.py index c1e587c..8dcd2db 100644 --- a/vespadb/observations/views.py +++ b/vespadb/observations/views.py @@ -11,7 +11,18 @@ import os import logging from tenacity import retry, stop_after_attempt, wait_exponential - +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, + retry_if_exception_type, + before_log, + after_log, +) +from typing import Generator, Optional +from django.db import OperationalError, connection, transaction +from django.core.exceptions import ValidationError +import psycopg2 from django.http import FileResponse import os import tempfile @@ -75,6 +86,13 @@ "notes", "eradication_result", "wn_id", "wn_validation_status", "nest_status" ] BATCH_SIZE = 1000 +class ExportError(Exception): + """Custom exception for export-related errors.""" + pass + +class QueryTimeoutError(Exception): + """Custom exception for query timeout errors.""" + pass class ObservationsViewSet(ModelViewSet): # noqa: PLR0904 """ViewSet for the Observation model.""" @@ -719,17 +737,131 @@ def _prepare_row_data( logger.error(f"Error preparing row data for observation {observation.id}: {str(e)}") return [""] * len(CSV_HEADERS) # Return empty row in case of error + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + retry=retry_if_exception_type((OperationalError, psycopg2.OperationalError)), + before=before_log(logger, logging.INFO), + after=before_log(logger, logging.INFO) + ) + def get_queryset_count(self, queryset: QuerySet) -> int: + """Get queryset count with retry logic.""" + try: + with transaction.atomic(), connection.cursor() as cursor: + cursor.execute('SET statement_timeout TO 30000') # 30 seconds timeout + return int(queryset.count()) + except (OperationalError, psycopg2.OperationalError) as e: + logger.error(f"Error getting queryset count: {str(e)}") + raise QueryTimeoutError("Query timed out while getting count") from e + + def get_chunk_with_retries( + self, + queryset: QuerySet, + start: int, + batch_size: int, + max_retries: int = 3 + ) -> Optional[List[Observation]]: + """Get a chunk of data with retries and error handling.""" + for attempt in range(max_retries): + try: + with transaction.atomic(), connection.cursor() as cursor: + cursor.execute('SET statement_timeout TO 30000') + chunk = list( + queryset.select_related( + 'province', + 'municipality', + 'reserved_by' + )[start:start + batch_size] + ) + return chunk + except (OperationalError, psycopg2.OperationalError) as e: + if attempt == max_retries - 1: + logger.error(f"Failed to get chunk after {max_retries} attempts: {str(e)}") + return None + wait_time = (2 ** attempt) * 1 # Exponential backoff + logger.warning(f"Retry {attempt + 1}/{max_retries} after {wait_time}s") + time.sleep(wait_time) + return None + + def create_csv_generator( + self, + queryset: QuerySet, + is_admin: bool, + user_municipality_ids: Set[str], + batch_size: int = BATCH_SIZE + ) -> Generator[str, None, None]: + """Create a generator for CSV streaming with improved error handling.""" + buffer = io.StringIO() + writer = csv.writer(buffer) + + # Write headers + writer.writerow(CSV_HEADERS) + yield buffer.getvalue() + buffer.seek(0) + buffer.truncate(0) + + total_processed = 0 + successful_writes = 0 + error_count = 0 + + try: + total_count = self.get_queryset_count(queryset) + + # Process in chunks + start = 0 + while True: + chunk = self.get_chunk_with_retries(queryset, start, batch_size) + if not chunk: + break + + for observation in chunk: + try: + row_data = self._prepare_row_data( + observation, + is_admin, + user_municipality_ids + ) + writer.writerow(row_data) + successful_writes += 1 + except Exception as e: + error_count += 1 + logger.error(f"Error processing observation {observation.id}: {str(e)}") + if error_count > total_count * 0.1: # If more than 10% errors + raise ExportError("Too many errors during export") + continue + + data = buffer.getvalue() + yield data + buffer.seek(0) + buffer.truncate(0) + + total_processed += len(chunk) + progress = (total_processed / total_count) * 100 if total_count else 0 + logger.info( + f"Export progress: {progress:.1f}% ({total_processed}/{total_count}). " + f"Successful: {successful_writes}, Errors: {error_count}" + ) + + start += batch_size + + except Exception as e: + logger.exception("Error in CSV generator") + raise ExportError(f"Export failed: {str(e)}") from e + finally: + buffer.close() @method_decorator(ratelimit(key="ip", rate="60/m", method="GET", block=True)) @action(detail=False, methods=["get"], permission_classes=[AllowAny]) - def export(self, request: HttpRequest) -> Union[FileResponse, JsonResponse]: + def export(self, request: HttpRequest) -> StreamingHttpResponse: """ - Export observations as CSV using batch processing with improved error handling. + Export observations as CSV using streaming response with improved error handling + and performance optimizations. """ - temp_file = None + export_format = request.query_params.get("export_format", "csv").lower() + try: - # Validate export format - if request.query_params.get("export_format", "csv").lower() != "csv": + # Input validation + if export_format != "csv": return JsonResponse({"error": "Only CSV export is supported"}, status=400) # Get user permissions @@ -740,95 +872,45 @@ def export(self, request: HttpRequest) -> Union[FileResponse, JsonResponse]: user_municipality_ids = set() is_admin = False - # Create temporary file - temp_file = tempfile.NamedTemporaryFile(mode='w+', newline='', delete=False, suffix='.csv') - writer = csv.writer(temp_file) - writer.writerow(CSV_HEADERS) - - # Get filtered queryset with timeout protection - total_count = None - try: - with transaction.atomic(), connection.cursor() as cursor: - cursor.execute('SET statement_timeout TO 30000') # 30 seconds timeout - queryset = self.filter_queryset(self.get_queryset()) - total_count = queryset.count() - except Exception as e: - logger.error(f"Error getting total count: {str(e)}") - # Continue with None total_count - - # Process in batches with progress tracking - total_processed = 0 - successful_records = 0 - offset = 0 - batch_size = 1000 - - while True: - try: - # Get batch with timeout protection - with transaction.atomic(), connection.cursor() as cursor: - cursor.execute('SET statement_timeout TO 30000') - batch = list(queryset[offset:offset + batch_size]) - if not batch: # No more records - break - - # Process batch with retry logic - successful_writes = self.write_batch_to_file(writer, batch, is_admin, user_municipality_ids) - successful_records += successful_writes - - batch_count = len(batch) - total_processed += batch_count - offset += batch_size - - # Log progress if we know the total - if total_count: - progress = (total_processed / total_count) * 100 - logger.info(f"Export progress: {progress:.1f}% ({total_processed}/{total_count})") - else: - logger.info(f"Processed {total_processed} records") - - except Exception as e: - logger.error(f"Error processing batch at offset {offset}: {str(e)}") - offset += batch_size # Skip problematic batch - continue - - # Ensure all data is written - temp_file.flush() + # Get filtered queryset + queryset = self.filter_queryset(self.get_queryset()) - # Create response - try: - response = FileResponse( - open(temp_file.name, 'rb'), - content_type='text/csv', - as_attachment=True, - filename=f"observations_export_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" - ) - - # Log export statistics - if total_count: - logger.info(f"Export completed: {successful_records} successful records out of {total_count} total") - else: - logger.info(f"Export completed: {successful_records} successful records") - - # Clean up temp file after sending - response.close = lambda: os.unlink(temp_file.name) - return response - - except Exception as e: - logger.exception("Error creating response") - if temp_file and os.path.exists(temp_file.name): - os.unlink(temp_file.name) - return JsonResponse({"error": "Error creating export file"}, status=500) + # Create streaming response + response = StreamingHttpResponse( + streaming_content=self.create_csv_generator( + queryset=queryset, + is_admin=is_admin, + user_municipality_ids=user_municipality_ids + ), + content_type='text/csv' + ) + + # Set headers + filename = f"observations_export_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" + response['Content-Disposition'] = f'attachment; filename="{filename}"' + response['X-Accel-Buffering'] = 'no' # Disable nginx buffering + + return response + except QueryTimeoutError: + logger.exception("Query timeout during export") + return JsonResponse( + {"error": "Export timed out. Please try with a smaller date range or fewer filters."}, + status=503 + ) + except ExportError as e: + logger.exception("Export error") + return JsonResponse( + {"error": f"Export failed: {str(e)}. Please try again or contact support."}, + status=500 + ) except Exception as e: - logger.exception("Export failed") - # Clean up temp file in case of error - if temp_file and os.path.exists(temp_file.name): - os.unlink(temp_file.name) + logger.exception("Unexpected error during export") return JsonResponse( - {"error": "Export failed. Please try again or contact support."}, + {"error": "An unexpected error occurred. Please try again or contact support."}, status=500 ) - + def get_status(self, observation: Observation) -> str: """Determine observation status based on eradication data.""" logger.debug("Getting status for observation %s", observation.eradication_result)