-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix functionality bugs #38
base: dev
Are you sure you want to change the base?
Conversation
7121aeb
to
c3225b0
Compare
@ChunxuTang Can you help check whether my implementation using async to refresh worker list is correct? |
@LuQQiu Did some basic tests for ETCD? It worked. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still not quite sure whether using asyncio inside sync program is a good idea or not.
For this PR, mainly need to consider how to deal with the asyncio event loop, should we get it or create it and should we destroy it.
) | ||
self.http_port = http_port | ||
|
||
def __del__(self): | ||
self.hash_provider.__del__() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this may not be needed? since hash_provider instance will be garbage collected once no reference to the object?
but the del in hash_provider may be needed to clean up the asyncio resources?
alluxio/worker_ring.py
Outdated
self._logger.error(f"Error updating hash ring: {e}") | ||
await asyncio.sleep(interval) | ||
|
||
self._task = asyncio.create_task(update_loop()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Async io create_task looks like will not create the event loop automatically, not quite sure whether it will introduce some problems.
and cancel task will also not destroy the event loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The asyncio. run() function creates an event loop, runs the coroutine passed to it, and then closes the event loop when the coroutine completes.
alluxio/worker_ring.py
Outdated
|
||
self._lock = threading.Lock() | ||
self._is_ring_initialized = False | ||
self._update_hash_ring_internal(self._worker_addresses) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ssyssy please follow LuQQiu@33326c2 to modify here
because here won't do anything because
def _update_hash_ring_internal(
self, worker_addresses: Set[WorkerNetAddress]
):
with self._lock:
if worker_addresses == self._worker_addresses:
return
@jja725 @ChunxuTang please help review the asyncio code |
while True: | ||
try: | ||
self._update_hash_ring_if_needed() | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is recommended that coroutines use try/finally blocks to robustly perform clean-up logic. In case asyncio.CancelledError is explicitly caught, it should generally be propagated when clean-up is complete. asyncio.CancelledError directly subclasses BaseException so most code will not need to be aware of it.
No description provided.