From d03a963a7ca7e899a2178de861543caa20832513 Mon Sep 17 00:00:00 2001 From: Jan Berdajs Date: Tue, 17 Nov 2015 20:57:40 +0100 Subject: [PATCH] JRuby support: UNIXSocket#recv_io/send_io workaround, pooled application manager (instead of fork) - only used if fork if not supported --- lib/spring/application.rb | 67 ++---- lib/spring/application/boot.rb | 5 +- lib/spring/binstub.rb | 2 +- lib/spring/boot.rb | 1 + lib/spring/client/binstub.rb | 2 +- lib/spring/client/run.rb | 48 +--- lib/spring/configuration.rb | 8 + lib/spring/env.rb | 2 +- lib/spring/impl/application.rb | 7 + lib/spring/impl/application_manager.rb | 7 + lib/spring/impl/fork/application.rb | 69 ++++++ .../{ => impl/fork}/application_manager.rb | 0 lib/spring/impl/fork/run.rb | 47 ++++ lib/spring/impl/pool/application.rb | 47 ++++ lib/spring/impl/pool/application_manager.rb | 226 ++++++++++++++++++ lib/spring/impl/pool/run.rb | 27 +++ lib/spring/impl/run.rb | 7 + lib/spring/io_helpers.rb | 92 +++++++ lib/spring/platform.rb | 23 ++ lib/spring/server.rb | 4 +- 20 files changed, 593 insertions(+), 98 deletions(-) create mode 100644 lib/spring/impl/application.rb create mode 100644 lib/spring/impl/application_manager.rb create mode 100644 lib/spring/impl/fork/application.rb rename lib/spring/{ => impl/fork}/application_manager.rb (100%) create mode 100644 lib/spring/impl/fork/run.rb create mode 100644 lib/spring/impl/pool/application.rb create mode 100644 lib/spring/impl/pool/application_manager.rb create mode 100644 lib/spring/impl/pool/run.rb create mode 100644 lib/spring/impl/run.rb create mode 100644 lib/spring/io_helpers.rb create mode 100644 lib/spring/platform.rb diff --git a/lib/spring/application.rb b/lib/spring/application.rb index 992c5125..d3df3e88 100644 --- a/lib/spring/application.rb +++ b/lib/spring/application.rb @@ -1,9 +1,11 @@ require "spring/boot" require "set" require "pty" +require "spring/impl/application" module Spring class Application + include ApplicationImpl attr_reader :manager, :watcher, :spring_env, :original_env def initialize(manager, original_env) @@ -114,13 +116,9 @@ def preload end end - def eager_preload - with_pty { preload } - end - def run state :running - manager.puts + notify_manager_ready loop do IO.select [manager, @interrupt.first] @@ -128,17 +126,18 @@ def run if terminating? || watcher_stale? || preload_failed? exit else - serve manager.recv_io(UNIXSocket) + serve IOWrapper.recv_io(manager, UNIXSocket).to_io end end end def serve(client) + child_started = [false] log "got client" manager.puts - stdout, stderr, stdin = streams = 3.times.map { client.recv_io } - [STDOUT, STDERR, STDIN].zip(streams).each { |a, b| a.reopen(b) } + stdout, stderr, stdin = streams = receive_streams(client) + reopen_streams(streams) preload unless preloaded? @@ -153,7 +152,7 @@ def serve(client) ActionDispatch::Reloader.prepare! end - pid = fork { + fork_child(client, streams, child_started) { IGNORE_SIGNALS.each { |sig| trap(sig, "DEFAULT") } trap("TERM", "DEFAULT") @@ -180,26 +179,21 @@ def serve(client) invoke_after_fork_callbacks shush_backtraces + before_command command.call } - - disconnect_database - reset_streams - - log "forked #{pid}" - manager.puts pid - - wait pid, streams, client rescue Exception => e + Kernel.exit if exiting? && e.is_a?(SystemExit) + log "exception: #{e}" - manager.puts unless pid + manager.puts unless child_started[0] if streams && !e.is_a?(SystemExit) - print_exception(stderr, e) + print_exception(stderr || STDERR, e) streams.each(&:close) end - client.puts(1) if pid + client.puts(1) if child_started[0] client.close end @@ -280,39 +274,6 @@ def print_exception(stream, error) rest.each { |line| stream.puts("\tfrom #{line}") } end - def with_pty - PTY.open do |master, slave| - [STDOUT, STDERR, STDIN].each { |s| s.reopen slave } - Thread.new { master.read } - yield - reset_streams - end - end - - def reset_streams - [STDOUT, STDERR].each { |stream| stream.reopen(spring_env.log_file) } - STDIN.reopen("/dev/null") - end - - def wait(pid, streams, client) - @mutex.synchronize { @waiting << pid } - - # Wait in a separate thread so we can run multiple commands at once - Thread.new { - begin - _, status = Process.wait2 pid - log "#{pid} exited with #{status.exitstatus}" - - streams.each(&:close) - client.puts(status.exitstatus) - client.close - ensure - @mutex.synchronize { @waiting.delete pid } - exit_if_finished - end - } - end - private def active_record_configured? diff --git a/lib/spring/application/boot.rb b/lib/spring/application/boot.rb index 6804b646..7bfaeae6 100644 --- a/lib/spring/application/boot.rb +++ b/lib/spring/application/boot.rb @@ -1,10 +1,11 @@ +require "spring/platform" # This is necessary for the terminal to work correctly when we reopen stdin. -Process.setsid +Process.setsid if Spring.fork? require "spring/application" app = Spring::Application.new( - UNIXSocket.for_fd(3), + Spring::WorkerChannel.remote_endpoint, Spring::JSON.load(ENV.delete("SPRING_ORIGINAL_ENV").dup) ) diff --git a/lib/spring/binstub.rb b/lib/spring/binstub.rb index 75f92fb4..ec40aa25 100644 --- a/lib/spring/binstub.rb +++ b/lib/spring/binstub.rb @@ -6,7 +6,7 @@ else disable = ENV["DISABLE_SPRING"] - if Process.respond_to?(:fork) && (disable.nil? || disable.empty? || disable == "0") + if disable.nil? || disable.empty? || disable == "0" ARGV.unshift(command) load bin_path end diff --git a/lib/spring/boot.rb b/lib/spring/boot.rb index e3b2421d..c12b8742 100644 --- a/lib/spring/boot.rb +++ b/lib/spring/boot.rb @@ -6,3 +6,4 @@ require "spring/process_title_updater" require "spring/json" require "spring/watcher" +require "spring/io_helpers" diff --git a/lib/spring/client/binstub.rb b/lib/spring/client/binstub.rb index edc13aa1..fae63ebe 100644 --- a/lib/spring/client/binstub.rb +++ b/lib/spring/client/binstub.rb @@ -44,7 +44,7 @@ class Binstub < Command end CODE - OLD_BINSTUB = %{if !Process.respond_to?(:fork) || Gem::Specification.find_all_by_name("spring").empty?} + OLD_BINSTUB = %{if Gem::Specification.find_all_by_name("spring").empty?} class Item attr_reader :command, :existing diff --git a/lib/spring/client/run.rb b/lib/spring/client/run.rb index 4a7c213f..d1285aec 100644 --- a/lib/spring/client/run.rb +++ b/lib/spring/client/run.rb @@ -1,12 +1,14 @@ require "rbconfig" require "socket" require "bundler" +require "spring/io_helpers" +require "spring/impl/run" module Spring module Client class Run < Command - FORWARDED_SIGNALS = %w(INT QUIT USR1 USR2 INFO) & Signal.list.keys - TIMEOUT = 1 + include RunImpl + TIMEOUT = RunImpl::TIMEOUT def initialize(args) super @@ -55,11 +57,11 @@ def cold_run def run verify_server_version - application, client = UNIXSocket.pair + application, client = WorkerChannel.pair queue_signals connect_to_application(client) - run_command(client, application) + run_command(client, application.to_io) end def boot_server @@ -108,7 +110,7 @@ def verify_server_version end def connect_to_application(client) - server.send_io client + client.forward_to(server) send_json server, "args" => args, "default_rails_env" => default_rails_env if IO.select([server], [], [], TIMEOUT) @@ -121,12 +123,11 @@ def connect_to_application(client) def run_command(client, application) log "sending command" - application.send_io STDOUT - application.send_io STDERR - application.send_io STDIN + send_std_io_to(application) send_json application, "args" => args, "env" => ENV.to_hash + IO.select([server]) pid = server.gets pid = pid.chomp if pid @@ -138,12 +139,7 @@ def run_command(client, application) if pid && !pid.empty? log "got pid: #{pid}" - forward_signals(pid.to_i) - status = application.read.to_i - - log "got exit status #{status}" - - exit status + run_on(application, pid) else log "got no pid" exit 1 @@ -152,30 +148,6 @@ def run_command(client, application) application.close end - def queue_signals - FORWARDED_SIGNALS.each do |sig| - trap(sig) { @signal_queue << sig } - end - end - - def forward_signals(pid) - @signal_queue.each { |sig| kill sig, pid } - - FORWARDED_SIGNALS.each do |sig| - trap(sig) { forward_signal sig, pid } - end - rescue Errno::ESRCH - end - - def forward_signal(sig, pid) - kill(sig, pid) - rescue Errno::ESRCH - # If the application process is gone, then don't block the - # signal on this process. - trap(sig, 'DEFAULT') - Process.kill(sig, Process.pid) - end - def kill(sig, pid) Process.kill(sig, -Process.getpgid(pid)) end diff --git a/lib/spring/configuration.rb b/lib/spring/configuration.rb index e25e079e..c22df16d 100644 --- a/lib/spring/configuration.rb +++ b/lib/spring/configuration.rb @@ -37,6 +37,14 @@ def project_root_path @project_root_path ||= find_project_root(Pathname.new(File.expand_path(Dir.pwd))) end + def pool_min_free_workers + 2 + end + + def pool_spawn_parallel + true + end + private def find_project_root(current_dir) diff --git a/lib/spring/env.rb b/lib/spring/env.rb index e1e9656f..87e8f10a 100644 --- a/lib/spring/env.rb +++ b/lib/spring/env.rb @@ -6,9 +6,9 @@ require "spring/version" require "spring/sid" require "spring/configuration" +require "spring/platform" module Spring - IGNORE_SIGNALS = %w(INT QUIT) STOP_TIMEOUT = 2 # seconds class Env diff --git a/lib/spring/impl/application.rb b/lib/spring/impl/application.rb new file mode 100644 index 00000000..892e40dc --- /dev/null +++ b/lib/spring/impl/application.rb @@ -0,0 +1,7 @@ +require "spring/platform" + +if Spring.fork? + require "spring/impl/fork/application" +else + require "spring/impl/pool/application" +end diff --git a/lib/spring/impl/application_manager.rb b/lib/spring/impl/application_manager.rb new file mode 100644 index 00000000..d2408bbd --- /dev/null +++ b/lib/spring/impl/application_manager.rb @@ -0,0 +1,7 @@ +require "spring/platform" + +if Spring.fork? + require "spring/impl/fork/application_manager" +else + require "spring/impl/pool/application_manager" +end diff --git a/lib/spring/impl/fork/application.rb b/lib/spring/impl/fork/application.rb new file mode 100644 index 00000000..21bae951 --- /dev/null +++ b/lib/spring/impl/fork/application.rb @@ -0,0 +1,69 @@ +module Spring + module ApplicationImpl + def notify_manager_ready + manager.puts + end + + def receive_streams(client) + 3.times.map { IOWrapper.recv_io(client).to_io } + end + + def reopen_streams(streams) + [STDOUT, STDERR, STDIN].zip(streams).each { |a, b| a.reopen(b) } + end + + def eager_preload + with_pty { preload } + end + + def with_pty + PTY.open do |master, slave| + [STDOUT, STDERR, STDIN].each { |s| s.reopen slave } + Thread.new { master.read } + yield + reset_streams + end + end + + def reset_streams + [STDOUT, STDERR].each { |stream| stream.reopen(spring_env.log_file) } + STDIN.reopen("/dev/null") + end + + def wait(pid, streams, client) + @mutex.synchronize { @waiting << pid } + + # Wait in a separate thread so we can run multiple commands at once + Thread.new { + begin + _, status = Process.wait2 pid + log "#{pid} exited with #{status.exitstatus}" + + streams.each(&:close) + client.puts(status.exitstatus) + client.close + ensure + @mutex.synchronize { @waiting.delete pid } + exit_if_finished + end + } + end + + def fork_child(client, streams, child_started) + pid = fork { yield } + child_started[0] = true + + disconnect_database + reset_streams + + log "forked #{pid}" + manager.puts pid + + wait pid, streams, client + end + + def before_command + # NOP + end + end +end diff --git a/lib/spring/application_manager.rb b/lib/spring/impl/fork/application_manager.rb similarity index 100% rename from lib/spring/application_manager.rb rename to lib/spring/impl/fork/application_manager.rb diff --git a/lib/spring/impl/fork/run.rb b/lib/spring/impl/fork/run.rb new file mode 100644 index 00000000..6d6c9ff8 --- /dev/null +++ b/lib/spring/impl/fork/run.rb @@ -0,0 +1,47 @@ +module Spring + module Client + module RunImpl + TIMEOUT = 1 + FORWARDED_SIGNALS = %w(INT QUIT USR1 USR2 INFO) & Signal.list.keys + + def queue_signals + RunImpl::FORWARDED_SIGNALS.each do |sig| + trap(sig) { @signal_queue << sig } + end + end + + def send_std_io_to(application) + application.send_io STDOUT + application.send_io STDERR + application.send_io STDIN + end + + def run_on(application, pid) + forward_signals(pid.to_i) + status = application.read.to_i + + log "got exit status #{status}" + + exit status + end + + def forward_signals(pid) + @signal_queue.each { |sig| kill sig, pid } + + RunImpl::FORWARDED_SIGNALS.each do |sig| + trap(sig) { forward_signal sig, pid } + end + rescue Errno::ESRCH + end + + def forward_signal(sig, pid) + kill(sig, pid) + rescue Errno::ESRCH + # If the application process is gone, then don't block the + # signal on this process. + trap(sig, 'DEFAULT') + Process.kill(sig, Process.pid) + end + end + end +end diff --git a/lib/spring/impl/pool/application.rb b/lib/spring/impl/pool/application.rb new file mode 100644 index 00000000..7f454fbc --- /dev/null +++ b/lib/spring/impl/pool/application.rb @@ -0,0 +1,47 @@ +module Spring + module ApplicationImpl + def notify_manager_ready + manager.puts Process.pid + end + + def receive_streams(client) + [] + end + + def reopen_streams(streams) + # NOP + end + + def eager_preload + preload + end + + def screen_attached? + !system(%{screen -ls | grep "#{ENV['SPRING_SCREEN_NAME']}" | grep Detached > /dev/null}) + end + + def screen_move_to_bottom + puts "\033[22B" + end + + def fork_child(client, streams, child_started) + manager.puts ENV["SPRING_SCREEN_NAME"] + child_started[0] = true + exitstatus = 0 + begin + log "started #{Process.pid}" + yield + rescue SystemExit => ex + exitstatus = ex.status + end + + log "#{Process.pid} exited with #{exitstatus}" + exit + end + + def before_command + screen_move_to_bottom + sleep 0.1 until screen_attached? + end + end +end diff --git a/lib/spring/impl/pool/application_manager.rb b/lib/spring/impl/pool/application_manager.rb new file mode 100644 index 00000000..ac7ffe0e --- /dev/null +++ b/lib/spring/impl/pool/application_manager.rb @@ -0,0 +1,226 @@ +module Spring + class ApplicationManager + class Worker + attr_reader :screen_pid, :pid, :uuid, :socket, :screen_name + attr_accessor :on_done + + def initialize(env, args) + @spring_env = Env.new + channel, @remote_socket = WorkerChannel.pair + @uuid = File.basename(@remote_socket.path).gsub('.sock', '') + + Bundler.with_clean_env do + spawn_screen( + env.merge("SPRING_SOCKET" => @remote_socket.path), + args + ) + end + + @socket = channel.to_io + end + + def spawn_screen(env, args) + @screen_name = "spring_#{@uuid}" + + @screen_pid = + Process.spawn( + env.merge("SPRING_SCREEN_NAME" => screen_name), + "screen", "-d", "-m", "-S", screen_name, + *args + ) + + log "(spawn #{@screen_pid})" + end + + def await_boot + Process.detach(screen_pid) + @pid = socket.gets.to_i + start_wait_thread(pid, socket) unless pid.zero? + @remote_socket.close + end + + def start_wait_thread(pid, child) + Thread.new { + begin + Process.kill(0, pid) while sleep(1) + rescue Errno::ESRCH + end + + log "child #{pid} shutdown" + + on_done.call(self) if on_done + } + end + + def log(message) + @spring_env.log "[worker:#{uuid}] #{message}" + end + end + + class WorkerPool + def initialize(app_env, *app_args) + @app_env = app_env + @app_args = app_args + @spring_env = Env.new + + @workers = [] + @workers_in_use = [] + @spawning_workers = [] + + @check_mutex = Mutex.new + @workers_mutex = Mutex.new + + run + end + + def add_worker + worker = Worker.new(@app_env, @app_args) + worker.on_done = method(:worker_done) + @workers_mutex.synchronize { @spawning_workers << worker } + Thread.new do + worker.await_boot + log "+ worker #{worker.pid} (#{worker.uuid})" + @workers_mutex.synchronize do + @spawning_workers.delete(worker) + @workers << worker + end + end + end + + def worker_done(worker) + log "- worker #{worker.pid} (#{worker.uuid})" + @workers_mutex.synchronize do + @workers_in_use.delete(worker) + end + end + + def get_worker(spawn_new = true) + add_worker if spawn_new && all_size == 0 + + worker = nil + while worker.nil? && all_size > 0 + @workers_mutex.synchronize do + worker = @workers.shift + @workers_in_use << worker if worker + end + break if worker + sleep 1 + end + + Thread.new { check_min_free_workers } if spawn_new + + worker + end + + def check_min_free_workers + if @check_mutex.try_lock + # TODO: mutex, and dont do it if already in progress + # do this in thread + while all_size < Spring.pool_min_free_workers + unless Spring.pool_spawn_parallel + sleep 0.1 until @workers_mutex.synchronize { @spawning_workers.empty? } + end + add_worker + end + @check_mutex.unlock + end + end + + def all_size + @workers_mutex.synchronize { @workers.size + @spawning_workers.size } + end + + def stop! + if spawning_worker_pids.include?(nil) + log "Waiting for workers to quit..." + sleep 0.1 while spawning_worker_pids.include?(nil) + end + + @workers_mutex.synchronize do + (@spawning_workers + @workers_in_use + @workers).each do |worker| + kill_worker(worker) + end + end + end + private + def kill_worker(worker) + log "- worker #{worker.pid} (#{worker.uuid})." + system("kill -9 #{worker.pid} > /dev/null 2>&1") + system("screen -S #{worker.screen_name} -X quit > /dev/null 2>&1") + rescue + end + + def spawning_worker_pids + @spawning_workers.map { |worker| worker.pid } + end + + def run + system("screen -wipe > /dev/null 2>&1") + + check_min_free_workers + end + + def log(message) + @spring_env.log "[worker:pool] #{message}" + end + end + + def initialize(app_env) + @app_env = app_env + @spring_env = Env.new + @pool = + WorkerPool.new( + { + "RAILS_ENV" => app_env, + "RACK_ENV" => app_env, + "SPRING_ORIGINAL_ENV" => JSON.dump(Spring::ORIGINAL_ENV), + "SPRING_PRELOAD" => "1", + }, + Spring.ruby_bin, + "-I", File.expand_path("../..", __FILE__), + "-e", "require 'spring/application/boot'" + ) + end + + # Returns the name of the screen running the command, or nil if the application process died. + def run(client) + name = nil + with_child do |child| + client.forward_to(child.socket) + child.socket.gets or raise Errno::EPIPE + + name = child.socket.gets + end + + unless name.nil? + log "got worker name #{name}" + name + end + rescue Errno::ECONNRESET, Errno::EPIPE => e + log "#{e} while reading from child; returning no name" + nil + ensure + client.close + end + + def stop + log "stopping" + + @pool.stop! + rescue Errno::ESRCH, Errno::ECHILD + # Don't care + end + + protected + + attr_reader :app_env, :spring_env + + def log(message) + spring_env.log "[application_manager:#{app_env}] #{message}" + end + + def with_child + yield(@pool.get_worker) + end + end +end diff --git a/lib/spring/impl/pool/run.rb b/lib/spring/impl/pool/run.rb new file mode 100644 index 00000000..4ea61ba9 --- /dev/null +++ b/lib/spring/impl/pool/run.rb @@ -0,0 +1,27 @@ +module Spring + module Client + module RunImpl + TIMEOUT = 60 + + def queue_signals + # NOP + end + + def send_std_io_to(application) + # NOP + end + + def run_on(application, screen_name) + application.close + server.close + + # Using vt100 because it does not have smcup/rmcup support, + # which means the output of the screen will stay shown after + # screen closes. + set_vt_100 = "export TERM=vt100; tset" + erase_screen_message = "echo '\\033[2A\\033[K'" + Kernel.exec("#{set_vt_100}; screen -r #{screen_name}; #{erase_screen_message}") + end + end + end +end diff --git a/lib/spring/impl/run.rb b/lib/spring/impl/run.rb new file mode 100644 index 00000000..502d7a9f --- /dev/null +++ b/lib/spring/impl/run.rb @@ -0,0 +1,7 @@ +require "spring/platform" + +if Spring.fork? + require "spring/impl/fork/run" +else + require "spring/impl/pool/run" +end diff --git a/lib/spring/io_helpers.rb b/lib/spring/io_helpers.rb new file mode 100644 index 00000000..224496ce --- /dev/null +++ b/lib/spring/io_helpers.rb @@ -0,0 +1,92 @@ +require "spring/platform" +require 'socket' +require 'securerandom' + +module Spring + if Spring.fork? + class IOWrapper + def self.recv_io(socket, *args) + new(socket.recv_io(*args)) + end + + def initialize(socket) + @socket = socket + end + + def forward_to(socket) + socket.send_io(@socket) + end + + def to_io + @socket + end + + def close + @socket.close + end + end + + class WorkerChannel + def self.pair + a, b = UNIXSocket.pair + [new(a), IOWrapper.new(b)] + end + + def self.remote_endpoint + UNIXSocket.for_fd(3) + end + + attr_reader :to_io + + def initialize(socket) + @to_io = socket + end + end + else + class IOWrapper + def self.recv_io(socket, *args) + new(socket.gets.chomp) + end + + def initialize(path) + @path = path + end + + def forward_to(socket) + socket.puts(@path) + end + + def to_io + UNIXSocket.open(@path) + end + + def path + @path + end + + def close + # nop + end + end + + class WorkerChannel + def self.pair + path = Env.new.tmp_path.join("#{SecureRandom.uuid}.sock").to_s + [new(path), IOWrapper.new(path)] + end + + def self.remote_endpoint + path = ENV.delete("SPRING_SOCKET") + UNIXSocket.open(path) + end + + def initialize(path) + @server = UNIXServer.open(path) + end + + def to_io + @socket ||= @server.accept + end + end + end +end diff --git a/lib/spring/platform.rb b/lib/spring/platform.rb new file mode 100644 index 00000000..a211f778 --- /dev/null +++ b/lib/spring/platform.rb @@ -0,0 +1,23 @@ +module Spring + def self.fork? + Process.respond_to?(:fork) + end + + def self.jruby? + RUBY_PLATFORM == "java" + end + + def self.ruby_bin + if RUBY_PLATFORM == "java" + "jruby" + else + "ruby" + end + end + + if jruby? + IGNORE_SIGNALS = %w(INT) + else + IGNORE_SIGNALS = %w(INT QUIT) + end +end diff --git a/lib/spring/server.rb b/lib/spring/server.rb index b922199f..503b7f5f 100644 --- a/lib/spring/server.rb +++ b/lib/spring/server.rb @@ -3,7 +3,7 @@ module Spring end require "spring/boot" -require "spring/application_manager" +require "spring/impl/application_manager" # Must be last, as it requires bundler/setup, which alters the load path require "spring/commands" @@ -48,7 +48,7 @@ def serve(client) log "accepted client" client.puts env.version - app_client = client.recv_io + app_client = IOWrapper.recv_io(client) command = JSON.load(client.read(client.gets.to_i)) args, default_rails_env = command.values_at('args', 'default_rails_env')