From 2a1f3f9f0833440126cfc90345ea0ce1bad0831c Mon Sep 17 00:00:00 2001 From: Ian Bishop Date: Sat, 16 Oct 2021 11:11:09 -0300 Subject: [PATCH 1/4] Add zk watch for master connection --- pybase/client.py | 19 +++++------- pybase/zk/client.py | 76 +++++++++++++++++++++------------------------ 2 files changed, 43 insertions(+), 52 deletions(-) diff --git a/pybase/client.py b/pybase/client.py index 8a07147..f95cdbf 100644 --- a/pybase/client.py +++ b/pybase/client.py @@ -57,6 +57,9 @@ def __init__(self, zkquorum, pool_size, secondary): # We don't really care if it fails, best effort only. self.secondary = secondary + self.zk_client = zk.connect(zkquorum) + ip, port = zk.get_master_info(self.zk_client, self.update_master_client) + self.update_master_client(ip, port) """ HERE LAY CACHE OPERATIONS @@ -380,19 +383,15 @@ def _create_new_region(self, response, table): logger.info("Successfully discovered new region %s", new_region) return new_region - def _recreate_master_client(self): - if self.master_client is not None: - # yep, still no idea why self.master_client can be set to None. + def update_master_client(self, ip, port): + if self.master_client: self.master_client.close() - # Ask ZooKeeper for the location of the Master. - ip, port = zk.LocateMaster(self.zkquorum) + try: - # Try creating a new client instance and setting it as the new - # master_client. + # Try creating a new client instance and setting it as the new master_client. self.master_client = region.NewClient(ip, port, self.pool_size, secondary=self.secondary) + logger.info("Updated master client to %s:%s", ip, port) except RegionServerException: - # We can't connect to the address that ZK supplied. Raise an - # exception. raise MasterServerException(ip, port, secondary=self.secondary) """ @@ -498,6 +497,4 @@ def _append_response(self, rsp): def NewClient(zkquorum, socket_pool_size=1, secondary=False): # Create the main client. a = MainClient(zkquorum, socket_pool_size, secondary) - # Create the master client. - a._recreate_master_client() return a diff --git a/pybase/zk/client.py b/pybase/zk/client.py index af6742d..2478635 100644 --- a/pybase/zk/client.py +++ b/pybase/zk/client.py @@ -32,63 +32,57 @@ znode = "/hbase" -# LocateMeta takes a string representing the location of the ZooKeeper -# quorum. It then asks ZK for the location of the MetaRegionServer, -# returning a tuple containing (host_name, port). -def LocateMaster(zkquorum, establish_connection_timeout=5, missing_znode_retries=5, zk=None): - - if zk is None: - # Using Kazoo for interfacing with ZK - zk = KazooClient(hosts=zkquorum) +def connect(zkquorum, establish_connection_timeout=5): + zk = KazooClient(hosts=zkquorum) + try: zk.start(timeout=establish_connection_timeout) except KazooTimeoutError: - raise ZookeeperConnectionException( - "Cannot connect to ZooKeeper at {}".format(zkquorum)) - # MetaRegionServer information is located at /hbase/meta-region-server - try: - rsp, znodestat = zk.get(znode + "/meta-region-server") - except NoNodeError: - if missing_znode_retries == 0: - raise ZookeeperZNodeException( - "ZooKeeper does not contain meta-region-server node.") - logger.warn("ZooKeeper does not contain meta-region-server node. Retrying in 2 seconds. " - "(%s retries remaining)", missing_znode_retries) - sleep(2.0) - return LocateMaster(zkquorum, establish_connection_timeout=establish_connection_timeout, - missing_znode_retries=missing_znode_retries - 1, zk=zk) - # We don't need to maintain a connection to ZK. If we need it again we'll - # recreate the connection. A possible future implementation can subscribe - # to ZK and listen for when RegionServers go down, then pre-emptively - # reestablish those regions instead of waiting for a failed rpc to come - # back. Only issue is that if too many clients subscribe ZK may become - # overloaded. - zk.stop() - if len(rsp) == 0: + raise ZookeeperConnectionException("Cannot connect to ZooKeeper at {}".format(zkquorum)) + + return zk + + +def _parse_master_info(resp): + if len(resp) == 0: # Empty response is bad. - raise ZookeeperResponseException( - "ZooKeeper returned an empty response") + raise ZookeeperResponseException("ZooKeeper returned an empty response") # The first byte must be \xff and the next four bytes are a little-endian # uint32 containing the length of the meta. - first_byte, meta_length = unpack(">cI", rsp[:5]) + first_byte, meta_length = unpack(">cI", resp[:5]) if first_byte != b'\xff': # Malformed response - raise ZookeeperResponseException( - "ZooKeeper returned an invalid response") + raise ZookeeperResponseException("ZooKeeper returned an invalid response") if meta_length < 1 or meta_length > 65000: # Is this really an error? - raise ZookeeperResponseException( - "ZooKeeper returned too much meta information") + raise ZookeeperResponseException("ZooKeeper returned too much meta information") # ZNode data in HBase are serialized protobufs with a four byte magic # 'PBUF' prefix. - magic = unpack(">I", rsp[meta_length + 5:meta_length + 9])[0] + magic = unpack(">I", resp[meta_length + 5:meta_length + 9])[0] if magic != 1346524486: # 4 bytes: PBUF raise ZookeeperResponseException("ZooKeeper returned an invalid response (are you running " "a version of HBase supporting Protobufs?)") - rsp = rsp[meta_length + 9:] + rsp = resp[meta_length + 9:] meta = MetaRegionServer() meta.ParseFromString(rsp) - logger.info('Discovered Master at %s:%s', - meta.server.host_name, meta.server.port) + logger.info('Discovered Master at %s:%s', meta.server.host_name, meta.server.port) return meta.server.host_name, meta.server.port + + +def get_master_info(zk, watch_fn, missing_znode_retries=5): + def _wrapped_watch(resp, stat): + watch_fn(*_parse_master_info(resp)) + + try: + resp, _ = zk.get(znode + "/meta-region-server", watch=_wrapped_watch) + + return _parse_master_info(resp) + except NoNodeError: + if missing_znode_retries == 0: + raise ZookeeperZNodeException( + "ZooKeeper does not contain meta-region-server node.") + logger.warn("ZooKeeper does not contain meta-region-server node. Retrying in 2 seconds. " + "(%s retries remaining)", missing_znode_retries) + sleep(2.0) + return get_master_info(zk, missing_znode_retries - 1) From 36a6d63ac7f0bd5262fe5daa4dbfd5a0cef13d88 Mon Sep 17 00:00:00 2001 From: Ian Bishop Date: Sat, 16 Oct 2021 11:30:37 -0300 Subject: [PATCH 2/4] Switch to using datawatch recipe for watch re-registry --- pybase/client.py | 8 ++++++-- pybase/zk/client.py | 13 +++++-------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/pybase/client.py b/pybase/client.py index f95cdbf..7b0ccbe 100644 --- a/pybase/client.py +++ b/pybase/client.py @@ -58,8 +58,12 @@ def __init__(self, zkquorum, pool_size, secondary): self.secondary = secondary self.zk_client = zk.connect(zkquorum) - ip, port = zk.get_master_info(self.zk_client, self.update_master_client) - self.update_master_client(ip, port) + self.update_master_client(*zk.get_master_info(self.zk_client)) + + # register a callback handler when master znode data changes + @self.zk_client.DataWatch(zk.master_znode) + def _update_master_info(data, stat): + self.update_master_client(*zk.parse_master_info(data)) """ HERE LAY CACHE OPERATIONS diff --git a/pybase/zk/client.py b/pybase/zk/client.py index 2478635..ccc7ba0 100644 --- a/pybase/zk/client.py +++ b/pybase/zk/client.py @@ -30,6 +30,7 @@ logger = logging.getLogger(__name__) znode = "/hbase" +master_znode = znode + "/meta-region-server" def connect(zkquorum, establish_connection_timeout=5): @@ -43,7 +44,7 @@ def connect(zkquorum, establish_connection_timeout=5): return zk -def _parse_master_info(resp): +def parse_master_info(resp): if len(resp) == 0: # Empty response is bad. raise ZookeeperResponseException("ZooKeeper returned an empty response") @@ -70,14 +71,10 @@ def _parse_master_info(resp): return meta.server.host_name, meta.server.port -def get_master_info(zk, watch_fn, missing_znode_retries=5): - def _wrapped_watch(resp, stat): - watch_fn(*_parse_master_info(resp)) - +def get_master_info(zk, missing_znode_retries=5): try: - resp, _ = zk.get(znode + "/meta-region-server", watch=_wrapped_watch) - - return _parse_master_info(resp) + resp, _ = zk.get(master_znode) + return parse_master_info(resp) except NoNodeError: if missing_znode_retries == 0: raise ZookeeperZNodeException( From b22d0344487227b302378c9b4dc45ee063de966b Mon Sep 17 00:00:00 2001 From: Ian Bishop Date: Tue, 19 Oct 2021 08:45:42 -0300 Subject: [PATCH 3/4] Handle delete event & add conditional wait --- pybase/client.py | 15 ++++++++++----- pybase/zk/client.py | 3 ++- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/pybase/client.py b/pybase/client.py index 7b0ccbe..f874ce9 100644 --- a/pybase/client.py +++ b/pybase/client.py @@ -18,14 +18,14 @@ import logging from builtins import str from itertools import chain -from threading import Lock +from threading import Condition, Lock import pybase.region.client as region import pybase.zk.client as zk from intervaltree import IntervalTree from .exceptions import (MasterServerException, NoSuchTableException, - PyBaseException, RegionException, RegionServerException) + PyBaseException, RegionException, RegionServerException, ZookeeperException) from .filters import _to_filter from .region.region import region_from_cell from .request import request @@ -58,12 +58,17 @@ def __init__(self, zkquorum, pool_size, secondary): self.secondary = secondary self.zk_client = zk.connect(zkquorum) - self.update_master_client(*zk.get_master_info(self.zk_client)) # register a callback handler when master znode data changes @self.zk_client.DataWatch(zk.master_znode) def _update_master_info(data, stat): - self.update_master_client(*zk.parse_master_info(data)) + if data: + self.update_master_client(*zk.parse_master_info(data)) + + wait_for_master = Condition() + wait_for_master.acquire() + if wait_for_master.wait_for(lambda: self.master_client is not None, 10.0): + raise ZookeeperException("Timed out waiting for master server watch to fire") """ HERE LAY CACHE OPERATIONS @@ -501,4 +506,4 @@ def _append_response(self, rsp): def NewClient(zkquorum, socket_pool_size=1, secondary=False): # Create the main client. a = MainClient(zkquorum, socket_pool_size, secondary) - return a + return a \ No newline at end of file diff --git a/pybase/zk/client.py b/pybase/zk/client.py index ccc7ba0..dbb2f6e 100644 --- a/pybase/zk/client.py +++ b/pybase/zk/client.py @@ -67,7 +67,8 @@ def parse_master_info(resp): rsp = resp[meta_length + 9:] meta = MetaRegionServer() meta.ParseFromString(rsp) - logger.info('Discovered Master at %s:%s', meta.server.host_name, meta.server.port) + print('Discovered Master at %s:%s' % (meta.server.host_name, meta.server.port)) + #logger.info('Discovered Master at %s:%s', meta.server.host_name, meta.server.port) return meta.server.host_name, meta.server.port From 9df2d0238a1e59810886c606ff0f1528492fb789 Mon Sep 17 00:00:00 2001 From: Ian Bishop Date: Tue, 19 Oct 2021 08:56:23 -0300 Subject: [PATCH 4/4] PR feedback --- pybase/client.py | 3 ++- pybase/zk/client.py | 7 +++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pybase/client.py b/pybase/client.py index f874ce9..11e3052 100644 --- a/pybase/client.py +++ b/pybase/client.py @@ -63,7 +63,8 @@ def __init__(self, zkquorum, pool_size, secondary): @self.zk_client.DataWatch(zk.master_znode) def _update_master_info(data, stat): if data: - self.update_master_client(*zk.parse_master_info(data)) + with self._master_lookup_lock: + self.update_master_client(*zk.parse_master_info(data)) wait_for_master = Condition() wait_for_master.acquire() diff --git a/pybase/zk/client.py b/pybase/zk/client.py index dbb2f6e..31a0296 100644 --- a/pybase/zk/client.py +++ b/pybase/zk/client.py @@ -67,8 +67,7 @@ def parse_master_info(resp): rsp = resp[meta_length + 9:] meta = MetaRegionServer() meta.ParseFromString(rsp) - print('Discovered Master at %s:%s' % (meta.server.host_name, meta.server.port)) - #logger.info('Discovered Master at %s:%s', meta.server.host_name, meta.server.port) + logger.info('Discovered Master at %s:%s', meta.server.host_name, meta.server.port) return meta.server.host_name, meta.server.port @@ -80,7 +79,7 @@ def get_master_info(zk, missing_znode_retries=5): if missing_znode_retries == 0: raise ZookeeperZNodeException( "ZooKeeper does not contain meta-region-server node.") - logger.warn("ZooKeeper does not contain meta-region-server node. Retrying in 2 seconds. " - "(%s retries remaining)", missing_znode_retries) + logger.warning("ZooKeeper does not contain meta-region-server node. Retrying in 2 seconds. " + "(%s retries remaining)", missing_znode_retries) sleep(2.0) return get_master_info(zk, missing_znode_retries - 1)