Skip to content

Commit

Permalink
Merge pull request #2029 from metemaddar/send-connectivity-to-workers
Browse files Browse the repository at this point in the history
Give Heatmap Tasks to Workers 💪
  • Loading branch information
majkshkurti authored Mar 17, 2023
2 parents 7a45192 + 3d89e63 commit 8eb0d17
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 22 deletions.
13 changes: 12 additions & 1 deletion .env_template
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ R5_HOST=172.17.0.1
R5_AUTHORIZATION=YWRtaW46YWRtaW4 # if you want to use R5 with authentication, you need to set this variable to the base64 encoded string (!remove equal sign (=) in the end)
USE_PYGEOS=0


RABBITMQ_DEFAULT_USER=guest
RABBITMQ_DEFAULT_PASS=Q34TWVTWE5YBDB
RABBITMQ_DEFAULT_VHOST=goat
CELERY_BROKER_URL=amqp://${RABBITMQ_DEFAULT_USER}:${RABBITMQ_DEFAULT_PASS}@rabbit:5672/${RABBITMQ_DEFAULT_VHOST}
CELERY_RESULT_BACKEND=redis://redis/0
RABBIT_OUTER_PORT=15674
FLOWER_OUTER_PORT=55550
CELERY_QUEUES="goat-read-heatmap-worker"


# CELERY_BROKER_URL=redis://redis:6379/0
CELERY_BROKER_URL=amqp://guest:guest@rabbit:5672/goat
# Names of nodes to start
Expand All @@ -85,7 +96,7 @@ CELERYD_CHDIR="/app"

# Extra command-line arguments to the worker
# CELERYD_OPTS="--time-limit=1200 --concurrency=1"
CELERYD_OPTS="--concurrency=1"
CELERYD_OPTS="--concurrency=1 --queues=${CELERY_QUEUES}"

# Configure node-specific settings by appending node name to arguments:
#CELERYD_OPTS="--time-limit=300 -c 8 -c:worker2 4 -c:worker3 2 -Ofair:worker1"
Expand Down
1 change: 1 addition & 0 deletions app/api/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
**/cache/
5 changes: 2 additions & 3 deletions app/api/src/crud/crud_read_heatmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
from src.core import heatmap as heatmap_core
from src.core import heatmap_cython
from src.core.config import settings
from src.db.session import legacy_engine
from src.db.session import legacy_engine, async_session
from src.schemas.heatmap import HeatmapMode, HeatmapSettings, HeatmapType
from src.schemas.isochrone import IsochroneDTO, IsochroneMode
from src.utils import create_h3_grid, print_warning, timing


class CRUDBaseHeatmap:
def __init__(self, db=None, current_user=None):
self.db = db
def __init__(self, current_user=None):
self.current_user = current_user
self.travel_time_base_path = os.path.join(settings.CACHE_DIR, "traveltime_matrices")
self.connectivity_base_path = os.path.join(settings.CACHE_DIR, "connectivity_matrices")
Expand Down
58 changes: 46 additions & 12 deletions app/api/src/endpoints/v1/indicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from typing import Any, List, Optional, Union

from fastapi import APIRouter, Body, Depends, HTTPException, Query
from fastapi import APIRouter, Body, Depends, HTTPException, Query, status
from fastapi.encoders import jsonable_encoder
from sqlalchemy import func, text
from sqlalchemy.ext.asyncio.session import AsyncSession
Expand All @@ -20,6 +20,7 @@
SQLReturnTypes,
)
from src.schemas.heatmap import HeatmapSettings, ReturnTypeHeatmap
from src.schemas.workers import TaskResultRequest
from src.schemas.heatmap import request_examples as heatmap_request_examples
from src.schemas.heatmap import request_examples_
from src.schemas.indicators import (
Expand All @@ -29,31 +30,64 @@
oev_gueteklasse_config_example,
)
from src.utils import return_geojson_or_geobuf
from src.workers.read_heatmap import read_heatmap_task
from src.workers.celery_app import celery_app
from celery.result import AsyncResult

router = APIRouter()


@router.post("/heatmap")
async def calculate_heatmap(
*,
db: AsyncSession = Depends(deps.get_db),
# db: AsyncSession = Depends(deps.get_db),
current_user: models.User = Depends(deps.get_current_active_user),
heatmap_settings: HeatmapSettings = Body(..., examples=heatmap_request_examples),
return_type: ReturnTypeHeatmap = Query(..., description="Return type of the response"),
# return_type: ReturnTypeHeatmap = Query(..., description="Return type of the response"),
):
"""
Calculate a heatmap.
"""
start_time = time.time()
result = await crud.read_heatmap(db=db, current_user=current_user).read_heatmap2(
heatmap_settings=heatmap_settings
current_user = json.loads(current_user.json())
heatmap_settings = json.loads(heatmap_settings.json())
task = read_heatmap_task.delay(
current_user=current_user,
heatmap_settings=heatmap_settings,
)
end_time = time.time()
print(f"Time to calculate heatmap: {round(end_time - start_time,2)}")
if return_type.value == "geobuf":
result = return_geojson_or_geobuf(result, "geobuf")
return result

return {"task_id": task.id}
# start_time = time.time()
# result = await crud.read_heatmap(db=db, current_user=current_user).read_heatmap2(
# heatmap_settings=heatmap_settings
# )
# end_time = time.time()
# print(f"Time to calculate heatmap: {round(end_time - start_time,2)}")
# if return_type.value == "geobuf":
# result = return_geojson_or_geobuf(result, "geobuf")
# return result

@router.post("/heatmap/result")
async def get_heatmap_result(
current_user: models.User = Depends(deps.get_current_active_user),
body: TaskResultRequest = Body(..., example={"task_id": "f7f0f0f0-0f0f-0f0f-0f0f-0f0f0f0f0f0f"}),
return_type: ReturnTypeHeatmap = Query(..., description="Return type of the response"),
):
result = AsyncResult(body.task_id, app=celery_app)
if result.ready():
if return_type.value == "geobuf":
result = return_geojson_or_geobuf(result.get(), "geobuf")
return result.get()

elif result.failed():
raise HTTPException(status_code=500, detail="Task failed")
else:
content = {
"task-status": result.status,
"details": "Task is still running, please try again later",
}
return JSONResponse(status_code=status.HTTP_102_PROCESSING, content=content)




@router.get("/connectivity", response_class=JSONResponse)
async def read_connectivity_heatmap(
Expand Down
1 change: 1 addition & 0 deletions app/api/src/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@
UserStudyAreaList,
UserUpdate,
)
from .workers import TaskResultRequest
4 changes: 4 additions & 0 deletions app/api/src/schemas/workers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from pydantic import BaseModel

class TaskResultRequest(BaseModel):
task_id: str
3 changes: 2 additions & 1 deletion app/api/src/workers/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

celery_app = Celery(
"worker",
include=["src.workers.heatmap_active_mobility", "src.workers.heatmap_motorized_transport"],
include=["src.workers.heatmap_active_mobility", "src.workers.heatmap_motorized_transport", "src.workers.read_heatmap"],
task_create_missing_queues=True,
)
celery_app.conf.update(settings.CELERY_CONFIG)
Expand All @@ -15,6 +15,7 @@
"src.workers.heatmap_motorized_transport.*": {
"queue": "goat-motorized-transport-heatmap-worker"
},
"src.workers.read_heatmap.*": {"queue": "goat-read-heatmap-worker"},
}
)

Expand Down
11 changes: 11 additions & 0 deletions app/api/src/workers/method_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
from src.core.config import settings
from src.core.opportunity import Opportunity
from src.crud.crud_compute_heatmap import CRUDComputeHeatmap
from src.crud.crud_read_heatmap import CRUDReadHeatmap
from src.db import models
from src.db.session import legacy_engine
from src.schemas.data_preparation import (
OpportunityMatrixParametersSingleBulk,
TravelTimeMatrixParametersSingleBulk,
)
from src.schemas.heatmap import HeatmapSettings
from src.schemas.isochrone import IsochroneMode


Expand Down Expand Up @@ -124,3 +126,12 @@ async def create_opportunity_matrices_async(user, parameters):
async def create_connectivity_matrices_async(current_super_user, parameters):
crud_compute_heatmap = CRUDComputeHeatmap(current_user=current_super_user)
await crud_compute_heatmap.compute_connectivity_matrix(**parameters)


async def crud_read_heatmap_async(current_user, heatmap_settings):
current_user = models.User(**current_user)
heatmap_settings = HeatmapSettings(**heatmap_settings)
crud_read_heatmap = CRUDReadHeatmap(current_user=current_user)
heatmap = await crud_read_heatmap.read_heatmap2(heatmap_settings=heatmap_settings)
return heatmap

11 changes: 11 additions & 0 deletions app/api/src/workers/read_heatmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import asyncio

from src.workers.celery_app import celery_app
from src.workers.method_connector import crud_read_heatmap_async

@celery_app.task
def read_heatmap_task(current_user, heatmap_settings):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
heatmap = loop.run_until_complete(crud_read_heatmap_async(current_user, heatmap_settings))
return heatmap
15 changes: 10 additions & 5 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ services:
- SERVER_HOST=http://${DOMAIN_NAME}
volumes:
- ./app/api:/app
# - ${PWD}/app/api/celeryd:/etc/default/celeryd
command: bash -c "/etc/init.d/celeryd start; tail -f /dev/null"
networks:
proxy:
Expand All @@ -165,20 +164,26 @@ services:
image: mher/flower
environment:
- CELERY_BROKER_URL=${CELERY_BROKER_URL}
- FLOWER_OUTER_PORT=${FLOWER_OUTER_PORT}
ports:
- 55550:5555
- "${FLOWER_OUTER_PORT}:5555"
depends_on:
- rabbit
# env_file:
# - .env


networks:
proxy:

rabbit:
image: rabbitmq:3-management-alpine
ports:
- 15672:15672
- "${RABBIT_OUTER_PORT}:15672"
environment:
- RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER}
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS}
- RABBITMQ_DEFAULT_VHOST=${RABBITMQ_DEFAULT_VHOST}
- RABBIT_OUTER_PORT=${RABBIT_OUTER_PORT}


networks:
proxy:
Expand Down

0 comments on commit 8eb0d17

Please sign in to comment.