Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broadcast_message example deadlocks #269

Closed
marshauf opened this issue Aug 30, 2020 · 9 comments · Fixed by #270
Closed

broadcast_message example deadlocks #269

marshauf opened this issue Aug 30, 2020 · 9 comments · Fixed by #270

Comments

@marshauf
Copy link

Hello,

the broadcast_message example deadlocks with warnings about "bastion::dispatcher: The message can't be delivered to the group with the 'Processing' name.". Workers are started after the warnings.

When I change the supervisor creation order to: response_supervisor, map_supervisor, input_supervisor results vary but sometimes some messages get processed by a response_group child. Sometimes a bastion-async-thread thread panics with:

thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#da488c05-d4af-4618-a49b-71f12933e0f4/children#00f54fd8-e2b0-41e7-ba19-6c4fe469b502/child#f5252966-e198-4e59-ab49-3aec8d692eea, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775809, message_queue: Queue { head: 0x558b97aef3d0, tail: UnsafeCell }, num_senders: 21, recv_task: AtomicWaker } })) } }', src/main.rs:149:72
stack backtrace:
   0: backtrace::backtrace::libunwind::trace
             at /cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/backtrace-0.3.46/src/backtrace/libunwind.rs:86
   1: backtrace::backtrace::trace_unsynchronized
             at /cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/backtrace-0.3.46/src/backtrace/mod.rs:66
   2: std::sys_common::backtrace::_print_fmt
             at src/libstd/sys_common/backtrace.rs:78
   3: <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt
             at src/libstd/sys_common/backtrace.rs:59
   4: core::fmt::write
             at src/libcore/fmt/mod.rs:1076
   5: std::io::Write::write_fmt
             at src/libstd/io/mod.rs:1537
   6: std::sys_common::backtrace::_print
             at src/libstd/sys_common/backtrace.rs:62
   7: std::sys_common::backtrace::print
             at src/libstd/sys_common/backtrace.rs:49
   8: std::panicking::default_hook::{{closure}}
             at src/libstd/panicking.rs:198
   9: std::panicking::default_hook
             at src/libstd/panicking.rs:217
  10: std::panicking::rust_panic_with_hook
             at src/libstd/panicking.rs:526
  11: rust_begin_unwind
             at src/libstd/panicking.rs:437
  12: core::panicking::panic_fmt
             at src/libcore/panicking.rs:85
  13: core::option::expect_none_failed
             at src/libcore/option.rs:1269
  14: core::result::Result<T,E>::unwrap
             at /home/marcel/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/result.rs:1005
  15: broadcast_message::response_group::{{closure}}::{{closure}}
             at src/main.rs:149
  16: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /home/marcel/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/future/mod.rs:78
  17: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /home/marcel/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/future/future.rs:119
  18: <bastion::child::Exec as core::future::future::Future>::poll
             at /home/marcel/.cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/bastion-0.4.2/src/child.rs:429
  19: <&mut F as core::future::future::Future>::poll
             at /home/marcel/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/future/future.rs:107
  20: futures_util::future::future::FutureExt::poll_unpin
             at /home/marcel/.cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/futures-util-0.3.5/src/future/future/mod.rs:554
  21: <futures_util::async_await::poll::PollOnce<F> as core::future::future::Future>::poll
             at /home/marcel/.cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/futures-util-0.3.5/src/async_await/poll.rs:33
  22: bastion::child::Child::run::{{closure}}
             at /home/marcel/.cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/bastion-0.4.2/src/child.rs:368
  23: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /home/marcel/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/future/mod.rs:78
  24: <std::panic::AssertUnwindSafe<F> as core::future::future::Future>::poll
             at /home/marcel/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libstd/panic.rs:335
  25: <lightproc::catch_unwind::CatchUnwind<F> as core::future::future::Future>::poll::{{closure}}
             at /home/marcel/.cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/lightproc-0.3.5/src/catch_unwind.rs:34
  26: <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /home/marcel/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libstd/panic.rs:318
  27: std::panicking::try::do_call
             at /home/marcel/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libstd/panicking.rs:348
  28: __rust_try
  29: std::panicking::try
             at /home/marcel/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libstd/panicking.rs:325
  30: std::panic::catch_unwind
             at /home/marcel/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libstd/panic.rs:394
  31: <lightproc::catch_unwind::CatchUnwind<F> as core::future::future::Future>::poll
             at /home/marcel/.cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/lightproc-0.3.5/src/catch_unwind.rs:34
  32: lightproc::raw_proc::RawProc<F,R,S>::run
             at /home/marcel/.cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/lightproc-0.3.5/src/raw_proc.rs:403
  33: lightproc::lightproc::LightProc::run
             at /home/marcel/.cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/lightproc-0.3.5/src/lightproc.rs:169
  34: bastion_executor::worker::main_loop::{{closure}}
             at /home/marcel/.cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/bastion-executor-0.3.6/src/worker.rs:147
  35: bastion_executor::worker::set_stack::{{closure}}
             at /home/marcel/.cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/bastion-executor-0.3.6/src/worker.rs:42
  36: std::thread::local::LocalKey<T>::try_with
             at /home/marcel/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libstd/thread/local.rs:263
  37: std::thread::local::LocalKey<T>::with
             at /home/marcel/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libstd/thread/local.rs:239
  38: bastion_executor::worker::set_stack
             at /home/marcel/.cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/bastion-executor-0.3.6/src/worker.rs:38
  39: bastion_executor::worker::main_loop
             at /home/marcel/.cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/bastion-executor-0.3.6/src/worker.rs:147
  40: bastion_executor::distributor::Distributor::assign::{{closure}}
             at /home/marcel/.cargo/registry/src/jackfan.us.kg-1ecc6299db9ec823/bastion-executor-0.3.6/src/distributor.rs:39
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

And the program deadlocks and has to be interrupted.

A similar problem occurs in my code. Some messages get processed but not all. Depending on the amount of children which process messages and the amountt of messages inserted, all, some or none get completly processed. It seems like broadcasting a message occupies two children sometimes.

Versions:
name = "bastion" version = "0.4.2"
name = "bastion-executor" version = "0.3.6"
name = "lightproc" version = "0.3.5"

Platform:
Linux linux-i9e5 5.8.2-1-default #1 SMP Wed Aug 19 09:43:15 UTC 2020 (71b519a) x86_64 x86_64 x86_64 GNU/Linux

Code:
https://github.com/bastion-rs/bastion/blob/master/src/bastion/examples/broadcast_message.rs

@o0Ignition0o
Copy link
Contributor

Hi, thanks for such a thorough description!

I think I've stumbled on a similar issue in this commit

Could you please target the master branch and tell me how it goes ?

Thanks again!

@marshauf
Copy link
Author

Thank you. It' seems to be improved but the master branch doesn't fix the issue.

The unmodified example still does not process any messages most of the time. On one run a thread panicked.

thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#d8c21683-e504-4e8d-92a6-231963f5af65/children#bbf3cb01-09dd-4c07-b63a-328ed9992d05/child#b1135e59-b2ec-428b-aad4-a716e892cc37, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775808, message_queue: Queue { head: 0x7f43c8000e40, tail: UnsafeCell }, num_senders: 17, recv_task: AtomicWaker } })) } }', src/main.rs:149:72
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

On most runs with the modified example, I received completely processed messages. A few runs didn't complete. A few runs had panicked threads.
On a successful run a few warnings were logged: WARN bastion::dispatcher: sending message to child 1/1 - /d00ef5cd-cf35-4a7f-92d7-6c69a56c89ee/69e87b3c-68b1-46c3-a45c-9a53b93094e3/ad7b50a5-c106-46d0-904d-4136024a5339
The panicked threads looked like this:

thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#96126a55-a2d8-42c1-842c-b2755165a28b/children#304183ac-490a-4ce6-890c-b1747f4fbb97/child#6a3620df-0b68-41b8-beed-4a23a94c2496, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775809, message_queue: Queue { head: 0x55f9de0a2830, tail: UnsafeCell }, num_senders: 13, recv_task: AtomicWaker } })) } }', src/main.rs:96:64
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#b3b68710-56a4-402c-a1bf-795387b7a116/children#1347ae6b-c90d-4087-bae1-f7abdc1150c0/child#15086617-421c-460a-83ee-e6c67233c8fc, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775809, message_queue: Queue { head: 0x55f9de09e8a0, tail: UnsafeCell }, num_senders: 15, recv_task: AtomicWaker } })) } }', src/main.rs:149:72

@o0Ignition0o
Copy link
Contributor

Super interesting, I think I might be able to reproduce it with the info you gave me, on my way! :)

@o0Ignition0o
Copy link
Contributor

Soooo I think the order is kind of wrong ? I mean we're sending things before someone is available to receive them which will... go bad!

@o0Ignition0o
Copy link
Contributor

o0Ignition0o commented Aug 30, 2020

@marshauf can you please try to target igni/broadcast_message_example_order and let me know how it goes ? :)

@marshauf
Copy link
Author

I made the same modification to the example, too. As I wrote it improved it a lot. Targeting the master branch improved it further. But a few runs still don't end with [Response] Aggregated data: `{"C": 5, "A": 2, "B": 2}. They look like this:

Aug 30 19:37:20.903  INFO bastion::system: System: Initializing.
Aug 30 19:37:20.904  INFO bastion::system: System: Launched.
Aug 30 19:37:20.905  INFO broadcast_message: [Response] Worker started!
Aug 30 19:37:20.906  INFO broadcast_message: [Processing] Worker started!
Aug 30 19:37:20.906  INFO broadcast_message: [Processing] Worker started!
Aug 30 19:37:20.906  INFO broadcast_message: [Processing] Worker started!
Aug 30 19:37:20.906  INFO broadcast_message: [Input] Worker started!
Aug 30 19:37:20.907  INFO bastion::system: System: Starting.
Aug 30 19:37:20.907  INFO bastion::system: System: Launching Supervisor(00000000-0000-0000-0000-000000000000).
Aug 30 19:37:20.907  WARN bastion::dispatcher: sending message to child 1/3 - /36a9c73a-169d-4a4c-a355-38e73e0a79de/465c387f-fb6c-4937-88fe-4ae21a4410cf/8d50486f-dcf6-472d-b095-c58e3ab8157a
[Processing] Worker #BastionId(8d50486f-dcf6-472d-b095-c58e3ab8157a) received `A B C`
[Processing] Worker process #BastionId(8d50486f-dcf6-472d-b095-c58e3ab8157a) processed data. Result: `{"B": 1, "A": 1, "C": 1}`
Aug 30 19:37:20.913  INFO bastion::system: System: Launching Supervisor(cc1c6744-e8ee-48f3-b7ac-ef18c688c07e).
Aug 30 19:37:20.913  INFO bastion::system: System: Launching Supervisor(36a9c73a-169d-4a4c-a355-38e73e0a79de).
Aug 30 19:37:20.913  WARN bastion::dispatcher: sending message to child 2/3 - /36a9c73a-169d-4a4c-a355-38e73e0a79de/465c387f-fb6c-4937-88fe-4ae21a4410cf/6593cc53-f6af-4314-a428-434aed05c5c2
Aug 30 19:37:20.913  WARN bastion::dispatcher: sending message to child 1/1 - /cc1c6744-e8ee-48f3-b7ac-ef18c688c07e/f46a9904-bdcb-4e52-b15a-45c6cabb2983/390a9e42-7096-4fbf-825a-bb913b546155
thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#b97b1675-10b0-4289-a2dc-dfa29f6be134/children#6733e2c0-df0e-4081-9c3e-210bcf84f76c/child#92825160-b515-41b0-910a-8ba81233af2a, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775808, message_queue: Queue { head: 0x562d27b98e60, tail: UnsafeCell }, num_senders: 13, recv_task: AtomicWaker } })) } }', src/main.rs:96:64
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Aug 30 19:37:20.914  INFO bastion::system: System: Launching Supervisor(b97b1675-10b0-4289-a2dc-dfa29f6be134).
thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#36a9c73a-169d-4a4c-a355-38e73e0a79de/children#465c387f-fb6c-4937-88fe-4ae21a4410cf/child#8d50486f-dcf6-472d-b095-c58e3ab8157a, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775808, message_queue: Queue { head: 0x562d27b951c0, tail: UnsafeCell }, num_senders: 20, recv_task: AtomicWaker } })) } }', src/main.rs:149:72
Aug 30 19:37:20.914  WARN bastion::child: Child(6593cc53-f6af-4314-a428-434aed05c5c2): Panicked.
Aug 30 19:37:20.914  WARN bastion::child: Child(390a9e42-7096-4fbf-825a-bb913b546155): Panicked.
Aug 30 19:37:20.915  WARN bastion::dispatcher: sending message to child 3/3 - /36a9c73a-169d-4a4c-a355-38e73e0a79de/465c387f-fb6c-4937-88fe-4ae21a4410cf/1860640f-1693-4868-be84-6c7c3c7b27d2
[Processing] Worker #BastionId(1860640f-1693-4868-be84-6c7c3c7b27d2) received `B C C`
[Processing] Worker process #BastionId(1860640f-1693-4868-be84-6c7c3c7b27d2) processed data. Result: `{"C": 2, "B": 1}`
Aug 30 19:37:20.915  INFO broadcast_message: [Response] Worker started!
Aug 30 19:37:20.916  INFO broadcast_message: [Processing] Worker started!
Aug 30 19:37:20.916  WARN bastion::dispatcher: sending message to child 1/1 - /cc1c6744-e8ee-48f3-b7ac-ef18c688c07e/f46a9904-bdcb-4e52-b15a-45c6cabb2983/390a9e42-7096-4fbf-825a-bb913b546155
^C

or this:

Aug 30 19:38:45.240  INFO bastion::system: System: Initializing.
Aug 30 19:38:45.241  INFO bastion::system: System: Launched.
Aug 30 19:38:45.242  INFO broadcast_message: [Response] Worker started!
Aug 30 19:38:45.243  INFO broadcast_message: [Processing] Worker started!
Aug 30 19:38:45.243  INFO broadcast_message: [Processing] Worker started!
Aug 30 19:38:45.243  INFO broadcast_message: [Input] Worker started!
Aug 30 19:38:45.243  INFO broadcast_message: [Processing] Worker started!
Aug 30 19:38:45.243  INFO bastion::system: System: Starting.
Aug 30 19:38:45.243  WARN bastion::dispatcher: sending message to child 1/3 - /7d2e9f58-d9b3-4230-bd67-113d03125c7b/529db7a5-b381-4f44-b7c6-b41fa49788d4/07238cf0-43e8-49c3-84bc-686dba366bb0
Aug 30 19:38:45.243  INFO bastion::system: System: Launching Supervisor(00000000-0000-0000-0000-000000000000).
[Processing] Worker #BastionId(07238cf0-43e8-49c3-84bc-686dba366bb0) received `A B C`
[Processing] Worker process #BastionId(07238cf0-43e8-49c3-84bc-686dba366bb0) processed data. Result: `{"B": 1, "A": 1, "C": 1}`
Aug 30 19:38:45.243  INFO bastion::system: System: Launching Supervisor(599b31c6-a4a1-4503-b62c-930c0b2f0699).
Aug 30 19:38:45.244  INFO bastion::system: System: Launching Supervisor(7d2e9f58-d9b3-4230-bd67-113d03125c7b).
Aug 30 19:38:45.244  INFO bastion::system: System: Launching Supervisor(14ced271-b562-4d2b-9ff9-5a7a7889599f).
Aug 30 19:38:45.244  WARN bastion::dispatcher: sending message to child 2/3 - /7d2e9f58-d9b3-4230-bd67-113d03125c7b/529db7a5-b381-4f44-b7c6-b41fa49788d4/a2bbd0b9-62c4-48b0-8020-ff1b7b484bc4
Aug 30 19:38:45.244  WARN bastion::dispatcher: sending message to child 1/1 - /599b31c6-a4a1-4503-b62c-930c0b2f0699/d4fa4651-745b-45d5-9ff5-4f9cbc868351/52ea34e4-9eab-4fb4-bb37-42fb8e99e4f8
thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#14ced271-b562-4d2b-9ff9-5a7a7889599f/children#3fecc1d4-64e9-4c3d-99d4-d2f845db626b/child#9c3cc4e6-c19e-42b8-8d2d-0434f3659302, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775808, message_queue: Queue { head: 0x56467b25ee80, tail: UnsafeCell }, num_senders: 13, recv_task: AtomicWaker } })) } }', src/main.rs:96:64
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#7d2e9f58-d9b3-4230-bd67-113d03125c7b/children#529db7a5-b381-4f44-b7c6-b41fa49788d4/child#07238cf0-43e8-49c3-84bc-686dba366bb0, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775808, message_queue: Queue { head: 0x7f1ac8001460, tail: UnsafeCell }, num_senders: 18, recv_task: AtomicWaker } })) } }', src/main.rs:149:72
Aug 30 19:38:45.246  WARN bastion::child: Child(a2bbd0b9-62c4-48b0-8020-ff1b7b484bc4): Panicked.
Aug 30 19:38:45.246  WARN bastion::child: Child(52ea34e4-9eab-4fb4-bb37-42fb8e99e4f8): Panicked.
Aug 30 19:38:45.246  WARN bastion::dispatcher: sending message to child 3/3 - /7d2e9f58-d9b3-4230-bd67-113d03125c7b/529db7a5-b381-4f44-b7c6-b41fa49788d4/474cd68a-f440-4386-9f4b-4ae65037561c
[Processing] Worker #BastionId(474cd68a-f440-4386-9f4b-4ae65037561c) received `B C C`
[Processing] Worker process #BastionId(474cd68a-f440-4386-9f4b-4ae65037561c) processed data. Result: `{"C": 2, "B": 1}`
Aug 30 19:38:45.247  INFO broadcast_message: [Response] Worker started!
Aug 30 19:38:45.249  INFO broadcast_message: [Processing] Worker started!

@o0Ignition0o
Copy link
Contributor

Thanks for your reply, I'm glad to see it improved.

I hope the last issues are resolved by #261 , but merging it will take a bit of time, Mind if I ping you once it's done ? :)

@marshauf
Copy link
Author

No, please go ahead. Thank you :)

@o0Ignition0o
Copy link
Contributor

Ok uhm I totally forgot to ping you here I'm sorry, #261 has been merged a while ago and i think / hope we're good ^^'

I m going to merge the pull request and close this issue, feel free to reopen it if it still is relevant ^^

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants