Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ceache committed Feb 12, 2023
1 parent 5225b3e commit 33573da
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 1 deletion.
23 changes: 23 additions & 0 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(
ca=None,
use_ssl=False,
verify_certs=True,
concurrent_request_limit=0,
**kwargs,
):
"""Create a :class:`KazooClient` instance. All time arguments
Expand Down Expand Up @@ -241,6 +242,18 @@ def __init__(
self.keyfile = keyfile
self.keyfile_password = keyfile_password
self.ca = ca
if concurrent_request_limit > 0:
self.logger.info(
"Zookeeper client rate-limited to %d concurrent requests",
concurrent_request_limit,
)
self.rate_limiting_sem = self.handler.semaphore_impl(
concurrent_request_limit
)

else:
self.rate_limiting_sem = None

# Curator like simplified state tracking, and listeners for
# state transitions
self._state = KeeperState.CLOSED
Expand Down Expand Up @@ -635,6 +648,16 @@ def _call(self, request, async_object):
async_object.set_exception(SessionExpiredError())
return False

if self.rate_limiting_sem:
if not self.rate_limiting_sem.acquire(blocking=False):
self.logger.info(
"Limiting concurrent requests. Waiting for completion.",
concurrent_request_limit,
)
# Actually block on the sempahore here
self.rate_limiting_sem.acquire(blocking=True)
async_object.rawlink(lambda _res: self.rate_limiting_sem.release())

self._queue.append((request, async_object))

# wake the connection, guarding against a race with close()
Expand Down
2 changes: 2 additions & 0 deletions kazoo/handlers/eventlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from eventlet.green import threading as green_threading
from eventlet.green import selectors as green_selectors
from eventlet import queue as green_queue
from eventlet import semaphore as green_semaphore

from kazoo.handlers import utils
from kazoo.handlers.utils import selector_select
Expand Down Expand Up @@ -80,6 +81,7 @@ class SequentialEventletHandler(object):
name = "sequential_eventlet_handler"
queue_impl = green_queue.LightQueue
queue_empty = green_queue.Empty
semaphore_impl = green_semaphore.BoundedSemaphore

def __init__(self):
"""Create a :class:`SequentialEventletHandler` instance"""
Expand Down
7 changes: 6 additions & 1 deletion kazoo/handlers/gevent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""A gevent based handler."""

from __future__ import absolute_import

import atexit
Expand All @@ -14,7 +15,10 @@

from kazoo.handlers.utils import selector_select

from gevent.lock import Semaphore, RLock
from gevent.lock import (
BoundedSemaphore as Semaphore,
RLock as RLock,
)

from kazoo.handlers import utils

Expand Down Expand Up @@ -52,6 +56,7 @@ class SequentialGeventHandler(object):
queue_impl = gevent.queue.Queue
queue_empty = gevent.queue.Empty
sleep_func = staticmethod(gevent.sleep)
semaphore_impl = Semaphore

def __init__(self):
"""Create a :class:`SequentialGeventHandler` instance"""
Expand Down
2 changes: 2 additions & 0 deletions kazoo/handlers/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
:class:`~kazoo.handlers.gevent.SequentialGeventHandler` instead.
"""

from __future__ import absolute_import

import atexit
Expand Down Expand Up @@ -95,6 +96,7 @@ class SequentialThreadingHandler(object):
sleep_func = staticmethod(time.sleep)
queue_impl = queue.Queue
queue_empty = queue.Empty
semaphore_impl = threading.BoundedSemaphore

def __init__(self):
"""Create a :class:`SequentialThreadingHandler` instance"""
Expand Down

0 comments on commit 33573da

Please sign in to comment.