Skip to content

Commit

Permalink
[DPE-5827] Set all nodes to synchronous replicas (#784)
Browse files Browse the repository at this point in the history
* Update patroni configuration

* Update test assertion

* Copy update_synchronous_node_count from VM

* Add unit test

* Set sync node count during upgrade

* Fix tls test

* Switchover primary

* Add different helper to get leader

* Add config boilerplate

* Use config value when setting sync node count

* Escape tuple

* Add policy values

* Add integration test

* Fix casting

* Fix test

* Update to spec

* Bump retry timout

* Switch to planned units

* Fix generator

* Update conf description

* Spread task

* Pass the charm
  • Loading branch information
dragomirp authored Feb 18, 2025
1 parent 27892b0 commit 842aa47
Show file tree
Hide file tree
Showing 16 changed files with 249 additions and 17 deletions.
6 changes: 6 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
# See LICENSE file for licensing details.

options:
synchronous_node_count:
description: |
Sets the number of synchronous nodes to be maintained in the cluster. Should be
either "all", "majority" or a positive integer value.
type: string
default: "all"
durability_synchronous_commit:
description: |
Sets the current transactions synchronization level. This charm allows only the
Expand Down
19 changes: 18 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,13 +470,22 @@ def get_unit_ip(self, unit: Unit) -> str | None:
else:
return None

def updated_synchronous_node_count(self) -> bool:
"""Tries to update synchronous_node_count configuration and reports the result."""
try:
self._patroni.update_synchronous_node_count()
return True
except RetryError:
logger.debug("Unable to set synchronous_node_count")
return False

def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
"""The leader removes the departing units from the list of cluster members."""
# Allow leader to update endpoints if it isn't leaving.
if not self.unit.is_leader() or event.departing_unit == self.unit:
return

if not self.is_cluster_initialised:
if not self.is_cluster_initialised or not self.updated_synchronous_node_count():
logger.debug(
"Deferring on_peer_relation_departed: Cluster must be initialized before members can leave"
)
Expand Down Expand Up @@ -680,6 +689,10 @@ def _on_config_changed(self, event) -> None:
self.unit.status = BlockedStatus("Configuration Error. Please check the logs")
logger.error("Invalid configuration: %s", str(e))
return
if not self.updated_synchronous_node_count():
logger.debug("Defer on_config_changed: unable to set synchronous node count")
event.defer()
return

if self.is_blocked and "Configuration Error" in self.unit.status.message:
self._set_active_status()
Expand All @@ -693,6 +706,9 @@ def _on_config_changed(self, event) -> None:
# Enable and/or disable the extensions.
self.enable_disable_extensions()

self._unblock_extensions()

def _unblock_extensions(self) -> None:
# Unblock the charm after extensions are enabled (only if it's blocked due to application
# charms requesting extensions).
if self.unit.status.message != EXTENSIONS_BLOCKING_MESSAGE:
Expand Down Expand Up @@ -803,6 +819,7 @@ def _add_members(self, event) -> None:
for member in self._hosts - self._patroni.cluster_members:
logger.debug("Adding %s to cluster", member)
self.add_cluster_member(member)
self._patroni.update_synchronous_node_count()
except NotReadyError:
logger.info("Deferring reconfigure: another member doing sync right now")
event.defer()
Expand Down
4 changes: 3 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
"""Structured configuration for the PostgreSQL charm."""

import logging
from typing import Literal

from charms.data_platform_libs.v0.data_models import BaseConfigModel
from pydantic import validator
from pydantic import PositiveInt, validator

logger = logging.getLogger(__name__)


class CharmConfig(BaseConfigModel):
"""Manager for the structured configuration."""

synchronous_node_count: Literal["all", "majority"] | PositiveInt
durability_synchronous_commit: str | None
instance_default_text_search_config: str | None
instance_max_locks_per_transaction: int | None
Expand Down
36 changes: 35 additions & 1 deletion src/patroni.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class SwitchoverFailedError(Exception):
"""Raised when a switchover failed for some reason."""


class UpdateSyncNodeCountError(Exception):
"""Raised when updating synchronous_node_count failed for some reason."""


class Patroni:
"""This class handles the communication with Patroni API and configuration files."""

Expand Down Expand Up @@ -126,6 +130,36 @@ def _get_alternative_patroni_url(
url = self._patroni_url
return url

@property
def _synchronous_node_count(self) -> int:
planned_units = self._charm.app.planned_units()
if self._charm.config.synchronous_node_count == "all":
return planned_units - 1
elif self._charm.config.synchronous_node_count == "majority":
return planned_units // 2
return (
self._charm.config.synchronous_node_count
if self._charm.config.synchronous_node_count < self._members_count - 1
else planned_units - 1
)

def update_synchronous_node_count(self) -> None:
"""Update synchronous_node_count."""
# Try to update synchronous_node_count.
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
r = requests.patch(
f"{self._patroni_url}/config",
json={"synchronous_node_count": self._synchronous_node_count},
verify=self._verify,
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)

# Check whether the update was unsuccessful.
if r.status_code != 200:
raise UpdateSyncNodeCountError(f"received {r.status_code}")

def get_primary(
self, unit_name_pattern=False, alternative_endpoints: list[str] | None = None
) -> str:
Expand Down Expand Up @@ -525,7 +559,7 @@ def render_patroni_yml_file(
restore_to_latest=restore_to_latest,
stanza=stanza,
restore_stanza=restore_stanza,
minority_count=self._members_count // 2,
synchronous_node_count=self._synchronous_node_count,
version=self.rock_postgresql_version.split(".")[0],
pg_parameters=parameters,
primary_cluster_endpoint=self._charm.async_replication.get_primary_cluster_endpoint(),
Expand Down
1 change: 1 addition & 0 deletions src/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def _on_upgrade_changed(self, event) -> None:
return

self.charm.update_config()
self.charm.updated_synchronous_node_count()

def _on_upgrade_charm_check_legacy(self, event: UpgradeCharmEvent) -> None:
if not self.peer_relation:
Expand Down
2 changes: 1 addition & 1 deletion templates/patroni.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ bootstrap:
dcs:
synchronous_mode: true
failsafe_mode: true
synchronous_node_count: {{ minority_count }}
synchronous_node_count: {{ synchronous_node_count }}
postgresql:
use_pg_rewind: true
remove_data_directory_on_rewind_failure: true
Expand Down
41 changes: 41 additions & 0 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,26 @@ async def get_postgresql_parameter(ops_test: OpsTest, parameter_name: str) -> in
return parameter_value


async def get_leader(model: Model, application_name: str) -> str:
"""Get the standby leader name.
Args:
model: the model instance.
application_name: the name of the application to get the value for.
Returns:
the name of the standby leader.
"""
status = await model.get_status()
first_unit_ip = next(
unit for unit in status["applications"][application_name]["units"].values()
)["address"]
cluster = get_patroni_cluster(first_unit_ip)
for member in cluster["members"]:
if member["role"] == "leader":
return member["name"]


async def get_standby_leader(model: Model, application_name: str) -> str:
"""Get the standby leader name.
Expand Down Expand Up @@ -1145,3 +1165,24 @@ async def remove_unit_force(ops_test: OpsTest, num_units: int):
timeout=1000,
wait_for_exact_units=scale,
)


async def get_cluster_roles(
ops_test: OpsTest, unit_name: str
) -> dict[str, str | list[str] | None]:
"""Returns whether the unit a replica in the cluster."""
unit_ip = await get_unit_address(ops_test, unit_name)
members = {"replicas": [], "primaries": [], "sync_standbys": []}
member_list = get_patroni_cluster(unit_ip)["members"]
logger.info(f"Cluster members are: {member_list}")
for member in member_list:
role = member["role"]
name = "/".join(member["name"].rsplit("-", 1))
if role == "leader":
members["primaries"].append(name)
elif role == "sync_standby":
members["sync_standbys"].append(name)
else:
members["replicas"].append(name)

return members
16 changes: 8 additions & 8 deletions tests/integration/ha_tests/test_async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
from .helpers import (
are_writes_increasing,
check_writes,
get_leader,
get_standby_leader,
get_sync_standby,
start_continuous_writes,
)

Expand Down Expand Up @@ -406,11 +406,11 @@ async def test_async_replication_failover_in_main_cluster(
logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test)

sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME)
logger.info(f"Sync-standby: {sync_standby}")
logger.info("deleting the sync-standby pod")
primary = await get_leader(first_model, DATABASE_APP_NAME)
logger.info(f"Primary: {primary}")
logger.info("deleting the primary pod")
client = Client(namespace=first_model.info.name)
client.delete(Pod, name=sync_standby.replace("/", "-"))
client.delete(Pod, name=primary.replace("/", "-"))

async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL):
await gather(
Expand All @@ -423,9 +423,9 @@ async def test_async_replication_failover_in_main_cluster(
)

# Check that the sync-standby unit is not the same as before.
new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME)
logger.info(f"New sync-standby: {new_sync_standby}")
assert new_sync_standby != sync_standby, "Sync-standby is the same as before"
new_primary = await get_leader(first_model, DATABASE_APP_NAME)
logger.info(f"New sync-standby: {new_primary}")
assert new_primary != primary, "Sync-standby is the same as before"

logger.info("Ensure continuous_writes after the crashed unit")
await are_writes_increasing(ops_test)
Expand Down
79 changes: 79 additions & 0 deletions tests/integration/ha_tests/test_synchronous_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.
import pytest
from pytest_operator.plugin import OpsTest
from tenacity import Retrying, stop_after_attempt, wait_fixed

from ..helpers import app_name, build_and_deploy
from .helpers import get_cluster_roles


@pytest.mark.abort_on_fail
async def test_build_and_deploy(ops_test: OpsTest, charm) -> None:
"""Build and deploy three unit of PostgreSQL."""
wait_for_apps = False
# It is possible for users to provide their own cluster for HA testing. Hence, check if there
# is a pre-existing cluster.
if not await app_name(ops_test):
wait_for_apps = True
await build_and_deploy(ops_test, charm, 3, wait_for_idle=False)

if wait_for_apps:
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(status="active", timeout=1000, raise_on_error=False)


async def test_default_all(ops_test: OpsTest) -> None:
app = await app_name(ops_test)

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=300)

for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
with attempt:
roles = await get_cluster_roles(
ops_test, ops_test.model.applications[app].units[0].name
)

assert len(roles["primaries"]) == 1
assert len(roles["sync_standbys"]) == 2
assert len(roles["replicas"]) == 0


async def test_majority(ops_test: OpsTest) -> None:
app = await app_name(ops_test)

await ops_test.model.applications[app].set_config({"synchronous_node_count": "majority"})

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active")

for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
with attempt:
roles = await get_cluster_roles(
ops_test, ops_test.model.applications[app].units[0].name
)

assert len(roles["primaries"]) == 1
assert len(roles["sync_standbys"]) == 1
assert len(roles["replicas"]) == 1


async def test_constant(ops_test: OpsTest) -> None:
app = await app_name(ops_test)

await ops_test.model.applications[app].set_config({"synchronous_node_count": "2"})

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=300)

for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
with attempt:
roles = await get_cluster_roles(
ops_test, ops_test.model.applications[app].units[0].name
)

assert len(roles["primaries"]) == 1
assert len(roles["sync_standbys"]) == 2
assert len(roles["replicas"]) == 0
3 changes: 1 addition & 2 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,15 +754,14 @@ async def switchover(
)
assert response.status_code == 200, f"Switchover status code is {response.status_code}"
app_name = current_primary.split("/")[0]
minority_count = len(ops_test.model.applications[app_name].units) // 2
for attempt in Retrying(stop=stop_after_attempt(30), wait=wait_fixed(2), reraise=True):
with attempt:
response = requests.get(f"http://{primary_ip}:8008/cluster")
assert response.status_code == 200
standbys = len([
member for member in response.json()["members"] if member["role"] == "sync_standby"
])
assert standbys >= minority_count
assert standbys == len(ops_test.model.applications[app_name].units) - 1


async def wait_for_idle_on_blocked(
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ async def test_config_parameters(ops_test: OpsTest, charm) -> None:
test_string = "abcXYZ123"

configs = [
{"synchronous_node_count": ["0", "1"]}, # config option is greater than 0
{
"synchronous_node_count": [test_string, "all"]
}, # config option is one of `all`, `minority` or `majority`
{
"durability_synchronous_commit": [test_string, "on"]
}, # config option is one of `on`, `remote_apply` or `remote_write`
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def test_tls(ops_test: OpsTest) -> None:
patroni_password = await get_password(ops_test, "patroni")
cluster_info = requests.get(f"https://{primary_address}:8008/cluster", verify=False)
for member in cluster_info.json()["members"]:
if member["role"] == "replica":
if member["role"] != "leader":
replica = "/".join(member["name"].rsplit("-", 1))

# Check if TLS enabled for replication
Expand Down
7 changes: 7 additions & 0 deletions tests/spread/test_synchronous_policy.py/task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
summary: test_synchronous_policy.py
environment:
TEST_MODULE: ha_tests/test_synchronous_policy.py
execute: |
tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results"
artifacts:
- allure-results
Loading

0 comments on commit 842aa47

Please sign in to comment.