-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fleet_executor] Add compute interceptor #37376
[fleet_executor] Add compute interceptor #37376
Conversation
Thanks for your contribution! |
6782f96
to
2e75832
Compare
acd19ea
to
3793ee5
Compare
3793ee5
to
d6a4958
Compare
void ComputeInterceptor::Compute(const InterceptorMessage& msg) { | ||
if (msg.message_type() == DATA_IS_READY) { | ||
auto src_id = msg.src_id(); | ||
upstream_deps_.erase(src_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里感觉erase不太好,这个Compute会被调用很多次,它的上下游是固定的话,一次compute清空,下一次算还要加回来,感觉。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是的,我也觉得不太好。特别是有多个上游情况下,如果有个上游产出多次,这个还会出错。不过目前是demo
,所以没那么讲究。
可能还是新建一个空的,然后填充对比比较好一些,但这样还是有上游产出多次的问题需要考虑下怎么解决。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我觉得完全不考虑micro scope可能有问题🤨
比如C同时依赖A,B的情况,且A的运行速度比B快。需要跑两个micro steps。
time | event | C's upstream_deps() |
---|---|---|
0 | C初始化 | (A, B) |
1 | A的micro step 0完成,A->C: DATA_IS_READY | (B) |
2 | A的micro step 1完成,A->C: DATA_IS_READY | (B) |
3 | B的micro step 0完成,B->C: DATA_IS_READY | (),C开始执行micro step 0 |
4 | C重新构建upstream_deps | (A, B) |
5 | B的micro step 1完成,B->C: DATA_IS_READY | (A) |
这样时间2这一刻,A给C发送的micro step1的DATA_IS_READY怎么处理?且在最后,AB都结束了两个micro steps的运行,但是C永远会在等A的第二个micro step的DATA_IS_READY。
现阶段我们的上下游依赖很简单,应该都是单依赖的。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
嗯,现在这个写的是个demo compute,后续需要有buffer作为流控,需要一个个buffer写,一个buffer写满了才能计算一个
@@ -76,7 +76,7 @@ bool Interceptor::Send(int64_t dst_id, InterceptorMessage& msg) { | |||
|
|||
void Interceptor::PoolTheMailbox() { | |||
// pool the local mailbox, parse the Message | |||
while (true) { | |||
for (;;) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😶🌫️
void ComputeInterceptor::Compute(const InterceptorMessage& msg) { | ||
if (msg.message_type() == DATA_IS_READY) { | ||
auto src_id = msg.src_id(); | ||
upstream_deps_.erase(src_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我觉得完全不考虑micro scope可能有问题🤨
比如C同时依赖A,B的情况,且A的运行速度比B快。需要跑两个micro steps。
time | event | C's upstream_deps() |
---|---|---|
0 | C初始化 | (A, B) |
1 | A的micro step 0完成,A->C: DATA_IS_READY | (B) |
2 | A的micro step 1完成,A->C: DATA_IS_READY | (B) |
3 | B的micro step 0完成,B->C: DATA_IS_READY | (),C开始执行micro step 0 |
4 | C重新构建upstream_deps | (A, B) |
5 | B的micro step 1完成,B->C: DATA_IS_READY | (A) |
这样时间2这一刻,A给C发送的micro step1的DATA_IS_READY怎么处理?且在最后,AB都结束了两个micro steps的运行,但是C永远会在等A的第二个micro step的DATA_IS_READY。
现阶段我们的上下游依赖很简单,应该都是单依赖的。
@@ -63,6 +69,8 @@ class TaskNode final { | |||
int64_t task_id_; | |||
int64_t max_run_times_; | |||
int64_t max_slot_nums_; | |||
|
|||
std::string type_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个是做什么的?区分interceptor种类用的?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是的,carrier构建用的,这段逻辑还没有加
|
||
InterceptorMessage msg; | ||
msg.set_message_type(DATA_IS_READY); | ||
a->Send(1, msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是不是可以给StopInterceptor加一个 finish的flag,这里wait那个flag。然后就可以delete那三个new出来的指针了?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个后续讨论一下,我觉得应该是通过析构来
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
TODO: add buffer
PR types
Others
PR changes
Others
Describe
Add
demo
compute interceptor