Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make flowmachine table cache owner #5189

Merged
merged 11 commits into from
Jun 20, 2022
1 change: 0 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -32,7 +32,6 @@ defaults:
FLOWDB_INGESTION_DIR: /home/circleci/project/flowdb/tests/data
POSTGRES_PASSWORD: flowflow
POSTGRES_USER: flowdb
FLOWMACHINE_FLOWDB_USER: flowmachine
FLOWAPI_FLOWDB_USER: flowapi
FLOWMACHINE_FLOWDB_PASSWORD: foo
FLOWAPI_FLOWDB_PASSWORD: foo
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -15,9 +15,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- `Query.explain` will now explain the query even where it is already stored. [#1285](https://github.com/Flowminder/FlowKit/issues/1285)
- `unstored_dependencies_graph` no longer blocks until dependencies are in a determinate state. [#4949](https://github.com/Flowminder/FlowKit/issues/4949)
- In and out flows no longer return location columns with to/from suffix.
- FlowDB now always creates a role named `flowmachine.`
- Flowmachine will set the state of a query being stored to cancelled if interrupted while the store is running.

### Fixed
- FlowDB trigger to alter ownership of cache tables is now triggered when a flowmachine query is `store`d. [#4714](https://github.com/Flowminder/FlowKit/issues/4714)
- Flowmachine now makes the built in `flowmachine` role owner of cache tables as a post-action when a query is `store`d. [#4714](https://github.com/Flowminder/FlowKit/issues/4714)
- TopupBalance now returns the weighted mode when requested instead of weighted median [#1412](https://github.com/Flowminder/FlowKit/issues/1412)
- Fixed in and out flow geojson for multicolumn location types [#5132](https://github.com/Flowminder/FlowKit/issues/5132)

1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -57,7 +57,6 @@ services:
environment:
POSTGRES_USER: ${POSTGRES_USER:?Must set POSTGRES_USER env var}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?Must set POSTGRES_PASSWORD env var}
FLOWMACHINE_FLOWDB_USER: ${FLOWMACHINE_FLOWDB_USER:?Must set FLOWMACHINE_FLOWDB_USER env var}
FLOWMACHINE_FLOWDB_PASSWORD: ${FLOWMACHINE_FLOWDB_PASSWORD:?Must set FLOWMACHINE_FLOWDB_PASSWORD env var}
FLOWAPI_FLOWDB_USER: ${FLOWAPI_FLOWDB_USER:?Must set FLOWAPI_FLOWDB_USER env var}
FLOWAPI_FLOWDB_PASSWORD: ${FLOWAPI_FLOWDB_PASSWORD:?Must set FLOWAPI_FLOWDB_PASSWORD env var}
3 changes: 1 addition & 2 deletions docs/source/administrator/deployment.md
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@ FlowDB is distributed as a docker container. To run it, you will need to provide
| ----------- | -------------- | ----- |
| FLOWAPI_FLOWDB_USER | Database user used by FlowAPI | Role with _read_ access to tables under the cache and geography schemas |
| FLOWAPI_FLOWDB_PASSWORD | Password for the FlowAPI database user | |
| FLOWMACHINE_FLOWDB_USER | Database user for FlowMachine | Role with _write_ access to tables under the cache schema, and _read_ access to events, infrastructure, cache and geography schemas |
| FLOWMACHINE_FLOWDB_PASSWORD | Password for flowmachine user | |
| FLOWDB_POSTGRES_PASSWORD | Postgres superuser password for flowdb | Username `flowdb`, user with super user access to flowdb database |

@@ -232,7 +231,7 @@ Once you have FlowAuth, FlowDB, and FlowETL running, you are ready to add FlowMa

##### FlowMachine

The FlowMachine server requires one additional secret: `REDIS_PASSWORD`, the password for an accompanying redis database. This secret should also be provided to redis. FlowMachine also uses the `FLOWMACHINE_FLOWDB_USER` and `FLOWMACHINE_FLOWDB_PASSWORD` secrets defined for FlowDB.
The FlowMachine server requires one additional secret: `REDIS_PASSWORD`, the password for an accompanying redis database. This secret should also be provided to redis. FlowMachine also uses the `FLOWMACHINE_FLOWDB_PASSWORD` secrets defined for FlowDB.

You may also set the following environment variables:

2 changes: 0 additions & 2 deletions flowdb.Dockerfile
Original file line number Diff line number Diff line change
@@ -134,8 +134,6 @@ ARG FLOWDB_RELEASE_DATE='3000-12-12'
ENV FLOWDB_RELEASE_DATE=$FLOWDB_RELEASE_DATE

# Default users

ENV FLOWMACHINE_FLOWDB_USER=flowmachine
ENV FLOWAPI_FLOWDB_USER=flowapi

# Default location table
71 changes: 21 additions & 50 deletions flowdb/bin/build/9000_last_create_roles.sh
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ set -e
# This user is mainly designed for ingesting
# data into the events table.
#
# * $FLOWMACHINE_FLOWDB_USER: for users that need access to raw data.
# * flowmachine: for users that need access to raw data.
# They can read all data available in the database and
# can write to most tables. Can't write to the tables under
# the events schema.
@@ -38,12 +38,6 @@ then
exit 1
fi

if [ -e /run/secrets/FLOWMACHINE_FLOWDB_USER ];
then
echo "Using secrets for flowmachine user."
FLOWMACHINE_FLOWDB_USER=$(< /run/secrets/FLOWMACHINE_FLOWDB_USER)
fi

if [ -e /run/secrets/FLOWMACHINE_FLOWDB_PASSWORD ];
then
echo "Using secrets for flowmachine user password."
@@ -66,9 +60,9 @@ psql --dbname="$POSTGRES_DB" -c "REVOKE CONNECT ON DATABASE $POSTGRES_DB FROM PU

if [[ $FLOWMACHINE_FLOWDB_PASSWORD ]]
then
psql --dbname="$POSTGRES_DB" -c "CREATE ROLE $FLOWMACHINE_FLOWDB_USER WITH LOGIN PASSWORD '$FLOWMACHINE_FLOWDB_PASSWORD';"
psql --dbname="$POSTGRES_DB" -c "CREATE ROLE flowmachine WITH LOGIN PASSWORD '$FLOWMACHINE_FLOWDB_PASSWORD';"
else
echo "No password supplied for '$FLOWMACHINE_FLOWDB_USER' user: $FLOWMACHINE_FLOWDB_PASSWORD"
echo "No password supplied for 'flowmachine' user: $FLOWMACHINE_FLOWDB_PASSWORD"
exit 1
fi

@@ -84,9 +78,9 @@ fi
# Roles can connect and create tables and
# schemas.
#
psql --dbname="$POSTGRES_DB" -c "GRANT CONNECT ON DATABASE $POSTGRES_DB TO $FLOWMACHINE_FLOWDB_USER;"
psql --dbname="$POSTGRES_DB" -c "GRANT CONNECT ON DATABASE $POSTGRES_DB TO flowmachine;"
psql --dbname="$POSTGRES_DB" -c "GRANT CONNECT ON DATABASE $POSTGRES_DB TO $FLOWAPI_FLOWDB_USER;"
psql --dbname="$POSTGRES_DB" -c "GRANT CREATE ON DATABASE $POSTGRES_DB TO $FLOWMACHINE_FLOWDB_USER;"
psql --dbname="$POSTGRES_DB" -c "GRANT CREATE ON DATABASE $POSTGRES_DB TO flowmachine;"
psql --dbname="$POSTGRES_DB" -c "GRANT CREATE ON DATABASE $POSTGRES_DB TO $FLOWAPI_FLOWDB_USER;"

#
@@ -95,65 +89,65 @@ psql --dbname="$POSTGRES_DB" -c "GRANT CREATE ON DATABASE $POSTGRES_DB TO $FLOWA
declare -a schema_list_permissive=("cache" "geography" "population" "elevation")
for schema in "${schema_list_permissive[@]}"
do
echo "Granting permissions to $FLOWMACHINE_FLOWDB_USER on $schema."
echo "Granting permissions to flowmachine on $schema."
psql --dbname="$POSTGRES_DB" -c "
BEGIN;
ALTER DEFAULT PRIVILEGES
IN SCHEMA $schema
GRANT ALL ON TABLES TO $FLOWMACHINE_FLOWDB_USER;
GRANT ALL ON TABLES TO flowmachine;

ALTER DEFAULT PRIVILEGES FOR ROLE $FLOWMACHINE_FLOWDB_USER
ALTER DEFAULT PRIVILEGES FOR ROLE flowmachine
IN SCHEMA $schema
GRANT ALL ON TABLES TO $FLOWMACHINE_FLOWDB_USER;
GRANT ALL ON TABLES TO flowmachine;

GRANT ALL PRIVILEGES
ON ALL TABLES
IN SCHEMA $schema TO $FLOWMACHINE_FLOWDB_USER;
IN SCHEMA $schema TO flowmachine;

GRANT USAGE
ON SCHEMA $schema
TO $FLOWMACHINE_FLOWDB_USER;
TO flowmachine;

GRANT ALL
ON ALL TABLES
IN SCHEMA $schema TO $FLOWMACHINE_FLOWDB_USER;
IN SCHEMA $schema TO flowmachine;

GRANT CREATE
ON SCHEMA $schema
TO $FLOWMACHINE_FLOWDB_USER;
TO flowmachine;
COMMIT;
"
done

declare -a schema_list_restricted=("events" "dfs" "infrastructure" "routing" "interactions" "etl")
for schema in "${schema_list_restricted[@]}"
do
echo "Restricting permissions to $FLOWMACHINE_FLOWDB_USER on $schema."
echo "Restricting permissions to flowmachine on $schema."
psql --dbname="$POSTGRES_DB" -c "
BEGIN;
GRANT USAGE
ON SCHEMA $schema
TO $FLOWMACHINE_FLOWDB_USER;
TO flowmachine;

GRANT SELECT
ON ALL TABLES
IN SCHEMA $schema TO $FLOWMACHINE_FLOWDB_USER;
IN SCHEMA $schema TO flowmachine;

ALTER DEFAULT PRIVILEGES
IN SCHEMA $schema
GRANT SELECT ON SEQUENCES TO $FLOWMACHINE_FLOWDB_USER;
GRANT SELECT ON SEQUENCES TO flowmachine;

ALTER DEFAULT PRIVILEGES
IN SCHEMA $schema
GRANT SELECT ON TABLES TO $FLOWMACHINE_FLOWDB_USER;
GRANT SELECT ON TABLES TO flowmachine;
END;
"
done

echo "Give $FLOWMACHINE_FLOWDB_USER role read and update access to cache_touches sequence"
echo "Give flowmachine role read and update access to cache_touches sequence"
psql --dbname="$POSTGRES_DB" -c "
BEGIN;
GRANT USAGE, SELECT, UPDATE ON SEQUENCE cache.cache_touches TO $FLOWMACHINE_FLOWDB_USER;
GRANT USAGE, SELECT, UPDATE ON SEQUENCE cache.cache_touches TO flowmachine;
COMMIT;
"

@@ -166,7 +160,7 @@ psql --dbname="$POSTGRES_DB" -c "
ALTER DEFAULT PRIVILEGES
IN SCHEMA cache
GRANT SELECT ON TABLES TO $FLOWAPI_FLOWDB_USER;
ALTER DEFAULT PRIVILEGES FOR ROLE $FLOWMACHINE_FLOWDB_USER
ALTER DEFAULT PRIVILEGES FOR ROLE flowmachine
IN SCHEMA cache
GRANT SELECT ON TABLES TO $FLOWAPI_FLOWDB_USER;
END;
@@ -185,27 +179,4 @@ psql --dbname="$POSTGRES_DB" -c "
IN SCHEMA geography
GRANT SELECT ON TABLES TO $FLOWAPI_FLOWDB_USER;
GRANT USAGE ON SCHEMA geography TO $FLOWAPI_FLOWDB_USER;
"

# Create event trigger to change owner of tables under the cache schema
# Note that we hardcode the schema and username because event trigger functions cannot take params

psql --dbname="$POSTGRES_DB" -c "
CREATE OR REPLACE FUNCTION trg_create_in_cache_set_owner_to_flowmachine()
RETURNS event_trigger
LANGUAGE plpgsql
AS \$\$
DECLARE
obj record;
BEGIN
FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands() WHERE command_tag IN ('CREATE TABLE', 'CREATE TABLE AS') AND schema_name='cache' LOOP
EXECUTE format('ALTER TABLE %s OWNER TO $FLOWMACHINE_FLOWDB_USER', obj.object_identity);
END LOOP;
END;
\$\$;

CREATE EVENT TRIGGER trg_create_in_cache_set_owner_to_flowmachine
ON ddl_command_end
WHEN tag IN ('CREATE TABLE', 'CREATE TABLE AS')
EXECUTE PROCEDURE trg_create_in_cache_set_owner_to_flowmachine();
"
4 changes: 2 additions & 2 deletions flowdb/tests/test_roles.py
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@
The database has three different roles:

* `flowdb`: database super user.
* `$FLOWMACHINE_FLOWDB_USER`: FlowMachine user with read access to raw tables.
* `flowmachine`: FlowMachine user with read access to raw tables.
* `$FLOWAPI_FLOWDB_USER`: has read access to public tables only and
reference tables.
"""
@@ -40,7 +40,7 @@ def test_tables(env):
"cache.blah": f"""
CREATE TABLE IF NOT EXISTS
cache.blah();
ALTER TABLE cache.blah OWNER TO {env["FLOWMACHINE_FLOWDB_USER"]};
ALTER TABLE cache.blah OWNER TO flowmachine;
""",
"geography.admin0": """
CREATE TABLE IF NOT EXISTS
1 change: 0 additions & 1 deletion flowetl/deployment_example/docker-stack-flowkit.yml
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ services:
environment: &base-flowdb-environment
POSTGRES_USER: ${FLOWDB_ADMIN_USER:?Must set FLOWDB_ADMIN_USER env var}
POSTGRES_PASSWORD: ${FLOWDB_ADMIN_PASSWORD:?Must set FLOWDB_ADMIN_PASSWORD env var}
FLOWMACHINE_FLOWDB_USER: ${FLOWMACHINE_FLOWDB_USER:?Must set FLOWMACHINE_FLOWDB_USER env var}
FLOWMACHINE_FLOWDB_PASSWORD: ${FLOWMACHINE_FLOWDB_PASSWORD:?Must set FLOWMACHINE_FLOWDB_PASSWORD env var}
FLOWAPI_FLOWDB_USER: ${FLOWAPI_FLOWDB_USER:?Must set FLOWAPI_FLOWDB_USER env var}
FLOWAPI_FLOWDB_PASSWORD: ${FLOWAPI_FLOWDB_PASSWORD:?Must set FLOWAPI_FLOWDB_PASSWORD env var}
1 change: 0 additions & 1 deletion flowetl/tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -106,7 +106,6 @@ def container_env(ensure_required_env_vars_are_set):
"POSTGRES_USER": "flowdb",
"POSTGRES_PASSWORD": "flowflow",
"POSTGRES_DB": "flowdb",
"FLOWMACHINE_FLOWDB_USER": "flowmachine",
"FLOWAPI_FLOWDB_USER": "flowapi",
"FLOWMACHINE_FLOWDB_PASSWORD": "flowmachine",
"FLOWAPI_FLOWDB_PASSWORD": "flowapi",
4 changes: 4 additions & 0 deletions flowmachine.Dockerfile
Original file line number Diff line number Diff line change
@@ -23,3 +23,7 @@ RUN apt-get update && \
apt purge -y --auto-remove && \
rm -rf /var/lib/apt/lists/*
CMD ["pipenv", "run", "flowmachine"]
# FlowDB has a default role named flowmachine for use with the flowmachine server
# when starting the container with a different user, that user must be in the flowmachine
# role
ENV FLOWMACHINE_FLOWDB_USER=flowmachine
50 changes: 29 additions & 21 deletions flowmachine/flowmachine/core/cache.py
Original file line number Diff line number Diff line change
@@ -181,33 +181,41 @@ def write_query_to_cache(
"""
logger.debug(f"Trying to switch '{query.query_id}' to executing state.")
q_state_machine = QueryStateMachine(redis, query.query_id, connection.conn_id)
current_state, this_thread_is_owner = q_state_machine.execute()
if this_thread_is_owner:
logger.debug(f"In charge of executing '{query.query_id}'.")
try:
query_ddl_ops = ddl_ops_func(name, schema)
except Exception as exc:
q_state_machine.raise_error()
logger.error(f"Error generating SQL. Error was {exc}")
raise exc
logger.debug("Made SQL.")
con = connection.engine
with con.begin():
this_thread_is_owner = False
try:
current_state, this_thread_is_owner = q_state_machine.execute()
if this_thread_is_owner:
logger.debug(f"In charge of executing '{query.query_id}'.")
try:
plan_time = run_ops_list_and_return_execution_time(query_ddl_ops, con)
logger.debug("Executed queries.")
query_ddl_ops = ddl_ops_func(name, schema)
except Exception as exc:
q_state_machine.raise_error()
logger.error(f"Error executing SQL. Error was {exc}")
logger.error(f"Error generating SQL. Error was {exc}")
raise exc
if schema == "cache":
logger.debug("Made SQL.")
con = connection.engine
with con.begin():
try:
write_cache_metadata(connection, query, compute_time=plan_time)
plan_time = run_ops_list_and_return_execution_time(
query_ddl_ops, con
)
logger.debug("Executed queries.")
except Exception as exc:
q_state_machine.raise_error()
logger.error(f"Error writing cache metadata. Error was {exc}")
logger.error(f"Error executing SQL. Error was {exc}")
raise exc
q_state_machine.finish()
if schema == "cache":
try:
write_cache_metadata(connection, query, compute_time=plan_time)
except Exception as exc:
logger.error(f"Error writing cache metadata. Error was {exc}")
raise exc
q_state_machine.finish()
except Exception as exc:
if this_thread_is_owner:
q_state_machine.raise_error()
raise exc
finally:
if this_thread_is_owner and not q_state_machine.is_finished_executing:
q_state_machine.cancel()

q_state_machine.wait_until_complete(sleep_duration=sleep_duration)
if q_state_machine.is_completed:
3 changes: 3 additions & 0 deletions flowmachine/flowmachine/core/query.py
Original file line number Diff line number Diff line change
@@ -529,12 +529,15 @@ def _make_sql(self, name: str, schema: Union[str, None] = None) -> List[str]:
Q = f"""EXPLAIN (ANALYZE TRUE, TIMING FALSE, FORMAT JSON) CREATE TABLE {full_name} AS
(SELECT {self.column_names_as_string_list} FROM ({self._make_query()}) _)"""
queries.append(Q)
# Make flowmachine user the owner to allow server to cleanup cache tables
queries.append(f"ALTER TABLE {full_name} OWNER TO flowmachine;")
for ix in self.index_cols:
queries.append(
"CREATE INDEX ON {tbl} ({ixen})".format(
tbl=full_name, ixen=",".join(ix) if isinstance(ix, list) else ix
)
)

return queries

def to_sql(
20 changes: 20 additions & 0 deletions flowmachine/tests/test_cache.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
"""
Tests for query caching functions.
"""
from typing import List

import pytest

@@ -14,7 +15,9 @@
get_obj_or_stub,
)
from flowmachine.core.context import get_db
from flowmachine.core.errors.flowmachine_errors import QueryCancelledException
from flowmachine.core.query import Query
from flowmachine.core.query_state import QueryState
from flowmachine.features import daily_location, ModalLocation, Flows


@@ -358,3 +361,20 @@ def test_retrieve_all():
assert dl1.query_id in from_cache
assert hl1.query_id in from_cache
assert flow.query_id in from_cache


def test_interrupt_cancels():
class Dummy(Query):
@property
def column_names(self) -> List[str]:
return ["dummy"]

def _make_query(self):
raise KeyboardInterrupt

interrupted_dummy = Dummy()
try:
interrupted_dummy.store().result()
except KeyboardInterrupt:
pass
assert interrupted_dummy.query_state == QueryState.CANCELLED
2 changes: 1 addition & 1 deletion quick_start.sh
Original file line number Diff line number Diff line change
@@ -150,7 +150,7 @@ else
echo "Worked examples ready."
fi
echo "All containers ready!"
echo "Access FlowDB using 'PGHOST=$FLOWDB_HOST PGPORT=$FLOWDB_PORT PGDATABASE=flowdb PGUSER=$FLOWMACHINE_FLOWDB_USER PGPASSWORD=$FLOWMACHINE_FLOWDB_PASSWORD psql'"
echo "Access FlowDB using 'PGHOST=$FLOWDB_HOST PGPORT=$FLOWDB_PORT PGDATABASE=flowdb PGUSER=flowmachine PGPASSWORD=$FLOWMACHINE_FLOWDB_PASSWORD psql'"
echo "Access FlowAPI using FlowClient at http://localhost:$FLOWAPI_PORT"
echo "View the FlowAPI spec at http://localhost:$FLOWAPI_PORT/api/0/spec/redoc"
echo "Generate FlowAPI access tokens using FlowAuth with user TEST_USER and password DUMMY_PASSWORD at http://localhost:$FLOWAUTH_PORT"
3 changes: 0 additions & 3 deletions secrets_quickstart/flowdb.yml
Original file line number Diff line number Diff line change
@@ -8,8 +8,6 @@ secrets:
external: true
FLOWAPI_FLOWDB_PASSWORD: # Password for the FlowAPI database user
external: true
FLOWMACHINE_FLOWDB_USER: # Database user for FlowMachine
external: true
FLOWMACHINE_FLOWDB_PASSWORD: # Password for FlowDB
external: true
FLOWDB_POSTGRES_PASSWORD: # Postgres superuser password for flowdb
@@ -35,7 +33,6 @@ services:
- FLOWDB_POSTGRES_PASSWORD
- FLOWAPI_FLOWDB_USER
- FLOWAPI_FLOWDB_PASSWORD
- FLOWMACHINE_FLOWDB_USER
- FLOWMACHINE_FLOWDB_PASSWORD
tty: true
volumes:
3 changes: 2 additions & 1 deletion secrets_quickstart/flowmachine.yml
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@ version: '3.7'
secrets:
REDIS_PASSWORD: # Redis password
external: true
FLOWMACHINE_FLOWDB_USER: # Database user for FlowMachine
external: true
networks:
flowdb:
redis:
@@ -29,7 +31,6 @@ services:
DB_CONNECTION_POOL_SIZE: ${DB_CONNECTION_POOL_SIZE:-5}
DB_CONNECTION_POOL_OVERFLOW: ${DB_CONNECTION_POOL_OVERFLOW:-1}
secrets:
- FLOWMACHINE_FLOWDB_USER
- FLOWMACHINE_FLOWDB_PASSWORD
- REDIS_PASSWORD
networks:
1 change: 0 additions & 1 deletion secrets_quickstart/secrets-quickstart.sh
Original file line number Diff line number Diff line change
@@ -72,7 +72,6 @@ declare -A hard_secrets
hard_secrets=(
[POSTGRES_USER]="flowdb"
[FLOWAUTH_ADMIN_USERNAME]="admin"
[FLOWMACHINE_FLOWDB_USER]="flowmachine"
[FLOWAPI_FLOWDB_USER]="flowapi"
[FLOWAPI_IDENTIFIER]="flowapi_server"
[FLOWETL_POSTGRES_USER]="flowetl"