Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PDOK-16629 use pydantic #133

Merged
merged 9 commits into from
Nov 18, 2024
6 changes: 1 addition & 5 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@ jobs:
strategy:
matrix:
os: [ubuntu-24.04, ubuntu-22.04, ubuntu-20.04] # no ubuntugis @ ubuntu-24.04
python-version: ['3.11', '3.10', '3.9', '3.8', '3.7'] # , '3.6'] <- 3.6 needs setup.cfg
python-version: ['3.11', '3.10', '3.9', '3.8']
gdal-version: ['3.8', '3.6', '3.4']
exclude:
- os: ubuntu-24.04
python-version: '3.9'
- os: ubuntu-24.04
python-version: '3.7'
- os: ubuntu-24.04
python-version: '3.7'
- os: ubuntu-24.04
gdal-version: '3.6'
- os: ubuntu-24.04
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
ARG GDAL_VERSION=3.9.1

FROM ghcr.io/osgeo/gdal:alpine-normal-${GDAL_VERSION} AS base
# docker run ghcr.io/osgeo/gdal:alpine-normal-3.9.1 python3 --version > Python 3.11.9

LABEL maintainer="Roel van den Berg <[email protected]>"

Expand Down
42 changes: 27 additions & 15 deletions geopackage_validator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
"""Main CLI entry for the Geopackage validator tool."""
# Setup logging before package imports.
import logging
from datetime import datetime
from pathlib import Path
import sys
import time
from datetime import datetime
from pathlib import Path

import click
import click_log
Expand Down Expand Up @@ -302,6 +302,13 @@ def geopackage_validator_command(
is_flag=True,
help="Output yaml",
)
@click.option(
"--with-indexes-and-fks",
default=False,
required=False,
is_flag=True,
help="Include indexes (and unique constraints) and foreign keys in the definitions",
)
@click.option(
"--s3-endpoint-no-protocol",
envvar="S3_ENDPOINT_NO_PROTOCOL",
Expand Down Expand Up @@ -367,17 +374,18 @@ def geopackage_validator_command(
)
@click_log.simple_verbosity_option(logger)
def geopackage_validator_command_generate_table_definitions(
gpkg_path,
yaml,
s3_endpoint_no_protocol,
s3_access_key,
s3_secret_key,
s3_bucket,
s3_key,
s3_secure,
s3_virtual_hosting,
s3_signing_region,
s3_no_sign_request,
gpkg_path: Path,
yaml: bool,
with_indexes_and_fks: bool,
s3_endpoint_no_protocol: str,
s3_access_key: str,
s3_secret_key: str,
s3_bucket: str,
s3_key: str,
s3_secure: bool,
s3_virtual_hosting: bool,
s3_signing_region: str,
s3_no_sign_request: bool,
):
gpkg_path_not_exists = s3_endpoint_no_protocol is None and (
gpkg_path is None
Expand All @@ -399,7 +407,9 @@ def geopackage_validator_command_generate_table_definitions(
s3_signing_region=s3_signing_region,
s3_no_sign_request=s3_no_sign_request,
)
definitionlist = generate.generate_definitions_for_path(gpkg_path)
definitionlist = generate.generate_definitions_for_path(
gpkg_path, with_indexes_and_fks
)
else:
with s3.minio_resource(
s3_endpoint_no_protocol,
Expand All @@ -409,7 +419,9 @@ def geopackage_validator_command_generate_table_definitions(
s3_key,
s3_secure,
) as localfilename:
definitionlist = generate.generate_definitions_for_path(localfilename)
definitionlist = generate.generate_definitions_for_path(
localfilename, with_indexes_and_fks
)
output.print_output(definitionlist, yaml)
except Exception:
logger.exception("Error while generating table definitions")
Expand Down
154 changes: 124 additions & 30 deletions geopackage_validator/generate.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import logging
from typing import Dict, List, Union
from collections import OrderedDict
from typing import List, Optional, Dict

from osgeo import ogr
from osgeo.ogr import DataSource
from osgeo.ogr import DataSource, Layer

from geopackage_validator import utils
from geopackage_validator import __version__
from geopackage_validator import utils
from geopackage_validator.models import (
ColumnDefinition,
ColumnMapping,
ForeignKeyDefinition,
IndexDefinition,
TableDefinition,
TablesDefinition,
)
from geopackage_validator.utils import group_by

logger = logging.getLogger(__name__)

ColumnDefinition = List[Dict[str, str]]
TableDefinition = Dict[str, Union[int, Dict[str, ColumnDefinition]]]


def columns_definition(table, geometry_column) -> ColumnDefinition:
def column_definitions(table, geometry_column) -> List[ColumnDefinition]:
layer_definition = table.GetLayerDefn()

assert layer_definition, f'Invalid Layer {"" if not table else table.GetName()}'
Expand All @@ -28,27 +32,106 @@ def columns_definition(table, geometry_column) -> ColumnDefinition:
for column_id in range(field_count)
]

fid_column = fid_column_definition(table)
fid_columns = fid_column_definition(table)

return fid_column + [geometry_column] + columns
return fid_columns + [geometry_column] + columns


def fid_column_definition(table) -> ColumnDefinition:
def fid_column_definition(table) -> List[ColumnDefinition]:
name = table.GetFIDColumn()
if not name:
return []
return [{"name": name, "type": "INTEGER"}]
return [ColumnDefinition(name=name, type="INTEGER")]


def get_index_definitions(
dataset: DataSource, table_name: str
) -> List[IndexDefinition]:
index_definitions: List[IndexDefinition] = []
index_list = dataset.ExecuteSQL(
f"select name, \"unique\", origin from pragma_index_list('{table_name}');"
)
pk_in_index_list = False
for index_listing in index_list:
pk_in_index_list = pk_in_index_list or index_listing["origin"] == "pk"
index_definitions.append(
IndexDefinition(
columns=tuple(get_index_column_names(dataset, index_listing["name"])),
unique=bool(int(index_listing["unique"])),
)
)
dataset.ReleaseResultSet(index_list)
index_definitions = sorted(index_definitions, key=lambda d: d.columns)

if not pk_in_index_list:
pk_index = get_pk_index(dataset, table_name)
if pk_index is not None:
index_definitions.insert(0, pk_index)

return index_definitions


def get_pk_index(dataset: DataSource, table_name: str) -> Optional[IndexDefinition]:
pk_columns = dataset.ExecuteSQL(
f"select name from pragma_table_info('{table_name}') where pk;"
)
column_names = tuple(r["name"] for r in pk_columns)
if len(column_names) == 0:
return None
return IndexDefinition(columns=column_names, unique=True)


def generate_table_definitions(dataset: DataSource) -> TableDefinition:
def get_index_column_names(dataset: DataSource, index_name: str) -> List[str]:
index_info = dataset.ExecuteSQL(
f"select name from pragma_index_info('{index_name}');"
)
column_names: List[str] = [r["name"] for r in index_info]
dataset.ReleaseResultSet(index_info)
return column_names


def get_foreign_key_definitions(dataset, table_name) -> List[ForeignKeyDefinition]:
foreign_key_list = dataset.ExecuteSQL(
f'select id, seq, "table", "from", "to" from pragma_foreign_key_list(\'{table_name}\');'
)
foreign_key_definitions: List[ForeignKeyDefinition] = []
for foreign_key_listing in group_by(foreign_key_list, lambda r: r["id"]):
table: str = ""
columns: Dict[str, str] = {}
for column_reference in foreign_key_listing:
table = column_reference["table"]
to = column_reference["to"]
if to is None:
pk_index = get_pk_index(dataset, column_reference["table"])
to = pk_index.columns[int(column_reference["seq"])]
columns[column_reference["from"]] = to
foreign_key_definitions.append(
ForeignKeyDefinition(
table=table,
columns=tuple(
ColumnMapping(src=c[0], dst=c[1]) for c in columns.items()
),
)
)
foreign_key_definitions = sorted(
foreign_key_definitions, key=lambda fk: (fk.table, (c.src for c in fk.columns))
)
dataset.ReleaseResultSet(foreign_key_list)
return foreign_key_definitions


def generate_table_definitions(
dataset: DataSource, with_indexes_and_fks: bool = False
) -> TablesDefinition:
projections = set()
table_geometry_types = {
table_name: geometry_type_name
for table_name, _, geometry_type_name in utils.dataset_geometry_tables(dataset)
}

table_list = []
table_list: List[TableDefinition] = []
for table in dataset:
table: Layer
geo_column_name = table.GetGeometryColumn()
if geo_column_name == "":
continue
Expand All @@ -58,35 +141,46 @@ def generate_table_definitions(dataset: DataSource) -> TableDefinition:
"name": geo_column_name,
"type": table_geometry_types[table_name],
}
columns = tuple(column_definitions(table, geometry_column))

indexes = None
foreign_keys = None
if with_indexes_and_fks:
indexes = tuple(get_index_definitions(dataset, table_name))
foreign_keys = tuple(get_foreign_key_definitions(dataset, table_name))

table_list.append(
OrderedDict(
[
("name", table_name),
("geometry_column", geo_column_name),
("columns", columns_definition(table, geometry_column)),
]
TableDefinition(
name=table_name,
geometry_column=geo_column_name,
columns=columns,
indexes=indexes,
foreign_keys=foreign_keys,
)
)

projections.add(table.GetSpatialRef().GetAuthorityCode(None))

assert len(projections) == 1, "Expected one projection per geopackage."

result = OrderedDict(
[
("geopackage_validator_version", __version__),
("projection", int(projections.pop())),
("tables", table_list),
]
result = TablesDefinition(
geopackage_validator_version=__version__,
projection=int(projections.pop()),
tables=tuple(sorted(table_list, key=lambda t: t.name)),
)

return result


def generate_definitions_for_path(gpkg_path: str) -> TableDefinition:
def get_datasource_for_path(gpkg_path: str, error_handler=None) -> DataSource:
"""Starts the geopackage validation."""
utils.check_gdal_version()
return utils.open_dataset(gpkg_path, error_handler)

dataset = utils.open_dataset(gpkg_path)

return generate_table_definitions(dataset)
def generate_definitions_for_path(
gpkg_path: str, with_indexes_and_fks: bool = False
) -> TablesDefinition:
return generate_table_definitions(
get_datasource_for_path(gpkg_path), with_indexes_and_fks
)
Loading
Loading