diff --git a/.env_template b/.env_template index 2fff70a0f..ff902a1dc 100644 --- a/.env_template +++ b/.env_template @@ -59,6 +59,17 @@ R5_HOST=172.17.0.1 R5_AUTHORIZATION=YWRtaW46YWRtaW4 # if you want to use R5 with authentication, you need to set this variable to the base64 encoded string (!remove equal sign (=) in the end) USE_PYGEOS=0 + +RABBITMQ_DEFAULT_USER=guest +RABBITMQ_DEFAULT_PASS=Q34TWVTWE5YBDB +RABBITMQ_DEFAULT_VHOST=goat +CELERY_BROKER_URL=amqp://${RABBITMQ_DEFAULT_USER}:${RABBITMQ_DEFAULT_PASS}@rabbit:5672/${RABBITMQ_DEFAULT_VHOST} +CELERY_RESULT_BACKEND=redis://redis/0 +RABBIT_OUTER_PORT=15674 +FLOWER_OUTER_PORT=55550 +CELERY_QUEUES="goat-read-heatmap-worker" + + # CELERY_BROKER_URL=redis://redis:6379/0 CELERY_BROKER_URL=amqp://guest:guest@rabbit:5672/goat # Names of nodes to start @@ -85,7 +96,7 @@ CELERYD_CHDIR="/app" # Extra command-line arguments to the worker # CELERYD_OPTS="--time-limit=1200 --concurrency=1" -CELERYD_OPTS="--concurrency=1" +CELERYD_OPTS="--concurrency=1 --queues=${CELERY_QUEUES}" # Configure node-specific settings by appending node name to arguments: #CELERYD_OPTS="--time-limit=300 -c 8 -c:worker2 4 -c:worker3 2 -Ofair:worker1" diff --git a/app/api/.dockerignore b/app/api/.dockerignore new file mode 100644 index 000000000..94f75cc4a --- /dev/null +++ b/app/api/.dockerignore @@ -0,0 +1 @@ +**/cache/ \ No newline at end of file diff --git a/app/api/src/crud/crud_read_heatmap.py b/app/api/src/crud/crud_read_heatmap.py index ab293ddd2..cbd7b5d60 100644 --- a/app/api/src/crud/crud_read_heatmap.py +++ b/app/api/src/crud/crud_read_heatmap.py @@ -15,15 +15,14 @@ from src.core import heatmap as heatmap_core from src.core import heatmap_cython from src.core.config import settings -from src.db.session import legacy_engine +from src.db.session import legacy_engine, async_session from src.schemas.heatmap import HeatmapMode, HeatmapSettings, HeatmapType from src.schemas.isochrone import IsochroneDTO, IsochroneMode from src.utils import create_h3_grid, print_warning, timing class CRUDBaseHeatmap: - def __init__(self, db=None, current_user=None): - self.db = db + def __init__(self, current_user=None): self.current_user = current_user self.travel_time_base_path = os.path.join(settings.CACHE_DIR, "traveltime_matrices") self.connectivity_base_path = os.path.join(settings.CACHE_DIR, "connectivity_matrices") diff --git a/app/api/src/endpoints/v1/indicators.py b/app/api/src/endpoints/v1/indicators.py index b52c02ff4..a4b6dae04 100644 --- a/app/api/src/endpoints/v1/indicators.py +++ b/app/api/src/endpoints/v1/indicators.py @@ -2,7 +2,7 @@ import time from typing import Any, List, Optional, Union -from fastapi import APIRouter, Body, Depends, HTTPException, Query +from fastapi import APIRouter, Body, Depends, HTTPException, Query, status from fastapi.encoders import jsonable_encoder from sqlalchemy import func, text from sqlalchemy.ext.asyncio.session import AsyncSession @@ -20,6 +20,7 @@ SQLReturnTypes, ) from src.schemas.heatmap import HeatmapSettings, ReturnTypeHeatmap +from src.schemas.workers import TaskResultRequest from src.schemas.heatmap import request_examples as heatmap_request_examples from src.schemas.heatmap import request_examples_ from src.schemas.indicators import ( @@ -29,6 +30,9 @@ oev_gueteklasse_config_example, ) from src.utils import return_geojson_or_geobuf +from src.workers.read_heatmap import read_heatmap_task +from src.workers.celery_app import celery_app +from celery.result import AsyncResult router = APIRouter() @@ -36,24 +40,54 @@ @router.post("/heatmap") async def calculate_heatmap( *, - db: AsyncSession = Depends(deps.get_db), + # db: AsyncSession = Depends(deps.get_db), current_user: models.User = Depends(deps.get_current_active_user), heatmap_settings: HeatmapSettings = Body(..., examples=heatmap_request_examples), - return_type: ReturnTypeHeatmap = Query(..., description="Return type of the response"), + # return_type: ReturnTypeHeatmap = Query(..., description="Return type of the response"), ): """ Calculate a heatmap. """ - start_time = time.time() - result = await crud.read_heatmap(db=db, current_user=current_user).read_heatmap2( - heatmap_settings=heatmap_settings + current_user = json.loads(current_user.json()) + heatmap_settings = json.loads(heatmap_settings.json()) + task = read_heatmap_task.delay( + current_user=current_user, + heatmap_settings=heatmap_settings, ) - end_time = time.time() - print(f"Time to calculate heatmap: {round(end_time - start_time,2)}") - if return_type.value == "geobuf": - result = return_geojson_or_geobuf(result, "geobuf") - return result - + return {"task_id": task.id} + # start_time = time.time() + # result = await crud.read_heatmap(db=db, current_user=current_user).read_heatmap2( + # heatmap_settings=heatmap_settings + # ) + # end_time = time.time() + # print(f"Time to calculate heatmap: {round(end_time - start_time,2)}") + # if return_type.value == "geobuf": + # result = return_geojson_or_geobuf(result, "geobuf") + # return result + +@router.post("/heatmap/result") +async def get_heatmap_result( + current_user: models.User = Depends(deps.get_current_active_user), + body: TaskResultRequest = Body(..., example={"task_id": "f7f0f0f0-0f0f-0f0f-0f0f-0f0f0f0f0f0f"}), + return_type: ReturnTypeHeatmap = Query(..., description="Return type of the response"), +): + result = AsyncResult(body.task_id, app=celery_app) + if result.ready(): + if return_type.value == "geobuf": + result = return_geojson_or_geobuf(result.get(), "geobuf") + return result.get() + + elif result.failed(): + raise HTTPException(status_code=500, detail="Task failed") + else: + content = { + "task-status": result.status, + "details": "Task is still running, please try again later", + } + return JSONResponse(status_code=status.HTTP_102_PROCESSING, content=content) + + + @router.get("/connectivity", response_class=JSONResponse) async def read_connectivity_heatmap( diff --git a/app/api/src/schemas/__init__.py b/app/api/src/schemas/__init__.py index 1b26d6699..e936e9a9b 100644 --- a/app/api/src/schemas/__init__.py +++ b/app/api/src/schemas/__init__.py @@ -29,3 +29,4 @@ UserStudyAreaList, UserUpdate, ) +from .workers import TaskResultRequest \ No newline at end of file diff --git a/app/api/src/schemas/workers.py b/app/api/src/schemas/workers.py new file mode 100644 index 000000000..95d853310 --- /dev/null +++ b/app/api/src/schemas/workers.py @@ -0,0 +1,4 @@ +from pydantic import BaseModel + +class TaskResultRequest(BaseModel): + task_id: str \ No newline at end of file diff --git a/app/api/src/workers/celery_app.py b/app/api/src/workers/celery_app.py index 1f4bbb596..d8f01b52b 100644 --- a/app/api/src/workers/celery_app.py +++ b/app/api/src/workers/celery_app.py @@ -4,7 +4,7 @@ celery_app = Celery( "worker", - include=["src.workers.heatmap_active_mobility", "src.workers.heatmap_motorized_transport"], + include=["src.workers.heatmap_active_mobility", "src.workers.heatmap_motorized_transport", "src.workers.read_heatmap"], task_create_missing_queues=True, ) celery_app.conf.update(settings.CELERY_CONFIG) @@ -15,6 +15,7 @@ "src.workers.heatmap_motorized_transport.*": { "queue": "goat-motorized-transport-heatmap-worker" }, + "src.workers.read_heatmap.*": {"queue": "goat-read-heatmap-worker"}, } ) diff --git a/app/api/src/workers/method_connector.py b/app/api/src/workers/method_connector.py index b3002554e..4cd122fb9 100644 --- a/app/api/src/workers/method_connector.py +++ b/app/api/src/workers/method_connector.py @@ -7,12 +7,14 @@ from src.core.config import settings from src.core.opportunity import Opportunity from src.crud.crud_compute_heatmap import CRUDComputeHeatmap +from src.crud.crud_read_heatmap import CRUDReadHeatmap from src.db import models from src.db.session import legacy_engine from src.schemas.data_preparation import ( OpportunityMatrixParametersSingleBulk, TravelTimeMatrixParametersSingleBulk, ) +from src.schemas.heatmap import HeatmapSettings from src.schemas.isochrone import IsochroneMode @@ -124,3 +126,12 @@ async def create_opportunity_matrices_async(user, parameters): async def create_connectivity_matrices_async(current_super_user, parameters): crud_compute_heatmap = CRUDComputeHeatmap(current_user=current_super_user) await crud_compute_heatmap.compute_connectivity_matrix(**parameters) + + +async def crud_read_heatmap_async(current_user, heatmap_settings): + current_user = models.User(**current_user) + heatmap_settings = HeatmapSettings(**heatmap_settings) + crud_read_heatmap = CRUDReadHeatmap(current_user=current_user) + heatmap = await crud_read_heatmap.read_heatmap2(heatmap_settings=heatmap_settings) + return heatmap + \ No newline at end of file diff --git a/app/api/src/workers/read_heatmap.py b/app/api/src/workers/read_heatmap.py new file mode 100644 index 000000000..9deea1375 --- /dev/null +++ b/app/api/src/workers/read_heatmap.py @@ -0,0 +1,11 @@ +import asyncio + +from src.workers.celery_app import celery_app +from src.workers.method_connector import crud_read_heatmap_async + +@celery_app.task +def read_heatmap_task(current_user, heatmap_settings): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + heatmap = loop.run_until_complete(crud_read_heatmap_async(current_user, heatmap_settings)) + return heatmap diff --git a/docker-compose.yaml b/docker-compose.yaml index ba617083c..0423a6cc9 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -156,7 +156,6 @@ services: - SERVER_HOST=http://${DOMAIN_NAME} volumes: - ./app/api:/app - # - ${PWD}/app/api/celeryd:/etc/default/celeryd command: bash -c "/etc/init.d/celeryd start; tail -f /dev/null" networks: proxy: @@ -165,12 +164,12 @@ services: image: mher/flower environment: - CELERY_BROKER_URL=${CELERY_BROKER_URL} + - FLOWER_OUTER_PORT=${FLOWER_OUTER_PORT} ports: - - 55550:5555 + - "${FLOWER_OUTER_PORT}:5555" depends_on: - rabbit - # env_file: - # - .env + networks: proxy: @@ -178,7 +177,13 @@ services: rabbit: image: rabbitmq:3-management-alpine ports: - - 15672:15672 + - "${RABBIT_OUTER_PORT}:15672" + environment: + - RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER} + - RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS} + - RABBITMQ_DEFAULT_VHOST=${RABBITMQ_DEFAULT_VHOST} + - RABBIT_OUTER_PORT=${RABBIT_OUTER_PORT} + networks: proxy: