Skip to content

Commit ebd9629

Browse files
committed
Feature: Dynamic scheduled tasks
1 parent 0ee21e4 commit ebd9629

File tree

7 files changed

+152
-19
lines changed

7 files changed

+152
-19
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,8 @@ Rails.application.config.after_initialize do # or to_prepare
607607
end
608608
```
609609

610+
You can also dynamically add or remove recurring tasks by creating or deleting SolidQueue::RecurringTask records. It works the same way as with static tasks, except you must set the static field to false. Changes won’t be picked up immediately — they take effect after about a one-minute delay.
611+
610612
It's possible to run multiple schedulers with the same `recurring_tasks` configuration, for example, if you have multiple servers for redundancy, and you run the `scheduler` in more than one of them. To avoid enqueuing duplicate tasks at the same time, an entry in a new `solid_queue_recurring_executions` table is created in the same transaction as the job is enqueued. This table has a unique index on `task_key` and `run_at`, ensuring only one entry per task per time will be created. This only works if you have `preserve_finished_jobs` set to `true` (the default), and the guarantee applies as long as you keep the jobs around.
611613

612614
**Note**: a single recurring schedule is supported, so you can have multiple schedulers using the same schedule, but not multiple schedulers using different configurations.

app/models/solid_queue/recurring_task.rb

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class RecurringTask < Record
1111
validate :existing_job_class
1212

1313
scope :static, -> { where(static: true) }
14+
scope :dynamic, -> { where(static: false) }
1415

1516
has_many :recurring_executions, foreign_key: :task_key, primary_key: :key
1617

lib/solid_queue/configuration.rb

+5-5
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def default_options
9393
end
9494

9595
def invalid_tasks
96-
recurring_tasks.select(&:invalid?)
96+
static_recurring_tasks.select(&:invalid?)
9797
end
9898

9999
def only_work?
@@ -122,8 +122,8 @@ def dispatchers
122122
end
123123

124124
def schedulers
125-
if !skip_recurring_tasks? && recurring_tasks.any?
126-
[ Process.new(:scheduler, recurring_tasks: recurring_tasks) ]
125+
if !skip_recurring_tasks?
126+
[ Process.new(:scheduler, recurring_tasks: static_recurring_tasks) ]
127127
else
128128
[]
129129
end
@@ -139,8 +139,8 @@ def dispatchers_options
139139
.map { |options| options.dup.symbolize_keys }
140140
end
141141

142-
def recurring_tasks
143-
@recurring_tasks ||= recurring_tasks_config.map do |id, options|
142+
def static_recurring_tasks
143+
@static_recurring_tasks ||= recurring_tasks_config.map do |id, options|
144144
RecurringTask.from_configuration(id, **options) if options&.has_key?(:schedule)
145145
end.compact
146146
end

lib/solid_queue/scheduler.rb

+6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ def run
3030
loop do
3131
break if shutting_down?
3232

33+
recurring_schedule.update_scheduled_tasks.tap do |updated_tasks|
34+
if updated_tasks.any?
35+
process.update_columns(metadata: metadata.compact)
36+
end
37+
end
38+
3339
interruptible_sleep(SLEEP_INTERVAL)
3440
end
3541
ensure

lib/solid_queue/scheduler/recurring_schedule.rb

+36-10
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ module SolidQueue
44
class Scheduler::RecurringSchedule
55
include AppExecutor
66

7-
attr_reader :configured_tasks, :scheduled_tasks
7+
attr_reader :static_tasks, :configured_tasks, :scheduled_tasks
88

99
def initialize(tasks)
10-
@configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?)
10+
@static_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?)
11+
@configured_tasks = @static_tasks + dynamic_tasks
1112
@scheduled_tasks = Concurrent::Hash.new
1213
end
1314

@@ -17,15 +18,36 @@ def empty?
1718

1819
def schedule_tasks
1920
wrap_in_app_executor do
20-
persist_tasks
21-
reload_tasks
21+
persist_static_tasks
22+
reload_static_tasks
2223
end
2324

2425
configured_tasks.each do |task|
2526
schedule_task(task)
2627
end
2728
end
2829

30+
def dynamic_tasks
31+
SolidQueue::RecurringTask.dynamic
32+
end
33+
34+
def schedule_new_dynamic_tasks
35+
dynamic_tasks.where.not(key: scheduled_tasks.keys).each do |task|
36+
schedule_task(task)
37+
end
38+
end
39+
40+
def unschedule_old_dynamic_tasks
41+
(scheduled_tasks.keys - SolidQueue::RecurringTask.pluck(:key)).each do |key|
42+
scheduled_tasks[key].cancel
43+
scheduled_tasks.delete(key)
44+
end
45+
end
46+
47+
def update_scheduled_tasks
48+
schedule_new_dynamic_tasks + unschedule_old_dynamic_tasks
49+
end
50+
2951
def schedule_task(task)
3052
scheduled_tasks[task.key] = schedule(task)
3153
end
@@ -35,18 +57,22 @@ def unschedule_tasks
3557
scheduled_tasks.clear
3658
end
3759

60+
def static_task_keys
61+
static_tasks.map(&:key)
62+
end
63+
3864
def task_keys
39-
configured_tasks.map(&:key)
65+
static_task_keys + dynamic_tasks.map(&:key)
4066
end
4167

4268
private
43-
def persist_tasks
44-
SolidQueue::RecurringTask.static.where.not(key: task_keys).delete_all
45-
SolidQueue::RecurringTask.create_or_update_all configured_tasks
69+
def persist_static_tasks
70+
SolidQueue::RecurringTask.static.where.not(key: static_task_keys).delete_all
71+
SolidQueue::RecurringTask.create_or_update_all static_tasks
4672
end
4773

48-
def reload_tasks
49-
@configured_tasks = SolidQueue::RecurringTask.where(key: task_keys)
74+
def reload_static_tasks
75+
@static_tasks = SolidQueue::RecurringTask.static.where(key: static_task_keys)
5076
end
5177

5278
def schedule(task)

test/unit/configuration_test.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class ConfigurationTest < ActiveSupport::TestCase
2121
test "default configuration when config given is empty" do
2222
configuration = SolidQueue::Configuration.new(config_file: config_file_path(:empty_configuration), recurring_schedule_file: config_file_path(:empty_configuration))
2323

24-
assert_equal 2, configuration.configured_processes.count
24+
assert_equal 3, configuration.configured_processes.count # includes scheduler for dynamic tasks
2525
assert_processes configuration, :worker, 1, queues: "*"
2626
assert_processes configuration, :dispatcher, 1, batch_size: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size]
2727
end
@@ -101,11 +101,11 @@ class ConfigurationTest < ActiveSupport::TestCase
101101

102102
configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_production_only))
103103
assert configuration.valid?
104-
assert_processes configuration, :scheduler, 0
104+
assert_processes configuration, :scheduler, 1 # Starts in case of dynamic tasks
105105

106106
configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_empty))
107107
assert configuration.valid?
108-
assert_processes configuration, :scheduler, 0
108+
assert_processes configuration, :scheduler, 1 # Starts in case of dynamic tasks
109109

110110
# No processes
111111
configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [])

test/unit/scheduler_test.rb

+99-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
class SchedulerTest < ActiveSupport::TestCase
44
self.use_transactional_tests = false
55

6-
test "recurring schedule" do
6+
test "recurring schedule (only static)" do
77
recurring_tasks = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: 42 } }
88
scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_tasks).tap(&:start)
99

@@ -17,6 +17,41 @@ class SchedulerTest < ActiveSupport::TestCase
1717
scheduler.stop
1818
end
1919

20+
test "recurring schedule (only dynamic)" do
21+
SolidQueue::RecurringTask.create(
22+
key: "dynamic_task", static: false, class_name: "AddToBufferJob", schedule: "every second", arguments: [ 42 ]
23+
)
24+
scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}).tap(&:start)
25+
26+
wait_for_registered_processes(1, timeout: 1.second)
27+
28+
process = SolidQueue::Process.first
29+
assert_equal "Scheduler", process.kind
30+
31+
assert_metadata process, recurring_schedule: [ "dynamic_task" ]
32+
ensure
33+
scheduler.stop
34+
end
35+
36+
test "recurring schedule (static + dynamic)" do
37+
SolidQueue::RecurringTask.create(
38+
key: "dynamic_task", static: false, class_name: "AddToBufferJob", schedule: "every second", arguments: [ 42 ]
39+
)
40+
41+
recurring_tasks = { static_task: { class: "AddToBufferJob", schedule: "every hour", args: 42 } }
42+
43+
scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_tasks).tap(&:start)
44+
45+
wait_for_registered_processes(1, timeout: 1.second)
46+
47+
process = SolidQueue::Process.first
48+
assert_equal "Scheduler", process.kind
49+
50+
assert_metadata process, recurring_schedule: [ "static_task", "dynamic_task" ]
51+
ensure
52+
scheduler.stop
53+
end
54+
2055
test "run more than one instance of the scheduler with recurring tasks" do
2156
recurring_tasks = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } }
2257
schedulers = 2.times.collect do
@@ -33,4 +68,67 @@ class SchedulerTest < ActiveSupport::TestCase
3368
assert_equal 1, run_at_times[i + 1] - run_at_times[i]
3469
end
3570
end
71+
72+
test "updates metadata after adding dynamic task post-start" do
73+
scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}).tap do |s|
74+
s.define_singleton_method(:interruptible_sleep) { |interval| sleep 0.1 }
75+
s.start
76+
end
77+
78+
wait_for_registered_processes(1, timeout: 1.second)
79+
80+
process = SolidQueue::Process.first
81+
# initially there are no recurring_schedule keys
82+
assert process.metadata, {}
83+
84+
# now create a dynamic task after the scheduler has booted
85+
SolidQueue::RecurringTask.create(
86+
key: "new_dynamic_task",
87+
static: false,
88+
class_name: "AddToBufferJob",
89+
schedule: "every second",
90+
arguments: [ 42 ]
91+
)
92+
93+
sleep 1
94+
95+
process.reload
96+
97+
# metadata should now include the new key
98+
assert_metadata process, recurring_schedule: [ "new_dynamic_task" ]
99+
ensure
100+
scheduler&.stop
101+
end
102+
103+
test "updates metadata after removing dynamic task post-start" do
104+
old_dynamic_task = SolidQueue::RecurringTask.create(
105+
key: "old_dynamic_task",
106+
static: false,
107+
class_name: "AddToBufferJob",
108+
schedule: "every second",
109+
arguments: [ 42 ]
110+
)
111+
112+
scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}).tap do |s|
113+
s.define_singleton_method(:interruptible_sleep) { |interval| sleep 0.1 }
114+
s.start
115+
end
116+
117+
wait_for_registered_processes(1, timeout: 1.second)
118+
119+
process = SolidQueue::Process.first
120+
# initially there is one recurring_schedule key
121+
assert_metadata process, recurring_schedule: [ "old_dynamic_task" ]
122+
123+
old_dynamic_task.destroy
124+
125+
sleep 1
126+
127+
process.reload
128+
129+
# The task is unschedule after it's being removed, and it's reflected in the metadata
130+
assert process.metadata, {}
131+
ensure
132+
scheduler&.stop
133+
end
36134
end

0 commit comments

Comments
 (0)