-
Notifications
You must be signed in to change notification settings - Fork 352
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use consistent hash to manage the topic #681
Use consistent hash to manage the topic #681
Conversation
delfin/coordination.py
Outdated
except tooz.coordination.MemberAlreadyExist: | ||
LOG.info('Member %s already in partitioner_group' % CONF.host) | ||
|
||
def belong_to_host(self, task_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest to change name ,may be get_executor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified
delfin/coordination.py
Outdated
members = part.members_for_object(task_id) | ||
for member in members: | ||
LOG.info('For task id %s, host should be %s' % (task_id, member)) | ||
return member |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our case, do we get more than one member in any scenario?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but it returned a set with one item
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
delfin/coordination.py
Outdated
@@ -320,3 +322,35 @@ def _get_redis_backend_url(): | |||
.format(backend_type=CONF.coordination.backend_type, | |||
server=CONF.coordination.backend_server) | |||
return backend_url | |||
|
|||
|
|||
def on_node_join(event): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where we need to register these callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code updated, we will register it on leader node
executor = CONF.host | ||
partitioner = ConsistentHashing() | ||
partitioner.start() | ||
executor = partitioner.belong_to_host(task_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitioner.belong_to_host returns a byte literal , we need to convert to string may be with
executor = partitioner.belong_to_host(task_id).decode('utf-8')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks
857f0b1
to
adffe4c
Compare
Codecov Report
@@ Coverage Diff @@
## perf_coll_fw_enhance #681 +/- ##
========================================================
+ Coverage 70.86% 70.90% +0.03%
========================================================
Files 161 161
Lines 15231 15292 +61
Branches 1867 1872 +5
========================================================
+ Hits 10794 10843 +49
- Misses 3819 3830 +11
- Partials 618 619 +1
|
8b615c7
to
258cf05
Compare
if new_executor != origin_executor: | ||
LOG.info('Re-distribute job %s from %s to %s' % | ||
(task['id'], origin_executor, new_executor)) | ||
self.task_rpcapi.remove_job(self.ctx, task['id'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removal of job would remove the task entry from DB! may be we need to separate remove_task and remove_job as separate handlers !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, we need a remove_task which just remove it from scheduler and would not remove it from db
@@ -90,6 +131,13 @@ def schedule_boot_jobs(self): | |||
'PerfJobManager', | |||
coordination=True) | |||
service.serve(job_generator) | |||
partitioner = ConsistentHashing() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generic comment SCHEDULER_BOOT_JOBS loop can be removed now..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll find a place to handle the job remove and the loop would be removed
@@ -47,7 +48,10 @@ def __call__(self): | |||
six.text_type(e)) | |||
|
|||
def distribute_new_job(self, task_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to remove def call ad we should have one methos to handle distribute_delete()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
(task['id'], origin_executor, new_executor)) | ||
self.task_rpcapi.remove_job(self.ctx, task['id'], | ||
task['executor']) | ||
distributor.distribute_new_job(task['id']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be inside the if block, as ou want to distribute job only if the executor is different..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scenario I considered is that our db recorded it in executor A, but actually it didn't run on executor A.
So, IMO, I think this should be outside the if block, because even the executor is same as before, we need to confirm that the executor already run the task, so I think we can send it again. If the executor already has the task, it would be ignored and have no side effect.
258cf05
to
723a44c
Compare
723a44c
to
e91f403
Compare
…ion/SIM into consistent_hashing
89e9359
to
314a7be
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
LGTM |
* Make job scheduler local to task process (#674) * Make job scheduler local to task process * Notify distributor when a new task added (#678) * Remove db-scan for new task creation (#680) * Use consistent hash to manage the topic (#681) * Remove the periodically call from task distributor (#686) * Start one historic collection immediate when a job is rescheduled (#685) * Start one historic collection immediate when a job is rescheduled * Remove failed task distributor (#687) * Improving Failed job handling and telemetry job removal (#689) Co-authored-by: ThisIsClark <[email protected]> Co-authored-by: Ashit Kumar <[email protected]>
What this PR does / why we need it:
Use consistent hash to manage the topic of distributing job
Which issue this PR fixes (optional, in
fixes #<issue number>(, fixes #<issue_number>, ...)
format, will close that issue when PR gets merged): fixes #Special notes for your reviewer:
Release note:
Use case:
Proposal:
MetricsTaskManager
initialize, add it to the groupon_leading_callback
schedule_boot_jobs
, start a apscheduler to watch group change periodicallyon_node_join
, re-scan all the exist jobs, and do different things according to whether the new executor is same as the old executoron_node_leave
, re-distribute all the job whose executor is same as the node which is downTaskDistributor.distribute_new_job
, get the executor from the group as topic of RabbitMQ, in order to distribute the job to specific nodeTest case: