From e4b9db619c565b67aa4103bf68c7ad805aaeb76e Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Tue, 31 May 2022 11:17:50 +0100 Subject: [PATCH 01/11] Mandate flowmachine username --- flowdb/bin/build/9000_last_create_roles.sh | 44 +++++++++++----------- flowdb/tests/test_roles.py | 2 +- quick_start.sh | 2 +- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/flowdb/bin/build/9000_last_create_roles.sh b/flowdb/bin/build/9000_last_create_roles.sh index cf3cf258f7..5b793fd3c9 100644 --- a/flowdb/bin/build/9000_last_create_roles.sh +++ b/flowdb/bin/build/9000_last_create_roles.sh @@ -18,7 +18,7 @@ set -e # This user is mainly designed for ingesting # data into the events table. # -# * $FLOWMACHINE_FLOWDB_USER: for users that need access to raw data. +# * flowmachine: for users that need access to raw data. # They can read all data available in the database and # can write to most tables. Can't write to the tables under # the events schema. @@ -66,9 +66,9 @@ psql --dbname="$POSTGRES_DB" -c "REVOKE CONNECT ON DATABASE $POSTGRES_DB FROM PU if [[ $FLOWMACHINE_FLOWDB_PASSWORD ]] then - psql --dbname="$POSTGRES_DB" -c "CREATE ROLE $FLOWMACHINE_FLOWDB_USER WITH LOGIN PASSWORD '$FLOWMACHINE_FLOWDB_PASSWORD';" + psql --dbname="$POSTGRES_DB" -c "CREATE ROLE flowmachine WITH LOGIN PASSWORD '$FLOWMACHINE_FLOWDB_PASSWORD';" else - echo "No password supplied for '$FLOWMACHINE_FLOWDB_USER' user: $FLOWMACHINE_FLOWDB_PASSWORD" + echo "No password supplied for 'flowmachine' user: $FLOWMACHINE_FLOWDB_PASSWORD" exit 1 fi @@ -84,9 +84,9 @@ fi # Roles can connect and create tables and # schemas. # -psql --dbname="$POSTGRES_DB" -c "GRANT CONNECT ON DATABASE $POSTGRES_DB TO $FLOWMACHINE_FLOWDB_USER;" +psql --dbname="$POSTGRES_DB" -c "GRANT CONNECT ON DATABASE $POSTGRES_DB TO flowmachine;" psql --dbname="$POSTGRES_DB" -c "GRANT CONNECT ON DATABASE $POSTGRES_DB TO $FLOWAPI_FLOWDB_USER;" -psql --dbname="$POSTGRES_DB" -c "GRANT CREATE ON DATABASE $POSTGRES_DB TO $FLOWMACHINE_FLOWDB_USER;" +psql --dbname="$POSTGRES_DB" -c "GRANT CREATE ON DATABASE $POSTGRES_DB TO flowmachine;" psql --dbname="$POSTGRES_DB" -c "GRANT CREATE ON DATABASE $POSTGRES_DB TO $FLOWAPI_FLOWDB_USER;" # @@ -95,32 +95,32 @@ psql --dbname="$POSTGRES_DB" -c "GRANT CREATE ON DATABASE $POSTGRES_DB TO $FLOWA declare -a schema_list_permissive=("cache" "geography" "population" "elevation") for schema in "${schema_list_permissive[@]}" do - echo "Granting permissions to $FLOWMACHINE_FLOWDB_USER on $schema." + echo "Granting permissions to flowmachine on $schema." psql --dbname="$POSTGRES_DB" -c " BEGIN; ALTER DEFAULT PRIVILEGES IN SCHEMA $schema - GRANT ALL ON TABLES TO $FLOWMACHINE_FLOWDB_USER; + GRANT ALL ON TABLES TO flowmachine; - ALTER DEFAULT PRIVILEGES FOR ROLE $FLOWMACHINE_FLOWDB_USER + ALTER DEFAULT PRIVILEGES FOR ROLE flowmachine IN SCHEMA $schema - GRANT ALL ON TABLES TO $FLOWMACHINE_FLOWDB_USER; + GRANT ALL ON TABLES TO flowmachine; GRANT ALL PRIVILEGES ON ALL TABLES - IN SCHEMA $schema TO $FLOWMACHINE_FLOWDB_USER; + IN SCHEMA $schema TO flowmachine; GRANT USAGE ON SCHEMA $schema - TO $FLOWMACHINE_FLOWDB_USER; + TO flowmachine; GRANT ALL ON ALL TABLES - IN SCHEMA $schema TO $FLOWMACHINE_FLOWDB_USER; + IN SCHEMA $schema TO flowmachine; GRANT CREATE ON SCHEMA $schema - TO $FLOWMACHINE_FLOWDB_USER; + TO flowmachine; COMMIT; " done @@ -128,32 +128,32 @@ done declare -a schema_list_restricted=("events" "dfs" "infrastructure" "routing" "interactions" "etl") for schema in "${schema_list_restricted[@]}" do - echo "Restricting permissions to $FLOWMACHINE_FLOWDB_USER on $schema." + echo "Restricting permissions to flowmachine on $schema." psql --dbname="$POSTGRES_DB" -c " BEGIN; GRANT USAGE ON SCHEMA $schema - TO $FLOWMACHINE_FLOWDB_USER; + TO flowmachine; GRANT SELECT ON ALL TABLES - IN SCHEMA $schema TO $FLOWMACHINE_FLOWDB_USER; + IN SCHEMA $schema TO flowmachine; ALTER DEFAULT PRIVILEGES IN SCHEMA $schema - GRANT SELECT ON SEQUENCES TO $FLOWMACHINE_FLOWDB_USER; + GRANT SELECT ON SEQUENCES TO flowmachine; ALTER DEFAULT PRIVILEGES IN SCHEMA $schema - GRANT SELECT ON TABLES TO $FLOWMACHINE_FLOWDB_USER; + GRANT SELECT ON TABLES TO flowmachine; END; " done -echo "Give $FLOWMACHINE_FLOWDB_USER role read and update access to cache_touches sequence" +echo "Give flowmachine role read and update access to cache_touches sequence" psql --dbname="$POSTGRES_DB" -c " BEGIN; - GRANT USAGE, SELECT, UPDATE ON SEQUENCE cache.cache_touches TO $FLOWMACHINE_FLOWDB_USER; + GRANT USAGE, SELECT, UPDATE ON SEQUENCE cache.cache_touches TO flowmachine; COMMIT; " @@ -166,7 +166,7 @@ psql --dbname="$POSTGRES_DB" -c " ALTER DEFAULT PRIVILEGES IN SCHEMA cache GRANT SELECT ON TABLES TO $FLOWAPI_FLOWDB_USER; - ALTER DEFAULT PRIVILEGES FOR ROLE $FLOWMACHINE_FLOWDB_USER + ALTER DEFAULT PRIVILEGES FOR ROLE flowmachine IN SCHEMA cache GRANT SELECT ON TABLES TO $FLOWAPI_FLOWDB_USER; END; @@ -199,7 +199,7 @@ psql --dbname="$POSTGRES_DB" -c " obj record; BEGIN FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands() WHERE command_tag IN ('CREATE TABLE', 'CREATE TABLE AS') AND schema_name='cache' LOOP - EXECUTE format('ALTER TABLE %s OWNER TO $FLOWMACHINE_FLOWDB_USER', obj.object_identity); + EXECUTE format('ALTER TABLE %s OWNER TO flowmachine', obj.object_identity); END LOOP; END; \$\$; diff --git a/flowdb/tests/test_roles.py b/flowdb/tests/test_roles.py index 5ed23207d5..e370799457 100644 --- a/flowdb/tests/test_roles.py +++ b/flowdb/tests/test_roles.py @@ -9,7 +9,7 @@ The database has three different roles: * `flowdb`: database super user. - * `$FLOWMACHINE_FLOWDB_USER`: FlowMachine user with read access to raw tables. + * `flowmachine`: FlowMachine user with read access to raw tables. * `$FLOWAPI_FLOWDB_USER`: has read access to public tables only and reference tables. """ diff --git a/quick_start.sh b/quick_start.sh index 37dc01a753..bc33f53774 100755 --- a/quick_start.sh +++ b/quick_start.sh @@ -150,7 +150,7 @@ else echo "Worked examples ready." fi echo "All containers ready!" - echo "Access FlowDB using 'PGHOST=$FLOWDB_HOST PGPORT=$FLOWDB_PORT PGDATABASE=flowdb PGUSER=$FLOWMACHINE_FLOWDB_USER PGPASSWORD=$FLOWMACHINE_FLOWDB_PASSWORD psql'" + echo "Access FlowDB using 'PGHOST=$FLOWDB_HOST PGPORT=$FLOWDB_PORT PGDATABASE=flowdb PGUSER=flowmachine PGPASSWORD=$FLOWMACHINE_FLOWDB_PASSWORD psql'" echo "Access FlowAPI using FlowClient at http://localhost:$FLOWAPI_PORT" echo "View the FlowAPI spec at http://localhost:$FLOWAPI_PORT/api/0/spec/redoc" echo "Generate FlowAPI access tokens using FlowAuth with user TEST_USER and password DUMMY_PASSWORD at http://localhost:$FLOWAUTH_PORT" From f1a83acaacda415ba97c738419ec460c971d7ba4 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Tue, 31 May 2022 12:13:57 +0100 Subject: [PATCH 02/11] Catch broader categories of exceptions in cache writes --- flowmachine/flowmachine/core/cache.py | 46 +++++++++++++++------------ 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/flowmachine/flowmachine/core/cache.py b/flowmachine/flowmachine/core/cache.py index fb895b70ea..e789b0bdcc 100644 --- a/flowmachine/flowmachine/core/cache.py +++ b/flowmachine/flowmachine/core/cache.py @@ -181,33 +181,37 @@ def write_query_to_cache( """ logger.debug(f"Trying to switch '{query.query_id}' to executing state.") q_state_machine = QueryStateMachine(redis, query.query_id, connection.conn_id) - current_state, this_thread_is_owner = q_state_machine.execute() - if this_thread_is_owner: - logger.debug(f"In charge of executing '{query.query_id}'.") - try: - query_ddl_ops = ddl_ops_func(name, schema) - except Exception as exc: - q_state_machine.raise_error() - logger.error(f"Error generating SQL. Error was {exc}") - raise exc - logger.debug("Made SQL.") - con = connection.engine - with con.begin(): + try: + current_state, this_thread_is_owner = q_state_machine.execute() + if this_thread_is_owner: + logger.debug(f"In charge of executing '{query.query_id}'.") try: - plan_time = run_ops_list_and_return_execution_time(query_ddl_ops, con) - logger.debug("Executed queries.") - except Exception as exc: + query_ddl_ops = ddl_ops_func(name, schema) + except BaseException as exc: q_state_machine.raise_error() - logger.error(f"Error executing SQL. Error was {exc}") + logger.error(f"Error generating SQL. Error was {exc}") raise exc - if schema == "cache": + logger.debug("Made SQL.") + con = connection.engine + with con.begin(): try: - write_cache_metadata(connection, query, compute_time=plan_time) - except Exception as exc: + plan_time = run_ops_list_and_return_execution_time(query_ddl_ops, con) + logger.debug("Executed queries.") + except BaseException as exc: q_state_machine.raise_error() - logger.error(f"Error writing cache metadata. Error was {exc}") + logger.error(f"Error executing SQL. Error was {exc}") raise exc - q_state_machine.finish() + if schema == "cache": + try: + write_cache_metadata(connection, query, compute_time=plan_time) + except BaseException as exc: + q_state_machine.raise_error() + logger.error(f"Error writing cache metadata. Error was {exc}") + raise exc + q_state_machine.finish() + except BaseException as exc: + q_state_machine.raise_error() + raise exc q_state_machine.wait_until_complete(sleep_duration=sleep_duration) if q_state_machine.is_completed: From a59c3b67dd151acbd83d2c754d4db1d6dd2ca7cd Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Tue, 31 May 2022 12:25:20 +0100 Subject: [PATCH 03/11] Add alter table owner as a post action --- flowmachine/flowmachine/core/query.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flowmachine/flowmachine/core/query.py b/flowmachine/flowmachine/core/query.py index 4e9d5d923e..af0c4f8339 100644 --- a/flowmachine/flowmachine/core/query.py +++ b/flowmachine/flowmachine/core/query.py @@ -529,12 +529,15 @@ def _make_sql(self, name: str, schema: Union[str, None] = None) -> List[str]: Q = f"""EXPLAIN (ANALYZE TRUE, TIMING FALSE, FORMAT JSON) CREATE TABLE {full_name} AS (SELECT {self.column_names_as_string_list} FROM ({self._make_query()}) _)""" queries.append(Q) + # Make flowmachine user the owner to allow server to cleanup cache tables + queries.append(f"ALTER TABLE {full_name} OWNER TO flowmachine;") for ix in self.index_cols: queries.append( "CREATE INDEX ON {tbl} ({ixen})".format( tbl=full_name, ixen=",".join(ix) if isinstance(ix, list) else ix ) ) + return queries def to_sql( From cb35b12e6265ea2f0e91a3c4a6315a5bc4b648eb Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Fri, 10 Jun 2022 16:39:52 +0100 Subject: [PATCH 04/11] Don't mark someone else's query as errored --- flowmachine/flowmachine/core/cache.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flowmachine/flowmachine/core/cache.py b/flowmachine/flowmachine/core/cache.py index e789b0bdcc..df9e4174a2 100644 --- a/flowmachine/flowmachine/core/cache.py +++ b/flowmachine/flowmachine/core/cache.py @@ -181,6 +181,7 @@ def write_query_to_cache( """ logger.debug(f"Trying to switch '{query.query_id}' to executing state.") q_state_machine = QueryStateMachine(redis, query.query_id, connection.conn_id) + this_thread_is_owner = False try: current_state, this_thread_is_owner = q_state_machine.execute() if this_thread_is_owner: @@ -195,7 +196,9 @@ def write_query_to_cache( con = connection.engine with con.begin(): try: - plan_time = run_ops_list_and_return_execution_time(query_ddl_ops, con) + plan_time = run_ops_list_and_return_execution_time( + query_ddl_ops, con + ) logger.debug("Executed queries.") except BaseException as exc: q_state_machine.raise_error() @@ -210,7 +213,8 @@ def write_query_to_cache( raise exc q_state_machine.finish() except BaseException as exc: - q_state_machine.raise_error() + if this_thread_is_owner: + q_state_machine.raise_error() raise exc q_state_machine.wait_until_complete(sleep_duration=sleep_duration) From dfc2f0df0295105c07d33b474b26360c2fffef98 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Fri, 10 Jun 2022 16:55:38 +0100 Subject: [PATCH 05/11] Remove instances uses of flowmachine user being configurable --- .circleci/config.yml | 1 - docker-compose.yml | 1 - docs/source/administrator/deployment.md | 3 +-- flowdb.Dockerfile | 2 -- flowdb/bin/build/9000_last_create_roles.sh | 6 ------ flowdb/tests/test_roles.py | 1 - flowetl/deployment_example/docker-stack-flowkit.yml | 1 - flowetl/tests/integration/conftest.py | 1 - secrets_quickstart/flowdb.yml | 3 --- secrets_quickstart/flowmachine.yml | 2 ++ 10 files changed, 3 insertions(+), 18 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 33deb1a5cf..c6971fdd2a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -32,7 +32,6 @@ defaults: FLOWDB_INGESTION_DIR: /home/circleci/project/flowdb/tests/data POSTGRES_PASSWORD: flowflow POSTGRES_USER: flowdb - FLOWMACHINE_FLOWDB_USER: flowmachine FLOWAPI_FLOWDB_USER: flowapi FLOWMACHINE_FLOWDB_PASSWORD: foo FLOWAPI_FLOWDB_PASSWORD: foo diff --git a/docker-compose.yml b/docker-compose.yml index b8db09fc58..92b514ae06 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -57,7 +57,6 @@ services: environment: POSTGRES_USER: ${POSTGRES_USER:?Must set POSTGRES_USER env var} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?Must set POSTGRES_PASSWORD env var} - FLOWMACHINE_FLOWDB_USER: ${FLOWMACHINE_FLOWDB_USER:?Must set FLOWMACHINE_FLOWDB_USER env var} FLOWMACHINE_FLOWDB_PASSWORD: ${FLOWMACHINE_FLOWDB_PASSWORD:?Must set FLOWMACHINE_FLOWDB_PASSWORD env var} FLOWAPI_FLOWDB_USER: ${FLOWAPI_FLOWDB_USER:?Must set FLOWAPI_FLOWDB_USER env var} FLOWAPI_FLOWDB_PASSWORD: ${FLOWAPI_FLOWDB_PASSWORD:?Must set FLOWAPI_FLOWDB_PASSWORD env var} diff --git a/docs/source/administrator/deployment.md b/docs/source/administrator/deployment.md index cd409f8d05..2e8f4f96c5 100644 --- a/docs/source/administrator/deployment.md +++ b/docs/source/administrator/deployment.md @@ -21,7 +21,6 @@ FlowDB is distributed as a docker container. To run it, you will need to provide | ----------- | -------------- | ----- | | FLOWAPI_FLOWDB_USER | Database user used by FlowAPI | Role with _read_ access to tables under the cache and geography schemas | | FLOWAPI_FLOWDB_PASSWORD | Password for the FlowAPI database user | | -| FLOWMACHINE_FLOWDB_USER | Database user for FlowMachine | Role with _write_ access to tables under the cache schema, and _read_ access to events, infrastructure, cache and geography schemas | | FLOWMACHINE_FLOWDB_PASSWORD | Password for flowmachine user | | | FLOWDB_POSTGRES_PASSWORD | Postgres superuser password for flowdb | Username `flowdb`, user with super user access to flowdb database | @@ -232,7 +231,7 @@ Once you have FlowAuth, FlowDB, and FlowETL running, you are ready to add FlowMa ##### FlowMachine -The FlowMachine server requires one additional secret: `REDIS_PASSWORD`, the password for an accompanying redis database. This secret should also be provided to redis. FlowMachine also uses the `FLOWMACHINE_FLOWDB_USER` and `FLOWMACHINE_FLOWDB_PASSWORD` secrets defined for FlowDB. +The FlowMachine server requires one additional secret: `REDIS_PASSWORD`, the password for an accompanying redis database. This secret should also be provided to redis. FlowMachine also uses the `FLOWMACHINE_FLOWDB_PASSWORD` secrets defined for FlowDB. You may also set the following environment variables: diff --git a/flowdb.Dockerfile b/flowdb.Dockerfile index 6322cd5f24..34849ee89e 100644 --- a/flowdb.Dockerfile +++ b/flowdb.Dockerfile @@ -134,8 +134,6 @@ ARG FLOWDB_RELEASE_DATE='3000-12-12' ENV FLOWDB_RELEASE_DATE=$FLOWDB_RELEASE_DATE # Default users - -ENV FLOWMACHINE_FLOWDB_USER=flowmachine ENV FLOWAPI_FLOWDB_USER=flowapi # Default location table diff --git a/flowdb/bin/build/9000_last_create_roles.sh b/flowdb/bin/build/9000_last_create_roles.sh index 5b793fd3c9..3f16840ac1 100644 --- a/flowdb/bin/build/9000_last_create_roles.sh +++ b/flowdb/bin/build/9000_last_create_roles.sh @@ -38,12 +38,6 @@ then exit 1 fi -if [ -e /run/secrets/FLOWMACHINE_FLOWDB_USER ]; -then - echo "Using secrets for flowmachine user." - FLOWMACHINE_FLOWDB_USER=$(< /run/secrets/FLOWMACHINE_FLOWDB_USER) -fi - if [ -e /run/secrets/FLOWMACHINE_FLOWDB_PASSWORD ]; then echo "Using secrets for flowmachine user password." diff --git a/flowdb/tests/test_roles.py b/flowdb/tests/test_roles.py index e370799457..7447932fbb 100644 --- a/flowdb/tests/test_roles.py +++ b/flowdb/tests/test_roles.py @@ -40,7 +40,6 @@ def test_tables(env): "cache.blah": f""" CREATE TABLE IF NOT EXISTS cache.blah(); - ALTER TABLE cache.blah OWNER TO {env["FLOWMACHINE_FLOWDB_USER"]}; """, "geography.admin0": """ CREATE TABLE IF NOT EXISTS diff --git a/flowetl/deployment_example/docker-stack-flowkit.yml b/flowetl/deployment_example/docker-stack-flowkit.yml index 7c5d61b6fe..edf1f5cfd0 100644 --- a/flowetl/deployment_example/docker-stack-flowkit.yml +++ b/flowetl/deployment_example/docker-stack-flowkit.yml @@ -23,7 +23,6 @@ services: environment: &base-flowdb-environment POSTGRES_USER: ${FLOWDB_ADMIN_USER:?Must set FLOWDB_ADMIN_USER env var} POSTGRES_PASSWORD: ${FLOWDB_ADMIN_PASSWORD:?Must set FLOWDB_ADMIN_PASSWORD env var} - FLOWMACHINE_FLOWDB_USER: ${FLOWMACHINE_FLOWDB_USER:?Must set FLOWMACHINE_FLOWDB_USER env var} FLOWMACHINE_FLOWDB_PASSWORD: ${FLOWMACHINE_FLOWDB_PASSWORD:?Must set FLOWMACHINE_FLOWDB_PASSWORD env var} FLOWAPI_FLOWDB_USER: ${FLOWAPI_FLOWDB_USER:?Must set FLOWAPI_FLOWDB_USER env var} FLOWAPI_FLOWDB_PASSWORD: ${FLOWAPI_FLOWDB_PASSWORD:?Must set FLOWAPI_FLOWDB_PASSWORD env var} diff --git a/flowetl/tests/integration/conftest.py b/flowetl/tests/integration/conftest.py index 0dd9499696..2f849c9b56 100644 --- a/flowetl/tests/integration/conftest.py +++ b/flowetl/tests/integration/conftest.py @@ -106,7 +106,6 @@ def container_env(ensure_required_env_vars_are_set): "POSTGRES_USER": "flowdb", "POSTGRES_PASSWORD": "flowflow", "POSTGRES_DB": "flowdb", - "FLOWMACHINE_FLOWDB_USER": "flowmachine", "FLOWAPI_FLOWDB_USER": "flowapi", "FLOWMACHINE_FLOWDB_PASSWORD": "flowmachine", "FLOWAPI_FLOWDB_PASSWORD": "flowapi", diff --git a/secrets_quickstart/flowdb.yml b/secrets_quickstart/flowdb.yml index 942774c48e..d2dcaa0f33 100644 --- a/secrets_quickstart/flowdb.yml +++ b/secrets_quickstart/flowdb.yml @@ -8,8 +8,6 @@ secrets: external: true FLOWAPI_FLOWDB_PASSWORD: # Password for the FlowAPI database user external: true - FLOWMACHINE_FLOWDB_USER: # Database user for FlowMachine - external: true FLOWMACHINE_FLOWDB_PASSWORD: # Password for FlowDB external: true FLOWDB_POSTGRES_PASSWORD: # Postgres superuser password for flowdb @@ -35,7 +33,6 @@ services: - FLOWDB_POSTGRES_PASSWORD - FLOWAPI_FLOWDB_USER - FLOWAPI_FLOWDB_PASSWORD - - FLOWMACHINE_FLOWDB_USER - FLOWMACHINE_FLOWDB_PASSWORD tty: true volumes: diff --git a/secrets_quickstart/flowmachine.yml b/secrets_quickstart/flowmachine.yml index 74e98f75d1..75d6008e2b 100644 --- a/secrets_quickstart/flowmachine.yml +++ b/secrets_quickstart/flowmachine.yml @@ -6,6 +6,8 @@ version: '3.7' secrets: REDIS_PASSWORD: # Redis password external: true + FLOWMACHINE_FLOWDB_USER: # Database user for FlowMachine + external: true networks: flowdb: redis: From 44009e2b2e8408b8bfc97a56724d4d59b0d9a74b Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Fri, 17 Jun 2022 12:56:55 +0100 Subject: [PATCH 06/11] Remove trigger Co-Authored-By: James Harrison --- flowdb/bin/build/9000_last_create_roles.sh | 23 ---------------------- 1 file changed, 23 deletions(-) diff --git a/flowdb/bin/build/9000_last_create_roles.sh b/flowdb/bin/build/9000_last_create_roles.sh index 3f16840ac1..22fb19a684 100644 --- a/flowdb/bin/build/9000_last_create_roles.sh +++ b/flowdb/bin/build/9000_last_create_roles.sh @@ -179,27 +179,4 @@ psql --dbname="$POSTGRES_DB" -c " IN SCHEMA geography GRANT SELECT ON TABLES TO $FLOWAPI_FLOWDB_USER; GRANT USAGE ON SCHEMA geography TO $FLOWAPI_FLOWDB_USER; - " - -# Create event trigger to change owner of tables under the cache schema -# Note that we hardcode the schema and username because event trigger functions cannot take params - -psql --dbname="$POSTGRES_DB" -c " - CREATE OR REPLACE FUNCTION trg_create_in_cache_set_owner_to_flowmachine() - RETURNS event_trigger - LANGUAGE plpgsql - AS \$\$ - DECLARE - obj record; - BEGIN - FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands() WHERE command_tag IN ('CREATE TABLE', 'CREATE TABLE AS') AND schema_name='cache' LOOP - EXECUTE format('ALTER TABLE %s OWNER TO flowmachine', obj.object_identity); - END LOOP; - END; - \$\$; - - CREATE EVENT TRIGGER trg_create_in_cache_set_owner_to_flowmachine - ON ddl_command_end - WHEN tag IN ('CREATE TABLE', 'CREATE TABLE AS') - EXECUTE PROCEDURE trg_create_in_cache_set_owner_to_flowmachine(); " \ No newline at end of file From a797c4466c4d3419c31091ca2530926f9ce658a4 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Fri, 17 Jun 2022 13:06:22 +0100 Subject: [PATCH 07/11] Deemphasise configurable flowmachine username Co-Authored-By: James Harrison --- flowmachine.Dockerfile | 4 ++++ secrets_quickstart/flowmachine.yml | 1 - secrets_quickstart/secrets-quickstart.sh | 1 - 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/flowmachine.Dockerfile b/flowmachine.Dockerfile index 098eb4d9d7..1ef801acb5 100644 --- a/flowmachine.Dockerfile +++ b/flowmachine.Dockerfile @@ -23,3 +23,7 @@ RUN apt-get update && \ apt purge -y --auto-remove && \ rm -rf /var/lib/apt/lists/* CMD ["pipenv", "run", "flowmachine"] +# FlowDB has a default role named flowmachine for use with the flowmachine server +# when starting the container with a different user, that user must be in the flowmachine +# role +ENV FLOWMACHINE_FLOWDB_USER=flowmachine diff --git a/secrets_quickstart/flowmachine.yml b/secrets_quickstart/flowmachine.yml index 75d6008e2b..fa32b05229 100644 --- a/secrets_quickstart/flowmachine.yml +++ b/secrets_quickstart/flowmachine.yml @@ -31,7 +31,6 @@ services: DB_CONNECTION_POOL_SIZE: ${DB_CONNECTION_POOL_SIZE:-5} DB_CONNECTION_POOL_OVERFLOW: ${DB_CONNECTION_POOL_OVERFLOW:-1} secrets: - - FLOWMACHINE_FLOWDB_USER - FLOWMACHINE_FLOWDB_PASSWORD - REDIS_PASSWORD networks: diff --git a/secrets_quickstart/secrets-quickstart.sh b/secrets_quickstart/secrets-quickstart.sh index d9276d9f6a..332c9317ea 100755 --- a/secrets_quickstart/secrets-quickstart.sh +++ b/secrets_quickstart/secrets-quickstart.sh @@ -72,7 +72,6 @@ declare -A hard_secrets hard_secrets=( [POSTGRES_USER]="flowdb" [FLOWAUTH_ADMIN_USERNAME]="admin" - [FLOWMACHINE_FLOWDB_USER]="flowmachine" [FLOWAPI_FLOWDB_USER]="flowapi" [FLOWAPI_IDENTIFIER]="flowapi_server" [FLOWETL_POSTGRES_USER]="flowetl" From c88484ef983aff41717b031b8b5df6a2ff17fadf Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Fri, 17 Jun 2022 13:41:34 +0100 Subject: [PATCH 08/11] Explicitly set owner of cache table for flowdb test --- flowdb/tests/test_roles.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flowdb/tests/test_roles.py b/flowdb/tests/test_roles.py index 7447932fbb..283ddba65b 100644 --- a/flowdb/tests/test_roles.py +++ b/flowdb/tests/test_roles.py @@ -40,6 +40,7 @@ def test_tables(env): "cache.blah": f""" CREATE TABLE IF NOT EXISTS cache.blah(); + ALTER TABLE cache.blah OWNER TO flowmachine; """, "geography.admin0": """ CREATE TABLE IF NOT EXISTS From 80d38660d6036315deab89c16c3a2bfcff5defd1 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 20 Jun 2022 11:42:33 +0100 Subject: [PATCH 09/11] Cancel instead of error on interrupts Co-Authored-By: James Harrison --- flowmachine/flowmachine/core/cache.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flowmachine/flowmachine/core/cache.py b/flowmachine/flowmachine/core/cache.py index df9e4174a2..f450570501 100644 --- a/flowmachine/flowmachine/core/cache.py +++ b/flowmachine/flowmachine/core/cache.py @@ -188,8 +188,7 @@ def write_query_to_cache( logger.debug(f"In charge of executing '{query.query_id}'.") try: query_ddl_ops = ddl_ops_func(name, schema) - except BaseException as exc: - q_state_machine.raise_error() + except Exception as exc: logger.error(f"Error generating SQL. Error was {exc}") raise exc logger.debug("Made SQL.") @@ -200,22 +199,23 @@ def write_query_to_cache( query_ddl_ops, con ) logger.debug("Executed queries.") - except BaseException as exc: - q_state_machine.raise_error() + except Exception as exc: logger.error(f"Error executing SQL. Error was {exc}") raise exc if schema == "cache": try: write_cache_metadata(connection, query, compute_time=plan_time) - except BaseException as exc: - q_state_machine.raise_error() + except Exception as exc: logger.error(f"Error writing cache metadata. Error was {exc}") raise exc q_state_machine.finish() - except BaseException as exc: + except Exception as exc: if this_thread_is_owner: q_state_machine.raise_error() raise exc + finally: + if this_thread_is_owner and not q_state_machine.is_finished_executing: + q_state_machine.cancel() q_state_machine.wait_until_complete(sleep_duration=sleep_duration) if q_state_machine.is_completed: From 1c98f5cbba123fcf8a514e746e8a2fad1aa59dfd Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 20 Jun 2022 11:54:25 +0100 Subject: [PATCH 10/11] Add test for inteerruptions Co-Authored-By: James Harrison --- flowmachine/tests/test_cache.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/flowmachine/tests/test_cache.py b/flowmachine/tests/test_cache.py index e94e71d39e..fe788cee8a 100644 --- a/flowmachine/tests/test_cache.py +++ b/flowmachine/tests/test_cache.py @@ -5,6 +5,7 @@ """ Tests for query caching functions. """ +from typing import List import pytest @@ -14,7 +15,9 @@ get_obj_or_stub, ) from flowmachine.core.context import get_db +from flowmachine.core.errors.flowmachine_errors import QueryCancelledException from flowmachine.core.query import Query +from flowmachine.core.query_state import QueryState from flowmachine.features import daily_location, ModalLocation, Flows @@ -358,3 +361,20 @@ def test_retrieve_all(): assert dl1.query_id in from_cache assert hl1.query_id in from_cache assert flow.query_id in from_cache + + +def test_interrupt_cancels(): + class Dummy(Query): + @property + def column_names(self) -> List[str]: + return ["dummy"] + + def _make_query(self): + raise KeyboardInterrupt + + interrupted_dummy = Dummy() + try: + interrupted_dummy.store().result() + except KeyboardInterrupt: + pass + assert interrupted_dummy.query_state == QueryState.CANCELLED From 2d7d2c1a5117deb6644acf0e09c5b3823465b74a Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 20 Jun 2022 12:27:22 +0100 Subject: [PATCH 11/11] Update CHANGELOG.md --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b7a7c8458..5d2bd46aa3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,9 +15,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - `Query.explain` will now explain the query even where it is already stored. [#1285](https://github.com/Flowminder/FlowKit/issues/1285) - `unstored_dependencies_graph` no longer blocks until dependencies are in a determinate state. [#4949](https://github.com/Flowminder/FlowKit/issues/4949) - In and out flows no longer return location columns with to/from suffix. +- FlowDB now always creates a role named `flowmachine.` +- Flowmachine will set the state of a query being stored to cancelled if interrupted while the store is running. ### Fixed -- FlowDB trigger to alter ownership of cache tables is now triggered when a flowmachine query is `store`d. [#4714](https://github.com/Flowminder/FlowKit/issues/4714) +- Flowmachine now makes the built in `flowmachine` role owner of cache tables as a post-action when a query is `store`d. [#4714](https://github.com/Flowminder/FlowKit/issues/4714) - TopupBalance now returns the weighted mode when requested instead of weighted median [#1412](https://github.com/Flowminder/FlowKit/issues/1412) - Fixed in and out flow geojson for multicolumn location types [#5132](https://github.com/Flowminder/FlowKit/issues/5132)