Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread pool exception handling #634

Open
andrew-aladev opened this issue Mar 2, 2017 · 16 comments
Open

Thread pool exception handling #634

andrew-aladev opened this issue Mar 2, 2017 · 16 comments
Assignees
Labels
bug A bug in the library or documentation. medium-priority Should be done soon.

Comments

@andrew-aladev
Copy link

I've read a code in {lib,spec}/concurrent/executor a bit. It looks like people loves java way: the abstraction layer is ideal, documentation is great, tests are good and implementation is poor.

Here we have the implementation:

def run_task(pool, task, args)
  task.call(*args)
  pool.worker_task_completed
rescue => ex
  # let it fail
  log DEBUG, ex
rescue Exception => ex
  log ERROR, ex
  pool.worker_died(self)
  throw :stop
end

We shouldn't catch Exception. We should work only with StandardError. System error should be able to kill this worker and all pool infrastructure.

For example: pool shouldn't be able to recreate workers if malloc failed in any worker, user should be able to interrupt ruby program with SIGINT and pool shouldn't stop him.


We could ignore user's StandardError from task.call(*args), but we couldn't ignore any errors from pool.worker_task_completed. pool.worker_task_completed should left this trap.


I saw issue #616:

You'll notice that our API is exactly the same (in fact, on JRuby) our TPE is just a thin wrapper around Java's). The reason that neither Java's thread pools nor ours have exception handling within the thread pool is because that's not the appropriate place to put it.

@jdantonio, This planet are not rotating around java. The old java spec is not a holy thing, that couldn't be changed.

If you absolutely insist on posting jobs directly to a thread pool rather than using our high-level abstractions (which I strongly discourage) then just create a job wrapper. You can find examples of job wrappers in all our high-level abstractions, Rails ActiveJob, Sucker Punch, and other libraries which use our thread pools.

All of the high-level abstractions in this library (Promise, Actor, etc.) all post jobs to the global thread pool and all provide exception handling. Simply pick the abstraction that best fits your use case and use it.

People wanted to modify the default behaviour of thread pool. They didn't want to use your special high level job wrappers.

Ruby Thread had the same issue, but people just fixed it without panic 12-15 years ago.

Here we can see Thread.abort_on_exception. It is false by default and parent's thread will ignore any child's error. If it is true child's errors will be re-raised in parent thread.

User is still able to create it's own job wrappers.

Some people (me too) loves this option because it makes api cleaner and code become more readable.

We can add :ABORT_ON_EXCEPTION => true/false option into thread pool and this issue will be fixed properly.

@jdantonio
Copy link
Member

@andrew-aladev Thank you for your thoughtful feedback. I've made my opinion clear and my opinion has not changed. However, the decision is no longer mine to make. @pitr-ch is now the maintainer. It is up to him. I will support whatever decision he makes and I will respectfully decline to comment further.

@andrew-aladev
Copy link
Author

andrew-aladev commented Mar 3, 2017

We can make abortable worker without any problems.

require "concurrent"

Concurrent::RubyThreadPoolExecutor.module_eval do
  def ns_abort_execution aborted_worker
    @pool.delete aborted_worker
    ns_kill_execution
  end

  def abort_worker worker
    synchronize { ns_abort_execution worker }
  end

  class AbortableWorker < self.const_get :Worker
    def initialize pool
      super
      @thread.abort_on_exception = true
    end

    def run_task pool, task, args
      begin
        task.call *args
      rescue StandardError => error
        pool.abort_worker self
        raise error
      end

      pool.worker_task_completed
      nil
    end
  end

  self.send :remove_const, :Worker
  self.const_set :Worker, AbortableWorker
end

class MyError < StandardError; end

pool = Concurrent::FixedThreadPool.new 5
begin
  pool.post do
    sleep 1
    puts "we shouldn't read this message"
  end

  pool.post do
    puts "raising my error"
    raise MyError
  end

  pool.shutdown
  pool.wait_for_termination

rescue MyError
  puts "received my error"
end

sleep 2

@andrew-aladev
Copy link
Author

andrew-aladev commented Mar 3, 2017

require "concurrent"

Concurrent::RubyThreadPoolExecutor.class_eval do
  # Inspired by "ns_kill_execution".
  def ns_abort_execution aborted_worker
    @pool.each do |worker|
      next if worker == aborted_worker
      worker.kill
    end

    @pool = [aborted_worker]
    @ready.clear

    stopped_event.set
    nil
  end

  def abort_worker worker
    synchronize do
      ns_abort_execution worker
    end
    nil
  end

  def join
    shutdown

    # We should wait for stopped event.
    # We couldn't use timeout.
    stopped_event.wait nil

    @pool.each do |aborted_worker|
      # Rubinius could receive an error from aborted thread's "join" only.
      # MRI Ruby doesn't care about "join".
      # It will receive error anyway.

      # We can "raise" error in aborted thread and than "join" it from this thread.
      # We can "join" aborted thread from this thread and than "raise" error in aborted thread.
      # The order of "raise" and "join" is not important. We will receive target error anyway.

      aborted_worker.join
    end

    @pool.clear
    nil
  end

  class AbortableWorker < self.const_get :Worker
    def initialize pool
      super
      @thread.abort_on_exception = true
    end

    def run_task pool, task, args
      begin
        task.call *args
      rescue StandardError => error
        pool.abort_worker self
        raise error
      end

      pool.worker_task_completed
      nil
    end

    def join
      @thread.join
      nil
    end
  end

  self.send :remove_const, :Worker
  self.const_set :Worker, AbortableWorker
end

class MyError < StandardError; end

pool = Concurrent::FixedThreadPool.new 5

begin
  pool.post do
    sleep 1
    puts "we shouldn't receive this message"
  end

  pool.post do
    puts "raising my error"
    raise MyError
  end

  pool.join

rescue MyError => error
  puts "received my error, trace: \n#{error.backtrace.join("\n")}"
end

sleep 2

This patch works fine for any version of MRI Ruby and Rubinius. I am not interested in JRuby support. It should be easy to fix JRuby executor in the same way. Please fix if you want.

@pitr-ch
Copy link
Member

pitr-ch commented Mar 13, 2017

The current implementation indeed swallows the exception, which is not good. Unfortunately I cannot accept your contribution as is. Could you open a pull-request? it will make perfectly clear what is the change in the code. Please include a simple reproducer for the problem you are describing, or even better tests. If you are not interested in JRuby that's fine I can evolve your pull-request and do necessary changes there. I cannot accept anything which will change only MRI implementation.

@pitr-ch pitr-ch added this to the 1.1.0 milestone Mar 13, 2017
@pitr-ch pitr-ch added bug A bug in the library or documentation. question An user question, does not change the library. labels Mar 13, 2017
@andrew-aladev
Copy link
Author

Sure, I will provide a pull request. I am very busy this week, I will provide it later.

@pitr-ch
Copy link
Member

pitr-ch commented Mar 13, 2017

Thanks, that's fine.

@pitr-ch pitr-ch modified the milestones: 1.2.0, 1.1.0 Apr 17, 2017
@pitr-ch
Copy link
Member

pitr-ch commented Apr 17, 2017

Postponing to 1.2 since it's not easily fixable since JRuby thread-pool is not using Ruby threads, the tasks probably need wrapping and raise on main thread or something similar.

@merqlove
Copy link

merqlove commented Jun 13, 2017

Another way is:

errors = Concurrent::Array.new
pool = Concurrent::FixedThreadPool.new 5
futures = configs.map do |c|
  Concurrent::Promises.future { run_process(c) }.rescue { |e| errors << e }
end
Concurrent::Promises.zip_futures_on(pool, *futures).value
errors.each { |e| STDERR.puts e.message }

@pitr-ch
Copy link
Member

pitr-ch commented Jul 23, 2017

@merqlove yeah that works thanks for the comment, the array should be a Concurrent::Array instance though. I've updated your comment.

@pitr-ch pitr-ch added looking-for-contributor We are looking for a contributor to help with this issue. medium-priority Should be done soon. and removed question An user question, does not change the library. labels Jul 6, 2018
@pitr-ch pitr-ch removed this from the 1.2.0 milestone Jul 6, 2018
@route
Copy link

route commented Sep 19, 2018

@pitr-ch did you guys end up with something? With the latest version thread pool still swallows exceptions

@pitr-ch pitr-ch removed the looking-for-contributor We are looking for a contributor to help with this issue. label Oct 10, 2018
@pitr-ch
Copy link
Member

pitr-ch commented Oct 10, 2018

@route I've put this on my list to revisit. Thanks for your interest.

@pitr-ch pitr-ch self-assigned this Oct 10, 2018
@zw963
Copy link

zw963 commented Dec 8, 2018

yes, when i use concurrent-ruby, i found it not work, even if add following config:

Thread.abort_on_exception = true

Because no any exception is raise actually.

@route
Copy link

route commented Dec 8, 2018

@pitr-ch I can work on this if you are short of hands and agree to merge it in one shape or another

@pitr-ch
Copy link
Member

pitr-ch commented Dec 13, 2018

@route thanks a lot! please have a look and open a PR with a prototype where we could discuss the desired change. Things to keen in mind:

  • The final change (not the prototype) should be backward compatible, we'll probably be forced to keep the current behavior by default and introduce the fixed one with an option. However we can print a warning when the default is used to let users switch manually to the fixed not swallowing behavior of the pool.
  • The behavior should be the same on MRI and JRuby.

I've send you an invitation to become a collaborator, if you accept I can assign you the issue.

@Nowaker
Copy link

Nowaker commented Jul 2, 2020

Swallowing an exception is really problematic. Any plans to tackle that? There even seems to be a ready to use solution, at least for the 2017 code...

@KostyaSha
Copy link

KostyaSha commented May 12, 2023

Another way is:

errors = Concurrent::Array.new
pool = Concurrent::FixedThreadPool.new 5
futures = configs.map do |c|
  Concurrent::Promises.future { run_process(c) }.rescue { |e| errors << e }
end
Concurrent::Promises.zip_futures_on(pool, *futures).value
errors.each { |e| STDERR.puts e.message }

This forces to create all tasks ahead

It would be good to have some strategy for handling exceptions. For example some :fail_fast strategy for cases when you need shutdown execution on the first acurred error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug A bug in the library or documentation. medium-priority Should be done soon.
Projects
None yet
Development

No branches or pull requests

8 participants