|
| 1 | +# frozen_string_literal: true |
| 2 | + |
| 3 | +require "test_helper" |
| 4 | + |
| 5 | +class ProcessRecoveryTest < ActiveSupport::TestCase |
| 6 | + self.use_transactional_tests = false |
| 7 | + |
| 8 | + setup do |
| 9 | + @pid = nil |
| 10 | + JobResult.delete_all |
| 11 | + end |
| 12 | + |
| 13 | + teardown do |
| 14 | + terminate_process(@pid) if @pid |
| 15 | + JobResult.delete_all |
| 16 | + end |
| 17 | + |
| 18 | + test "supervisor handles missing process record and fails claimed executions properly" do |
| 19 | + # Start a supervisor with one worker |
| 20 | + @pid = run_supervisor_as_fork(workers: [ { queues: "*", polling_interval: 0.1, processes: 1 } ]) |
| 21 | + wait_for_registered_processes(2, timeout: 1.second) # Supervisor + 1 worker |
| 22 | + |
| 23 | + supervisor_process = SolidQueue::Process.find_by(kind: "Supervisor", pid: @pid) |
| 24 | + assert supervisor_process |
| 25 | + |
| 26 | + worker_process = SolidQueue::Process.find_by(kind: "Worker") |
| 27 | + assert worker_process |
| 28 | + |
| 29 | + # Enqueue a job and manually claim it for the worker to avoid timing races |
| 30 | + job = enqueue_store_result_job(42) |
| 31 | + claimed_execution = SolidQueue::ReadyExecution.claim("*", 5, worker_process.id).first |
| 32 | + assert claimed_execution.present? |
| 33 | + assert_equal worker_process.id, claimed_execution.process_id |
| 34 | + |
| 35 | + # Simulate supervisor process record disappearing |
| 36 | + supervisor_process.delete |
| 37 | + assert_nil SolidQueue::Process.find_by(id: supervisor_process.id) |
| 38 | + |
| 39 | + # Terminate the worker process |
| 40 | + worker_pid = worker_process.pid |
| 41 | + terminate_process(worker_pid, signal: :KILL) |
| 42 | + |
| 43 | + |
| 44 | + # Wait for the supervisor to reap the worker and fail the job |
| 45 | + wait_for_failed_executions(1, timeout: 5.seconds) |
| 46 | + |
| 47 | + # Assert the execution is failed |
| 48 | + failed_execution = SolidQueue::FailedExecution.last |
| 49 | + assert failed_execution.present? |
| 50 | + assert_equal "SolidQueue::Processes::ProcessExitError", failed_execution.exception_class |
| 51 | + |
| 52 | + # Ensure supervisor replaces the worker (even though its own record was missing) |
| 53 | + wait_for_registered_processes(2, timeout: 5.seconds) |
| 54 | + assert_operator SolidQueue::Process.where(kind: "Worker").count, :>=, 1 |
| 55 | + end |
| 56 | + |
| 57 | + private |
| 58 | + def assert_registered_workers_for(*queues, supervisor_pid: nil) |
| 59 | + workers = find_processes_registered_as("Worker") |
| 60 | + registered_queues = workers.map { |process| process.metadata["queues"] }.compact |
| 61 | + assert_equal queues.map(&:to_s).sort, registered_queues.sort |
| 62 | + if supervisor_pid |
| 63 | + assert_equal [ supervisor_pid ], workers.map { |process| process.supervisor.pid }.uniq |
| 64 | + end |
| 65 | + end |
| 66 | + |
| 67 | + def enqueue_store_result_job(value, queue_name = :default, **options) |
| 68 | + StoreResultJob.set(queue: queue_name).perform_later(value, **options) |
| 69 | + end |
| 70 | + |
| 71 | + def assert_no_claimed_jobs |
| 72 | + skip_active_record_query_cache do |
| 73 | + assert_empty SolidQueue::ClaimedExecution.all |
| 74 | + end |
| 75 | + end |
| 76 | + |
| 77 | + def wait_for_claimed_executions(count, timeout: 1.second) |
| 78 | + wait_for(timeout: timeout) { SolidQueue::ClaimedExecution.count == count } |
| 79 | + end |
| 80 | + |
| 81 | + def wait_for_failed_executions(count, timeout: 1.second) |
| 82 | + wait_for(timeout: timeout) { SolidQueue::FailedExecution.count == count } |
| 83 | + end |
| 84 | +end |
0 commit comments