-
Notifications
You must be signed in to change notification settings - Fork 419
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
31 changed files
with
2,276 additions
and
227 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,14 +16,14 @@ threads = Array.new(3) { |i| Thread.new { ch.push message: i } } | |
sleep 0.01 # let the threads run | ||
threads | ||
# => [#<Thread:[email protected]:14 dead>, | ||
# #<Thread:[email protected]:14 sleep_forever>, | ||
# #<Thread:[email protected]:14 dead>] | ||
# #<Thread:[email protected]:14 dead>, | ||
# #<Thread:[email protected]:14 sleep_forever>] | ||
``` | ||
|
||
When message is popped the last thread continues and finishes as well. | ||
|
||
```ruby | ||
ch.pop # => {:message=>2} | ||
ch.pop # => {:message=>0} | ||
threads.map(&:join) | ||
# => [#<Thread:[email protected]:14 dead>, | ||
# #<Thread:[email protected]:14 dead>, | ||
|
@@ -38,11 +38,14 @@ one will be blocked until new messages is pushed. | |
```ruby | ||
threads = Array.new(3) { |i| Thread.new { ch.pop } } | ||
sleep 0.01 # let the threads run | ||
threads.map(&:status) # => [false, false, "sleep"] | ||
threads | ||
# => [#<Thread:[email protected]:32 dead>, | ||
# #<Thread:[email protected]:32 dead>, | ||
# #<Thread:[email protected]:32 sleep_forever>] | ||
ch.push message: 3 | ||
# => #<Concurrent::Promises::Channel:0x000002 capacity taken 0 of 2> | ||
threads.map(&:value) | ||
# => [{:message=>0}, {:message=>1}, {:message=>3}] | ||
# => [{:message=>1}, {:message=>2}, {:message=>3}] | ||
``` | ||
|
||
### Promises integration | ||
|
@@ -52,11 +55,11 @@ therefore all operations can be represented as futures. | |
|
||
```ruby | ||
ch = Concurrent::Promises::Channel.new 2 | ||
# => #<Concurrent::Promises::Channel:0x000006 capacity taken 0 of 2> | ||
# => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2> | ||
push_operations = Array.new(3) { |i| ch.push_op message: i } | ||
# => [#<Concurrent::Promises::Future:0x000007 fulfilled with #<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>>, | ||
# #<Concurrent::Promises::Future:0x000008 fulfilled with #<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>>, | ||
# #<Concurrent::Promises::ResolvableFuture:0x000009 pending>] | ||
# => [#<Concurrent::Promises::Future:0x00000a fulfilled with #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>>, | ||
# #<Concurrent::Promises::Future:0x00000b fulfilled with #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>>, | ||
# #<Concurrent::Promises::ResolvableFuture:0x00000c pending>] | ||
``` | ||
|
||
> We do not have to sleep here letting the futures execute as Threads. | ||
|
@@ -70,14 +73,14 @@ making a space for a new message. | |
```ruby | ||
ch.pop_op.value! # => {:message=>0} | ||
push_operations.map(&:value!) | ||
# => [#<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>, | ||
# #<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>, | ||
# #<Concurrent::Promises::Channel:0x000006 capacity taken 2 of 2>] | ||
# => [#<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>, | ||
# #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>, | ||
# #<Concurrent::Promises::Channel:0x000009 capacity taken 2 of 2>] | ||
|
||
pop_operations = Array.new(3) { |i| ch.pop_op } | ||
# => [#<Concurrent::Promises::ResolvableFuture:0x00000a fulfilled with {:message=>1}>, | ||
# #<Concurrent::Promises::ResolvableFuture:0x00000b fulfilled with {:message=>2}>, | ||
# #<Concurrent::Promises::ResolvableFuture:0x00000c pending>] | ||
# => [#<Concurrent::Promises::ResolvableFuture:0x00000d fulfilled with {:message=>1}>, | ||
# #<Concurrent::Promises::ResolvableFuture:0x00000e fulfilled with {:message=>2}>, | ||
# #<Concurrent::Promises::ResolvableFuture:0x00000f pending>] | ||
ch.push message: 3 # (push|pop) can be freely mixed with (push_o|pop_op) | ||
pop_operations.map(&:value) | ||
# => [{:message=>1}, {:message=>2}, {:message=>3}] | ||
|
@@ -91,21 +94,21 @@ returns a pair to be able to find out which channel had the message available. | |
|
||
```ruby | ||
ch1 = Concurrent::Promises::Channel.new 2 | ||
# => #<Concurrent::Promises::Channel:0x00000d capacity taken 0 of 2> | ||
# => #<Concurrent::Promises::Channel:0x000010 capacity taken 0 of 2> | ||
ch2 = Concurrent::Promises::Channel.new 2 | ||
# => #<Concurrent::Promises::Channel:0x00000e capacity taken 0 of 2> | ||
# => #<Concurrent::Promises::Channel:0x000011 capacity taken 0 of 2> | ||
ch1.push 1 | ||
# => #<Concurrent::Promises::Channel:0x00000d capacity taken 1 of 2> | ||
# => #<Concurrent::Promises::Channel:0x000010 capacity taken 1 of 2> | ||
ch2.push 2 | ||
# => #<Concurrent::Promises::Channel:0x00000e capacity taken 1 of 2> | ||
# => #<Concurrent::Promises::Channel:0x000011 capacity taken 1 of 2> | ||
|
||
Concurrent::Promises::Channel.select([ch1, ch2]) | ||
# => [#<Concurrent::Promises::Channel:0x00000d capacity taken 0 of 2>, 1] | ||
# => [#<Concurrent::Promises::Channel:0x000010 capacity taken 0 of 2>, 1] | ||
ch1.select(ch2) | ||
# => [#<Concurrent::Promises::Channel:0x00000e capacity taken 0 of 2>, 2] | ||
# => [#<Concurrent::Promises::Channel:0x000011 capacity taken 0 of 2>, 2] | ||
|
||
Concurrent::Promises.future { 3 + 4 }.then_channel_push(ch1) | ||
# => #<Concurrent::Promises::Future:0x00000f pending> | ||
# => #<Concurrent::Promises::Future:0x000012 pending> | ||
Concurrent::Promises::Channel. | ||
# or `ch1.select_op(ch2)` would be equivalent | ||
select_op([ch1, ch2]). | ||
|
@@ -122,7 +125,7 @@ They always return immediately and indicate either success or failure. | |
|
||
```ruby | ||
ch | ||
# => #<Concurrent::Promises::Channel:0x000006 capacity taken 0 of 2> | ||
# => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2> | ||
ch.try_push 1 # => true | ||
ch.try_push 2 # => true | ||
ch.try_push 3 # => false | ||
|
@@ -139,7 +142,7 @@ when the timeout option is used. | |
|
||
```ruby | ||
ch | ||
# => #<Concurrent::Promises::Channel:0x000006 capacity taken 0 of 2> | ||
# => #<Concurrent::Promises::Channel:0x000009 capacity taken 0 of 2> | ||
ch.push 1, 0.01 # => true | ||
ch.push 2, 0.01 # => true | ||
ch.push 3, 0.01 # => false | ||
|
@@ -156,7 +159,7 @@ if the consumers are not keeping up. | |
|
||
```ruby | ||
channel = Concurrent::Promises::Channel.new 2 | ||
# => #<Concurrent::Promises::Channel:0x000010 capacity taken 0 of 2> | ||
# => #<Concurrent::Promises::Channel:0x000013 capacity taken 0 of 2> | ||
log = Concurrent::Array.new # => [] | ||
|
||
producers = Array.new 2 do |i| | ||
|
@@ -167,8 +170,8 @@ producers = Array.new 2 do |i| | |
end | ||
end | ||
end | ||
# => [#<Thread:0x000011@channel.in.md:133 run>, | ||
# #<Thread:0x000012@channel.in.md:133 run>] | ||
# => [#<Thread:0x000014@channel.in.md:133 run>, | ||
# #<Thread:0x000015@channel.in.md:133 run>] | ||
|
||
consumers = Array.new 4 do |i| | ||
Thread.new(i) do |consumer| | ||
|
@@ -180,38 +183,38 @@ consumers = Array.new 4 do |i| | |
end | ||
end | ||
end | ||
# => [#<Thread:0x000013@channel.in.md:142 run>, | ||
# #<Thread:0x000014@channel.in.md:142 run>, | ||
# #<Thread:0x000015@channel.in.md:142 run>, | ||
# #<Thread:0x000016@channel.in.md:142 run>] | ||
# => [#<Thread:0x000016@channel.in.md:142 run>, | ||
# #<Thread:0x000017@channel.in.md:142 run>, | ||
# #<Thread:0x000018@channel.in.md:142 run>, | ||
# #<Thread:0x000019@channel.in.md:142 run>] | ||
|
||
# wait for all to finish | ||
producers.map(&:join) | ||
# => [#<Thread:0x000011@channel.in.md:133 dead>, | ||
# #<Thread:0x000012@channel.in.md:133 dead>] | ||
# => [#<Thread:0x000014@channel.in.md:133 dead>, | ||
# #<Thread:0x000015@channel.in.md:133 dead>] | ||
consumers.map(&:join) | ||
# => [#<Thread:0x000013@channel.in.md:142 dead>, | ||
# #<Thread:0x000014@channel.in.md:142 dead>, | ||
# #<Thread:0x000015@channel.in.md:142 dead>, | ||
# #<Thread:0x000016@channel.in.md:142 dead>] | ||
# => [#<Thread:0x000016@channel.in.md:142 dead>, | ||
# #<Thread:0x000017@channel.in.md:142 dead>, | ||
# #<Thread:0x000018@channel.in.md:142 dead>, | ||
# #<Thread:0x000019@channel.in.md:142 dead>] | ||
# investigate log | ||
log | ||
log | ||
# => ["producer 0 pushing 0", | ||
# "producer 0 pushing 1", | ||
# "producer 0 pushing 2", | ||
# "producer 1 pushing 0", | ||
# "consumer 0 got 0. payload 0 from producer 0", | ||
# "producer 0 pushing 3", | ||
# "consumer 2 got 0. payload 1 from producer 0", | ||
# "consumer 1 got 0. payload 1 from producer 0", | ||
# "producer 1 pushing 1", | ||
# "consumer 1 got 0. payload 2 from producer 0", | ||
# "consumer 3 got 0. payload 0 from producer 1", | ||
# "consumer 3 got 0. payload 2 from producer 0", | ||
# "producer 1 pushing 2", | ||
# "consumer 2 got 1. payload 3 from producer 0", | ||
# "consumer 0 got 1. payload 1 from producer 1", | ||
# "consumer 3 got 1. payload 2 from producer 1", | ||
# "consumer 2 got 0. payload 0 from producer 1", | ||
# "producer 1 pushing 3", | ||
# "consumer 1 got 1. payload 3 from producer 1"] | ||
# "producer 0 pushing 3", | ||
# "consumer 0 got 1. payload 1 from producer 1", | ||
# "consumer 1 got 1. payload 2 from producer 1", | ||
# "consumer 3 got 1. payload 3 from producer 1", | ||
# "consumer 2 got 1. payload 3 from producer 0"] | ||
``` | ||
|
||
The producers are much faster than consumers | ||
|
@@ -226,7 +229,7 @@ that run a thread pool. | |
|
||
```ruby | ||
channel = Concurrent::Promises::Channel.new 2 | ||
# => #<Concurrent::Promises::Channel:0x000017 capacity taken 0 of 2> | ||
# => #<Concurrent::Promises::Channel:0x00001a capacity taken 0 of 2> | ||
log = Concurrent::Array.new # => [] | ||
|
||
def produce(channel, log, producer, i) | ||
|
@@ -248,38 +251,38 @@ end # => :consume | |
producers = Array.new 2 do |i| | ||
Concurrent::Promises.future(channel, log, i) { |*args| produce *args, 0 }.run | ||
end | ||
# => [#<Concurrent::Promises::Future:0x000018 pending>, | ||
# #<Concurrent::Promises::Future:0x000019 pending>] | ||
# => [#<Concurrent::Promises::Future:0x00001b pending>, | ||
# #<Concurrent::Promises::Future:0x00001c pending>] | ||
|
||
consumers = Array.new 4 do |i| | ||
Concurrent::Promises.future(channel, log, i) { |*args| consume *args, 0 }.run | ||
end | ||
# => [#<Concurrent::Promises::Future:0x00001a pending>, | ||
# #<Concurrent::Promises::Future:0x00001b pending>, | ||
# #<Concurrent::Promises::Future:0x00001c pending>, | ||
# #<Concurrent::Promises::Future:0x00001d pending>] | ||
# => [#<Concurrent::Promises::Future:0x00001d pending>, | ||
# #<Concurrent::Promises::Future:0x00001e pending>, | ||
# #<Concurrent::Promises::Future:0x00001f pending>, | ||
# #<Concurrent::Promises::Future:0x000020 pending>] | ||
|
||
# wait for all to finish | ||
producers.map(&:value!) # => [:done, :done] | ||
consumers.map(&:value!) # => [:done, :done, :done, :done] | ||
# investigate log | ||
log | ||
log | ||
# => ["producer 0 pushing 0", | ||
# "producer 1 pushing 0", | ||
# "consumer 1 got 0. payload 0 from producer 1", | ||
# "producer 0 pushing 1", | ||
# "producer 1 pushing 1", | ||
# "consumer 0 got 0. payload 0 from producer 0", | ||
# "consumer 3 got 0. payload 1 from producer 0", | ||
# "consumer 1 got 0. payload 0 from producer 1", | ||
# "consumer 2 got 0. payload 1 from producer 0", | ||
# "producer 0 pushing 2", | ||
# "consumer 3 got 0. payload 1 from producer 1", | ||
# "producer 1 pushing 2", | ||
# "producer 1 pushing 3", | ||
# "producer 0 pushing 3", | ||
# "consumer 2 got 0. payload 1 from producer 1", | ||
# "consumer 0 got 1. payload 2 from producer 0", | ||
# "consumer 2 got 1. payload 3 from producer 1", | ||
# "consumer 1 got 1. payload 3 from producer 0", | ||
# "consumer 3 got 1. payload 2 from producer 1"] | ||
# "producer 1 pushing 3", | ||
# "consumer 0 got 1. payload 2 from producer 1", | ||
# "consumer 2 got 1. payload 3 from producer 0", | ||
# "consumer 1 got 1. payload 2 from producer 0", | ||
# "consumer 3 got 1. payload 3 from producer 1"] | ||
``` | ||
|
||
### Synchronization of workers by passing a value | ||
|
@@ -292,19 +295,19 @@ The operations have to be paired to succeed. | |
|
||
```ruby | ||
channel = Concurrent::Promises::Channel.new 0 | ||
# => #<Concurrent::Promises::Channel:0x00001e capacity taken 0 of 0> | ||
# => #<Concurrent::Promises::Channel:0x000021 capacity taken 0 of 0> | ||
thread = Thread.new { channel.pop }; sleep 0.01 | ||
# allow the thread to go to sleep | ||
thread | ||
# => #<Thread:0x00001f@channel.in.md:246 sleep_forever> | ||
# => #<Thread:0x000022@channel.in.md:214 sleep_forever> | ||
# succeeds because there is matching pop operation waiting in the thread | ||
channel.try_push(:v1) # => true | ||
# remains pending, since there is no matching operation | ||
push = channel.push_op(:v2) | ||
# => #<Concurrent::Promises::ResolvableFuture:0x000020 pending> | ||
# => #<Concurrent::Promises::ResolvableFuture:0x000023 pending> | ||
thread.value # => :v1 | ||
# the push operation resolves as a pairing pop is called | ||
channel.pop # => :v2 | ||
push | ||
# => #<Concurrent::Promises::ResolvableFuture:0x000020 fulfilled with #<Concurrent::Promises::Channel:0x00001e capacity taken 0 of 0>> | ||
# => #<Concurrent::Promises::ResolvableFuture:0x000023 fulfilled with #<Concurrent::Promises::Channel:0x000021 capacity taken 0 of 0>> | ||
``` |
Oops, something went wrong.