-
Notifications
You must be signed in to change notification settings - Fork 168
Implements TimerTasks using Promises. #425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# frozen_string_literal: true | ||
|
||
module SolidQueue | ||
class TimerTask | ||
include AppExecutor | ||
|
||
def initialize(execution_interval:, run_now: false, &block) | ||
raise ArgumentError, "A block is required" unless block_given? | ||
@shutdown = Concurrent::AtomicBoolean.new | ||
|
||
run(run_now, execution_interval, &block) | ||
end | ||
|
||
def shutdown | ||
@shutdown.make_true | ||
end | ||
|
||
private | ||
|
||
def run(run_now, execution_interval, &block) | ||
execute_task(&block) if run_now | ||
|
||
Concurrent::Promises.future(execution_interval) do |interval| | ||
repeating_task(interval, &block) | ||
end.run | ||
end | ||
|
||
def execute_task(&block) | ||
block.call unless @shutdown.true? | ||
rescue Exception => e | ||
handle_thread_error(e) | ||
end | ||
|
||
def repeating_task(interval, &block) | ||
Concurrent::Promises.schedule(interval) do | ||
execute_task(&block) | ||
end.then do | ||
repeating_task(interval, &block) unless @shutdown.true? | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,9 +22,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase | |
job = claimed_execution.job | ||
|
||
assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do | ||
assert_raises RuntimeError do | ||
claimed_execution.perform | ||
end | ||
claimed_execution.perform | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was added in #373, and it's not clear to me that the new behaviour is correct 🤔 If the error reporting library doesn't rely on Rails error reporting, I think this might not work correctly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I fully understand your issue / question or possibly the impact of #373. Is your issue with the actual error handling and possibly missed / lost errors, this test is not valid with my changes and hence I'm no longer testing an important behavior, or what happens if a SQ user is depending on something other than Rails.error for their system wide Error manage mechanism? hms There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've done some digging into ActiveSupport::ExecutorWrapper and SolidQueue initialization and I think I understand your issue/concern. When an exception is raised and not rescued within a ExecutionWrapper.wrap block, it's caught and reported to the ActiveSupport.error_reporter. Under normal circumstances, ActiveSupport.error_reporter and Rails.error_reporter are one and the same. So, the code in the PR is fine.... under normal circumstances. However, if directly or indirectly, the app_executor has been overridden and the overridden app_executor handles exception reporting without calling Rails.error_reporter, then the current implementation of the PR is effectively swallowing the exception. If this summary of your concern is correct, I'll see what I have to do to address it. |
||
end | ||
|
||
assert_not job.reload.finished? | ||
|
@@ -39,12 +37,10 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase | |
test "job failures are reported via Rails error subscriber" do | ||
subscriber = ErrorBuffer.new | ||
|
||
assert_raises RuntimeError do | ||
with_error_subscriber(subscriber) do | ||
claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B") | ||
with_error_subscriber(subscriber) do | ||
claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B") | ||
|
||
claimed_execution.perform | ||
end | ||
claimed_execution.perform | ||
end | ||
|
||
assert_equal 1, subscriber.errors.count | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
# frozen_string_literal: true | ||
|
||
require "test_helper" | ||
require "mocha/minitest" | ||
|
||
class TimerTaskTest < ActiveSupport::TestCase | ||
test "initialization requires a block" do | ||
assert_raises(ArgumentError) do | ||
SolidQueue::TimerTask.new(execution_interval: 1) | ||
end | ||
end | ||
|
||
test "task runs immediate when run now true" do | ||
executed = false | ||
|
||
task = SolidQueue::TimerTask.new(run_now: true, execution_interval: 1) do | ||
executed = true | ||
end | ||
|
||
sleep 0.1 | ||
|
||
assert executed, "Task should have executed immediately" | ||
task.shutdown | ||
end | ||
|
||
test "task does not run immediately when run with run_now false" do | ||
executed = false | ||
|
||
task = SolidQueue::TimerTask.new(run_now: false, execution_interval: 1) do | ||
executed = true | ||
end | ||
|
||
sleep 0.1 | ||
|
||
assert_not executed, "Task should have executed immediately" | ||
task.shutdown | ||
end | ||
|
||
test "task repeats" do | ||
executions = 0 | ||
|
||
task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) do | ||
executions += 1 | ||
end | ||
|
||
sleep(0.5) # Wait to accumulate some executions | ||
|
||
assert executions > 3, "The block should be executed repeatedly" | ||
|
||
task.shutdown | ||
end | ||
|
||
test "task stops on shutdown" do | ||
executions = 0 | ||
|
||
task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) { executions += 1 } | ||
|
||
sleep(0.3) # Let the task run a few times | ||
|
||
task.shutdown | ||
|
||
current_executions = executions | ||
|
||
sleep(0.5) # Ensure no more executions after shutdown | ||
|
||
assert_equal current_executions, executions, "The task should stop executing after shutdown" | ||
end | ||
|
||
test "calls handle_thread_error if task raises" do | ||
task = SolidQueue::TimerTask.new(execution_interval: 0.1) do | ||
raise ExpectedTestError.new | ||
end | ||
task.expects(:handle_thread_error).with(instance_of(ExpectedTestError)) | ||
|
||
sleep(0.2) # Give some time for the task to run and handle the error | ||
|
||
task.shutdown | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the advantage of using
Concurrent::Maybe
here vs. a regular struct?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Warning: Rationalization alert.
I dislike Struct. I would have reached for Data, but when I started this PR, SQ was on Ruby 3.3.1.
The Result struct was nothing more than a simple Maybe. It was also only used in 1 place, for 1
if
and made me question if it was really needed at all (returning [success, exception] is pretty much the same thing minus the success method). But with the change from Futures to Promises::Futures, I also pushed the error reporting out of the execute method back to the Future. With some of the logic moving back to the Future, why not keep the logic all within the Concurrent library?If all of this is bothersome, you're not buying the rationalization for the change, of you simply feel that this doesn't make SQ better I'm happy to refactor the Maybe out and return the Result object (but I would recommend using Data over Struct).
One question on Result: Given it's only used in one place and in a sequence of a return value, immediately in an IF, and then GC'ed, is there a reason for the (very little) extra code Result Vs. an alternative implementation?