Skip to content

Commit

Permalink
Merge pull request #49 from UW-Macrostrat/ingestion-fixes
Browse files Browse the repository at this point in the history
Map ingestion fixes
  • Loading branch information
davenquinn authored Jan 10, 2025
2 parents 80e82ed + cfe2451 commit 4ba4df9
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 18 deletions.
1 change: 1 addition & 0 deletions .idea/tileserver.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ ENV PIP_DISABLE_PIP_VERSION_CHECK=1 POETRY_VIRTUALENVS_CREATE=false

RUN python3 -m venv /poetry-env
RUN /poetry-env/bin/pip install -U pip setuptools
RUN /poetry-env/bin/pip install poetry
RUN /poetry-env/bin/pip install poetry==1.8.4


WORKDIR /app/
Expand Down
13 changes: 8 additions & 5 deletions macrostrat_tileserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
connect_to_db,
register_table_catalog,
)
from timvt.settings import PostgresSettings
from timvt.layer import FunctionRegistry
from titiler.core.errors import DEFAULT_STATUS_CODES, add_exception_handlers
from titiler.core.factory import TilerFactory
Expand All @@ -30,6 +29,7 @@
from pathlib import Path
from time import time

from .map_ingestion import register_map_ingestion_routes

from typing import Any, Optional
from buildpg import asyncpg
Expand Down Expand Up @@ -100,9 +100,8 @@ async def startup_event():

# Apply fixtures
# apply_fixtures(db_settings.database_url)
await register_table_catalog(app, schemas=["sources"])
# await register_table_catalog(app, schemas=["sources"])
prepare_image_tile_subsystem()
print("Application started.")


@app.on_event("startup")
Expand Down Expand Up @@ -138,6 +137,10 @@ async def shutdown_event():

app.add_middleware(CompressionMiddleware, minimum_size=0)

# Map ingestion
register_map_ingestion_routes(app)


MapnikLayerFactory(app)

cog = TilerFactory()
Expand All @@ -148,8 +151,8 @@ async def shutdown_event():

# Register endpoints.
mvt_tiler = CachedVectorTilerFactory(
with_tables_metadata=True,
with_functions_metadata=True, # add Functions metadata endpoints (/functions.json, /{function_name}.json)
with_tables_metadata=False,
with_functions_metadata=False, # add Functions metadata endpoints (/functions.json, /{function_name}.json)
with_viewer=False,
)

Expand Down
33 changes: 22 additions & 11 deletions macrostrat_tileserver/map_bounds/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,33 @@ async def rgeom(
y: int,
):
"""Get a tile from the tileserver."""
pool = request.app.state.pool
return await get_rgeom(request.app.state.pool, z=z, x=x, y=y)


@router.get("/bounds/{slug}/{z}/{x}/{y}")
async def rgeom_slug(
request: Request,
slug: str,
z: int,
x: int,
y: int,
):
"""Get a tile from the tileserver."""
return await get_rgeom(
request.app.state.pool, where="slug = :slug", z=z, x=x, y=y, slug=slug
)


async def get_rgeom(pool, *, where="is_finalized = true", **params):
async with pool.acquire() as con:
data = await run_layer_query(
con,
"bounds",
z=z,
x=x,
y=y,
)
data = await run_layer_query(con, "bounds", where=where, **params)
kwargs = {}
kwargs.setdefault("media_type", MimeTypes.pbf.value)
return Response(data, **kwargs)


async def run_layer_query(con, layer_name, **params):
query = get_layer_sql(layer_name)
async def run_layer_query(con, layer_name, *, where="true", **params):
query = get_layer_sql(layer_name, where=where)
q, p = render(query, layer_name=layer_name, **params)

# Overcomes a shortcoming in buildpg that deems casting to an array as unsafe
Expand All @@ -43,7 +53,7 @@ async def run_layer_query(con, layer_name, **params):
return await con.fetchval(q, *p)


def get_layer_sql(layer: str):
def get_layer_sql(layer: str, *, where="true"):
query = __here__ / "queries" / (layer + ".sql")
q = query.read_text()
q = q.strip()
Expand All @@ -52,6 +62,7 @@ def get_layer_sql(layer: str):

# Replace the envelope with the function call. Kind of awkward.
q = q.replace(":envelope", "tile_utils.envelope(:x, :y, :z)")
q = q.replace(":where", where)

# Wrap with MVT creation
return f"WITH feature_query AS ({q}) SELECT ST_AsMVT(feature_query, :layer_name) FROM feature_query"
3 changes: 2 additions & 1 deletion macrostrat_tileserver/map_bounds/queries/bounds.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ WITH tile AS (
), sources AS (
SELECT
source_id,
is_finalized,
name,
slug,
scale,
Expand All @@ -15,7 +16,7 @@ WITH tile AS (
FROM maps.sources, tile
WHERE
rgeom is NOT NULL
AND status_code = 'active'
AND :where
AND ST_Intersects(rgeom, envelope_4326)
)
SELECT * FROM sources z
Expand Down
249 changes: 249 additions & 0 deletions macrostrat_tileserver/map_ingestion/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
from pathlib import Path

from buildpg import render, Renderer
from fastapi import APIRouter, Request, Response
from timvt.resources.enums import MimeTypes
from titiler.core.models.mapbox import TileJSON
from asyncpg import UndefinedTableError
from enum import Enum
from macrostrat.utils import get_logger
from macrostrat.database.utils import format as format_sql

print_sql_statements = False

router = APIRouter()
log = get_logger("uvicorn.error")

__here__ = Path(__file__).parent


class FeatureType(str, Enum):
"""Feature types."""

polygons = "polygons"
lines = "lines"
points = "points"


@router.get(
"/{slug}/tilejson.json",
response_model=TileJSON,
responses={200: {"description": "Return a tilejson"}},
response_model_exclude_none=True,
)
async def tilejson(
request: Request,
slug: str,
):
"""Return TileJSON document."""
url_path = request.url_for(
"tile", **{"slug": slug, "z": "{z}", "x": "{x}", "y": "{y}"}
)

tile_endpoint = str(url_path)

bounds_query = f"""
SELECT geom FROM sources.{slug}_polygons
UNION
SELECT geom FROM sources.{slug}_lines
UNION
SELECT geom FROM sources.{slug}_points
"""

sql = get_bounds(bounds_query, geometry_column="geom")
pool = request.app.state.pool
async with pool.acquire() as con:
bounds = await con.fetchval(sql)

return {
"minzoom": 0,
"maxzoom": 18,
"name": slug,
"bounds": bounds,
"tiles": [tile_endpoint],
}


@router.get("/{slug}/{z}/{x}/{y}")
async def tile(
request: Request,
slug: str,
z: int,
x: int,
y: int,
):

# if feature_type != FeatureType.polygons:
# return Response(status_code=404, content="Only polygons are supported for now")

"""Get a tile from the tileserver."""
pool = request.app.state.pool

data = b""
success = False
for layer in FeatureType:
try:
data += await get_layer(pool, slug, layer, z=z, x=x, y=y)
success = True
except UndefinedTableError:
pass
if not success:
return Response(status_code=404, content=f"No tables found for {slug}")

kwargs = {}
kwargs.setdefault("media_type", MimeTypes.pbf.value)
return Response(data, **kwargs)


async def get_layer(pool, slug, layer: FeatureType, **params):
async with pool.acquire() as con:
table_name = f"{slug}_{layer}"
alias = "s"
column_dict = await get_table_columns(con, table_name, schema="sources")
log.debug("Columns: %s", column_dict)
columns = [
format_column(k, v, cast_empty_strings=True, table_alias=alias)
for k, v in column_dict.items()
if k != "geom"
]
columns.append("tile_layers.tile_geom(s.geom, :envelope) AS geometry")

joins = None
if layer == FeatureType.polygons:
joins = [
"LEFT JOIN macrostrat.intervals i0 ON s.b_interval = i0.id",
"LEFT JOIN macrostrat.intervals i1 ON s.t_interval = i1.id",
]

b_age = "i0.age_bottom"
t_age = "i1.age_top"
# Eventually we will allow b_age and t_age to be set directly
# b_age = "coalesce(s.b_age, i0.age_bottom)"
# t_age = "coalesce(s.t_age, i1.age_top)"
columns += [
b_age + "::float AS b_age",
t_age + "::float AS t_age",
_color_subquery(b_age, t_age, "color"),
]

return await run_layer_query(
con,
f"sources.{table_name}",
columns,
joins=joins,
table_alias=alias,
layer_name=f"{layer}",
**params,
)


string_data_types = [
"character varying",
"text",
]


def format_column(
col, data_type, table_alias=None, cast_empty_strings=False, name=None
):
val = _wrap_with_quotes(col)
if name is None:
name = val
if table_alias is not None:
val = f"{table_alias}.{val}"
if cast_empty_strings and data_type in string_data_types:
val = f"NULLIF({val}, '')::text"
return f"{val} AS {name}"


def _color_subquery(b_age, t_age, alias):
return f"""(
SELECT interval_color
FROM macrostrat.intervals
WHERE age_top <= {t_age} AND age_bottom >= {b_age}
ORDER BY age_bottom - age_top
LIMIT 1
) AS {alias}"""


async def run_layer_query(
con,
table_name,
columns,
*,
joins=None,
layer_name="default",
table_alias=None,
**params,
):
_cols = ", ".join(columns)
query = f"SELECT {_cols} FROM {table_name}"
if table_alias:
query += f" AS {table_alias}"

if joins:
query += "\n" + "\n".join(joins)

query = extend_sql(query)
params = dict(layer_name=layer_name, **params)

if print_sql_statements:
log.debug(
"Running query:\n%s\nParameters: %s",
format_sql(query, reindent=True),
params,
)

q, p = render(query, **params)

return await con.fetchval(q, *p)


def _wrap_with_quotes(col):
if col[0] == '"' and col[-1] == '"':
col = col[1:-1]
if '"' in col:
col = col.replace('"', '""')
return '"' + col + '"'


def extend_sql(sql):
q = sql.strip()
if q.endswith(";"):
q = q[:-1]

# Replace the envelope with the function call. Kind of awkward.
q = q.replace(":envelope", "tile_utils.envelope(:x, :y, :z)")

# Wrap with MVT creation
return f"WITH feature_query AS ({q}) SELECT ST_AsMVT(feature_query, :layer_name, 4096, 'geometry') FROM feature_query"


def get_bounds(base_query, geometry_column="geometry"):
return f"""WITH b AS (
SELECT ST_Union(a.{geometry_column}::box2d)::box2d env
FROM ({base_query}) a
)
SELECT ARRAY[ST_XMin(env), ST_YMin(env), ST_XMax(env), ST_YMax(env)]
FROM b;
"""


async def get_table_columns(con, table, schema="sources"):
base_sql = f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = :table
AND table_schema = :schema;
"""

q, p = render(base_sql, table=table, schema=schema)
res = await con.fetch(q, *p)
if len(res) == 0:
raise UndefinedTableError(f"Table {schema}.{table} not found")

return {i[0]: i[1] for i in res}


def register_map_ingestion_routes(app):
app.include_router(router, tags=["Map ingestion"], prefix="/ingestion")

0 comments on commit 4ba4df9

Please sign in to comment.