From 8f39ac025f0efa466820ac296f9774ecd64b9afd Mon Sep 17 00:00:00 2001 From: Zachary Pitts Date: Fri, 19 May 2023 16:23:33 -0700 Subject: [PATCH 1/8] using a ThreadPoolExecutor with matching number of works to socket pool size --- pybase/region/client.py | 100 ++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 66 deletions(-) diff --git a/pybase/region/client.py b/pybase/region/client.py index bfad8e4..cb1651c 100644 --- a/pybase/region/client.py +++ b/pybase/region/client.py @@ -17,10 +17,11 @@ import logging import socket +from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from io import BytesIO from struct import pack, unpack -from threading import Condition, Lock +from threading import current_thread, Condition, Lock from ..exceptions import (NoSuchColumnFamilyException, NotServingRegionException, PyBaseException, RegionMovedException, RegionOpeningException, RegionServerException) @@ -67,11 +68,10 @@ def __init__(self, host, port, secondary): self.host = host.decode('utf8') if isinstance(host, bytes) else host self.port = port.decode('utf8') if isinstance(port, bytes) else port self.pool_size = 0 - # We support connection pools so have lists of sockets and read/write - # mutexes on them. + + self.thread_pool = None self.sock_pool = [] - self.write_lock_pool = [] - self.read_lock_pool = [] + # Why yes, we do have a mutex protecting a single variable. self.call_lock = Lock() self.call_id = 0 @@ -142,7 +142,6 @@ def _send_request(self, rq, lock_timeout=10): to_send = pack(">IB", total_length - 4, len(serialized_header)) to_send += serialized_header + rpc_length_bytes + serialized_rpc - pool_id = my_id % self.pool_size try: # todo: quick hack to patch a deadlock happening here. Needs revisiting. with acquire_timeout(self.write_lock_pool[pool_id], lock_timeout) as acquired: @@ -158,10 +157,10 @@ def _send_request(self, rq, lock_timeout=10): # RegionServer dead? raise RegionServerException(region_client=self) # Message is sent! Now go listen for the results. - return self._receive_rpc(my_id, rq) + future = self.thread_pool.submit(Client.send_and_receive_rpc, [self, my_id, rq, to_send]) + return future.result() - # Called after sending an RPC, listens for the response and builds the - # correct pbResponse object. + # Sending an RPC, listens for the response and builds the correct pbResponse object. # # The raw bytes we receive are composed (in order) - # @@ -171,32 +170,30 @@ def _send_request(self, rq, lock_timeout=10): # 4. A varint representing the length of the serialized ResponseMessage. # 5. The ResponseMessage. # - def _receive_rpc(self, call_id, rq, data=None, lock_timeout=10): + @staticmethod + def send_and_receive_rpc(client, call_id, rq, to_send): + thread_name = current_thread().name + sp = thread_name.split("_") # i.e. splitting "ThreadPoolExecutor-1_0" + pool_id = int(sp[1]) # thread number is now responsible for only using its matching socket + + client.sock_pool[pool_id].send(to_send) + # If the field data is populated that means we should process from that # instead of the socket. - full_data = data - if data is None: - pool_id = call_id % self.pool_size - # Total message length is going to be the first four bytes - # (little-endian uint32) - with acquire_timeout(self.read_lock_pool[pool_id], lock_timeout) as acquired: - if acquired: - try: - msg_length = self._recv_n(self.sock_pool[pool_id], 4) - if msg_length is None: - raise - msg_length = unpack(">I", msg_length)[0] - # The message is then going to be however many bytes the first four - # bytes specified. We don't want to overread or underread as that'll - # cause havoc. - full_data = self._recv_n( - self.sock_pool[pool_id], msg_length) - except socket.error: - raise RegionServerException(region_client=self) - else: - logger.warning('Lock timeout receive %s RPC to %s:%s on pool port %s', - rq.type, self.host, self.port, pool_id) - raise RegionServerException(region_client=self) + full_data = None + # Total message length is going to be the first four bytes + # (little-endian uint32) + try: + msg_length = Client._recv_n(self.sock_pool[pool_id], 4) + if msg_length is None: + raise + msg_length = unpack(">I", msg_length)[0] + # The message is then going to be however many bytes the first four + # bytes specified. We don't want to overread or underread as that'll + # cause havoc. + full_data = Client._recv_n(self.sock_pool[pool_id], msg_length) + except socket.error: + raise RegionServerException(region_client=self) # Pass in the full data as well as your current position to the # decoder. It'll then return two variables: # - next_pos: The number of bytes of data specified by the varint @@ -205,11 +202,7 @@ def _receive_rpc(self, call_id, rq, data=None, lock_timeout=10): header = ResponseHeader() header.ParseFromString(full_data[pos: pos + next_pos]) pos += next_pos - if header.call_id != call_id: - # call_ids don't match? Looks like a different thread nabbed our - # response. - return self._bad_call_id(call_id, rq, header.call_id, full_data) - elif header.exception.exception_class_name != '': + if header.exception.exception_class_name != '': # If we're in here it means a remote exception has happened. exception_class = header.exception.exception_class_name if exception_class in \ @@ -234,35 +227,11 @@ def _receive_rpc(self, call_id, rq, data=None, lock_timeout=10): # The rpc is fully built! return rpc - # Receive an RPC with incorrect call_id? - # 1. Acquire lock - # 2. Place raw data into missed_rpcs with key call_id - # 3. Notify all other threads to wake up (nothing will happen until you release the lock) - # 4. WHILE: Your call_id is not in the dictionary - # 4.5 Call wait() on the conditional and get comfy. - # 5. Pop your data out - # 6. Release the lock - def _bad_call_id(self, my_id, my_request, msg_id, data, lock_timeout=10): - with acquire_timeout(self.missed_rpcs_lock, lock_timeout) as acquired: - if acquired: - logger.debug("Received invalid RPC ID. Got: %s, Expected: %s.", msg_id, my_id) - self.missed_rpcs[msg_id] = data - self.missed_rpcs_condition.notifyAll() - while my_id not in self.missed_rpcs: - if self.shutting_down: - raise RegionServerException(region_client=self) - self.missed_rpcs_condition.wait(lock_timeout) - new_data = self.missed_rpcs.pop(my_id) - logger.debug("Another thread found my RPC! RPC ID: %s", my_id) - else: - logger.warning('Lock timeout bad_call to %s:%s', self.host, self.port) - raise RegionServerException(region_client=self) - return self._receive_rpc(my_id, my_request, data=new_data) - # Receives exactly n bytes from the socket. Will block until n bytes are # received. If a socket is closed (RegionServer died) then raise an # exception that goes all the way back to the main client - def _recv_n(self, sock, n): + @staticmethod + def _recv_n(sock, n): partial_str = BytesIO() partial_len = 0 while partial_len < n: @@ -291,14 +260,13 @@ def NewClient(host, port, pool_size, secondary=False): c = Client(host, port, secondary) try: c.pool_size = pool_size + c.thread_pool = ThreadPoolExecutor(pool_size) for x in range(pool_size): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((c.host, int(port))) _send_hello(s) s.settimeout(2) c.sock_pool.append(s) - c.read_lock_pool.append(Lock()) - c.write_lock_pool.append(Lock()) except (socket.error, socket.timeout): return None return c From 9ea538fb9e0d4cb72f7b7bc60338e3b201b915f0 Mon Sep 17 00:00:00 2001 From: Zachary Pitts Date: Fri, 19 May 2023 18:09:41 -0700 Subject: [PATCH 2/8] removing more unneeded code --- pybase/region/client.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/pybase/region/client.py b/pybase/region/client.py index cb1651c..b19090a 100644 --- a/pybase/region/client.py +++ b/pybase/region/client.py @@ -142,21 +142,7 @@ def _send_request(self, rq, lock_timeout=10): to_send = pack(">IB", total_length - 4, len(serialized_header)) to_send += serialized_header + rpc_length_bytes + serialized_rpc - try: - # todo: quick hack to patch a deadlock happening here. Needs revisiting. - with acquire_timeout(self.write_lock_pool[pool_id], lock_timeout) as acquired: - if acquired: - logger.debug('Sending %s RPC to %s:%s on pool port %s', - rq.type, self.host, self.port, pool_id) - self.sock_pool[pool_id].send(to_send) - else: - logger.warning('Lock timeout sending %s RPC to %s:%s on pool port %s', - rq.type, self.host, self.port, pool_id) - raise RegionServerException(region_client=self) - except socket.error: - # RegionServer dead? - raise RegionServerException(region_client=self) - # Message is sent! Now go listen for the results. + # send and receive the request future = self.thread_pool.submit(Client.send_and_receive_rpc, [self, my_id, rq, to_send]) return future.result() From 2776bd2a071cb26570bfb5910f4c89aa8443b2c1 Mon Sep 17 00:00:00 2001 From: Zachary Pitts Date: Sat, 20 May 2023 06:13:17 -0700 Subject: [PATCH 3/8] send in the try block... where it belongs --- .gitignore | 4 +++- pybase/region/client.py | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 6c73837..4890598 100644 --- a/.gitignore +++ b/.gitignore @@ -108,4 +108,6 @@ venv.bak/ # mypy .mypy_cache/ .dmypy.json -dmypy.json \ No newline at end of file +dmypy.json + +.idea \ No newline at end of file diff --git a/pybase/region/client.py b/pybase/region/client.py index b19090a..c4435fe 100644 --- a/pybase/region/client.py +++ b/pybase/region/client.py @@ -162,14 +162,14 @@ def send_and_receive_rpc(client, call_id, rq, to_send): sp = thread_name.split("_") # i.e. splitting "ThreadPoolExecutor-1_0" pool_id = int(sp[1]) # thread number is now responsible for only using its matching socket - client.sock_pool[pool_id].send(to_send) - # If the field data is populated that means we should process from that # instead of the socket. full_data = None # Total message length is going to be the first four bytes # (little-endian uint32) try: + client.sock_pool[pool_id].send(to_send) + msg_length = Client._recv_n(self.sock_pool[pool_id], 4) if msg_length is None: raise From 56bf755f0c320acd0994e1af85b2ee1159420896 Mon Sep 17 00:00:00 2001 From: Zachary Pitts Date: Wed, 24 May 2023 09:45:31 -0700 Subject: [PATCH 4/8] switching send_and_receive_rpc to not be static --- pybase/region/client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pybase/region/client.py b/pybase/region/client.py index c4435fe..c484e40 100644 --- a/pybase/region/client.py +++ b/pybase/region/client.py @@ -143,7 +143,7 @@ def _send_request(self, rq, lock_timeout=10): to_send += serialized_header + rpc_length_bytes + serialized_rpc # send and receive the request - future = self.thread_pool.submit(Client.send_and_receive_rpc, [self, my_id, rq, to_send]) + future = self.thread_pool.submit(self.send_and_receive_rpc, rq, to_send) return future.result() # Sending an RPC, listens for the response and builds the correct pbResponse object. @@ -156,8 +156,8 @@ def _send_request(self, rq, lock_timeout=10): # 4. A varint representing the length of the serialized ResponseMessage. # 5. The ResponseMessage. # - @staticmethod - def send_and_receive_rpc(client, call_id, rq, to_send): + # @staticmethod + def send_and_receive_rpc(self, rq, to_send): thread_name = current_thread().name sp = thread_name.split("_") # i.e. splitting "ThreadPoolExecutor-1_0" pool_id = int(sp[1]) # thread number is now responsible for only using its matching socket @@ -168,7 +168,7 @@ def send_and_receive_rpc(client, call_id, rq, to_send): # Total message length is going to be the first four bytes # (little-endian uint32) try: - client.sock_pool[pool_id].send(to_send) + self.sock_pool[pool_id].send(to_send) msg_length = Client._recv_n(self.sock_pool[pool_id], 4) if msg_length is None: From 99af621c732ad7a6537ff4a13b3352fafbf33883 Mon Sep 17 00:00:00 2001 From: Zachary Pitts Date: Wed, 24 May 2023 12:20:22 -0700 Subject: [PATCH 5/8] receive_rpc will call itself if the call_id was not the same as the send --- pybase/region/client.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/pybase/region/client.py b/pybase/region/client.py index c484e40..fd34242 100644 --- a/pybase/region/client.py +++ b/pybase/region/client.py @@ -143,8 +143,8 @@ def _send_request(self, rq, lock_timeout=10): to_send += serialized_header + rpc_length_bytes + serialized_rpc # send and receive the request - future = self.thread_pool.submit(self.send_and_receive_rpc, rq, to_send) - return future.result() + future = self.thread_pool.submit(self.send_and_receive_rpc, my_id, rq, to_send) + return future.result(timeout=60) # Sending an RPC, listens for the response and builds the correct pbResponse object. # @@ -156,20 +156,26 @@ def _send_request(self, rq, lock_timeout=10): # 4. A varint representing the length of the serialized ResponseMessage. # 5. The ResponseMessage. # - # @staticmethod - def send_and_receive_rpc(self, rq, to_send): + def send_and_receive_rpc(self, call_id, rq, to_send): thread_name = current_thread().name sp = thread_name.split("_") # i.e. splitting "ThreadPoolExecutor-1_0" pool_id = int(sp[1]) # thread number is now responsible for only using its matching socket + try: + self.sock_pool[pool_id].send(to_send) + except socket.error: + raise RegionServerException(region_client=self) + + return self.receive_rpc(pool_id=pool_id, call_id=call_id, rq=rq) + + + def receive_rpc(self, pool_id, call_id, rq, data=None): # If the field data is populated that means we should process from that # instead of the socket. - full_data = None + full_data = data # Total message length is going to be the first four bytes # (little-endian uint32) try: - self.sock_pool[pool_id].send(to_send) - msg_length = Client._recv_n(self.sock_pool[pool_id], 4) if msg_length is None: raise @@ -188,7 +194,11 @@ def send_and_receive_rpc(self, rq, to_send): header = ResponseHeader() header.ParseFromString(full_data[pos: pos + next_pos]) pos += next_pos - if header.exception.exception_class_name != '': + if header.call_id != call_id: + # Receive an RPC with incorrect call_id, so call receive again to receive the next + # data on the socket. Most likely, this means that + return self.receive_rpc(pool_id, call_id, rq) + elif header.exception.exception_class_name != '': # If we're in here it means a remote exception has happened. exception_class = header.exception.exception_class_name if exception_class in \ @@ -213,6 +223,7 @@ def send_and_receive_rpc(self, rq, to_send): # The rpc is fully built! return rpc + # Receives exactly n bytes from the socket. Will block until n bytes are # received. If a socket is closed (RegionServer died) then raise an # exception that goes all the way back to the main client From 76538728fc3113f74338962e379589bacdd3b6f7 Mon Sep 17 00:00:00 2001 From: Zachary Pitts Date: Wed, 24 May 2023 12:30:23 -0700 Subject: [PATCH 6/8] comment cleanup --- pybase/region/client.py | 31 ++++--------------------------- 1 file changed, 4 insertions(+), 27 deletions(-) diff --git a/pybase/region/client.py b/pybase/region/client.py index fd34242..190d2b6 100644 --- a/pybase/region/client.py +++ b/pybase/region/client.py @@ -75,24 +75,7 @@ def __init__(self, host, port, secondary): # Why yes, we do have a mutex protecting a single variable. self.call_lock = Lock() self.call_id = 0 - # This dictionary and associated sync primitives are for when _receive_rpc - # receives an RPC that isn't theirs. If a thread gets one that isn't - # theirs it means there's another thread who also just sent an RPC. The - # other thread will also get the wrong call_id. So how do we make them - # switch RPCs? - # - # Receive an RPC with incorrect call_id? - # 1. Acquire lock - # 2. Place raw data into missed_rpcs with key call_id - # 3. Notify all other threads to wake up (nothing will happen until you release the - # lock) - # 4. WHILE: Your call_id is not in the dictionary - # 4.5 Call wait() on the conditional and get comfy. - # 5. Pop your data out - # 6. Release the lock - self.missed_rpcs = {} - self.missed_rpcs_lock = Lock() - self.missed_rpcs_condition = Condition(self.missed_rpcs_lock) + # Set to true when .close is called - this allows threads/greenlets # stuck in _bad_call_id to escape into the error handling code. self.shutting_down = False @@ -167,12 +150,10 @@ def send_and_receive_rpc(self, call_id, rq, to_send): return self.receive_rpc(pool_id=pool_id, call_id=call_id, rq=rq) - - def receive_rpc(self, pool_id, call_id, rq, data=None): - + def receive_rpc(self, pool_id, call_id, rq): # If the field data is populated that means we should process from that # instead of the socket. - full_data = data + full_data = None # Total message length is going to be the first four bytes # (little-endian uint32) try: @@ -196,7 +177,7 @@ def receive_rpc(self, pool_id, call_id, rq, data=None): pos += next_pos if header.call_id != call_id: # Receive an RPC with incorrect call_id, so call receive again to receive the next - # data on the socket. Most likely, this means that + # data on the socket. Likely, this means that that some caller abandoned their request return self.receive_rpc(pool_id, call_id, rq) elif header.exception.exception_class_name != '': # If we're in here it means a remote exception has happened. @@ -223,7 +204,6 @@ def receive_rpc(self, pool_id, call_id, rq, data=None): # The rpc is fully built! return rpc - # Receives exactly n bytes from the socket. Will block until n bytes are # received. If a socket is closed (RegionServer died) then raise an # exception that goes all the way back to the main client @@ -246,9 +226,6 @@ def close(self): sock.close() # We could still have greenlets waiting in the bad_call_id pools! Wake # them up so they can fail to error handling as well. - self.missed_rpcs_condition.acquire() - self.missed_rpcs_condition.notifyAll() - self.missed_rpcs_condition.release() # Creates a new RegionServer client. Creates the socket, initializes the From 2497c65cd5ea5bd7d40a906dc13eccd2b3645300 Mon Sep 17 00:00:00 2001 From: Zachary Pitts Date: Wed, 24 May 2023 13:47:49 -0700 Subject: [PATCH 7/8] adding call_timeout parameter to Client constructor --- pybase/region/client.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pybase/region/client.py b/pybase/region/client.py index 190d2b6..c9c19d8 100644 --- a/pybase/region/client.py +++ b/pybase/region/client.py @@ -64,12 +64,13 @@ class Client(object): # - call_id: A monotonically increasing int used as a sequence number for rpcs. This way # we can match incoming responses with the rpc that made the request. - def __init__(self, host, port, secondary): + def __init__(self, host, port, secondary, call_timeout=60): self.host = host.decode('utf8') if isinstance(host, bytes) else host self.port = port.decode('utf8') if isinstance(port, bytes) else port self.pool_size = 0 self.thread_pool = None + self.thread_pool_timeout = call_timeout self.sock_pool = [] # Why yes, we do have a mutex protecting a single variable. @@ -127,7 +128,7 @@ def _send_request(self, rq, lock_timeout=10): # send and receive the request future = self.thread_pool.submit(self.send_and_receive_rpc, my_id, rq, to_send) - return future.result(timeout=60) + return future.result(timeout=self.thread_pool_timeout) # Sending an RPC, listens for the response and builds the correct pbResponse object. # @@ -230,8 +231,8 @@ def close(self): # Creates a new RegionServer client. Creates the socket, initializes the # connection and returns an instance of Client. -def NewClient(host, port, pool_size, secondary=False): - c = Client(host, port, secondary) +def NewClient(host, port, pool_size, secondary=False, call_timeout=60): + c = Client(host, port, secondary, call_timeout) try: c.pool_size = pool_size c.thread_pool = ThreadPoolExecutor(pool_size) From 6886130bd28422aaff1b76b24ed1324cd2c95e7d Mon Sep 17 00:00:00 2001 From: Zachary Pitts Date: Wed, 24 May 2023 13:50:38 -0700 Subject: [PATCH 8/8] updating version to 4.0.0 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 4235244..01bf638 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import find_packages, setup setup(name='pybase', - version='0.3.3', + version='4.0.0', description='Native python client to hbase 1.0+', url='https://github.com/CurleySamuel/PyBase', author='Sam Curley',