Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zk watch for master connection #10

Merged
merged 4 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions pybase/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
import logging
from builtins import str
from itertools import chain
from threading import Lock
from threading import Condition, Lock

import pybase.region.client as region
import pybase.zk.client as zk
from intervaltree import IntervalTree

from .exceptions import (MasterServerException, NoSuchTableException,
PyBaseException, RegionException, RegionServerException)
PyBaseException, RegionException, RegionServerException, ZookeeperException)
from .filters import _to_filter
from .region.region import region_from_cell
from .request import request
Expand Down Expand Up @@ -57,6 +57,19 @@ def __init__(self, zkquorum, pool_size, secondary):
# We don't really care if it fails, best effort only.
self.secondary = secondary

self.zk_client = zk.connect(zkquorum)

# register a callback handler when master znode data changes
@self.zk_client.DataWatch(zk.master_znode)
def _update_master_info(data, stat):
if data:
with self._master_lookup_lock:
self.update_master_client(*zk.parse_master_info(data))

wait_for_master = Condition()
wait_for_master.acquire()
if wait_for_master.wait_for(lambda: self.master_client is not None, 10.0):
raise ZookeeperException("Timed out waiting for master server watch to fire")

"""
HERE LAY CACHE OPERATIONS
Expand Down Expand Up @@ -380,19 +393,15 @@ def _create_new_region(self, response, table):
logger.info("Successfully discovered new region %s", new_region)
return new_region

def _recreate_master_client(self):
if self.master_client is not None:
# yep, still no idea why self.master_client can be set to None.
def update_master_client(self, ip, port):
if self.master_client:
self.master_client.close()
# Ask ZooKeeper for the location of the Master.
ip, port = zk.LocateMaster(self.zkquorum)

try:
# Try creating a new client instance and setting it as the new
# master_client.
# Try creating a new client instance and setting it as the new master_client.
self.master_client = region.NewClient(ip, port, self.pool_size, secondary=self.secondary)
logger.info("Updated master client to %s:%s", ip, port)
except RegionServerException:
# We can't connect to the address that ZK supplied. Raise an
# exception.
raise MasterServerException(ip, port, secondary=self.secondary)

"""
Expand Down Expand Up @@ -498,6 +507,4 @@ def _append_response(self, rsp):
def NewClient(zkquorum, socket_pool_size=1, secondary=False):
# Create the main client.
a = MainClient(zkquorum, socket_pool_size, secondary)
# Create the master client.
a._recreate_master_client()
return a
return a
71 changes: 31 additions & 40 deletions pybase/zk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,65 +30,56 @@
logger = logging.getLogger(__name__)

znode = "/hbase"
master_znode = znode + "/meta-region-server"


# LocateMeta takes a string representing the location of the ZooKeeper
# quorum. It then asks ZK for the location of the MetaRegionServer,
# returning a tuple containing (host_name, port).
def LocateMaster(zkquorum, establish_connection_timeout=5, missing_znode_retries=5, zk=None):
def connect(zkquorum, establish_connection_timeout=5):
zk = KazooClient(hosts=zkquorum)

if zk is None:
# Using Kazoo for interfacing with ZK
zk = KazooClient(hosts=zkquorum)
try:
zk.start(timeout=establish_connection_timeout)
except KazooTimeoutError:
raise ZookeeperConnectionException(
"Cannot connect to ZooKeeper at {}".format(zkquorum))
# MetaRegionServer information is located at /hbase/meta-region-server
try:
rsp, znodestat = zk.get(znode + "/meta-region-server")
except NoNodeError:
if missing_znode_retries == 0:
raise ZookeeperZNodeException(
"ZooKeeper does not contain meta-region-server node.")
logger.warn("ZooKeeper does not contain meta-region-server node. Retrying in 2 seconds. "
"(%s retries remaining)", missing_znode_retries)
sleep(2.0)
return LocateMaster(zkquorum, establish_connection_timeout=establish_connection_timeout,
missing_znode_retries=missing_znode_retries - 1, zk=zk)
# We don't need to maintain a connection to ZK. If we need it again we'll
# recreate the connection. A possible future implementation can subscribe
# to ZK and listen for when RegionServers go down, then pre-emptively
# reestablish those regions instead of waiting for a failed rpc to come
# back. Only issue is that if too many clients subscribe ZK may become
# overloaded.
zk.stop()
if len(rsp) == 0:
raise ZookeeperConnectionException("Cannot connect to ZooKeeper at {}".format(zkquorum))

return zk


def parse_master_info(resp):
if len(resp) == 0:
# Empty response is bad.
raise ZookeeperResponseException(
"ZooKeeper returned an empty response")
raise ZookeeperResponseException("ZooKeeper returned an empty response")
# The first byte must be \xff and the next four bytes are a little-endian
# uint32 containing the length of the meta.
first_byte, meta_length = unpack(">cI", rsp[:5])
first_byte, meta_length = unpack(">cI", resp[:5])
if first_byte != b'\xff':
# Malformed response
raise ZookeeperResponseException(
"ZooKeeper returned an invalid response")
raise ZookeeperResponseException("ZooKeeper returned an invalid response")
if meta_length < 1 or meta_length > 65000:
# Is this really an error?
raise ZookeeperResponseException(
"ZooKeeper returned too much meta information")
raise ZookeeperResponseException("ZooKeeper returned too much meta information")
# ZNode data in HBase are serialized protobufs with a four byte magic
# 'PBUF' prefix.
magic = unpack(">I", rsp[meta_length + 5:meta_length + 9])[0]
magic = unpack(">I", resp[meta_length + 5:meta_length + 9])[0]
if magic != 1346524486:
# 4 bytes: PBUF
raise ZookeeperResponseException("ZooKeeper returned an invalid response (are you running "
"a version of HBase supporting Protobufs?)")
rsp = rsp[meta_length + 9:]
rsp = resp[meta_length + 9:]
meta = MetaRegionServer()
meta.ParseFromString(rsp)
logger.info('Discovered Master at %s:%s',
meta.server.host_name, meta.server.port)
logger.info('Discovered Master at %s:%s', meta.server.host_name, meta.server.port)
return meta.server.host_name, meta.server.port


def get_master_info(zk, missing_znode_retries=5):
try:
resp, _ = zk.get(master_znode)
return parse_master_info(resp)
except NoNodeError:
if missing_znode_retries == 0:
raise ZookeeperZNodeException(
"ZooKeeper does not contain meta-region-server node.")
logger.warning("ZooKeeper does not contain meta-region-server node. Retrying in 2 seconds. "
"(%s retries remaining)", missing_znode_retries)
sleep(2.0)
return get_master_info(zk, missing_znode_retries - 1)