Skip to content

Commit 4f89ebb

Browse files
committed
Move some more manual logging lines to Active Support notifications
And set log subscriber's logger to be Solid Queue's logger. Also: refactor a bit how (OS) process attributes are exposed so they can be logged more easily.
1 parent e0e0574 commit 4f89ebb

File tree

11 files changed

+106
-53
lines changed

11 files changed

+106
-53
lines changed

lib/solid_queue/dispatcher.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ def initialize(**options)
1919
@recurring_schedule = RecurringSchedule.new(options[:recurring_tasks])
2020
end
2121

22+
def metadata
23+
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence)
24+
end
25+
2226
private
2327
def poll
2428
batch = dispatch_next_batch
@@ -50,9 +54,5 @@ def unload_recurring_schedule
5054
def set_procline
5155
procline "waiting"
5256
end
53-
54-
def metadata
55-
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence)
56-
end
5757
end
5858
end

lib/solid_queue/engine.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ class Engine < ::Rails::Engine
2626

2727
initializer "solid_queue.logger" do |app|
2828
ActiveSupport.on_load(:solid_queue) do
29-
self.logger = app.logger
30-
SolidQueue::LogSubscriber.logger = self.logger
29+
self.logger ||= app.logger
3130
end
3231

3332
SolidQueue::LogSubscriber.attach_to :solid_queue

lib/solid_queue/log_subscriber.rb

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,25 @@ def enqueue_recurring_task(event)
5050
end
5151
end
5252

53+
def start_process(event)
54+
process = event.payload[:process]
55+
56+
attributes = {
57+
pid: process.pid,
58+
hostname: process.hostname
59+
}.merge(process.metadata)
60+
61+
info formatted_event(event, action: "Started #{process.kind}", **attributes)
62+
end
63+
5364
def register_process(event)
54-
attributes = event.payload.slice(:kind, :pid, :hostname)
65+
process_kind = event.payload[:kind]
66+
attributes = event.payload.slice(:pid, :hostname)
5567

5668
if error = event.payload[:error]
57-
warn formatted_event(event, action: "Error registering process", **attributes.merge(error: formatted_error(error)))
69+
warn formatted_event(event, action: "Error registering #{process_kind}", **attributes.merge(error: formatted_error(error)))
5870
else
59-
info formatted_event(event, action: "Register process", **attributes)
71+
info formatted_event(event, action: "Register #{process_kind}", **attributes)
6072
end
6173
end
6274

@@ -66,24 +78,41 @@ def deregister_process(event)
6678
attributes = {
6779
process_id: process.id,
6880
pid: process.pid,
69-
kind: process.kind,
7081
hostname: process.hostname,
7182
last_heartbeat_at: process.last_heartbeat_at,
7283
claimed_size: process.claimed_executions.size,
7384
pruned: event.payload
7485
}
7586

7687
if error = event.payload[:error]
77-
warn formatted_event(event, action: "Error deregistering process", **attributes.merge(formatted_error(error)))
88+
warn formatted_event(event, action: "Error deregistering #{process.kind}", **attributes.merge(error: formatted_error(error)))
7889
else
79-
info formatted_event(event, action: "Deregister process", **attributes)
90+
info formatted_event(event, action: "Deregister #{process.kind}", **attributes)
8091
end
8192
end
8293

8394
def prune_processes(event)
8495
debug formatted_event(event, action: "Prune dead processes", **event.payload.slice(:size))
8596
end
8697

98+
def thread_error(event)
99+
error formatted_event(event, action: "Error in thread", error: formatted_error(event.payload[:error]))
100+
end
101+
102+
def graceful_termination(event)
103+
attributes = event.payload.slice(:supervisor_pid, :supervised_pids)
104+
105+
if event.payload[:shutdown_timeout_exceeded]
106+
warn formatted_event(event, action: "Supervisor wasn't terminated gracefully - shutdown timeout exceeded", **attributes)
107+
else
108+
formatted_event(event, action: "Supervisor terminated gracefully", **attributes)
109+
end
110+
end
111+
112+
def immediate_termination(event)
113+
info formatted_event(event, action: "Supervisor terminated immediately", **event.payload.slice(:supervisor_pid, :supervised_pids))
114+
end
115+
87116
private
88117
def formatted_event(event, action:, **attributes)
89118
"SolidQueue-#{SolidQueue::VERSION} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}"
@@ -96,4 +125,9 @@ def formatted_attributes(**attributes)
96125
def formatted_error(error)
97126
[ error.class, error.message ].compact.join(" ")
98127
end
128+
129+
# Use the logger configured for SolidQueue
130+
def logger
131+
SolidQueue.logger
132+
end
99133
end

lib/solid_queue/processes/base.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,22 @@ module Processes
55
class Base
66
include Callbacks # Defines callbacks needed by other concerns
77
include AppExecutor, Registrable, Interruptible, Procline
8+
9+
def kind
10+
self.class.name.demodulize
11+
end
12+
13+
def hostname
14+
@hostname ||= Socket.gethostname.force_encoding(Encoding::UTF_8)
15+
end
16+
17+
def pid
18+
@pid ||= ::Process.pid
19+
end
20+
21+
def metadata
22+
{}
23+
end
824
end
925
end
1026
end

lib/solid_queue/processes/poller.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ module Poller
1010
attr_accessor :polling_interval
1111
end
1212

13+
def metadata
14+
super.merge(polling_interval: polling_interval)
15+
end
16+
1317
private
1418
def run
1519
if mode.async?
@@ -44,9 +48,5 @@ def with_polling_volume
4448
yield
4549
end
4650
end
47-
48-
def metadata
49-
super.merge(polling_interval: polling_interval)
50-
end
5151
end
5252
end

lib/solid_queue/processes/registrable.rb

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,13 @@ module Registrable
1111
after_shutdown :deregister
1212
end
1313

14-
def inspect
15-
"#{kind}(pid=#{process_pid}, hostname=#{hostname}, metadata=#{metadata})"
16-
end
17-
alias to_s inspect
18-
1914
private
2015
attr_accessor :process
2116

2217
def register
2318
@process = SolidQueue::Process.register \
2419
kind: self.class.name.demodulize,
25-
pid: process_pid,
20+
pid: pid,
2621
hostname: hostname,
2722
supervisor: try(:supervisor),
2823
metadata: metadata.compact
@@ -48,21 +43,5 @@ def stop_heartbeat
4843
def heartbeat
4944
process.heartbeat
5045
end
51-
52-
def kind
53-
self.class.name.demodulize
54-
end
55-
56-
def hostname
57-
@hostname ||= Socket.gethostname.force_encoding(Encoding::UTF_8)
58-
end
59-
60-
def process_pid
61-
@pid ||= ::Process.pid
62-
end
63-
64-
def metadata
65-
{}
66-
end
6746
end
6847
end

lib/solid_queue/processes/runnable.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ module Runnable
88

99
def start
1010
@stopping = false
11-
run_callbacks(:boot) { boot }
11+
12+
SolidQueue.instrument(:start_process, process: self) do
13+
run_callbacks(:boot) { boot }
14+
end
1215

1316
run
1417
end
@@ -30,8 +33,6 @@ def boot
3033
register_signal_handlers
3134
set_procline
3235
end
33-
34-
SolidQueue.logger.info("[SolidQueue] Starting #{self}")
3536
end
3637

3738
def shutting_down?

lib/solid_queue/supervisor.rb

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,19 +79,24 @@ def shutdown
7979
end
8080

8181
def graceful_termination
82-
SolidQueue.logger.info("[SolidQueue] Terminating gracefully...")
83-
term_forks
82+
SolidQueue.instrument(:graceful_termination, supervisor_pid: ::Process.pid, supervised_pids: forks.keys) do |payload|
83+
term_forks
8484

85-
wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do
86-
reap_terminated_forks
87-
end
85+
wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do
86+
reap_terminated_forks
87+
end
8888

89-
immediate_termination unless all_forks_terminated?
89+
unless all_forks_terminated?
90+
payload[:shutdown_timeout_exceeded] = true
91+
immediate_termination
92+
end
93+
end
9094
end
9195

9296
def immediate_termination
93-
SolidQueue.logger.info("[SolidQueue] Terminating immediately...")
94-
quit_forks
97+
SolidQueue.instrument(:immediate_termination, supervisor_pid: ::Process.pid, supervised_pids: forks.keys) do
98+
quit_forks
99+
end
95100
end
96101

97102
def term_forks

lib/solid_queue/worker.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ def initialize(**options)
1414
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
1515
end
1616

17+
def metadata
18+
super.merge(queues: queues.join(","), thread_pool_size: pool.size)
19+
end
20+
1721
private
1822
def poll
1923
claim_executions.then do |executions|
@@ -45,9 +49,5 @@ def all_work_completed?
4549
def set_procline
4650
procline "waiting for jobs in #{queues.join(",")}"
4751
end
48-
49-
def metadata
50-
super.merge(queues: queues.join(","), thread_pool_size: pool.size)
51-
end
5252
end
5353
end

test/integration/instrumentation_test.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,21 @@ class InstrumentationTest < ActiveSupport::TestCase
3838
assert_event release_many_event, "release_many_claimed", size: 1
3939
end
4040

41+
test "starting a runnable process emits a start_process event" do
42+
worker = SolidQueue::Worker.new
43+
44+
events = subscribed("start_process.solid_queue") do
45+
worker.start
46+
wait_for_registered_processes(1, timeout: 1.second)
47+
48+
worker.stop
49+
wait_for_registered_processes(0, timeout: 1.second)
50+
end
51+
52+
assert_equal 1, events.size
53+
assert_event events.first, "start_process", process: worker
54+
end
55+
4156
test "starting and stopping a worker emits register_process and deregister_process events" do
4257
StoreResultJob.perform_later(42, pause: SolidQueue.shutdown_timeout + 10.second)
4358
process = nil

test/unit/log_subscriber_test.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ class LogSubscriberTest < ActiveSupport::TestCase
88

99
teardown { ActiveSupport::LogSubscriber.log_subscribers.clear }
1010

11+
def set_logger(logger)
12+
SolidQueue.logger = logger
13+
end
14+
1115
test "unblock one job" do
1216
attach_log_subscriber
1317
instrument "release_blocked.solid_queue", job_id: 42, concurrency_key: "foo/1", released: true

0 commit comments

Comments
 (0)