From 9d7bfed48288486872c6f26b7e6892a5e9f2e605 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Tue, 6 Feb 2024 18:48:02 +0200 Subject: [PATCH 01/20] deployment test "zero-units" --- .../integration/ha_tests/test_self_healing.py | 78 ++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 83d2166914..7e27dac84f 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -5,6 +5,7 @@ import logging import pytest +from pip._vendor import requests from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_delay, wait_fixed @@ -14,7 +15,7 @@ get_machine_from_unit, get_password, get_unit_address, - run_command_on_unit, + run_command_on_unit, scale_application, ) from .conftest import APPLICATION_NAME from .helpers import ( @@ -540,3 +541,78 @@ async def test_network_cut_without_ip_change( ), "Connection is not possible after network restore" await is_cluster_updated(ops_test, primary_name) + +@pytest.mark.group(1) +async def test_deploy_zero_units(ops_test: OpsTest): + """Scale the database to zero units and scale up again.""" + wait_for_apps = False + if not await app_name(ops_test): + wait_for_apps = True + async with ops_test.fast_forward(): + await ops_test.model.deploy( + APP_NAME, + application_name=APP_NAME, + num_units=3, + storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}}, + series=CHARM_SERIES, + channel="edge", + ) + + # Deploy the continuous writes application charm if it wasn't already deployed. + if not await app_name(ops_test, APPLICATION_NAME): + wait_for_apps = True + async with ops_test.fast_forward(): + await ops_test.model.deploy( + APPLICATION_NAME, + application_name=APPLICATION_NAME, + series=CHARM_SERIES, + channel="edge", + ) + + if wait_for_apps: + await ops_test.model.wait_for_idle(status="active", timeout=3000) + + # Start an application that continuously writes data to the database. + await start_continuous_writes(ops_test, APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + unit_ip_addresses = [] + storage_id_list = [] + primary_name = await get_primary(ops_test, APP_NAME) + primary_storage = "" + for unit in ops_test.model.applications[APP_NAME].units: + # Save IP addresses of units + unit_ip_addresses.append(await get_unit_ip(ops_test, unit.name)) + + # Save detached storage ID + if primary_name != unit.name: + storage_id_list.append(storage_id(ops_test, unit.name)) + else: + primary_storage = storage_id(ops_test, unit.name) + + # Scale the database to zero units. + logger.info("scaling database to zero units") + await scale_application(ops_test, APP_NAME, 0) + + # Checking shutdown units + for unit_ip in unit_ip_addresses: + try: + resp = requests.get(f"http://{unit_ip}:8008") + assert resp.status_code != 200, f"status code = {resp.status_code}, message = {resp.text}" + except requests.exceptions.ConnectionError as e: + assert True, f"unit host = http://{unit_ip}:8008, all units shutdown" + except Exception as e: + assert False, f"{e} unit host = http://{unit_ip}:8008, something went wrong" + + # Scale the database to one unit. + logger.info("scaling database to one unit") + await add_unit_with_storage(ops_test, storage=primary_storage, app=APP_NAME) + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Scale the database to three units. + for store_id in storage_id_list: + await add_unit_with_storage(ops_test, storage=store_id, app=APP_NAME) + await check_writes(ops_test) \ No newline at end of file From 938035fa66056d35100c94d5f56c628de719b172 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Mon, 4 Mar 2024 14:52:53 +0200 Subject: [PATCH 02/20] deployment test "zero-units" --- .../integration/ha_tests/test_self_healing.py | 83 ++++++++++++------- tests/integration/helpers.py | 1 + 2 files changed, 53 insertions(+), 31 deletions(-) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 7e27dac84f..27401b300b 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -4,6 +4,7 @@ import asyncio import logging +import psycopg2 import pytest from pip._vendor import requests from pytest_operator.plugin import OpsTest @@ -15,7 +16,7 @@ get_machine_from_unit, get_password, get_unit_address, - run_command_on_unit, scale_application, + run_command_on_unit, scale_application, build_connection_string, FIRST_DATABASE_RELATION_NAME, ) from .conftest import APPLICATION_NAME from .helpers import ( @@ -545,39 +546,33 @@ async def test_network_cut_without_ip_change( @pytest.mark.group(1) async def test_deploy_zero_units(ops_test: OpsTest): """Scale the database to zero units and scale up again.""" - wait_for_apps = False - if not await app_name(ops_test): - wait_for_apps = True - async with ops_test.fast_forward(): - await ops_test.model.deploy( - APP_NAME, - application_name=APP_NAME, - num_units=3, - storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}}, - series=CHARM_SERIES, - channel="edge", - ) - - # Deploy the continuous writes application charm if it wasn't already deployed. - if not await app_name(ops_test, APPLICATION_NAME): - wait_for_apps = True - async with ops_test.fast_forward(): - await ops_test.model.deploy( - APPLICATION_NAME, - application_name=APPLICATION_NAME, - series=CHARM_SERIES, - channel="edge", - ) - - if wait_for_apps: - await ops_test.model.wait_for_idle(status="active", timeout=3000) + app = await app_name(ops_test, APP_NAME) # Start an application that continuously writes data to the database. - await start_continuous_writes(ops_test, APP_NAME) + await start_continuous_writes(ops_test, app) logger.info("checking whether writes are increasing") await are_writes_increasing(ops_test) + connection_string = await build_connection_string( + ops_test, APPLICATION_NAME, FIRST_DATABASE_RELATION_NAME + ) + + # Connect to the database. + # Create test data + with psycopg2.connect(connection_string) as connection: + connection.autocommit = True + with connection.cursor() as cursor: + # Check that it's possible to write and read data from the database that + # was created for the application. + cursor.execute("DROP TABLE IF EXISTS test;") + cursor.execute("CREATE TABLE test(data TEXT);") + cursor.execute("INSERT INTO test(data) VALUES('some data');") + cursor.execute("SELECT data FROM test;") + data = cursor.fetchone() + assert data[0] == "some data" + connection.close() + unit_ip_addresses = [] storage_id_list = [] primary_name = await get_primary(ops_test, APP_NAME) @@ -608,11 +603,37 @@ async def test_deploy_zero_units(ops_test: OpsTest): # Scale the database to one unit. logger.info("scaling database to one unit") - await add_unit_with_storage(ops_test, storage=primary_storage, app=APP_NAME) + await ops_test.model.applications[app].add_unit(attach_storage=[primary_storage]) logger.info("checking whether writes are increasing") await are_writes_increasing(ops_test) # Scale the database to three units. - for store_id in storage_id_list: - await add_unit_with_storage(ops_test, storage=store_id, app=APP_NAME) + await ops_test.model.applications[app].add_unit() + + # Connect to the database. + # Create test data + with psycopg2.connect(connection_string) as connection: + connection.autocommit = True + with connection.cursor() as cursor: + # Check that it's possible to write and read data from the database that + # was created for the application. + cursor.execute("SELECT data FROM test;") + data = cursor.fetchone() + assert data[0] == "some data" + connection.close() + + # Scale the database to three units. + await ops_test.model.applications[app].add_unit() + # Connect to the database. + # Create test data + with psycopg2.connect(connection_string) as connection: + connection.autocommit = True + with connection.cursor() as cursor: + # Check that it's possible to write and read data from the database that + # was created for the application. + cursor.execute("SELECT data FROM test;") + data = cursor.fetchone() + assert data[0] == "some data" + connection.close() + await check_writes(ops_test) \ No newline at end of file diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 0bbecae6c0..fabf28b06a 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -35,6 +35,7 @@ DATABASE_APP_NAME = METADATA["name"] STORAGE_PATH = METADATA["storage"]["pgdata"]["location"] APPLICATION_NAME = "postgresql-test-app" +FIRST_DATABASE_RELATION_NAME = "first-database" async def build_connection_string( From 7dc328b4597503d7e8e39d7e1d14bdc213f0b752 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Tue, 12 Mar 2024 22:00:57 +0200 Subject: [PATCH 03/20] Zero-units: continuous writes ON, deploy 3 units, check, scale to 0 units, check no errors, scale to 3, check --- tests/integration/ha_tests/helpers.py | 39 +++++++++ .../integration/ha_tests/test_self_healing.py | 81 +++++++------------ 2 files changed, 70 insertions(+), 50 deletions(-) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index c9844d41fd..d46a2e3ceb 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -794,3 +794,42 @@ async def reused_full_cluster_recovery_storage(ops_test: OpsTest, unit_name) -> "/var/snap/charmed-postgresql/common/var/log/patroni/patroni.log*", ) return True + + +async def get_db_connection(ops_test, dbname, is_primary=True, replica_unit_name=""): + unit_name = await get_primary(ops_test, APP_NAME) + password = await get_password(ops_test, APP_NAME) + address = get_unit_address(ops_test, unit_name) + if not is_primary and unit_name != "": + unit_name = replica_unit_name + address = ops_test.model.applications[APP_NAME].units[unit_name].public_address + connection_string = ( + f"dbname='{dbname}' user='operator'" + f" host='{address}' password='{password}' connect_timeout=10" + ) + return connection_string, unit_name + +async def validate_test_data(connection_string): + with psycopg2.connect(connection_string) as connection: + connection.autocommit = True + with connection.cursor() as cursor: + cursor.execute("SELECT data FROM test;") + data = cursor.fetchone() + assert data[0] == "some data" + connection.close() + + +async def create_test_data(connection_string): + with psycopg2.connect(connection_string) as connection: + connection.autocommit = True + with connection.cursor() as cursor: + # Check that it's possible to write and read data from the database that + # was created for the application. + cursor.execute("DROP TABLE IF EXISTS test;") + cursor.execute("CREATE TABLE test(data TEXT);") + cursor.execute("INSERT INTO test(data) VALUES('some data');") + cursor.execute("SELECT data FROM test;") + data = cursor.fetchone() + logger.info("check test data") + assert data[0] == "some data" + connection.close() \ No newline at end of file diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 27401b300b..76423c3e02 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -51,7 +51,7 @@ storage_id, storage_type, update_restart_condition, - wait_network_restore, + wait_network_restore, get_db_connection, validate_test_data, ) logger = logging.getLogger(__name__) @@ -543,10 +543,14 @@ async def test_network_cut_without_ip_change( await is_cluster_updated(ops_test, primary_name) + @pytest.mark.group(1) async def test_deploy_zero_units(ops_test: OpsTest): """Scale the database to zero units and scale up again.""" - app = await app_name(ops_test, APP_NAME) + app = await app_name(ops_test) + + dbname = f"{APPLICATION_NAME.replace('-', '_')}_first_database" + connection_string, primary_name = await get_db_connection(ops_test, dbname=dbname) # Start an application that continuously writes data to the database. await start_continuous_writes(ops_test, app) @@ -554,30 +558,15 @@ async def test_deploy_zero_units(ops_test: OpsTest): logger.info("checking whether writes are increasing") await are_writes_increasing(ops_test) - connection_string = await build_connection_string( - ops_test, APPLICATION_NAME, FIRST_DATABASE_RELATION_NAME - ) - # Connect to the database. - # Create test data - with psycopg2.connect(connection_string) as connection: - connection.autocommit = True - with connection.cursor() as cursor: - # Check that it's possible to write and read data from the database that - # was created for the application. - cursor.execute("DROP TABLE IF EXISTS test;") - cursor.execute("CREATE TABLE test(data TEXT);") - cursor.execute("INSERT INTO test(data) VALUES('some data');") - cursor.execute("SELECT data FROM test;") - data = cursor.fetchone() - assert data[0] == "some data" - connection.close() + # Create test data. + logger.info("connect to DB and create test table") + await create_test_data(connection_string) unit_ip_addresses = [] storage_id_list = [] - primary_name = await get_primary(ops_test, APP_NAME) primary_storage = "" - for unit in ops_test.model.applications[APP_NAME].units: + for unit in ops_test.model.applications[app].units: # Save IP addresses of units unit_ip_addresses.append(await get_unit_ip(ops_test, unit.name)) @@ -589,9 +578,9 @@ async def test_deploy_zero_units(ops_test: OpsTest): # Scale the database to zero units. logger.info("scaling database to zero units") - await scale_application(ops_test, APP_NAME, 0) + await scale_application(ops_test, app, 0) - # Checking shutdown units + # Checking shutdown units. for unit_ip in unit_ip_addresses: try: resp = requests.get(f"http://{unit_ip}:8008") @@ -603,37 +592,29 @@ async def test_deploy_zero_units(ops_test: OpsTest): # Scale the database to one unit. logger.info("scaling database to one unit") - await ops_test.model.applications[app].add_unit(attach_storage=[primary_storage]) + await add_unit_with_storage(ops_test, app=app, storage=primary_storage) + await ops_test.model.wait_for_idle(status="active", timeout=3000) + + connection_string, primary_name = await get_db_connection(ops_test, dbname=dbname) logger.info("checking whether writes are increasing") await are_writes_increasing(ops_test) - # Scale the database to three units. - await ops_test.model.applications[app].add_unit() - - # Connect to the database. - # Create test data - with psycopg2.connect(connection_string) as connection: - connection.autocommit = True - with connection.cursor() as cursor: - # Check that it's possible to write and read data from the database that - # was created for the application. - cursor.execute("SELECT data FROM test;") - data = cursor.fetchone() - assert data[0] == "some data" - connection.close() + logger.info("check test database data") + await validate_test_data(connection_string) # Scale the database to three units. - await ops_test.model.applications[app].add_unit() - # Connect to the database. - # Create test data - with psycopg2.connect(connection_string) as connection: - connection.autocommit = True - with connection.cursor() as cursor: - # Check that it's possible to write and read data from the database that - # was created for the application. - cursor.execute("SELECT data FROM test;") - data = cursor.fetchone() - assert data[0] == "some data" - connection.close() + logger.info("scaling database to two unit") + await scale_application(ops_test, application_name=app, count=2) + await ops_test.model.wait_for_idle(status="active", timeout=3000) + for unit in ops_test.model.applications[app].units: + if not await unit.is_leader_from_status(): + assert await reused_replica_storage(ops_test, unit_name=unit.name + ), "attached storage not properly re-used by Postgresql." + logger.info(f"check test database data of unit name {unit.name}") + connection_string, _ = await get_db_connection(ops_test, + dbname=dbname, + is_primary=False, + replica_unit_name=unit.name) + await validate_test_data(connection_string) await check_writes(ops_test) \ No newline at end of file From b762ec87d6982eac1a89925401a23402f6bb88f2 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Tue, 12 Mar 2024 22:03:03 +0200 Subject: [PATCH 04/20] Zero-units: continuous writes ON, deploy 3 units, check, scale to 0 units, check no errors, scale to 3, check --- tests/integration/ha_tests/test_self_healing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 76423c3e02..7c19cedaf6 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -51,7 +51,7 @@ storage_id, storage_type, update_restart_condition, - wait_network_restore, get_db_connection, validate_test_data, + wait_network_restore, get_db_connection, validate_test_data, create_test_data, ) logger = logging.getLogger(__name__) From 0ca974071aba5a3cf5726694c8bb576549176414 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Tue, 12 Mar 2024 22:04:20 +0200 Subject: [PATCH 05/20] Zero-units: continuous writes ON, deploy 3 units, check, scale to 0 units, check no errors, scale to 3, check --- tests/integration/ha_tests/helpers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index d46a2e3ceb..9f72ea92fb 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -830,6 +830,5 @@ async def create_test_data(connection_string): cursor.execute("INSERT INTO test(data) VALUES('some data');") cursor.execute("SELECT data FROM test;") data = cursor.fetchone() - logger.info("check test data") assert data[0] == "some data" connection.close() \ No newline at end of file From 04bc51c17ee80f8f4fa9c2715387e4604fcfee68 Mon Sep 17 00:00:00 2001 From: BalabaDmintri Date: Tue, 12 Mar 2024 22:05:44 +0200 Subject: [PATCH 06/20] run format & lint --- poetry.lock | 1 + tests/integration/ha_tests/helpers.py | 3 +- .../integration/ha_tests/test_self_healing.py | 29 +++++++++++-------- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/poetry.lock b/poetry.lock index 21809206d2..c727818fb1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1633,6 +1633,7 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 9f72ea92fb..fd72b3b910 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -809,6 +809,7 @@ async def get_db_connection(ops_test, dbname, is_primary=True, replica_unit_name ) return connection_string, unit_name + async def validate_test_data(connection_string): with psycopg2.connect(connection_string) as connection: connection.autocommit = True @@ -831,4 +832,4 @@ async def create_test_data(connection_string): cursor.execute("SELECT data FROM test;") data = cursor.fetchone() assert data[0] == "some data" - connection.close() \ No newline at end of file + connection.close() diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 7c19cedaf6..e732c0eebe 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -4,7 +4,6 @@ import asyncio import logging -import psycopg2 import pytest from pip._vendor import requests from pytest_operator.plugin import OpsTest @@ -16,7 +15,8 @@ get_machine_from_unit, get_password, get_unit_address, - run_command_on_unit, scale_application, build_connection_string, FIRST_DATABASE_RELATION_NAME, + run_command_on_unit, + scale_application, ) from .conftest import APPLICATION_NAME from .helpers import ( @@ -29,10 +29,12 @@ change_patroni_setting, change_wal_settings, check_writes, + create_test_data, cut_network_from_unit, cut_network_from_unit_without_ip_change, fetch_cluster_members, get_controller_machine, + get_db_connection, get_patroni_setting, get_primary, get_unit_ip, @@ -51,7 +53,8 @@ storage_id, storage_type, update_restart_condition, - wait_network_restore, get_db_connection, validate_test_data, create_test_data, + validate_test_data, + wait_network_restore, ) logger = logging.getLogger(__name__) @@ -584,8 +587,10 @@ async def test_deploy_zero_units(ops_test: OpsTest): for unit_ip in unit_ip_addresses: try: resp = requests.get(f"http://{unit_ip}:8008") - assert resp.status_code != 200, f"status code = {resp.status_code}, message = {resp.text}" - except requests.exceptions.ConnectionError as e: + assert ( + resp.status_code != 200 + ), f"status code = {resp.status_code}, message = {resp.text}" + except requests.exceptions.ConnectionError: assert True, f"unit host = http://{unit_ip}:8008, all units shutdown" except Exception as e: assert False, f"{e} unit host = http://{unit_ip}:8008, something went wrong" @@ -608,13 +613,13 @@ async def test_deploy_zero_units(ops_test: OpsTest): await ops_test.model.wait_for_idle(status="active", timeout=3000) for unit in ops_test.model.applications[app].units: if not await unit.is_leader_from_status(): - assert await reused_replica_storage(ops_test, unit_name=unit.name - ), "attached storage not properly re-used by Postgresql." + assert await reused_replica_storage( + ops_test, unit_name=unit.name + ), "attached storage not properly re-used by Postgresql." logger.info(f"check test database data of unit name {unit.name}") - connection_string, _ = await get_db_connection(ops_test, - dbname=dbname, - is_primary=False, - replica_unit_name=unit.name) + connection_string, _ = await get_db_connection( + ops_test, dbname=dbname, is_primary=False, replica_unit_name=unit.name + ) await validate_test_data(connection_string) - await check_writes(ops_test) \ No newline at end of file + await check_writes(ops_test) From 171b53ff25c437d32ac0ef2fd71c5297877ac1cc Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Tue, 12 Mar 2024 22:10:42 +0200 Subject: [PATCH 07/20] reduce time out --- tests/integration/ha_tests/test_self_healing.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index e732c0eebe..c71e97a1ca 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -598,7 +598,7 @@ async def test_deploy_zero_units(ops_test: OpsTest): # Scale the database to one unit. logger.info("scaling database to one unit") await add_unit_with_storage(ops_test, app=app, storage=primary_storage) - await ops_test.model.wait_for_idle(status="active", timeout=3000) + await ops_test.model.wait_for_idle(status="active", timeout=1500) connection_string, primary_name = await get_db_connection(ops_test, dbname=dbname) logger.info("checking whether writes are increasing") @@ -610,7 +610,6 @@ async def test_deploy_zero_units(ops_test: OpsTest): # Scale the database to three units. logger.info("scaling database to two unit") await scale_application(ops_test, application_name=app, count=2) - await ops_test.model.wait_for_idle(status="active", timeout=3000) for unit in ops_test.model.applications[app].units: if not await unit.is_leader_from_status(): assert await reused_replica_storage( From 8382d0d32c11b7d9c95b7d36f82cf2bda898f2a3 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Tue, 12 Mar 2024 22:26:39 +0200 Subject: [PATCH 08/20] remove replication storage list --- tests/integration/ha_tests/test_self_healing.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 72a8c42eb5..cf7d38cfe8 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -570,16 +570,13 @@ async def test_deploy_zero_units(ops_test: OpsTest): await create_test_data(connection_string) unit_ip_addresses = [] - storage_id_list = [] primary_storage = "" for unit in ops_test.model.applications[app].units: # Save IP addresses of units unit_ip_addresses.append(await get_unit_ip(ops_test, unit.name)) # Save detached storage ID - if primary_name != unit.name: - storage_id_list.append(storage_id(ops_test, unit.name)) - else: + if await unit.is_leader_from_status: primary_storage = storage_id(ops_test, unit.name) # Scale the database to zero units. From d467d8c513f8dc10def9a01b6c3c1f0335656065 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Wed, 13 Mar 2024 00:33:38 +0200 Subject: [PATCH 09/20] checking after scale to 2 and checking after scale up to 3 --- tests/integration/ha_tests/helpers.py | 7 ++++ .../integration/ha_tests/test_self_healing.py | 42 +++++++++++++------ 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 48bf51d04c..4bce239ee7 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -901,3 +901,10 @@ async def create_test_data(connection_string): data = cursor.fetchone() assert data[0] == "some data" connection.close() + +async def get_last_added_unit(ops_test, app, prev_units): + curr_units = [unit.name for unit in ops_test.model.applications[app].units] + new_unit = list(set(curr_units) - set(prev_units))[0] + for unit in ops_test.model.applications[app].units: + if new_unit == unit.name: + return unit diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index cf7d38cfe8..5ab11ff2e0 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -54,7 +54,7 @@ storage_type, update_restart_condition, validate_test_data, - wait_network_restore, + wait_network_restore, get_last_added_unit, ) logger = logging.getLogger(__name__) @@ -595,7 +595,7 @@ async def test_deploy_zero_units(ops_test: OpsTest): except Exception as e: assert False, f"{e} unit host = http://{unit_ip}:8008, something went wrong" - # Scale the database to one unit. + # Scale up to one unit. logger.info("scaling database to one unit") await add_unit_with_storage(ops_test, app=app, storage=primary_storage) await ops_test.model.wait_for_idle(status="active", timeout=1500) @@ -607,18 +607,34 @@ async def test_deploy_zero_units(ops_test: OpsTest): logger.info("check test database data") await validate_test_data(connection_string) - # Scale the database to three units. + # Scale up to two units. logger.info("scaling database to two unit") + prev_units = [unit.name for unit in ops_test.model.applications[app].units] await scale_application(ops_test, application_name=app, count=2) - for unit in ops_test.model.applications[app].units: - if not await unit.is_leader_from_status(): - assert await reused_replica_storage( - ops_test, unit_name=unit.name - ), "attached storage not properly re-used by Postgresql." - logger.info(f"check test database data of unit name {unit.name}") - connection_string, _ = await get_db_connection( - ops_test, dbname=dbname, is_primary=False, replica_unit_name=unit.name - ) - await validate_test_data(connection_string) + unit = await get_last_added_unit(ops_test, app, prev_units) + + logger.info(f"check test database data of unit name {unit.name}") + connection_string, _ = await get_db_connection( + ops_test, dbname=dbname, is_primary=False, replica_unit_name=unit.name + ) + await validate_test_data(connection_string) + assert await reused_replica_storage( + ops_test, unit_name=unit.name + ), "attached storage not properly re-used by Postgresql." + + # Scale up to three units. + logger.info("scaling database to three unit") + prev_units = [unit.name for unit in ops_test.model.applications[app].units] + await scale_application(ops_test, application_name=app, count=3) + unit = await get_last_added_unit(ops_test, app, prev_units) + + logger.info(f"check test database data of unit name {unit.name}") + connection_string, _ = await get_db_connection( + ops_test, dbname=dbname, is_primary=False, replica_unit_name=unit.name + ) + await validate_test_data(connection_string) + assert await reused_replica_storage( + ops_test, unit_name=unit.name + ), "attached storage not properly re-used by Postgresql." await check_writes(ops_test) From 526357b9bc45ee36f1615859c9104e7c4a9bc1b1 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Wed, 13 Mar 2024 00:34:56 +0200 Subject: [PATCH 10/20] checking after scale to 2 and checking after scale up to 3 --- tests/integration/ha_tests/test_self_healing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 5ab11ff2e0..83c4050306 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -556,7 +556,7 @@ async def test_deploy_zero_units(ops_test: OpsTest): app = await app_name(ops_test) dbname = f"{APPLICATION_NAME.replace('-', '_')}_first_database" - connection_string, primary_name = await get_db_connection(ops_test, dbname=dbname) + connection_string, _ = await get_db_connection(ops_test, dbname=dbname) # Start an application that continuously writes data to the database. await start_continuous_writes(ops_test, app) From 4b64ce9d073170a35b5cfa4f227e4e8826a4c367 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Wed, 13 Mar 2024 00:36:09 +0200 Subject: [PATCH 11/20] checking after scale to 2 and checking after scale up to 3 --- tests/integration/ha_tests/test_self_healing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 83c4050306..cd64d1ccc8 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -600,7 +600,7 @@ async def test_deploy_zero_units(ops_test: OpsTest): await add_unit_with_storage(ops_test, app=app, storage=primary_storage) await ops_test.model.wait_for_idle(status="active", timeout=1500) - connection_string, primary_name = await get_db_connection(ops_test, dbname=dbname) + connection_string, _ = await get_db_connection(ops_test, dbname=dbname) logger.info("checking whether writes are increasing") await are_writes_increasing(ops_test) From 927ad24539d7a02cc569dafd045db6f309f24ea9 Mon Sep 17 00:00:00 2001 From: BalabaDmintri Date: Wed, 13 Mar 2024 00:38:56 +0200 Subject: [PATCH 12/20] run format & lint --- tests/integration/ha_tests/helpers.py | 1 + tests/integration/ha_tests/test_self_healing.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 4bce239ee7..d71d5df22b 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -902,6 +902,7 @@ async def create_test_data(connection_string): assert data[0] == "some data" connection.close() + async def get_last_added_unit(ops_test, app, prev_units): curr_units = [unit.name for unit in ops_test.model.applications[app].units] new_unit = list(set(curr_units) - set(prev_units))[0] diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index cd64d1ccc8..3f7f31a7f4 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -35,6 +35,7 @@ fetch_cluster_members, get_controller_machine, get_db_connection, + get_last_added_unit, get_patroni_setting, get_primary, get_unit_ip, @@ -54,7 +55,7 @@ storage_type, update_restart_condition, validate_test_data, - wait_network_restore, get_last_added_unit, + wait_network_restore, ) logger = logging.getLogger(__name__) From a18b1d3f951c02b6a41b1ab041b3856421476e74 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Wed, 3 Apr 2024 14:23:43 +0300 Subject: [PATCH 13/20] handle error: storage belongs to different cluster --- src/charm.py | 5 +++ src/cluster.py | 37 +++++++++++++++++ tests/integration/ha_tests/helpers.py | 11 ++++- .../ha_tests/test_restore_cluster.py | 34 +++++++-------- .../integration/ha_tests/test_self_healing.py | 41 ++++++++++++++++--- 5 files changed, 104 insertions(+), 24 deletions(-) diff --git a/src/charm.py b/src/charm.py index c1694849ea..cc2a486df8 100755 --- a/src/charm.py +++ b/src/charm.py @@ -478,6 +478,11 @@ def _on_peer_relation_changed(self, event: HookEvent): try: # Update the members of the cluster in the Patroni configuration on this unit. self.update_config() + if self._patroni.cluster_system_id_mismatch(unit_name=self.unit.name): + self.unit.status = BlockedStatus( + "Failed to start postgresql. The storage belongs to a third-party cluster" + ) + return except RetryError: self.unit.status = BlockedStatus("failed to update cluster members on member") return diff --git a/src/cluster.py b/src/cluster.py index d4ae2ac873..38696d7036 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -4,6 +4,7 @@ """Helper class used to manage cluster lifecycle.""" +import glob import logging import os import pwd @@ -647,3 +648,39 @@ def update_synchronous_node_count(self, units: int = None) -> None: # Check whether the update was unsuccessful. if r.status_code != 200: raise UpdateSyncNodeCountError(f"received {r.status_code}") + + def cluster_system_id_mismatch(self, unit_name: str) -> bool: + """Check if the Patroni service is down. + + If there is the error storage belongs to third-party cluster in its logs. + + Returns: + "True" if an error occurred due to the fact that the storage belongs to someone else's cluster. + """ + last_log_file = self._last_patroni_log_file() + unit_name = unit_name.replace("/", "-") + if ( + f" CRITICAL: system ID mismatch, node {unit_name} belongs to a different cluster:" + in last_log_file + ): + return True + return False + + def _last_patroni_log_file(self) -> str: + """Get last log file content of Patroni service. + + If there is no available log files, empty line will be returned. + + Returns: + Content of last log file of Patroni service. + """ + log_files = glob.glob(f"{PATRONI_LOGS_PATH}/*.log") + if len(log_files) == 0: + return "" + latest_file = max(log_files, key=os.path.getmtime) + try: + with open(latest_file) as last_log_file: + return last_log_file.read() + except OSError as e: + logger.exception("Failed to read last patroni log file", exc_info=e) + return "" diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index d71d5df22b..dc97f399a9 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -807,7 +807,7 @@ def storage_id(ops_test, unit_name): return line.split()[1] -async def add_unit_with_storage(ops_test, app, storage): +async def add_unit_with_storage(ops_test, app, storage, is_blocked: bool = False): """Adds unit with storage. Note: this function exists as a temporary solution until this issue is resolved: @@ -820,7 +820,14 @@ async def add_unit_with_storage(ops_test, app, storage): return_code, _, _ = await ops_test.juju(*add_unit_cmd) assert return_code == 0, "Failed to add unit with storage" async with ops_test.fast_forward(): - await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=2000) + if is_blocked: + application = ops_test.model.applications[app] + await ops_test.model.block_until( + lambda: "blocked" in {unit.workload_status for unit in application.units}, + timeout=1500, + ) + else: + await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1500) assert ( len(ops_test.model.applications[app].units) == expected_units ), "New unit not added to model" diff --git a/tests/integration/ha_tests/test_restore_cluster.py b/tests/integration/ha_tests/test_restore_cluster.py index d6af07e251..19445a084a 100644 --- a/tests/integration/ha_tests/test_restore_cluster.py +++ b/tests/integration/ha_tests/test_restore_cluster.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +import asyncio import logging import pytest @@ -37,24 +38,23 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: charm = await ops_test.build_charm(".") async with ops_test.fast_forward(): # Deploy the first cluster with reusable storage - await ops_test.model.deploy( - charm, - application_name=FIRST_APPLICATION, - num_units=3, - series=CHARM_SERIES, - storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}}, - config={"profile": "testing"}, + await asyncio.gather( + ops_test.model.deploy( + charm, + application_name=FIRST_APPLICATION, + num_units=3, + series=CHARM_SERIES, + storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}}, + config={"profile": "testing"}, + ), + ops_test.model.deploy( + charm, + application_name=SECOND_APPLICATION, + num_units=1, + series=CHARM_SERIES, + config={"profile": "testing"}, + ), ) - - # Deploy the second cluster - await ops_test.model.deploy( - charm, - application_name=SECOND_APPLICATION, - num_units=1, - series=CHARM_SERIES, - config={"profile": "testing"}, - ) - await ops_test.model.wait_for_idle(status="active", timeout=1500) # TODO have a better way to bootstrap clusters with existing storage diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 3f7f31a7f4..40c0fc1c86 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -57,6 +57,7 @@ validate_test_data, wait_network_restore, ) +from .test_restore_cluster import SECOND_APPLICATION logger = logging.getLogger(__name__) @@ -79,7 +80,15 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: async with ops_test.fast_forward(): await ops_test.model.deploy( charm, - num_units=3, + num_units=1, + series=CHARM_SERIES, + storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}}, + config={"profile": "testing"}, + ) + await ops_test.model.deploy( + charm, + num_units=1, + application_name=SECOND_APPLICATION, series=CHARM_SERIES, storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}}, config={"profile": "testing"}, @@ -552,10 +561,9 @@ async def test_network_cut_without_ip_change( @pytest.mark.group(1) -async def test_deploy_zero_units(ops_test: OpsTest): +async def test_deploy_zero_units(ops_test: OpsTest, charm): """Scale the database to zero units and scale up again.""" app = await app_name(ops_test) - dbname = f"{APPLICATION_NAME.replace('-', '_')}_first_database" connection_string, _ = await get_db_connection(ops_test, dbname=dbname) @@ -577,12 +585,20 @@ async def test_deploy_zero_units(ops_test: OpsTest): unit_ip_addresses.append(await get_unit_ip(ops_test, unit.name)) # Save detached storage ID - if await unit.is_leader_from_status: + if await unit.is_leader_from_status(): primary_storage = storage_id(ops_test, unit.name) + logger.info(f"get storage id app: {SECOND_APPLICATION}") + second_storage = "" + for unit in ops_test.model.applications[SECOND_APPLICATION].units: + if await unit.is_leader_from_status(): + second_storage = storage_id(ops_test, unit.name) + break + # Scale the database to zero units. logger.info("scaling database to zero units") await scale_application(ops_test, app, 0) + await scale_application(ops_test, SECOND_APPLICATION, 0) # Checking shutdown units. for unit_ip in unit_ip_addresses: @@ -599,7 +615,9 @@ async def test_deploy_zero_units(ops_test: OpsTest): # Scale up to one unit. logger.info("scaling database to one unit") await add_unit_with_storage(ops_test, app=app, storage=primary_storage) - await ops_test.model.wait_for_idle(status="active", timeout=1500) + await ops_test.model.wait_for_idle( + apps=[APP_NAME, APPLICATION_NAME], status="active", timeout=1500 + ) connection_string, _ = await get_db_connection(ops_test, dbname=dbname) logger.info("checking whether writes are increasing") @@ -608,6 +626,19 @@ async def test_deploy_zero_units(ops_test: OpsTest): logger.info("check test database data") await validate_test_data(connection_string) + logger.info("database scaling up to two units using third-party cluster storage") + new_unit = await add_unit_with_storage( + ops_test, app=app, storage=second_storage, is_blocked=True + ) + + logger.info(f"remove unit {new_unit.name} with storage from application {SECOND_APPLICATION}") + await ops_test.model.destroy_units(new_unit.name) + + await are_writes_increasing(ops_test) + + logger.info("check test database data") + await validate_test_data(connection_string) + # Scale up to two units. logger.info("scaling database to two unit") prev_units = [unit.name for unit in ops_test.model.applications[app].units] From 18211ed09799061ba1ca90d1aeb8606a5abc701e Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Thu, 4 Apr 2024 15:57:29 +0300 Subject: [PATCH 14/20] handle error: storage belongs to different cluster --- tests/integration/ha_tests/test_self_healing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 40c0fc1c86..d862071ae3 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -80,7 +80,7 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: async with ops_test.fast_forward(): await ops_test.model.deploy( charm, - num_units=1, + num_units=3, series=CHARM_SERIES, storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}}, config={"profile": "testing"}, From d917d88ad513b4c1d1aa05bdaee8ccda5fcc5293 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Fri, 12 Apr 2024 11:45:18 +0300 Subject: [PATCH 15/20] handling different versions of Postgres of unit --- src/charm.py | 15 +++++++++++++++ tests/integration/ha_tests/test_self_healing.py | 4 ++++ 2 files changed, 19 insertions(+) diff --git a/src/charm.py b/src/charm.py index cc2a486df8..2ada3f0730 100755 --- a/src/charm.py +++ b/src/charm.py @@ -92,6 +92,7 @@ PRIMARY_NOT_REACHABLE_MESSAGE = "waiting for primary to be reachable from this unit" EXTENSIONS_DEPENDENCY_MESSAGE = "Unsatisfied plugin dependencies. Please check the logs" +DIFFERENT_VERSIONS_PSQL_BLOCKING_MESSAGE = "Please select the correct version of postgresql to use. No need to use different versions of postgresql." Scopes = Literal[APP_SCOPE, UNIT_SCOPE] @@ -519,6 +520,8 @@ def _on_peer_relation_changed(self, event: HookEvent): self._update_new_unit_status() + self._validate_database_version() + # Split off into separate function, because of complexity _on_peer_relation_changed def _start_stop_pgbackrest_service(self, event: HookEvent) -> None: # Start or stop the pgBackRest TLS server service when TLS certificate change. @@ -1577,6 +1580,18 @@ def client_relations(self) -> List[Relation]: relations.append(relation) return relations + def _validate_database_version(self): + """Checking that only one version of Postgres is used.""" + peer_db_version = self.app_peer_data.get("database-version") + + if self.unit.is_leader() and peer_db_version is None: + self.app_peer_data.update({"database-version": self._patroni.get_postgresql_version()}) + return + + if peer_db_version != self._patroni.get_postgresql_version(): + self.unit.status = BlockedStatus(DIFFERENT_VERSIONS_PSQL_BLOCKING_MESSAGE) + return + if __name__ == "__main__": main(PostgresqlOperatorCharm) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index d862071ae3..299e5b04c5 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -578,6 +578,10 @@ async def test_deploy_zero_units(ops_test: OpsTest, charm): logger.info("connect to DB and create test table") await create_test_data(connection_string) + # Test to check the use of different versions postgresql. + # Release of a new version of charm with another version of postgresql, + # it is necessary to implement a test that will check the use of different versions of postgresql. + unit_ip_addresses = [] primary_storage = "" for unit in ops_test.model.applications[app].units: From 0a0486f5327477667176c6c3c526ff680581aaf0 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Thu, 18 Apr 2024 00:04:59 +0300 Subject: [PATCH 16/20] fix unit fixed setting postgresql version into app_peer_data --- src/charm.py | 4 +++- tests/unit/test_charm.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/charm.py b/src/charm.py index 2ada3f0730..a990f18cb9 100755 --- a/src/charm.py +++ b/src/charm.py @@ -1585,7 +1585,9 @@ def _validate_database_version(self): peer_db_version = self.app_peer_data.get("database-version") if self.unit.is_leader() and peer_db_version is None: - self.app_peer_data.update({"database-version": self._patroni.get_postgresql_version()}) + _psql_version = self._patroni.get_postgresql_version() + if _psql_version is not None: + self.app_peer_data.update({"database-version": _psql_version}) return if peer_db_version != self._patroni.get_postgresql_version(): diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 3666878d5a..e9eac822f9 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -1372,7 +1372,7 @@ def test_on_peer_relation_changed( self.harness.update_relation_data( self.rel_id, self.charm.app.name, - {"cluster_initialised": "True", "members_ips": '["1.1.1.1"]'}, + {"cluster_initialised": "True", "members_ips": '["1.1.1.1"]', "database_version": "14"}, ) self.harness.set_leader() _reconfigure_cluster.return_value = False From 263a1efb1f10f33a0e9d97e820a0ea6daed65a62 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 18 Apr 2024 16:03:00 +0300 Subject: [PATCH 17/20] format --- tests/unit/test_charm.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index ea51f46320..a3458c188f 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -1403,7 +1403,11 @@ def test_on_peer_relation_changed(harness): harness.update_relation_data( rel_id, harness.charm.app.name, - {"cluster_initialised": "True", "members_ips": '["1.1.1.1"]', "database_version": "14"}, + { + "cluster_initialised": "True", + "members_ips": '["1.1.1.1"]', + "database_version": "14", + }, ) harness.set_leader() _reconfigure_cluster.return_value = False From a1b24ddab52a06e8112c6aedcb932fdd34693492 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Sat, 27 Apr 2024 14:51:59 +0300 Subject: [PATCH 18/20] fix record of postgres version in databags --- src/charm.py | 25 ++++++++++++-- src/constants.py | 1 + tests/integration/ha_tests/helpers.py | 34 ++++++++++++++++++- .../ha_tests/test_restore_cluster.py | 2 +- .../integration/ha_tests/test_self_healing.py | 2 +- tests/integration/helpers.py | 1 - 6 files changed, 59 insertions(+), 6 deletions(-) diff --git a/src/charm.py b/src/charm.py index 9a28ffd6d4..ef399b1b3f 100755 --- a/src/charm.py +++ b/src/charm.py @@ -71,6 +71,7 @@ MONITORING_USER, PATRONI_CONF_PATH, PEER, + UPGRADE_RELATION, POSTGRESQL_SNAP_NAME, REPLICATION_PASSWORD_KEY, REWIND_PASSWORD_KEY, @@ -95,7 +96,9 @@ PRIMARY_NOT_REACHABLE_MESSAGE = "waiting for primary to be reachable from this unit" EXTENSIONS_DEPENDENCY_MESSAGE = "Unsatisfied plugin dependencies. Please check the logs" -DIFFERENT_VERSIONS_PSQL_BLOCKING_MESSAGE = "Please select the correct version of postgresql to use. No need to use different versions of postgresql." +DIFFERENT_VERSIONS_PSQL_BLOCKING_MESSAGE = ( + "Please select the correct version of postgresql to use. You cannot use different versions of postgresql!" +) Scopes = Literal[APP_SCOPE, UNIT_SCOPE] @@ -157,6 +160,7 @@ def __init__(self, *args): self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe(self.on.get_primary_action, self._on_get_primary) self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed) + self.framework.observe(self.on[UPGRADE_RELATION].relation_changed, self._on_upgrade_relation_changed) self.framework.observe(self.on.secret_changed, self._on_peer_relation_changed) self.framework.observe(self.on.secret_remove, self._on_peer_relation_changed) self.framework.observe(self.on[PEER].relation_departed, self._on_peer_relation_departed) @@ -535,6 +539,17 @@ def _on_peer_relation_changed(self, event: HookEvent): self._validate_database_version() + def _on_upgrade_relation_changed(self, event: HookEvent): + if not self.unit.is_leader(): + return + + if self.upgrade.idle: + logger.debug("Defer _on_upgrade_relation_changed: upgrade in progress") + event.defer() + return + + self._set_workload_version(self._patroni.get_postgresql_version()) + # Split off into separate function, because of complexity _on_peer_relation_changed def _start_stop_pgbackrest_service(self, event: HookEvent) -> None: # Start or stop the pgBackRest TLS server service when TLS certificate change. @@ -1020,7 +1035,7 @@ def _on_start(self, event: StartEvent) -> None: self.unit_peer_data.update({"ip": self.get_hostname_by_unit(None)}) - self.unit.set_workload_version(self._patroni.get_postgresql_version()) + self._set_workload_version(self._patroni.get_postgresql_version()) # Open port try: @@ -1596,6 +1611,12 @@ def client_relations(self) -> List[Relation]: relations.append(relation) return relations + def _set_workload_version(self, psql_version): + """Record the version of the software running as the workload. Also writes the version into the databags""" + self.unit.set_workload_version(psql_version) + if self.unit.is_leader(): + self.app_peer_data.update({"database-version": psql_version}) + def _validate_database_version(self): """Checking that only one version of Postgres is used.""" peer_db_version = self.app_peer_data.get("database-version") diff --git a/src/constants.py b/src/constants.py index 49929ff037..980204715f 100644 --- a/src/constants.py +++ b/src/constants.py @@ -10,6 +10,7 @@ LEGACY_DB = "db" LEGACY_DB_ADMIN = "db-admin" PEER = "database-peers" +UPGRADE_RELATION = "upgrade" ALL_CLIENT_RELATIONS = [DATABASE, LEGACY_DB, LEGACY_DB_ADMIN] API_REQUEST_TIMEOUT = 5 PATRONI_CLUSTER_STATUS_ENDPOINT = "cluster" diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 4acf082cf8..999549566a 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -38,6 +38,7 @@ PATRONI_SERVICE_DEFAULT_PATH = f"/etc/systemd/system/{SERVICE_NAME}" RESTART_CONDITION = "no" ORIGINAL_RESTART_CONDITION = "always" +SECOND_APPLICATION = "second-cluster" class MemberNotListedOnClusterError(Exception): @@ -872,10 +873,21 @@ async def reused_full_cluster_recovery_storage(ops_test: OpsTest, unit_name) -> async def get_db_connection(ops_test, dbname, is_primary=True, replica_unit_name=""): + """Returns a PostgreSQL connection string. + + Args: + ops_test: The ops test framework instance + dbname: The name of the database + is_primary: Whether to use a primary unit (default is True, so it uses the primary + replica_unit_name: The name of the replica unit + + Returns: + a PostgreSQL connection string + """ unit_name = await get_primary(ops_test, APP_NAME) password = await get_password(ops_test, APP_NAME) address = get_unit_address(ops_test, unit_name) - if not is_primary and unit_name != "": + if not is_primary and replica_unit_name != "": unit_name = replica_unit_name address = ops_test.model.applications[APP_NAME].units[unit_name].public_address connection_string = ( @@ -886,6 +898,11 @@ async def get_db_connection(ops_test, dbname, is_primary=True, replica_unit_name async def validate_test_data(connection_string): + """Checking test data. + + Args: + connection_string: Database connection string + """ with psycopg2.connect(connection_string) as connection: connection.autocommit = True with connection.cursor() as cursor: @@ -896,6 +913,11 @@ async def validate_test_data(connection_string): async def create_test_data(connection_string): + """Creating test data in the database. + + Args: + connection_string: Database connection string + """ with psycopg2.connect(connection_string) as connection: connection.autocommit = True with connection.cursor() as cursor: @@ -911,6 +933,16 @@ async def create_test_data(connection_string): async def get_last_added_unit(ops_test, app, prev_units): + """Returns a unit. + + Args: + ops_test: The ops test framework instance + app: The name of the application + prev_units: List of unit names before adding the last unit + + Returns: + last added unit + """ curr_units = [unit.name for unit in ops_test.model.applications[app].units] new_unit = list(set(curr_units) - set(prev_units))[0] for unit in ops_test.model.applications[app].units: diff --git a/tests/integration/ha_tests/test_restore_cluster.py b/tests/integration/ha_tests/test_restore_cluster.py index 19445a084a..b3350476fb 100644 --- a/tests/integration/ha_tests/test_restore_cluster.py +++ b/tests/integration/ha_tests/test_restore_cluster.py @@ -20,10 +20,10 @@ add_unit_with_storage, reused_full_cluster_recovery_storage, storage_id, + SECOND_APPLICATION, ) FIRST_APPLICATION = "first-cluster" -SECOND_APPLICATION = "second-cluster" logger = logging.getLogger(__name__) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 299e5b04c5..7994891a1a 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -56,8 +56,8 @@ update_restart_condition, validate_test_data, wait_network_restore, + SECOND_APPLICATION, ) -from .test_restore_cluster import SECOND_APPLICATION logger = logging.getLogger(__name__) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 4a30b672f4..786cb67685 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -35,7 +35,6 @@ DATABASE_APP_NAME = METADATA["name"] STORAGE_PATH = METADATA["storage"]["pgdata"]["location"] APPLICATION_NAME = "postgresql-test-app" -FIRST_DATABASE_RELATION_NAME = "first-database" async def build_connection_string( From 6873326b3c199fa7b41f747d36b1c5467552568c Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Mon, 29 Apr 2024 23:29:39 +0300 Subject: [PATCH 19/20] format & lint --- src/charm.py | 12 ++--- tests/integration/ha_tests/helpers.py | 44 +++++++++---------- .../ha_tests/test_restore_cluster.py | 2 +- .../integration/ha_tests/test_self_healing.py | 2 +- 4 files changed, 30 insertions(+), 30 deletions(-) diff --git a/src/charm.py b/src/charm.py index aac044fb10..47aa65fa8f 100755 --- a/src/charm.py +++ b/src/charm.py @@ -71,7 +71,6 @@ MONITORING_USER, PATRONI_CONF_PATH, PEER, - UPGRADE_RELATION, POSTGRESQL_SNAP_NAME, REPLICATION_PASSWORD_KEY, REWIND_PASSWORD_KEY, @@ -84,6 +83,7 @@ TLS_CERT_FILE, TLS_KEY_FILE, UNIT_SCOPE, + UPGRADE_RELATION, USER, USER_PASSWORD_KEY, ) @@ -96,9 +96,7 @@ PRIMARY_NOT_REACHABLE_MESSAGE = "waiting for primary to be reachable from this unit" EXTENSIONS_DEPENDENCY_MESSAGE = "Unsatisfied plugin dependencies. Please check the logs" -DIFFERENT_VERSIONS_PSQL_BLOCKING_MESSAGE = ( - "Please select the correct version of postgresql to use. You cannot use different versions of postgresql!" -) +DIFFERENT_VERSIONS_PSQL_BLOCKING_MESSAGE = "Please select the correct version of postgresql to use. You cannot use different versions of postgresql!" Scopes = Literal[APP_SCOPE, UNIT_SCOPE] @@ -160,7 +158,9 @@ def __init__(self, *args): self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe(self.on.get_primary_action, self._on_get_primary) self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed) - self.framework.observe(self.on[UPGRADE_RELATION].relation_changed, self._on_upgrade_relation_changed) + self.framework.observe( + self.on[UPGRADE_RELATION].relation_changed, self._on_upgrade_relation_changed + ) self.framework.observe(self.on.secret_changed, self._on_peer_relation_changed) self.framework.observe(self.on.secret_remove, self._on_peer_relation_changed) self.framework.observe(self.on[PEER].relation_departed, self._on_peer_relation_departed) @@ -1620,7 +1620,7 @@ def client_relations(self) -> List[Relation]: return relations def _set_workload_version(self, psql_version): - """Record the version of the software running as the workload. Also writes the version into the databags""" + """Record the version of the software running as the workload. Also writes the version into the databags.""" self.unit.set_workload_version(psql_version) if self.unit.is_leader(): self.app_peer_data.update({"database-version": psql_version}) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 999549566a..7aee055d69 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -875,15 +875,15 @@ async def reused_full_cluster_recovery_storage(ops_test: OpsTest, unit_name) -> async def get_db_connection(ops_test, dbname, is_primary=True, replica_unit_name=""): """Returns a PostgreSQL connection string. - Args: - ops_test: The ops test framework instance - dbname: The name of the database - is_primary: Whether to use a primary unit (default is True, so it uses the primary - replica_unit_name: The name of the replica unit - - Returns: - a PostgreSQL connection string - """ + Args: + ops_test: The ops test framework instance + dbname: The name of the database + is_primary: Whether to use a primary unit (default is True, so it uses the primary + replica_unit_name: The name of the replica unit + + Returns: + a PostgreSQL connection string + """ unit_name = await get_primary(ops_test, APP_NAME) password = await get_password(ops_test, APP_NAME) address = get_unit_address(ops_test, unit_name) @@ -900,9 +900,9 @@ async def get_db_connection(ops_test, dbname, is_primary=True, replica_unit_name async def validate_test_data(connection_string): """Checking test data. - Args: - connection_string: Database connection string - """ + Args: + connection_string: Database connection string + """ with psycopg2.connect(connection_string) as connection: connection.autocommit = True with connection.cursor() as cursor: @@ -915,9 +915,9 @@ async def validate_test_data(connection_string): async def create_test_data(connection_string): """Creating test data in the database. - Args: - connection_string: Database connection string - """ + Args: + connection_string: Database connection string + """ with psycopg2.connect(connection_string) as connection: connection.autocommit = True with connection.cursor() as cursor: @@ -935,14 +935,14 @@ async def create_test_data(connection_string): async def get_last_added_unit(ops_test, app, prev_units): """Returns a unit. - Args: - ops_test: The ops test framework instance - app: The name of the application - prev_units: List of unit names before adding the last unit + Args: + ops_test: The ops test framework instance + app: The name of the application + prev_units: List of unit names before adding the last unit - Returns: - last added unit - """ + Returns: + last added unit + """ curr_units = [unit.name for unit in ops_test.model.applications[app].units] new_unit = list(set(curr_units) - set(prev_units))[0] for unit in ops_test.model.applications[app].units: diff --git a/tests/integration/ha_tests/test_restore_cluster.py b/tests/integration/ha_tests/test_restore_cluster.py index b3350476fb..ed071c79f6 100644 --- a/tests/integration/ha_tests/test_restore_cluster.py +++ b/tests/integration/ha_tests/test_restore_cluster.py @@ -17,10 +17,10 @@ set_password, ) from .helpers import ( + SECOND_APPLICATION, add_unit_with_storage, reused_full_cluster_recovery_storage, storage_id, - SECOND_APPLICATION, ) FIRST_APPLICATION = "first-cluster" diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 7994891a1a..8fd667b4db 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -22,6 +22,7 @@ from .helpers import ( METADATA, ORIGINAL_RESTART_CONDITION, + SECOND_APPLICATION, add_unit_with_storage, app_name, are_all_db_processes_down, @@ -56,7 +57,6 @@ update_restart_condition, validate_test_data, wait_network_restore, - SECOND_APPLICATION, ) logger = logging.getLogger(__name__) From 2b7db1434368a9a217de8dfdc8c6c7dd40978691 Mon Sep 17 00:00:00 2001 From: DmitriyBalaba Date: Tue, 11 Jun 2024 14:48:28 +0300 Subject: [PATCH 20/20] checking blocked status based using blocking message --- tests/integration/ha_tests/helpers.py | 21 +++++++++++++++++-- .../integration/ha_tests/test_self_healing.py | 9 +++++++- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index e49ccc10e2..650015cafd 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -898,11 +898,20 @@ def storage_id(ops_test, unit_name): return line.split()[1] -async def add_unit_with_storage(ops_test, app, storage, is_blocked: bool = False): +async def add_unit_with_storage( + ops_test, app, storage, is_blocked: bool = False, blocked_message: str = "" +): """Adds unit with storage. Note: this function exists as a temporary solution until this issue is resolved: https://github.com/juju/python-libjuju/issues/695 + + Args: + ops_test: The ops test framework instance + app: The name of the application + storage: Unique storage identifier + is_blocked: Checking blocked status + blocked_message: Check message in blocked status """ expected_units = len(ops_test.model.applications[app].units) + 1 prev_units = [unit.name for unit in ops_test.model.applications[app].units] @@ -912,9 +921,17 @@ async def add_unit_with_storage(ops_test, app, storage, is_blocked: bool = False assert return_code == 0, "Failed to add unit with storage" async with ops_test.fast_forward(): if is_blocked: + assert ( + is_blocked and blocked_message != "" + ), "The blocked status check should be checked along with the message" application = ops_test.model.applications[app] await ops_test.model.block_until( - lambda: "blocked" in {unit.workload_status for unit in application.units}, + lambda: any( + unit.workload_status == "blocked" + and unit.workload_status_message == blocked_message + for unit in application.units + ), + # "blocked" in {unit.workload_status for unit in application.units}, timeout=1500, ) else: diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 8fd667b4db..fbb46aa445 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -65,6 +65,9 @@ PATRONI_PROCESS = "/snap/charmed-postgresql/[0-9]*/usr/bin/patroni" POSTGRESQL_PROCESS = "postgres" DB_PROCESSES = [POSTGRESQL_PROCESS, PATRONI_PROCESS] +ENDPOINT_SIMULTANEOUSLY_BLOCKING_MESSAGE = ( + "Please choose one endpoint to use. No need to relate all of them simultaneously!" +) @pytest.mark.group(1) @@ -632,7 +635,11 @@ async def test_deploy_zero_units(ops_test: OpsTest, charm): logger.info("database scaling up to two units using third-party cluster storage") new_unit = await add_unit_with_storage( - ops_test, app=app, storage=second_storage, is_blocked=True + ops_test, + app=app, + storage=second_storage, + is_blocked=True, + blocked_message=ENDPOINT_SIMULTANEOUSLY_BLOCKING_MESSAGE, ) logger.info(f"remove unit {new_unit.name} with storage from application {SECOND_APPLICATION}")