Skip to content

Gracefully fail claimed executions even if the supervisor process was pruned #559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,11 @@ def replace_fork(pid, status)
end
end

# When a supervised fork crashes or exits we need to mark all the
# executions it had claimed as failed so that they can be retried
# by some other worker.
def handle_claimed_jobs_by(terminated_fork, status)
if registered_process = process.supervisees.find_by(name: terminated_fork.name)
if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name)
error = Processes::ProcessExitError.new(status)
registered_process.fail_all_claimed_executions_with(error)
end
Expand Down
12 changes: 12 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,16 @@ def silent_on_thread_error_for(exceptions, on_thread_error)
end
end
end

# Waits until the given block returns truthy or the timeout is reached.
# Similar to other helper methods in this file but waits *for* a condition
# instead of *while* it is true.
def wait_for(timeout: 1.second, interval: 0.05)
Timeout.timeout(timeout) do
loop do
break if skip_active_record_query_cache { yield }
sleep interval
end
end
end
end
13 changes: 9 additions & 4 deletions test/test_helpers/processes_test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,19 @@ def find_processes_registered_as(kind)

def terminate_process(pid, timeout: 10, signal: :TERM)
signal_process(pid, signal)
wait_for_process_termination_with_timeout(pid, timeout: timeout)
wait_for_process_termination_with_timeout(pid, timeout: timeout, signaled: signal)
end

def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0)
def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0, signaled: nil)
Timeout.timeout(timeout) do
if process_exists?(pid)
Process.waitpid(pid)
assert exitstatus, $?.exitstatus
begin
status = Process.waitpid2(pid).last
assert_equal exitstatus, status.exitstatus, "Expected pid #{pid} to exit with status #{exitstatus}" if status.exitstatus
assert_equal signaled, Signal.list.key(status.termsig).to_sym, "Expected pid #{pid} to be terminated with signal #{signaled}" if status.termsig
rescue Errno::ECHILD
# Child pid already reaped
end
end
end
rescue Timeout::Error
Expand Down
84 changes: 84 additions & 0 deletions test/unit/process_recovery_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# frozen_string_literal: true

require "test_helper"

class ProcessRecoveryTest < ActiveSupport::TestCase
self.use_transactional_tests = false

setup do
@pid = nil
JobResult.delete_all
end

teardown do
terminate_process(@pid) if @pid
JobResult.delete_all
end

test "supervisor handles missing process record and fails claimed executions properly" do
# Start a supervisor with one worker
@pid = run_supervisor_as_fork(workers: [ { queues: "*", polling_interval: 0.1, processes: 1 } ])
wait_for_registered_processes(2, timeout: 1.second) # Supervisor + 1 worker

supervisor_process = SolidQueue::Process.find_by(kind: "Supervisor", pid: @pid)
assert supervisor_process

worker_process = SolidQueue::Process.find_by(kind: "Worker")
assert worker_process

# Enqueue a job and manually claim it for the worker to avoid timing races
job = enqueue_store_result_job(42)
claimed_execution = SolidQueue::ReadyExecution.claim("*", 5, worker_process.id).first
assert claimed_execution.present?
assert_equal worker_process.id, claimed_execution.process_id

# Simulate supervisor process record disappearing
supervisor_process.delete
assert_nil SolidQueue::Process.find_by(id: supervisor_process.id)

# Terminate the worker process
worker_pid = worker_process.pid
terminate_process(worker_pid, signal: :KILL)


# Wait for the supervisor to reap the worker and fail the job
wait_for_failed_executions(1, timeout: 5.seconds)

# Assert the execution is failed
failed_execution = SolidQueue::FailedExecution.last
assert failed_execution.present?
assert_equal "SolidQueue::Processes::ProcessExitError", failed_execution.exception_class

# Ensure supervisor replaces the worker (even though its own record was missing)
wait_for_registered_processes(2, timeout: 5.seconds)
assert_operator SolidQueue::Process.where(kind: "Worker").count, :>=, 1
end

private
def assert_registered_workers_for(*queues, supervisor_pid: nil)
workers = find_processes_registered_as("Worker")
registered_queues = workers.map { |process| process.metadata["queues"] }.compact
assert_equal queues.map(&:to_s).sort, registered_queues.sort
if supervisor_pid
assert_equal [ supervisor_pid ], workers.map { |process| process.supervisor.pid }.uniq
end
end

def enqueue_store_result_job(value, queue_name = :default, **options)
StoreResultJob.set(queue: queue_name).perform_later(value, **options)
end

def assert_no_claimed_jobs
skip_active_record_query_cache do
assert_empty SolidQueue::ClaimedExecution.all
end
end

def wait_for_claimed_executions(count, timeout: 1.second)
wait_for(timeout: timeout) { SolidQueue::ClaimedExecution.count == count }
end

def wait_for_failed_executions(count, timeout: 1.second)
wait_for(timeout: timeout) { SolidQueue::FailedExecution.count == count }
end
end
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test isn't strictly necessary. I was using it to guide debugging and understanding.

27 changes: 27 additions & 0 deletions test/unit/supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,33 @@ class SupervisorTest < ActiveSupport::TestCase
end
end

# Regression test for supervisor failing to handle claimed jobs when its own
# process record has been pruned (NoMethodError in #handle_claimed_jobs_by).
test "handle_claimed_jobs_by fails claimed executions even if supervisor record is missing" do
worker_name = "worker-test-#{SecureRandom.hex(4)}"

worker_process = SolidQueue::Process.register(kind: "Worker", pid: 999_999, name: worker_name)

job = StoreResultJob.perform_later(42)
claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process.id).first

terminated_fork = Struct.new(:name).new(worker_name)

DummyStatus = Struct.new(:pid, :exitstatus) do
def signaled? = false
def termsig = nil
end
status = DummyStatus.new(worker_process.pid, 1)

supervisor = SolidQueue::Supervisor.allocate

supervisor.send(:handle_claimed_jobs_by, terminated_fork, status)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is terribly synthetic, I'm afraid. Just explicitly constructs the situation that's encountered in the wild (nil Supervisor#process) rather than realistically (actually pruning the process).


failed = SolidQueue::FailedExecution.find_by(job_id: claimed_execution.job_id)
assert failed.present?
assert_equal "SolidQueue::Processes::ProcessExitError", failed.exception_class
end

private
def assert_registered_workers(supervisor_pid: nil, count: 1)
assert_registered_processes(kind: "Worker", count: count, supervisor_pid: supervisor_pid)
Expand Down
Loading