Skip to content

Commit 85a32c2

Browse files
committed
Gracefully fail claimed executions even if the supervisor process was pruned
e.g. due to sleep/wake in a dev app
1 parent 0ee21e4 commit 85a32c2

File tree

5 files changed

+131
-5
lines changed

5 files changed

+131
-5
lines changed

lib/solid_queue/supervisor.rb

+4-1
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,11 @@ def replace_fork(pid, status)
172172
end
173173
end
174174

175+
# When a supervised fork crashes or exits we need to mark all the
176+
# executions it had claimed as failed so that they can be retried
177+
# by some other worker.
175178
def handle_claimed_jobs_by(terminated_fork, status)
176-
if registered_process = process.supervisees.find_by(name: terminated_fork.name)
179+
if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name)
177180
error = Processes::ProcessExitError.new(status)
178181
registered_process.fail_all_claimed_executions_with(error)
179182
end

test/test_helper.rb

+12
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,16 @@ def silent_on_thread_error_for(exceptions, on_thread_error)
9797
end
9898
end
9999
end
100+
101+
# Waits until the given block returns truthy or the timeout is reached.
102+
# Similar to other helper methods in this file but waits *for* a condition
103+
# instead of *while* it is true.
104+
def wait_for(timeout: 1.second, interval: 0.05)
105+
Timeout.timeout(timeout) do
106+
loop do
107+
break if skip_active_record_query_cache { yield }
108+
sleep interval
109+
end
110+
end
111+
end
100112
end

test/test_helpers/processes_test_helper.rb

+5-4
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,15 @@ def find_processes_registered_as(kind)
6262

6363
def terminate_process(pid, timeout: 10, signal: :TERM)
6464
signal_process(pid, signal)
65-
wait_for_process_termination_with_timeout(pid, timeout: timeout)
65+
wait_for_process_termination_with_timeout(pid, timeout: timeout, signaled: signal)
6666
end
6767

68-
def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0)
68+
def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0, signaled: nil)
6969
Timeout.timeout(timeout) do
7070
if process_exists?(pid)
71-
Process.waitpid(pid)
72-
assert exitstatus, $?.exitstatus
71+
status = Process.waitpid2(pid).last
72+
assert_equal exitstatus, status.exitstatus, "Expected pid #{pid} to exit with status #{exitstatus}" if status.exitstatus
73+
assert_equal signaled, Signal.list.key(status.termsig).to_sym, "Expected pid #{pid} to be terminated with signal #{signaled}" if status.termsig
7374
end
7475
end
7576
rescue Timeout::Error

test/unit/process_recovery_test.rb

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# frozen_string_literal: true
2+
require "test_helper"
3+
4+
class ProcessRecoveryTest < ActiveSupport::TestCase
5+
self.use_transactional_tests = false
6+
7+
setup do
8+
@pid = nil
9+
JobResult.delete_all
10+
end
11+
12+
teardown do
13+
terminate_process(@pid) if @pid
14+
JobResult.delete_all
15+
end
16+
17+
test "supervisor handles missing process record and fails claimed executions properly" do
18+
# Start a supervisor with one worker
19+
@pid = run_supervisor_as_fork(workers: [ { queues: "*", polling_interval: 0.1, processes: 1 } ])
20+
wait_for_registered_processes(2, timeout: 1.second) # Supervisor + 1 worker
21+
22+
supervisor_process = SolidQueue::Process.find_by(kind: "Supervisor", pid: @pid)
23+
assert supervisor_process
24+
25+
worker_process = SolidQueue::Process.find_by(kind: "Worker")
26+
assert worker_process
27+
28+
# Enqueue a job and manually claim it for the worker to avoid timing races
29+
job = enqueue_store_result_job(42)
30+
claimed_execution = SolidQueue::ReadyExecution.claim("*", 5, worker_process.id).first
31+
assert claimed_execution.present?
32+
assert_equal worker_process.id, claimed_execution.process_id
33+
34+
# Simulate supervisor process record disappearing
35+
supervisor_process.delete
36+
assert_nil SolidQueue::Process.find_by(id: supervisor_process.id)
37+
38+
# Terminate the worker process
39+
worker_pid = worker_process.pid
40+
terminate_process(worker_pid, signal: :KILL)
41+
42+
43+
# Wait for the supervisor to reap the worker and fail the job
44+
wait_for_failed_executions(1, timeout: 5.seconds)
45+
46+
# Assert the execution is failed
47+
failed_execution = SolidQueue::FailedExecution.last
48+
assert failed_execution.present?
49+
assert_equal "SolidQueue::Processes::ProcessExitError", failed_execution.exception_class
50+
51+
# Ensure supervisor replaces the worker (even though its own record was missing)
52+
wait_for_registered_processes(2, timeout: 5.seconds)
53+
assert_operator SolidQueue::Process.where(kind: "Worker").count, :>=, 1
54+
end
55+
56+
private
57+
def assert_registered_workers_for(*queues, supervisor_pid: nil)
58+
workers = find_processes_registered_as("Worker")
59+
registered_queues = workers.map { |process| process.metadata["queues"] }.compact
60+
assert_equal queues.map(&:to_s).sort, registered_queues.sort
61+
if supervisor_pid
62+
assert_equal [ supervisor_pid ], workers.map { |process| process.supervisor.pid }.uniq
63+
end
64+
end
65+
66+
def enqueue_store_result_job(value, queue_name = :default, **options)
67+
StoreResultJob.set(queue: queue_name).perform_later(value, **options)
68+
end
69+
70+
def assert_no_claimed_jobs
71+
skip_active_record_query_cache do
72+
assert_empty SolidQueue::ClaimedExecution.all
73+
end
74+
end
75+
76+
def wait_for_claimed_executions(count, timeout: 1.second)
77+
wait_for(timeout: timeout) { SolidQueue::ClaimedExecution.count == count }
78+
end
79+
80+
def wait_for_failed_executions(count, timeout: 1.second)
81+
wait_for(timeout: timeout) { SolidQueue::FailedExecution.count == count }
82+
end
83+
end

test/unit/supervisor_test.rb

+27
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,33 @@ class SupervisorTest < ActiveSupport::TestCase
185185
end
186186
end
187187

188+
# Regression test for supervisor failing to handle claimed jobs when its own
189+
# process record has been pruned (NoMethodError in #handle_claimed_jobs_by).
190+
test "handle_claimed_jobs_by fails claimed executions even if supervisor record is missing" do
191+
worker_name = "worker-test-#{SecureRandom.hex(4)}"
192+
193+
worker_process = SolidQueue::Process.register(kind: "Worker", pid: 999_999, name: worker_name)
194+
195+
job = StoreResultJob.perform_later(42)
196+
claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process.id).first
197+
198+
terminated_fork = Struct.new(:name).new(worker_name)
199+
200+
DummyStatus = Struct.new(:pid, :exitstatus) do
201+
def signaled? = false
202+
def termsig = nil
203+
end
204+
status = DummyStatus.new(worker_process.pid, 1)
205+
206+
supervisor = SolidQueue::Supervisor.allocate
207+
208+
supervisor.send(:handle_claimed_jobs_by, terminated_fork, status)
209+
210+
failed = SolidQueue::FailedExecution.find_by(job_id: claimed_execution.job_id)
211+
assert failed.present?
212+
assert_equal "SolidQueue::Processes::ProcessExitError", failed.exception_class
213+
end
214+
188215
private
189216
def assert_registered_workers(supervisor_pid: nil, count: 1)
190217
assert_registered_processes(kind: "Worker", count: count, supervisor_pid: supervisor_pid)

0 commit comments

Comments
 (0)