From 2c66767734345ab6c354edad7d1562a463e45f66 Mon Sep 17 00:00:00 2001 From: Ben Pennell Date: Mon, 18 Nov 2024 11:40:00 -0500 Subject: [PATCH] Use pgroup, capture more info and reproduce more of the behavior from original timeout method --- .../processors/document_override.rb | 69 ++++++++++++++----- .../derivatives/processors/document_spec.rb | 42 ++++++----- 2 files changed, 76 insertions(+), 35 deletions(-) diff --git a/app/overrides/lib/hydra/derivatives/processors/document_override.rb b/app/overrides/lib/hydra/derivatives/processors/document_override.rb index 872ed9e04..d8f300762 100644 --- a/app/overrides/lib/hydra/derivatives/processors/document_override.rb +++ b/app/overrides/lib/hydra/derivatives/processors/document_override.rb @@ -2,35 +2,64 @@ # [hyc-override] https://github.com/samvera/hydra-derivatives/blob/v3.8.0/lib/hydra/derivatives/processors/document.rb require 'redlock' -class SofficeTimeoutError < StandardError; end +class SofficeTimeoutError < Hydra::Derivatives::TimeoutError; end Hydra::Derivatives::Processors::Document.class_eval do # [hyc-override] Use Redlock to manage soffice process lock LOCK_KEY = 'soffice:document_conversion' LOCK_TIMEOUT = 6 * 60 * 1000 - JOB_TIMEOUT_SECONDS = 300 + JOB_TIMEOUT_SECONDS = 30 LOCK_MANAGER = Redlock::Client.new([Redis.current]) + # [hyc-override] Adding in a graceful termination before hard kill, use spawn for process group + def self.execute_with_timeout(timeout, command, context) + stdout, stderr = "", "" + pid = nil + status = nil + + Timeout.timeout(timeout) do + # Create pipes for stdout and stderr + stdout_r, stdout_w = IO.pipe + stderr_r, stderr_w = IO.pipe + + # Use Process.spawn to start the command with process group + pid = Process.spawn(command, :pgroup => true, :out => stdout_w, :err => stderr_w) + + # Close unused ends in parent process + stdout_w.close + stderr_w.close + + # Read the output in separate threads to avoid deadlocks + stdout_thread = Thread.new { stdout = stdout_r.read } + stderr_thread = Thread.new { stderr = stderr_r.read } + + # Wait for the process to complete + _, status = Process.wait2(pid) + + # Ensure threads finish reading + stdout_thread.join + stderr_thread.join + end + raise "Unable to execute command \"#{command}\". Exit code: #{status}\nError message: #{stderr}" unless status == 0 + rescue Timeout::Error + # If it times out, terminate the process + if pid + Process.kill('TERM', pid) # Attempt a graceful termination + sleep 5 # Give it a few seconds to exit + Process.kill('KILL', pid) if system("ps -p #{pid}") # Force kill if still running + end + # Raise a custom error to prevent Sidekiq from retrying + raise SofficeTimeoutError, "soffice process timed out after #{timeout} seconds" + rescue EOFError + Rails.logger.debug "Caught an eof error in ShellBasedProcessor" + end + # [hyc-override] Trigger kill if soffice process takes too long, and throw a non-retry error if that happens def self.encode(path, format, outdir, timeout = JOB_TIMEOUT_SECONDS) + Rails.logger.error("Converting document to #{format} from source path: #{path} to destination file: #{outdir}") + Rails.logger.error("Encode backtrace #{Thread.current.backtrace.join("\n")}") command = "#{Hydra::Derivatives.libreoffice_path} --invisible --headless --convert-to #{format} --outdir #{outdir} #{Shellwords.escape(path)}" - pid = nil - begin - Timeout.timeout(timeout) do - # Use Process.spawn to track the process and capture the pid - pid = Process.spawn(command) - Process.wait(pid) # Wait for the process to complete - end - rescue Timeout::Error - # If it times out, terminate the process - if pid - Process.kill('TERM', pid) # Attempt a graceful termination - sleep 5 # Give it a few seconds to exit - Process.kill('KILL', pid) if system("ps -p #{pid}") # Force kill if still running - end - # Raise a custom error to prevent Sidekiq from retrying - raise SofficeTimeoutError, "soffice process timed out after #{timeout} seconds" - end + execute_with_timeout(timeout, command, {}) end # Converts the document to the format specified in the directives hash. @@ -40,11 +69,13 @@ def encode_file(_file_suffix, _options = {}) # [hyc-override] Use Redlock to manage soffice process lock, since only one soffice process can run at a time LOCK_MANAGER.lock(LOCK_KEY, LOCK_TIMEOUT) do |locked| if locked + Rails.logger.error("Acquired lock for document conversion of #{source_path}") convert_to_format else raise "Could not acquire lock for document conversion of #{source_path}" end end + Rails.logger.error("Released lock for #{source_path}") ensure FileUtils.rm_f(converted_file) # [hyc-override] clean up the parent temp dir diff --git a/spec/lib/hydra/derivatives/processors/document_spec.rb b/spec/lib/hydra/derivatives/processors/document_spec.rb index dbfab9c66..8f0aa97a2 100644 --- a/spec/lib/hydra/derivatives/processors/document_spec.rb +++ b/spec/lib/hydra/derivatives/processors/document_spec.rb @@ -4,6 +4,7 @@ RSpec.describe Hydra::Derivatives::Processors::Document do subject { described_class.new(source_path, directives) } + PID = 991234 let(:source_path) { File.join(fixture_path, 'test.doc') } let(:output_service) { Hyrax::PersistDerivatives } @@ -44,32 +45,26 @@ context 'when the process completes successfully' do it 'runs the command and completes without timeout' do # Mock Process.spawn and Process.wait to simulate successful execution - pid = 991234 - allow(Process).to receive(:spawn).and_return(pid) - allow(Process).to receive(:wait).with(pid) + allow(Process).to receive(:spawn).and_return(PID) + allow(Process).to receive(:wait2).with(PID).and_return([PID, 0]) - expect do - described_class.encode(path, format, outdir, timeout) - end.not_to raise_error + described_class.encode(path, format, outdir, timeout) # Verify that the process was spawned expect(Process).to have_received(:spawn) - expect(Process).to have_received(:wait).with(pid) + expect(Process).to have_received(:wait2).with(PID) end end context 'when the process times out' do it 'kills the process after a timeout' do - pid = 991234 - - # Mock Process.spawn to return a fake PID - allow(Process).to receive(:spawn).and_return(pid) - # Simulate timeout by making Process.wait sleep beyond the timeout - allow(Process).to receive(:wait).with(pid) { sleep 5 } + allow(Process).to receive(:spawn).and_return(PID) + # Simulate timeout + allow(Process).to receive(:wait2).with(PID).and_raise(Timeout::Error) # Mock Process.kill to simulate killing the process allow(Process).to receive(:kill) - allow(Hydra::Derivatives::Processors::Document).to receive(:system).with("ps -p #{pid}").and_return(true) + allow(Hydra::Derivatives::Processors::Document).to receive(:system).with("ps -p #{PID}").and_return(true) expect do described_class.encode(path, format, outdir, timeout) @@ -77,8 +72,23 @@ # Verify that the process was spawned expect(Process).to have_received(:spawn) - expect(Process).to have_received(:kill).with('TERM', pid) # Attempted graceful termination - expect(Process).to have_received(:kill).with('KILL', pid) # Force kill if necessary + expect(Process).to have_received(:kill).with('TERM', PID) # Attempted graceful termination + expect(Process).to have_received(:kill).with('KILL', PID) # Force kill if necessary + end + end + + context 'when the process returns error status' do + it 'runs the command and throws an error' do + allow(Process).to receive(:spawn).and_return(PID) + allow(Process).to receive(:wait2).with(PID).and_return([PID, 1]) + + expect do + described_class.encode(path, format, outdir, timeout) + end.to raise_error(/Unable to execute command.*Exit code: 1.*/) + + # Verify that the process was spawned + expect(Process).to have_received(:spawn) + expect(Process).to have_received(:wait2).with(PID) end end end