Skip to content

Commit

Permalink
Merge pull request #16 from Flipboard/ibishop/support-multi
Browse files Browse the repository at this point in the history
Add support for `get_many`
  • Loading branch information
ianbishop authored Feb 20, 2024
2 parents 58cf1e1 + f747c2e commit 393cb97
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 29 deletions.
71 changes: 65 additions & 6 deletions pybase/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
limitations under the License.
"""
from __future__ import absolute_import, print_function, unicode_literals
from collections import defaultdict

import logging
from builtins import str
from concurrent.futures import as_completed
from itertools import chain
from threading import Condition, Lock

Expand All @@ -41,7 +43,7 @@

class MainClient(object):

def __init__(self, zkquorum, pool_size, secondary):
def __init__(self, zkquorum, pool_size, secondary, call_timeout=60):
# Location of the ZooKeeper quorum (csv)
self.zkquorum = zkquorum
# Connection pool size per region server (and master!)
Expand All @@ -62,6 +64,8 @@ def __init__(self, zkquorum, pool_size, secondary):
# Capture if this client is being used for secondary operations
# We don't really care if it fails, best effort only.
self.secondary = secondary
# How long to wait before a call times out
self.call_timeout = call_timeout

self.zk_client = zk.connect(zkquorum)

Expand Down Expand Up @@ -181,6 +185,59 @@ def get(self, table, key, families={}, filters=None):
e._handle_exception(self, dest_region=dest_region)
# Everything should be dandy now. Repeat the request!
return self.get(table, key, families=families, filters=filters)

def get_many(self, table, keys, families=None):
"""
get row or specified cell with optional filter for all provided keys
:param table: hbase table
:param key: list of row key
:param families: (optional) specifies columns to get,
e.g., {"columnFamily1":["col1","col2"], "colFamily2": "col3"}
:return: tuple of (list of responses with cells, list of exceptions that occurred)
"""
if len(keys) == 0:
return []

grouped_by_server = defaultdict(lambda: defaultdict(list))
for key in keys:
dest_region = self._find_hosting_region(table, key)
# we must call each region server, which can server many key ranges
grouped_by_server[dest_region.region_client.host][dest_region].append(key)

results = []
errors = []
tasks = []
for grouped_by_region in grouped_by_server.values():
try:
dest_region = next(iter(grouped_by_region.keys()))
client = dest_region.region_client
rq = request.multi_get(grouped_by_region, families)
tasks.append(client._send_request(rq, _async=True))
except PyBaseException as e:
e._handle_exception(self, dest_region=dest_region)
errors.append(e)

try:
for f in as_completed(tasks, timeout=self.call_timeout * len(grouped_by_server)):
try:
response = f.result()
for ra_result in response.regionActionResult:
if ra_result.exception.name != "":
errors.append(client._parse_exception(ra_result.exception.name,
ra_result.exception.value))
else:
for res_or_err in ra_result.resultOrException:
if res_or_err.exception.name != "":
errors.append(client._parse_exception(res_or_err.exception.name,
res_or_err.exception.value))
else:
results.append(Result(res_or_err))
except PyBaseException as e:
e._handle_exception(self, dest_region=dest_region)
errors.append(e)
except TimeoutError:
errors.append(e)
return results, errors

def put(self, table, key, values):
return self._mutate(table, key, values, request.put_request)
Expand Down Expand Up @@ -395,7 +452,8 @@ def _create_new_region(self, response, table):
new_region.region_client = self.reverse_client_cache[server_loc]
else:
# Otherwise we need to create a new region client instance.
new_client = region.NewClient(host, port, self.pool_size, secondary=self.secondary)
new_client = region.NewClient(host, port, self.pool_size,
secondary=self.secondary, call_timeout=self.call_timeout)
if new_client is None:
# Welp. We can't connect to the server that the Master
# supplied. Raise an exception.
Expand All @@ -417,8 +475,9 @@ def update_master_client(self, ip, port):

try:
# Try creating a new client instance and setting it as the new master_client.
self.master_client = region.NewClient(
ip, port, self.pool_size, secondary=self.secondary)
self.master_client = region.NewClient(ip, port, self.pool_size,
secondary=self.secondary,
call_timeout=self.call_timeout)
logger.info("Updated master client to %s:%s", ip, port)
except RegionServerException:
raise MasterServerException(ip, port, secondary=self.secondary)
Expand Down Expand Up @@ -523,5 +582,5 @@ def _append_response(self, rsp):
# location of ZooKeeper this function will ask ZK for the location of the
# meta table and create the region client responsible for future meta
# lookups (masterclient). Returns an instance of MainClient
def NewClient(zkquorum, socket_pool_size=1, secondary=False):
return MainClient(zkquorum, socket_pool_size, secondary)
def NewClient(zkquorum, socket_pool_size=1, secondary=False, call_timeout=60):
return MainClient(zkquorum, socket_pool_size, secondary, call_timeout=call_timeout)
48 changes: 27 additions & 21 deletions pybase/region/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from ..exceptions import (NoSuchColumnFamilyException, NotServingRegionException, PyBaseException,
RegionMovedException, RegionOpeningException, RegionServerException)
from ..helpers import varint
from ..pb.Client_pb2 import GetResponse, MutateResponse, ScanResponse
from ..pb.Client_pb2 import GetResponse, MutateResponse, ScanResponse, MultiResponse
from ..pb.RPC_pb2 import ConnectionHeader, RequestHeader, ResponseHeader

logger = logging.getLogger(__name__)
Expand All @@ -40,7 +40,8 @@
response_types = {
b"Get": GetResponse,
b"Mutate": MutateResponse,
b"Scan": ScanResponse
b"Scan": ScanResponse,
b"Multi": MultiResponse
}


Expand Down Expand Up @@ -101,7 +102,7 @@ def __init__(self, host, port, secondary, call_timeout=60):
# 4. A varint representing the length of the serialized RPC.
# 5. The serialized RPC.
#
def _send_request(self, rq, lock_timeout=10):
def _send_request(self, rq, lock_timeout=10, _async=False):
with acquire_timeout(self.call_lock, lock_timeout) as acquired:
if acquired:
my_id = self.call_id
Expand All @@ -128,6 +129,8 @@ def _send_request(self, rq, lock_timeout=10):

# send and receive the request
future = self.thread_pool.submit(self.send_and_receive_rpc, my_id, rq, to_send)
if _async:
return future
return future.result(timeout=self.thread_pool_timeout)

# Sending an RPC, listens for the response and builds the correct pbResponse object.
Expand All @@ -150,6 +153,23 @@ def send_and_receive_rpc(self, call_id, rq, to_send):
raise RegionServerException(region_client=self)

return self.receive_rpc(pool_id=pool_id, call_id=call_id, rq=rq)

def _parse_exception(self, exception_class, stack_trace):
if exception_class in ('org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException',
"java.io.IOException"):
return NoSuchColumnFamilyException()
elif exception_class == 'org.apache.hadoop.hbase.exceptions.RegionMovedException':
return RegionMovedException(region_client=self)
elif exception_class == 'org.apache.hadoop.hbase.NotServingRegionException':
return NotServingRegionException(region_client=self)
elif exception_class == \
'org.apache.hadoop.hbase.regionserver.RegionServerStoppedException':
return RegionServerException(region_client=self)
elif exception_class == 'org.apache.hadoop.hbase.exceptions.RegionOpeningException':
return RegionOpeningException(region_client=self)
else:
return PyBaseException(
exception_class + ". Remote traceback:\n%s" % stack_trace)

def receive_rpc(self, pool_id, call_id, rq):
# If the field data is populated that means we should process from that
Expand All @@ -166,7 +186,7 @@ def receive_rpc(self, pool_id, call_id, rq):
# bytes specified. We don't want to overread or underread as that'll
# cause havoc.
full_data = Client._recv_n(self.sock_pool[pool_id], msg_length)
except socket.error:
except socket.error as e:
raise RegionServerException(region_client=self)
# Pass in the full data as well as your current position to the
# decoder. It'll then return two variables:
Expand All @@ -183,22 +203,8 @@ def receive_rpc(self, pool_id, call_id, rq):
elif header.exception.exception_class_name != '':
# If we're in here it means a remote exception has happened.
exception_class = header.exception.exception_class_name
if exception_class in \
{'org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException',
"java.io.IOException"}:
raise NoSuchColumnFamilyException()
elif exception_class == 'org.apache.hadoop.hbase.exceptions.RegionMovedException':
raise RegionMovedException(region_client=self)
elif exception_class == 'org.apache.hadoop.hbase.NotServingRegionException':
raise NotServingRegionException(region_client=self)
elif exception_class == \
'org.apache.hadoop.hbase.regionserver.RegionServerStoppedException':
raise RegionServerException(region_client=self)
elif exception_class == 'org.apache.hadoop.hbase.exceptions.RegionOpeningException':
raise RegionOpeningException(region_client=self)
else:
raise PyBaseException(
exception_class + ". Remote traceback:\n%s" % header.exception.stack_trace)
if err := self._parse_exception(exception_class, header.exception.stack_trace):
raise err
next_pos, pos = decoder(full_data, pos)
rpc = response_types[rq.type]()
rpc.ParseFromString(full_data[pos: pos + next_pos])
Expand Down Expand Up @@ -240,7 +246,7 @@ def NewClient(host, port, pool_size, secondary=False, call_timeout=60):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((c.host, int(port)))
_send_hello(s)
s.settimeout(2)
s.settimeout(call_timeout)
c.sock_pool.append(s)
except (socket.error, socket.timeout):
return None
Expand Down
19 changes: 18 additions & 1 deletion pybase/request/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from ..exceptions import MalformedFamilies, MalformedValues
from ..filters import _to_filter
from ..pb.Client_pb2 import Column, GetRequest, MutateRequest, MutationProto, ScanRequest
from ..pb.Client_pb2 import Action, Column, GetRequest, MultiRequest, MutateRequest, MutationProto, RegionAction, ScanRequest

# Table + Family used when requesting meta information from the
# MetaRegionServer
Expand Down Expand Up @@ -42,6 +42,23 @@ def get_request(region, key, families, filters):
rq.get.filter.CopyFrom(pbFilter)
return Request(b"Get", rq)

def region_action(region, keys, families):
ra = RegionAction()
ra.region.type = 1
ra.region.value = region.region_name
ra.atomic = False
for key in keys:
action = Action()
action.get.row = key
action.get.column.extend(families_to_columns(families))
ra.action.append(action)
return ra

def multi_get(regions_and_keys, families):
rq = MultiRequest()
rq.regionAction.extend([region_action(region, keys, families)
for region, keys in regions_and_keys.items()])
return Request(b"Multi", rq)

def put_request(region, key, values):
rq = MutateRequest()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import find_packages, setup

setup(name='pybase',
version='4.0.0',
version='4.1.0',
description='Native python client to hbase 1.0+',
url='https://github.com/CurleySamuel/PyBase',
author='Sam Curley',
Expand Down

0 comments on commit 393cb97

Please sign in to comment.