Skip to content

Implements TimerTasks using Promises. #425

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 5 additions & 14 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ class SolidQueue::ClaimedExecution < SolidQueue::Execution

scope :orphaned, -> { where.missing(:process) }

class Result < Struct.new(:success, :error)
def success?
success
end
end

class << self
def claiming(job_ids, process_id, &block)
job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } }
Expand Down Expand Up @@ -60,12 +54,9 @@ def discard_all_from_jobs(*)
def perform
result = execute

if result.success?
finished
else
failed_with(result.error)
raise result.error
end
result.just? ? finished : failed_with(result.reason)

result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the advantage of using Concurrent::Maybe here vs. a regular struct?

Copy link
Contributor Author

@hms hms Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning: Rationalization alert.

I dislike Struct. I would have reached for Data, but when I started this PR, SQ was on Ruby 3.3.1.

The Result struct was nothing more than a simple Maybe. It was also only used in 1 place, for 1 if and made me question if it was really needed at all (returning [success, exception] is pretty much the same thing minus the success method). But with the change from Futures to Promises::Futures, I also pushed the error reporting out of the execute method back to the Future. With some of the logic moving back to the Future, why not keep the logic all within the Concurrent library?

If all of this is bothersome, you're not buying the rationalization for the change, of you simply feel that this doesn't make SQ better I'm happy to refactor the Maybe out and return the Result object (but I would recommend using Data over Struct).

One question on Result: Given it's only used in one place and in a sequence of a return value, immediately in an IF, and then GC'ed, is there a reason for the (very little) extra code Result Vs. an alternative implementation?

ensure
job.unblock_next_blocked_job
end
Expand Down Expand Up @@ -93,9 +84,9 @@ def failed_with(error)
private
def execute
ActiveJob::Base.execute(job.arguments)
Result.new(true, nil)
Concurrent::Maybe.just(true)
rescue Exception => e
Result.new(false, e)
Concurrent::Maybe.nothing(e)
end

def finished
Expand Down
1 change: 1 addition & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ module SolidQueue
mattr_accessor :preserve_finished_jobs, default: true
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes
mattr_accessor :reporting_label, default: "SolidQueue-#{SolidQueue::VERSION}"

delegate :on_start, :on_stop, to: Supervisor

Expand Down
35 changes: 31 additions & 4 deletions lib/solid_queue/app_executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,38 @@ def wrap_in_app_executor(&block)
end

def handle_thread_error(error)
SolidQueue.instrument(:thread_error, error: error)
CallErrorReporters.new(error).call
end

private

# Handles error reporting and guarantees that Rails.error will be called if configured.
#
# This method performs the following actions:
# 1. Invokes `SolidQueue.instrument` for `:thread_error`.
# 2. Invokes `SolidQueue.on_thread_error` if it is configured.
# 3. Invokes `Rails.error.report` if it wasn't invoked by one of the above calls.
class CallErrorReporters
# @param [Exception] error The error to be reported.
def initialize(error)
@error = error
@reported = false
end

def call
SolidQueue.instrument(:thread_error, error: @error)
Rails.error.subscribe(self) if Rails.error&.respond_to?(:subscribe)

if SolidQueue.on_thread_error
SolidQueue.on_thread_error.call(error)
SolidQueue.on_thread_error&.call(@error)

Rails.error.report(@error, handled: false, source: SolidQueue.reporting_label) unless @reported
ensure
Rails.error.unsubscribe(self) if Rails.error&.respond_to?(:unsubscribe)
end

def report(*, **)
@reported = true
end
end
end
end
end
8 changes: 1 addition & 7 deletions lib/solid_queue/dispatcher/concurrency_maintenance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,10 @@ def initialize(interval, batch_size)
end

def start
@concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: interval) do
@concurrency_maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: interval) do
expire_semaphores
unblock_blocked_executions
end

@concurrency_maintenance_task.add_observer do |_, _, error|
handle_thread_error(error) if error
end

@concurrency_maintenance_task.execute
end

def stop
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Engine < ::Rails::Engine

initializer "solid_queue.app_executor", before: :run_prepare_callbacks do |app|
config.solid_queue.app_executor ||= app.executor
config.solid_queue.on_thread_error ||= ->(exception) { Rails.error.report(exception, handled: false) }
config.solid_queue.on_thread_error ||= ->(exception) { Rails.error.report(exception, handled: false, source: SolidQueue.reporting_label) }

SolidQueue.app_executor = config.solid_queue.app_executor
SolidQueue.on_thread_error = config.solid_queue.on_thread_error
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/log_subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def replace_fork(event)

private
def formatted_event(event, action:, **attributes)
"SolidQueue-#{SolidQueue::VERSION} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}"
"#{SolidQueue.reporting_label} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}"
end

def formatted_attributes(**attributes)
Expand Down
12 changes: 4 additions & 8 deletions lib/solid_queue/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,16 @@ def initialize(size, on_idle: nil)
def post(execution)
available_threads.decrement

future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution|
Concurrent::Promises.future_on(executor, execution) do |thread_execution|
wrap_in_app_executor do
thread_execution.perform
result = thread_execution.perform

handle_thread_error(result.reason) if result.rejected?
ensure
available_threads.increment
mutex.synchronize { on_idle.try(:call) if idle? }
end
end

future.add_observer do |_, _, error|
handle_thread_error(error) if error
end

future.execute
end

def idle_threads
Expand Down
8 changes: 1 addition & 7 deletions lib/solid_queue/processes/registrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,9 @@ def registered?
end

def launch_heartbeat
@heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do
@heartbeat_task = SolidQueue::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do
wrap_in_app_executor { heartbeat }
end

@heartbeat_task.add_observer do |_, _, error|
handle_thread_error(error) if error
end

@heartbeat_task.execute
end

def stop_heartbeat
Expand Down
9 changes: 2 additions & 7 deletions lib/solid_queue/supervisor/maintenance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,11 @@ module Supervisor::Maintenance
end

private

def launch_maintenance_task
@maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do
@maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do
prune_dead_processes
end

@maintenance_task.add_observer do |_, _, error|
handle_thread_error(error) if error
end

@maintenance_task.execute
end

def stop_maintenance_task
Expand Down
42 changes: 42 additions & 0 deletions lib/solid_queue/timer_task.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

module SolidQueue
class TimerTask
include AppExecutor

def initialize(execution_interval:, run_now: false, &block)
raise ArgumentError, "A block is required" unless block_given?
@shutdown = Concurrent::AtomicBoolean.new

run(run_now, execution_interval, &block)
end

def shutdown
@shutdown.make_true
end

private

def run(run_now, execution_interval, &block)
execute_task(&block) if run_now

Concurrent::Promises.future(execution_interval) do |interval|
repeating_task(interval, &block)
end.run
end

def execute_task(&block)
block.call unless @shutdown.true?
rescue Exception => e
handle_thread_error(e)
end

def repeating_task(interval, &block)
Concurrent::Promises.schedule(interval) do
execute_task(&block)
end.then do
repeating_task(interval, &block) unless @shutdown.true?
end
end
end
end
5 changes: 3 additions & 2 deletions test/integration/instrumentation_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ class InstrumentationTest < ActiveSupport::TestCase

test "thread errors emit thread_error events" do
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false

error = ExpectedTestError.new("everything is broken")
SolidQueue::ClaimedExecution::Result.expects(:new).raises(error).at_least_once

# Allows the job to process normally, but trigger the error path in ClaimedExecution.execute
Concurrent::Maybe.expects(:just).returns(Concurrent::Maybe.nothing(error))

AddToBufferJob.perform_later "hey!"

Expand Down
12 changes: 4 additions & 8 deletions test/models/solid_queue/claimed_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
job = claimed_execution.job

assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do
assert_raises RuntimeError do
claimed_execution.perform
end
claimed_execution.perform
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added in #373, and it's not clear to me that the new behaviour is correct 🤔 If the error reporting library doesn't rely on Rails error reporting, I think this might not work correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rosa

I'm not sure I fully understand your issue / question or possibly the impact of #373.

Is your issue with the actual error handling and possibly missed / lost errors, this test is not valid with my changes and hence I'm no longer testing an important behavior, or what happens if a SQ user is depending on something other than Rails.error for their system wide Error manage mechanism?

hms

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rosa

I've done some digging into ActiveSupport::ExecutorWrapper and SolidQueue initialization and I think I understand your issue/concern.

When an exception is raised and not rescued within a ExecutionWrapper.wrap block, it's caught and reported to the ActiveSupport.error_reporter. Under normal circumstances, ActiveSupport.error_reporter and Rails.error_reporter are one and the same. So, the code in the PR is fine.... under normal circumstances.

However, if directly or indirectly, the app_executor has been overridden and the overridden app_executor handles exception reporting without calling Rails.error_reporter, then the current implementation of the PR is effectively swallowing the exception.

If this summary of your concern is correct, I'll see what I have to do to address it.

end

assert_not job.reload.finished?
Expand All @@ -39,12 +37,10 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
test "job failures are reported via Rails error subscriber" do
subscriber = ErrorBuffer.new

assert_raises RuntimeError do
with_error_subscriber(subscriber) do
claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B")
with_error_subscriber(subscriber) do
claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B")

claimed_execution.perform
end
claimed_execution.perform
end

assert_equal 1, subscriber.errors.count
Expand Down
79 changes: 79 additions & 0 deletions test/unit/timer_task_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# frozen_string_literal: true

require "test_helper"
require "mocha/minitest"

class TimerTaskTest < ActiveSupport::TestCase
test "initialization requires a block" do
assert_raises(ArgumentError) do
SolidQueue::TimerTask.new(execution_interval: 1)
end
end

test "task runs immediate when run now true" do
executed = false

task = SolidQueue::TimerTask.new(run_now: true, execution_interval: 1) do
executed = true
end

sleep 0.1

assert executed, "Task should have executed immediately"
task.shutdown
end

test "task does not run immediately when run with run_now false" do
executed = false

task = SolidQueue::TimerTask.new(run_now: false, execution_interval: 1) do
executed = true
end

sleep 0.1

assert_not executed, "Task should have executed immediately"
task.shutdown
end

test "task repeats" do
executions = 0

task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) do
executions += 1
end

sleep(0.5) # Wait to accumulate some executions

assert executions > 3, "The block should be executed repeatedly"

task.shutdown
end

test "task stops on shutdown" do
executions = 0

task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) { executions += 1 }

sleep(0.3) # Let the task run a few times

task.shutdown

current_executions = executions

sleep(0.5) # Ensure no more executions after shutdown

assert_equal current_executions, executions, "The task should stop executing after shutdown"
end

test "calls handle_thread_error if task raises" do
task = SolidQueue::TimerTask.new(execution_interval: 0.1) do
raise ExpectedTestError.new
end
task.expects(:handle_thread_error).with(instance_of(ExpectedTestError))

sleep(0.2) # Give some time for the task to run and handle the error

task.shutdown
end
end
2 changes: 1 addition & 1 deletion test/unit/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class WorkerTest < ActiveSupport::TestCase
subscriber = ErrorBuffer.new
Rails.error.subscribe(subscriber)

SolidQueue::ClaimedExecution::Result.expects(:new).raises(ExpectedTestError.new("everything is broken")).at_least_once
Concurrent::Maybe.expects(:just).returns(Concurrent::Maybe.nothing(ExpectedTestError.new("everything is broken")))

AddToBufferJob.perform_later "hey!"

Expand Down
Loading