Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use consistent hash to manage the topic #681

Conversation

ThisIsClark
Copy link
Collaborator

What this PR does / why we need it:
Use consistent hash to manage the topic of distributing job

Which issue this PR fixes (optional, in fixes #<issue number>(, fixes #<issue_number>, ...) format, will close that issue when PR gets merged): fixes #

Special notes for your reviewer:

Release note:

Use case:

  1. When metric task service launch, add it to partitioner group
  2. Leader node would watch the group change periodically
    • If a new node was added to the group, re-scan all exist jobs, to check which executor they should have
      • If the new executor of a job is same as the old executor, do nothing
      • If the new executor is different with the old executor, remove the job from the old executor and add it to the new executor
    • If a exist node was removed from the group, get all the jobs whose executor is node which was down, and re-distribute that
  3. When a distributor distribute a job, get executor from partitioner, and use it as the topic

Proposal:

  1. When the MetricsTaskManager initialize, add it to the group
  2. When a node becomes a leader, in the on_leading_callback schedule_boot_jobs, start a apscheduler to watch group change periodically
    • On on_node_join, re-scan all the exist jobs, and do different things according to whether the new executor is same as the old executor
    • On on_node_leave, re-distribute all the job whose executor is same as the node which is down
  3. In TaskDistributor.distribute_new_job, get the executor from the group as topic of RabbitMQ, in order to distribute the job to specific node

Test case:

  1. Every time a node up, a member would be added to the group
  2. Every time a node up, the watcher on leader node would get the event
  3. When watched a node join event, re-scan all the jobs and re-distribute the specific job to the new node
  4. Every time a node down, the member would be removed from the group
  5. Every time a node down, the watcher on leader node would get the event
  6. When watched a node leave event, re-distribute all the job whose executor is same as the node which is down
  7. When distribute new job, the topic should be different when the task meta data is different

except tooz.coordination.MemberAlreadyExist:
LOG.info('Member %s already in partitioner_group' % CONF.host)

def belong_to_host(self, task_id):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to change name ,may be get_executor

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified

members = part.members_for_object(task_id)
for member in members:
LOG.info('For task id %s, host should be %s' % (task_id, member))
return member
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our case, do we get more than one member in any scenario?

Copy link
Collaborator Author

@ThisIsClark ThisIsClark Sep 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but it returned a set with one item

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -320,3 +322,35 @@ def _get_redis_backend_url():
.format(backend_type=CONF.coordination.backend_type,
server=CONF.coordination.backend_server)
return backend_url


def on_node_join(event):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where we need to register these callback?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code updated, we will register it on leader node

executor = CONF.host
partitioner = ConsistentHashing()
partitioner.start()
executor = partitioner.belong_to_host(task_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitioner.belong_to_host returns a byte literal , we need to convert to string may be with
executor = partitioner.belong_to_host(task_id).decode('utf-8')

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks

@ThisIsClark ThisIsClark changed the title [WIP] Use consistent hash to manage the topic Use consistent hash to manage the topic Sep 2, 2021
@codecov
Copy link

codecov bot commented Sep 2, 2021

Codecov Report

Merging #681 (314a7be) into perf_coll_fw_enhance (a88486e) will increase coverage by 0.03%.
The diff coverage is 77.27%.

@@                   Coverage Diff                    @@
##           perf_coll_fw_enhance     #681      +/-   ##
========================================================
+ Coverage                 70.86%   70.90%   +0.03%     
========================================================
  Files                       161      161              
  Lines                     15231    15292      +61     
  Branches                   1867     1872       +5     
========================================================
+ Hits                      10794    10843      +49     
- Misses                     3819     3830      +11     
- Partials                    618      619       +1     
Impacted Files Coverage Δ
delfin/task_manager/metrics_manager.py 0.00% <0.00%> (ø)
...ager/scheduler/schedulers/telemetry/job_handler.py 76.55% <ø> (-0.32%) ⬇️
delfin/coordination.py 64.58% <68.18%> (+0.64%) ⬆️
delfin/task_manager/scheduler/schedule_manager.py 71.76% <88.57%> (+17.91%) ⬆️
...in/leader_election/distributor/task_distributor.py 67.56% <100.00%> (+3.93%) ⬆️
delfin/drivers/fake_storage/__init__.py 94.85% <0.00%> (-0.29%) ⬇️

@ThisIsClark ThisIsClark force-pushed the consistent_hashing branch 5 times, most recently from 8b615c7 to 258cf05 Compare September 2, 2021 14:02
if new_executor != origin_executor:
LOG.info('Re-distribute job %s from %s to %s' %
(task['id'], origin_executor, new_executor))
self.task_rpcapi.remove_job(self.ctx, task['id'],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removal of job would remove the task entry from DB! may be we need to separate remove_task and remove_job as separate handlers !

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, we need a remove_task which just remove it from scheduler and would not remove it from db

@@ -90,6 +131,13 @@ def schedule_boot_jobs(self):
'PerfJobManager',
coordination=True)
service.serve(job_generator)
partitioner = ConsistentHashing()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generic comment SCHEDULER_BOOT_JOBS loop can be removed now..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll find a place to handle the job remove and the loop would be removed

@@ -47,7 +48,10 @@ def __call__(self):
six.text_type(e))

def distribute_new_job(self, task_id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to remove def call ad we should have one methos to handle distribute_delete()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

(task['id'], origin_executor, new_executor))
self.task_rpcapi.remove_job(self.ctx, task['id'],
task['executor'])
distributor.distribute_new_job(task['id'])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be inside the if block, as ou want to distribute job only if the executor is different..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scenario I considered is that our db recorded it in executor A, but actually it didn't run on executor A.
So, IMO, I think this should be outside the if block, because even the executor is same as before, we need to confirm that the executor already run the task, so I think we can send it again. If the executor already has the task, it would be ignored and have no side effect.

Copy link
Member

@NajmudheenCT NajmudheenCT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sushanthakumar
Copy link
Collaborator

LGTM

@NajmudheenCT NajmudheenCT merged commit be004ce into sodafoundation:perf_coll_fw_enhance Sep 4, 2021
kumarashit added a commit that referenced this pull request Sep 14, 2021
* Make job scheduler local to task process (#674)

* Make job scheduler local to task process

* Notify distributor when a new task added (#678)

* Remove db-scan for new task creation (#680)

* Use consistent hash to manage the topic (#681)

* Remove the periodically call from task distributor (#686)

* Start one historic collection immediate when a job is rescheduled (#685)

* Start one historic collection immediate when a job is rescheduled

* Remove failed task distributor (#687)

* Improving Failed job handling and telemetry job removal (#689)

Co-authored-by: ThisIsClark <[email protected]>
Co-authored-by: Ashit Kumar <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants