Skip to content

Commit

Permalink
Merge pull request #791 from ruby-concurrency/ruby-association
Browse files Browse the repository at this point in the history
Ruby association project
  • Loading branch information
pitr-ch authored Mar 11, 2019
2 parents 2307958 + 3ff1cb0 commit f72141e
Show file tree
Hide file tree
Showing 151 changed files with 37,307 additions and 12,809 deletions.
1 change: 1 addition & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
-I lib-edge
--require spec_helper
--color
--warnings
Expand Down
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
## Current

## Release v1.1.5, edge v0.5.0 (10 mar 2019)

concurrent-ruby:

* fix potential leak of context on JRuby and Java 7

concurrent-ruby-edge:

* Add finalized Concurrent::Cancellation
* Add finalized Concurrent::Throttle
* Add finalized Concurrent::Promises::Channel
* Add new Concurrent::ErlangActor

## Release v1.1.4 (14 Dec 2018)

* (#780) Remove java_alias of 'submit' method of Runnable to let executor service work on java 11
Expand Down
14 changes: 8 additions & 6 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
source 'https://rubygems.org'

require File.join(File.dirname(__FILE__ ), 'lib/concurrent/version')
require File.join(File.dirname(__FILE__), 'lib/concurrent/version')
require File.join(File.dirname(__FILE__ ), 'lib-edge/concurrent/edge/version')

no_path = ENV['NO_PATH']
options = no_path ? {} : { path: '.' }
Expand All @@ -11,26 +12,27 @@ gem 'concurrent-ruby-ext', Concurrent::VERSION, options.merge(platform: :mri)

group :development do
gem 'rake', '~> 12.0'
gem 'rake-compiler', '~> 1.0'
gem 'rake-compiler', '~> 1.0', '>= 1.0.7'
gem 'rake-compiler-dock', '~> 0.7.0'
gem 'pry', '~> 0.11', platforms: :mri
end

group :documentation, optional: true do
gem 'yard', '~> 0.9.0', :require => false
gem 'yard', '~> 0.9.0', require: false
gem 'redcarpet', '~> 3.0', platforms: :mri # understands github markdown
gem 'md-ruby-eval', '~> 0.3'
gem 'md-ruby-eval', '~> 0.6'
end

group :testing do
gem 'rspec', '~> 3.7'
gem 'timecop', '~> 0.7.4'
gem 'sigdump', require: false
end

# made opt-in since it will not install on jruby 1.7
group :coverage, optional: !ENV['COVERAGE'] do
gem 'simplecov', '~> 0.10.0', :require => false
gem 'coveralls', '~> 0.8.2', :require => false
gem 'simplecov', '~> 0.16.0', require: false
gem 'coveralls', '~> 0.8.2', require: false
end

group :benchmarks, optional: true do
Expand Down
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,34 @@ be obeyed though. Features developed in `concurrent-ruby-edge` are expected to m
*Status: will be moved to core soon.*
* [LockFreeStack](http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/LockFreeStack.html)
*Status: missing documentation and tests.*
* [Promises::Channel](http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises/Channel.html)
A first in first out channel that accepts messages with push family of methods and returns
messages with pop family of methods.
Pop and push operations can be represented as futures, see `#pop_op` and `#push_op`.
The capacity of the channel can be limited to support back pressure, use capacity option in `#initialize`.
`#pop` method blocks ans `#pop_op` returns pending future if there is no message in the channel.
If the capacity is limited the `#push` method blocks and `#push_op` returns pending future.
* [Cancellation](http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Cancellation.html)
The Cancellation abstraction provides cooperative cancellation.

The standard methods `Thread#raise` of `Thread#kill` available in Ruby
are very dangerous (see linked the blog posts bellow).
Therefore concurrent-ruby provides an alternative.

* <https://jvns.ca/blog/2015/11/27/why-rubys-timeout-is-dangerous-and-thread-dot-raise-is-terrifying/>
* <http://www.mikeperham.com/2015/05/08/timeout-rubys-most-dangerous-api/>
* <http://blog.headius.com/2008/02/rubys-threadraise-threadkill-timeoutrb.html>

It provides an object which represents a task which can be executed,
the task has to get the reference to the object and periodically cooperatively check that it is not cancelled.
Good practices to make tasks cancellable:
* check cancellation every cycle of a loop which does significant work,
* do all blocking actions in a loop with a timeout then on timeout check cancellation
and if ok block again with the timeout
* [Throttle](http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Throttle.html)
A tool managing concurrency level of tasks.
* [ErlangActor](http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ErlangActor.html)
Actor implementation which matches Erlang actor behaviour.

## Supported Ruby versions

Expand Down Expand Up @@ -339,6 +367,9 @@ and to the past maintainers
* [Paweł Obrok](https://github.com/obrok)
* [Lucas Allan](https://github.com/lucasallan)

and to [Ruby Association](https://www.ruby.or.jp/en/) for sponsoring a project
["Enhancing Ruby’s concurrency tooling"](https://www.ruby.or.jp/en/news/20181106) in 2018.

## License and Copyright

*Concurrent Ruby* is free software released under the
Expand Down
66 changes: 20 additions & 46 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,7 @@ edge_gemspec = Gem::Specification.load File.join(__dir__, 'concurrent-ruby-edge.

require 'rake/javaextensiontask'

class ConcurrentRubyJavaExtensionTask < Rake::JavaExtensionTask
def java_classpath_arg(*args)
jruby_cpath = nil

if RUBY_PLATFORM =~ /java/
begin
cpath = Java::java.lang.System.getProperty('java.class.path').split(File::PATH_SEPARATOR)
cpath += Java::java.lang.System.getProperty('sun.boot.class.path').split(File::PATH_SEPARATOR)
jruby_cpath = cpath.compact.join(File::PATH_SEPARATOR)
rescue => e
end

unless jruby_cpath
libdir = RbConfig::CONFIG['libdir']
if libdir.start_with? "classpath:"
raise 'Cannot build with jruby-complete'
end
jruby_cpath = File.join(libdir, "jruby.jar")
end
end

unless jruby_cpath
jruby_home = ENV['JRUBY_HOME']
if jruby_home
candidate = File.join(jruby_home, 'lib', 'jruby.jar')
jruby_cpath = candidate if File.exist? candidate
end
end

raise "jruby.jar path not found" unless jruby_cpath

jruby_cpath += File::PATH_SEPARATOR + args.join(File::PATH_SEPARATOR) unless args.empty?
jruby_cpath ? "-cp \"#{jruby_cpath}\"" : ""
end
end

ConcurrentRubyJavaExtensionTask.new('concurrent_ruby', core_gemspec) do |ext|
Rake::JavaExtensionTask.new('concurrent_ruby', core_gemspec) do |ext|
ext.ext_dir = 'ext/concurrent-ruby'
ext.lib_dir = 'lib/concurrent'
end
Expand All @@ -79,7 +43,8 @@ namespace :repackage do
sh 'bundle package'

# build only the jar file not the whole gem for java platform, the jar is part the concurrent-ruby-x.y.z.gem
RakeCompilerDock.sh 'bundle install --local && bundle exec rake lib/concurrent/concurrent_ruby.jar --trace', rubyvm: :jruby
Rake::Task['lib/concurrent/concurrent_ruby.jar'].invoke

# build all gem files
RakeCompilerDock.sh 'bundle install --local && bundle exec rake cross native package --trace'
end
Expand All @@ -101,15 +66,14 @@ begin

RSpec::Core::RakeTask.new(:spec)

options = %w[ --color
--backtrace
--seed 1
--format documentation
--tag ~notravis ]

namespace :spec do
desc '* Configured for ci'
RSpec::Core::RakeTask.new(:ci) do |t|
options = %w[ --color
--backtrace
--order defined
--format documentation
--tag ~notravis ]
t.rspec_opts = [*options].join(' ')
end

Expand Down Expand Up @@ -184,10 +148,19 @@ begin
end

define_yard_task = -> name do
output_dir = "docs/#{name}"

removal_name = "remove.#{name}"
task removal_name do
Dir.chdir __dir__ do
FileUtils.rm_rf output_dir
end
end

desc "* of #{name} into subdir #{name}"
YARD::Rake::YardocTask.new(name) do |yard|
yard.options.push(
'--output-dir', "docs/#{name}",
'--output-dir', output_dir,
'--main', 'tmp/README.md',
*common_yard_options)
yard.files = ['./lib/**/*.rb',
Expand All @@ -196,10 +169,11 @@ begin
'-',
'docs-source/thread_pools.md',
'docs-source/promises.out.md',
'docs-source/medium-example.out.rb',
'LICENSE.md',
'CHANGELOG.md']
end
Rake::Task[name].prerequisites.push 'yard:eval_md', 'yard:update_readme'
Rake::Task[name].prerequisites.push removal_name, 'yard:eval_md', 'yard:update_readme'
end

define_yard_task.call current_yard_version_name
Expand Down
2 changes: 1 addition & 1 deletion concurrent-ruby-edge.gemspec
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require File.join(File.dirname(__FILE__ ), 'lib/concurrent/version')
require File.join(File.dirname(__FILE__ ), 'lib-edge/concurrent/edge/version')

Gem::Specification.new do |s|
git_files = `git ls-files`.split("\n")
Expand Down
158 changes: 158 additions & 0 deletions docs-source/cancellation.in.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@

## Examples

**Run async task until cancelled**

Create cancellation and then run work in a background thread until it is cancelled.

```ruby
cancellation, origin = Concurrent::Cancellation.new
# - origin is used for cancelling, resolve it to cancel
# - cancellation is passed down to tasks for cooperative cancellation
async_task = Concurrent::Promises.future(cancellation) do |cancellation|
# Do work repeatedly until it is cancelled
do_stuff until cancellation.canceled?
:stopped_gracefully
end

sleep 0.01
# Wait a bit then stop the thread by resolving the origin of the cancellation
origin.resolve
async_task.value!
```

Or let it raise an error.

```ruby
cancellation, origin = Concurrent::Cancellation.new
async_task = Concurrent::Promises.future(cancellation) do |cancellation|
# Do work repeatedly until it is cancelled
while true
cancellation.check!
do_stuff
end
end

sleep 0.01
# Wait a bit then stop the thread by resolving the origin of the cancellation
origin.resolve
async_task.result
```

**Run additional tasks on Cancellation**

Cancellation can also be used to log or plan re-execution.

```ruby
cancellation.origin.chain do
# This block is executed after the Cancellation is cancelled
# It can then log cancellation or e.g. plan new re-execution
end
```

**Run only for limited time – Timeout replacement**

Execute task for a given time then finish.
Instead of letting Cancellation crate its own origin, it can be passed in as argument.
The passed in origin is scheduled to be resolved in given time which then cancels the Cancellation.

```ruby
timeout = Concurrent::Cancellation.new Concurrent::Promises.schedule(0.02)
# or using shortcut helper method
timeout = Concurrent::Cancellation.timeout 0.02
count = Concurrent::AtomicFixnum.new
Concurrent.global_io_executor.post(timeout) do |timeout|
# do stuff until cancelled
count.increment until timeout.canceled?
end #

timeout.origin.wait
count.value # => 177576
```

**Parallel background processing with single cancellation**

Each task tries to count to 1000 but there is a randomly failing test. The
tasks share single cancellation, when one of them fails it cancels the others.
The failing tasks ends with an error, the other tasks are gracefully cancelled.

```ruby
cancellation, origin = Concurrent::Cancellation.new
tasks = 4.times.map do |i|
Concurrent::Promises.future(cancellation, origin, i) do |cancellation, origin, i|
count = 0
100.times do
break count = :cancelled if cancellation.canceled?
count += 1
sleep 0.001
if rand > 0.95
origin.resolve # cancel
raise 'random error'
end
count
end
end
end
Concurrent::Promises.zip(*tasks).result #
# => [false,
# [:cancelled, nil, :cancelled, :cancelled],
# [nil, #<RuntimeError: random error>, nil, nil]]
```

Without the randomly failing part it produces following.

```ruby
cancellation, origin = Concurrent::Cancellation.new
tasks = 4.times.map do |i|
Concurrent::Promises.future(cancellation, origin, i) do |cancellation, origin, i|
count = 0
100.times do
break count = :cancelled if cancellation.canceled?
count += 1
sleep 0.001
# if rand > 0.95
# origin.resolve
# raise 'random error'
# end
count
end
end
end
Concurrent::Promises.zip(*tasks).result
```

**Combine cancellations**

The combination created by joining two cancellations cancels when the first or the other does.

```ruby
cancellation_a, origin_a = Concurrent::Cancellation.new
cancellation_b, origin_b = Concurrent::Cancellation.new
combined_cancellation = cancellation_a.join(cancellation_b)

origin_a.resolve

cancellation_a.canceled?
cancellation_b.canceled?
combined_cancellation.canceled?
```

If a different rule for joining is needed, the source can be combined manually.
The manually created cancellation cancels when both the first and the other cancels.

```ruby
cancellation_a, origin_a = Concurrent::Cancellation.new
cancellation_b, origin_b = Concurrent::Cancellation.new
# cancels only when both a and b is cancelled
combined_cancellation = Concurrent::Cancellation.new origin_a & origin_b

origin_a.resolve

cancellation_a.canceled? #=> true
cancellation_b.canceled? #=> false
combined_cancellation.canceled? #=> false

origin_b.resolve
combined_cancellation.canceled? #=> true
```

6 changes: 6 additions & 0 deletions docs-source/cancellation.init.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
require 'concurrent-edge'

def do_stuff(*args)
sleep 0.01
:stuff
end
Loading

0 comments on commit f72141e

Please sign in to comment.