Skip to content

Commit

Permalink
Use pgroup, capture more info and reproduce more of the behavior from…
Browse files Browse the repository at this point in the history
… original timeout method
  • Loading branch information
bbpennel committed Nov 18, 2024
1 parent 201635a commit 2c66767
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 35 deletions.
69 changes: 50 additions & 19 deletions app/overrides/lib/hydra/derivatives/processors/document_override.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,64 @@
# [hyc-override] https://github.com/samvera/hydra-derivatives/blob/v3.8.0/lib/hydra/derivatives/processors/document.rb
require 'redlock'

class SofficeTimeoutError < StandardError; end
class SofficeTimeoutError < Hydra::Derivatives::TimeoutError; end

Hydra::Derivatives::Processors::Document.class_eval do
# [hyc-override] Use Redlock to manage soffice process lock
LOCK_KEY = 'soffice:document_conversion'
LOCK_TIMEOUT = 6 * 60 * 1000
JOB_TIMEOUT_SECONDS = 300
JOB_TIMEOUT_SECONDS = 30
LOCK_MANAGER = Redlock::Client.new([Redis.current])

# [hyc-override] Adding in a graceful termination before hard kill, use spawn for process group
def self.execute_with_timeout(timeout, command, context)
stdout, stderr = "", ""
pid = nil
status = nil

Timeout.timeout(timeout) do
# Create pipes for stdout and stderr
stdout_r, stdout_w = IO.pipe
stderr_r, stderr_w = IO.pipe

# Use Process.spawn to start the command with process group
pid = Process.spawn(command, :pgroup => true, :out => stdout_w, :err => stderr_w)

# Close unused ends in parent process
stdout_w.close
stderr_w.close

# Read the output in separate threads to avoid deadlocks
stdout_thread = Thread.new { stdout = stdout_r.read }
stderr_thread = Thread.new { stderr = stderr_r.read }

# Wait for the process to complete
_, status = Process.wait2(pid)

# Ensure threads finish reading
stdout_thread.join
stderr_thread.join
end
raise "Unable to execute command \"#{command}\". Exit code: #{status}\nError message: #{stderr}" unless status == 0
rescue Timeout::Error
# If it times out, terminate the process
if pid
Process.kill('TERM', pid) # Attempt a graceful termination
sleep 5 # Give it a few seconds to exit
Process.kill('KILL', pid) if system("ps -p #{pid}") # Force kill if still running
end
# Raise a custom error to prevent Sidekiq from retrying
raise SofficeTimeoutError, "soffice process timed out after #{timeout} seconds"
rescue EOFError
Rails.logger.debug "Caught an eof error in ShellBasedProcessor"
end

# [hyc-override] Trigger kill if soffice process takes too long, and throw a non-retry error if that happens
def self.encode(path, format, outdir, timeout = JOB_TIMEOUT_SECONDS)
Rails.logger.error("Converting document to #{format} from source path: #{path} to destination file: #{outdir}")
Rails.logger.error("Encode backtrace #{Thread.current.backtrace.join("\n")}")
command = "#{Hydra::Derivatives.libreoffice_path} --invisible --headless --convert-to #{format} --outdir #{outdir} #{Shellwords.escape(path)}"
pid = nil
begin
Timeout.timeout(timeout) do
# Use Process.spawn to track the process and capture the pid
pid = Process.spawn(command)
Process.wait(pid) # Wait for the process to complete
end
rescue Timeout::Error
# If it times out, terminate the process
if pid
Process.kill('TERM', pid) # Attempt a graceful termination
sleep 5 # Give it a few seconds to exit
Process.kill('KILL', pid) if system("ps -p #{pid}") # Force kill if still running
end
# Raise a custom error to prevent Sidekiq from retrying
raise SofficeTimeoutError, "soffice process timed out after #{timeout} seconds"
end
execute_with_timeout(timeout, command, {})
end

# Converts the document to the format specified in the directives hash.
Expand All @@ -40,11 +69,13 @@ def encode_file(_file_suffix, _options = {})
# [hyc-override] Use Redlock to manage soffice process lock, since only one soffice process can run at a time
LOCK_MANAGER.lock(LOCK_KEY, LOCK_TIMEOUT) do |locked|
if locked
Rails.logger.error("Acquired lock for document conversion of #{source_path}")
convert_to_format
else
raise "Could not acquire lock for document conversion of #{source_path}"
end
end
Rails.logger.error("Released lock for #{source_path}")
ensure
FileUtils.rm_f(converted_file)
# [hyc-override] clean up the parent temp dir
Expand Down
42 changes: 26 additions & 16 deletions spec/lib/hydra/derivatives/processors/document_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

RSpec.describe Hydra::Derivatives::Processors::Document do
subject { described_class.new(source_path, directives) }
PID = 991234

let(:source_path) { File.join(fixture_path, 'test.doc') }
let(:output_service) { Hyrax::PersistDerivatives }
Expand Down Expand Up @@ -44,41 +45,50 @@
context 'when the process completes successfully' do
it 'runs the command and completes without timeout' do
# Mock Process.spawn and Process.wait to simulate successful execution
pid = 991234
allow(Process).to receive(:spawn).and_return(pid)
allow(Process).to receive(:wait).with(pid)
allow(Process).to receive(:spawn).and_return(PID)
allow(Process).to receive(:wait2).with(PID).and_return([PID, 0])

expect do
described_class.encode(path, format, outdir, timeout)
end.not_to raise_error
described_class.encode(path, format, outdir, timeout)

# Verify that the process was spawned
expect(Process).to have_received(:spawn)
expect(Process).to have_received(:wait).with(pid)
expect(Process).to have_received(:wait2).with(PID)
end
end

context 'when the process times out' do
it 'kills the process after a timeout' do
pid = 991234

# Mock Process.spawn to return a fake PID
allow(Process).to receive(:spawn).and_return(pid)
# Simulate timeout by making Process.wait sleep beyond the timeout
allow(Process).to receive(:wait).with(pid) { sleep 5 }
allow(Process).to receive(:spawn).and_return(PID)
# Simulate timeout
allow(Process).to receive(:wait2).with(PID).and_raise(Timeout::Error)

# Mock Process.kill to simulate killing the process
allow(Process).to receive(:kill)
allow(Hydra::Derivatives::Processors::Document).to receive(:system).with("ps -p #{pid}").and_return(true)
allow(Hydra::Derivatives::Processors::Document).to receive(:system).with("ps -p #{PID}").and_return(true)

expect do
described_class.encode(path, format, outdir, timeout)
end.to raise_error(SofficeTimeoutError, "soffice process timed out after #{timeout} seconds")

# Verify that the process was spawned
expect(Process).to have_received(:spawn)
expect(Process).to have_received(:kill).with('TERM', pid) # Attempted graceful termination
expect(Process).to have_received(:kill).with('KILL', pid) # Force kill if necessary
expect(Process).to have_received(:kill).with('TERM', PID) # Attempted graceful termination
expect(Process).to have_received(:kill).with('KILL', PID) # Force kill if necessary
end
end

context 'when the process returns error status' do
it 'runs the command and throws an error' do
allow(Process).to receive(:spawn).and_return(PID)
allow(Process).to receive(:wait2).with(PID).and_return([PID, 1])

expect do
described_class.encode(path, format, outdir, timeout)
end.to raise_error(/Unable to execute command.*Exit code: 1.*/)

# Verify that the process was spawned
expect(Process).to have_received(:spawn)
expect(Process).to have_received(:wait2).with(PID)
end
end
end
Expand Down

0 comments on commit 2c66767

Please sign in to comment.