Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) [DPE-5303] Update endpoints faster when snap is stopped #610

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,7 @@ def _update_members_ips(self, ip_to_add: str = None, ip_to_remove: str = None) -
elif ip_to_remove:
ips.remove(ip_to_remove)
self._peers.data[self.app]["members_ips"] = json.dumps(ips)
self._observer.restart_observer()

@retry(
stop=stop_after_delay(60),
Expand Down Expand Up @@ -859,6 +860,7 @@ def _unit_ip(self) -> str:
def _on_cluster_topology_change(self, _):
"""Updates endpoints and (optionally) certificates when the cluster topology changes."""
logger.info("Cluster topology changed")
self._observer.restart_observer()
if self.primary_endpoint:
self._update_relation_endpoints()
self.unit.status = ActiveStatus()
Expand Down
19 changes: 10 additions & 9 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from charms.operator_libs_linux.v2 import snap
from jinja2 import Template
from tenacity import (
AttemptManager,
RetryError,
Retrying,
retry,
Expand Down Expand Up @@ -210,7 +209,7 @@ def get_member_ip(self, member_name: str) -> str:
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
url = self._get_alternative_patroni_url(attempt.retry_state.attempt_number)
cluster_status = requests.get(
f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=self.verify,
Expand All @@ -234,7 +233,7 @@ def get_member_status(self, member_name: str) -> str:
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
url = self._get_alternative_patroni_url(attempt.retry_state.attempt_number)
cluster_status = requests.get(
f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=self.verify,
Expand All @@ -259,7 +258,9 @@ def get_primary(self, unit_name_pattern=False, alternative_endpoints: List[str]
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
with attempt:
url = self._get_alternative_patroni_url(attempt, alternative_endpoints)
url = self._get_alternative_patroni_url(
attempt.retry_state.attempt_number, alternative_endpoints
)
cluster_status = requests.get(
f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=self.verify,
Expand Down Expand Up @@ -289,7 +290,7 @@ def get_standby_leader(
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
url = self._get_alternative_patroni_url(attempt.retry_state.attempt_number)
cluster_status = requests.get(
f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=self.verify,
Expand All @@ -313,15 +314,15 @@ def get_sync_standby_names(self) -> List[str]:
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
url = self._get_alternative_patroni_url(attempt.retry_state.attempt_number)
r = requests.get(f"{url}/cluster", verify=self.verify, auth=self._patroni_auth)
for member in r.json()["members"]:
if member["role"] == "sync_standby":
sync_standbys.append("/".join(member["name"].rsplit("-", 1)))
return sync_standbys

def _get_alternative_patroni_url(
self, attempt: AttemptManager, alternative_endpoints: List[str] = None
self, attempt_number: int, alternative_endpoints: List[str] = None
) -> str:
"""Get an alternative REST API URL from another member each time.

Expand All @@ -330,9 +331,9 @@ def _get_alternative_patroni_url(
"""
if alternative_endpoints is not None:
return self._patroni_url.replace(
self.unit_ip, alternative_endpoints[attempt.retry_state.attempt_number - 1]
self.unit_ip, alternative_endpoints[attempt_number - 1]
)
attempt_number = attempt.retry_state.attempt_number
attempt_number = attempt_number
if attempt_number > 1:
url = self._patroni_url
# Build the URL using http and later using https for each peer.
Expand Down
72 changes: 49 additions & 23 deletions src/cluster_topology_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ def __init__(self, charm: CharmBase, run_cmd: str):
self._charm = charm
self._run_cmd = run_cmd

def start_observer(self):
def restart_observer(self):
"""Restart the cluster topology observer process."""
self.stop_observer()
self.start_observer(skip_status_check=True)

def start_observer(self, skip_status_check: bool = False):
"""Start the cluster topology observer running in a new process."""
if not isinstance(self._charm.unit.status, ActiveStatus) or self._charm._peers is None:
if not skip_status_check and (
not isinstance(self._charm.unit.status, ActiveStatus) or self._charm._peers is None
):
return
if "observer-pid" in self._charm._peers.data[self._charm.unit]:
# Double check that the PID exists
Expand All @@ -80,6 +87,10 @@ def start_observer(self):
"/usr/bin/python3",
"src/cluster_topology_observer.py",
self._charm._patroni._patroni_url,
",".join([
self._charm._patroni._get_alternative_patroni_url(number)
for number in range(2 * len(self._charm._peer_members_ips) + 1)[1:]
]),
f"{self._charm._patroni.verify}",
self._run_cmd,
self._charm.unit.name,
Expand Down Expand Up @@ -129,30 +140,45 @@ def main():

Watch the Patroni API cluster info. When changes are detected, dispatch the change event.
"""
patroni_url, verify, run_cmd, unit, charm_dir = sys.argv[1:]
patroni_url, alternative_patroni_urls, verify, run_cmd, unit, charm_dir = sys.argv[1:]

previous_cluster_topology = {}
urls = [patroni_url] + list(filter(None, alternative_patroni_urls.split(",")))
while True:
cluster_status = requests.get(
f"{patroni_url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=verify,
timeout=API_REQUEST_TIMEOUT,
)
current_cluster_topology = {
member["name"]: member["role"] for member in cluster_status.json()["members"]
}

# If it's the first time the cluster topology was retrieved, then store it and use
# it for subsequent checks.
if not previous_cluster_topology:
previous_cluster_topology = current_cluster_topology
# If the cluster topology changed, dispatch a charm event to handle this change.
elif current_cluster_topology != previous_cluster_topology:
previous_cluster_topology = current_cluster_topology
dispatch(run_cmd, unit, charm_dir)

# Wait some time before checking again for a cluster topology change.
sleep(30)
for url in urls:
try:
cluster_status = requests.get(
f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=verify,
timeout=API_REQUEST_TIMEOUT,
)
except Exception as e:
with open(LOG_FILE_PATH, "a") as log_file:
log_file.write(
f"Failed to get cluster status when using {url}: {e} - {type(e)}\n"
)
if url == urls[-1]:
with open(LOG_FILE_PATH, "a") as log_file:
log_file.write("No more peers to try to get the cluster status from.\n")
break
else:
continue
else:
current_cluster_topology = {
member["name"]: member["role"] for member in cluster_status.json()["members"]
}

# If it's the first time the cluster topology was retrieved, then store it and use
# it for subsequent checks.
if not previous_cluster_topology:
previous_cluster_topology = current_cluster_topology
# If the cluster topology changed, dispatch a charm event to handle this change.
elif current_cluster_topology != previous_cluster_topology:
previous_cluster_topology = current_cluster_topology
dispatch(run_cmd, unit, charm_dir)

# Wait some time before checking again for a cluster topology change.
sleep(30)


if __name__ == "__main__":
Expand Down
12 changes: 10 additions & 2 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def test_primary_endpoint_no_peers(harness):


def test_on_leader_elected(harness):
with patch(
with patch("charm.ClusterTopologyObserver.start_observer") as _start_observer, patch(
"charm.PostgresqlOperatorCharm._update_relation_endpoints", new_callable=PropertyMock
) as _update_relation_endpoints, patch(
"charm.PostgresqlOperatorCharm.primary_endpoint",
Expand Down Expand Up @@ -565,6 +565,7 @@ def test_enable_disable_extensions(harness, caplog):

def test_on_start(harness):
with (
patch("charm.ClusterTopologyObserver.start_observer") as _start_observer,
patch(
"charm.PostgresqlOperatorCharm._restart_services_after_reboot"
) as _restart_services_after_reboot,
Expand Down Expand Up @@ -727,6 +728,7 @@ def test_on_start_replica(harness):

def test_on_start_no_patroni_member(harness):
with (
patch("charm.ClusterTopologyObserver.start_observer") as _start_observer,
patch("subprocess.check_output", return_value=b"C"),
patch("charm.snap.SnapCache") as _snap_cache,
patch("charm.PostgresqlOperatorCharm.postgresql") as _postgresql,
Expand Down Expand Up @@ -776,7 +778,10 @@ def test_on_start_after_blocked_state(harness):


def test_on_get_password(harness):
with patch("charm.PostgresqlOperatorCharm.update_config"):
with (
patch("charm.ClusterTopologyObserver.start_observer") as _start_observer,
patch("charm.PostgresqlOperatorCharm.update_config"),
):
rel_id = harness.model.get_relation(PEER).id
# Create a mock event and set passwords in peer relation data.
harness.set_leader(True)
Expand Down Expand Up @@ -1327,6 +1332,7 @@ def test_on_cluster_topology_change(harness):
patch(
"charm.PostgresqlOperatorCharm.primary_endpoint", new_callable=PropertyMock
) as _primary_endpoint,
patch("charm.ClusterTopologyObserver.restart_observer") as _restart_observer,
):
# Mock the property value.
_primary_endpoint.side_effect = [None, "1.1.1.1"]
Expand All @@ -1350,6 +1356,7 @@ def test_on_cluster_topology_change_keep_blocked(harness):
patch(
"charm.PostgresqlOperatorCharm._update_relation_endpoints"
) as _update_relation_endpoints,
patch("charm.ClusterTopologyObserver.restart_observer") as _restart_observer,
):
harness.model.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE)

Expand All @@ -1371,6 +1378,7 @@ def test_on_cluster_topology_change_clear_blocked(harness):
patch(
"charm.PostgresqlOperatorCharm._update_relation_endpoints"
) as _update_relation_endpoints,
patch("charm.ClusterTopologyObserver.restart_observer") as _restart_observer,
):
harness.model.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE)

Expand Down
13 changes: 4 additions & 9 deletions tests/unit/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,14 @@ def patroni(harness, peers_ips):


def test_get_alternative_patroni_url(peers_ips, patroni):
# Mock tenacity attempt.
retry = tenacity.Retrying()
retry_state = tenacity.RetryCallState(retry, None, None, None)
attempt = tenacity.AttemptManager(retry_state)

# Test the first URL that is returned (it should have the current unit IP).
url = patroni._get_alternative_patroni_url(attempt)
attempt_number = 1
url = patroni._get_alternative_patroni_url(attempt_number)
tc.assertEqual(url, f"http://{patroni.unit_ip}:8008")

# Test returning the other servers URLs.
for attempt_number in range(attempt.retry_state.attempt_number + 1, len(peers_ips) + 2):
attempt.retry_state.attempt_number = attempt_number
url = patroni._get_alternative_patroni_url(attempt)
for attempt_number in range(attempt_number + 1, len(peers_ips) + 2):
url = patroni._get_alternative_patroni_url(attempt_number)
assert url.split("http://")[1].split(":8008")[0] in peers_ips


Expand Down
6 changes: 5 additions & 1 deletion tests/unit/test_cluster_topology_observer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import signal
from typing import Optional
from typing import List, Optional
from unittest.mock import Mock, PropertyMock, patch

import pytest
Expand Down Expand Up @@ -51,6 +51,10 @@ def _on_cluster_topology_change(self, _) -> None:
def _patroni(self) -> Patroni:
return Mock(_patroni_url="http://1.1.1.1:8008/", verify=True)

@property
def _peer_members_ips(self) -> List[str]:
return []

@property
def _peers(self) -> Optional[Relation]:
return None
Expand Down
Loading