Skip to content

Rely on ActiveSupport::Notifications and ActiveSupport::LogSubscriber for logging and instrumentation #208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 8, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move some more manual logging lines to Active Support notifications
And set log subscriber's logger to be Solid Queue's logger.

Also: refactor a bit how (OS) process attributes are exposed so they can
be logged more easily.
  • Loading branch information
rosa committed Apr 30, 2024
commit 7095a8b3483149ccee7c1c20260362c84dfc9391
8 changes: 4 additions & 4 deletions lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
@@ -19,6 +19,10 @@ def initialize(**options)
@recurring_schedule = RecurringSchedule.new(options[:recurring_tasks])
end

def metadata
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence)
end

private
def poll
batch = dispatch_next_batch
@@ -50,9 +54,5 @@ def unload_recurring_schedule
def set_procline
procline "waiting"
end

def metadata
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence)
end
end
end
3 changes: 1 addition & 2 deletions lib/solid_queue/engine.rb
Original file line number Diff line number Diff line change
@@ -26,8 +26,7 @@ class Engine < ::Rails::Engine

initializer "solid_queue.logger" do |app|
ActiveSupport.on_load(:solid_queue) do
self.logger = app.logger
SolidQueue::LogSubscriber.logger = self.logger
self.logger ||= app.logger
end

SolidQueue::LogSubscriber.attach_to :solid_queue
46 changes: 40 additions & 6 deletions lib/solid_queue/log_subscriber.rb
Original file line number Diff line number Diff line change
@@ -50,13 +50,25 @@ def enqueue_recurring_task(event)
end
end

def start_process(event)
process = event.payload[:process]

attributes = {
pid: process.pid,
hostname: process.hostname
}.merge(process.metadata)

info formatted_event(event, action: "Started #{process.kind}", **attributes)
end

def register_process(event)
attributes = event.payload.slice(:kind, :pid, :hostname)
process_kind = event.payload[:kind]
attributes = event.payload.slice(:pid, :hostname)

if error = event.payload[:error]
warn formatted_event(event, action: "Error registering process", **attributes.merge(error: formatted_error(error)))
warn formatted_event(event, action: "Error registering #{process_kind}", **attributes.merge(error: formatted_error(error)))
else
info formatted_event(event, action: "Register process", **attributes)
info formatted_event(event, action: "Register #{process_kind}", **attributes)
end
end

@@ -66,24 +78,41 @@ def deregister_process(event)
attributes = {
process_id: process.id,
pid: process.pid,
kind: process.kind,
hostname: process.hostname,
last_heartbeat_at: process.last_heartbeat_at,
claimed_size: process.claimed_executions.size,
pruned: event.payload
}

if error = event.payload[:error]
warn formatted_event(event, action: "Error deregistering process", **attributes.merge(formatted_error(error)))
warn formatted_event(event, action: "Error deregistering #{process.kind}", **attributes.merge(error: formatted_error(error)))
else
info formatted_event(event, action: "Deregister process", **attributes)
info formatted_event(event, action: "Deregister #{process.kind}", **attributes)
end
end

def prune_processes(event)
debug formatted_event(event, action: "Prune dead processes", **event.payload.slice(:size))
end

def thread_error(event)
error formatted_event(event, action: "Error in thread", error: formatted_error(event.payload[:error]))
end

def graceful_termination(event)
attributes = event.payload.slice(:supervisor_pid, :supervised_pids)

if event.payload[:shutdown_timeout_exceeded]
warn formatted_event(event, action: "Supervisor wasn't terminated gracefully - shutdown timeout exceeded", **attributes)
else
formatted_event(event, action: "Supervisor terminated gracefully", **attributes)
end
end

def immediate_termination(event)
info formatted_event(event, action: "Supervisor terminated immediately", **event.payload.slice(:supervisor_pid, :supervised_pids))
end

private
def formatted_event(event, action:, **attributes)
"SolidQueue-#{SolidQueue::VERSION} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}"
@@ -96,4 +125,9 @@ def formatted_attributes(**attributes)
def formatted_error(error)
[ error.class, error.message ].compact.join(" ")
end

# Use the logger configured for SolidQueue
def logger
SolidQueue.logger
end
end
16 changes: 16 additions & 0 deletions lib/solid_queue/processes/base.rb
Original file line number Diff line number Diff line change
@@ -5,6 +5,22 @@ module Processes
class Base
include Callbacks # Defines callbacks needed by other concerns
include AppExecutor, Registrable, Interruptible, Procline

def kind
self.class.name.demodulize
end

def hostname
@hostname ||= Socket.gethostname.force_encoding(Encoding::UTF_8)
end

def pid
@pid ||= ::Process.pid
end

def metadata
{}
end
end
end
end
8 changes: 4 additions & 4 deletions lib/solid_queue/processes/poller.rb
Original file line number Diff line number Diff line change
@@ -10,6 +10,10 @@ module Poller
attr_accessor :polling_interval
end

def metadata
super.merge(polling_interval: polling_interval)
end

private
def run
if mode.async?
@@ -44,9 +48,5 @@ def with_polling_volume
yield
end
end

def metadata
super.merge(polling_interval: polling_interval)
end
end
end
23 changes: 1 addition & 22 deletions lib/solid_queue/processes/registrable.rb
Original file line number Diff line number Diff line change
@@ -11,18 +11,13 @@ module Registrable
after_shutdown :deregister
end

def inspect
"#{kind}(pid=#{process_pid}, hostname=#{hostname}, metadata=#{metadata})"
end
alias to_s inspect

private
attr_accessor :process

def register
@process = SolidQueue::Process.register \
kind: self.class.name.demodulize,
pid: process_pid,
pid: pid,
hostname: hostname,
supervisor: try(:supervisor),
metadata: metadata.compact
@@ -48,21 +43,5 @@ def stop_heartbeat
def heartbeat
process.heartbeat
end

def kind
self.class.name.demodulize
end

def hostname
@hostname ||= Socket.gethostname.force_encoding(Encoding::UTF_8)
end

def process_pid
@pid ||= ::Process.pid
end

def metadata
{}
end
end
end
7 changes: 4 additions & 3 deletions lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
@@ -8,7 +8,10 @@ module Runnable

def start
@stopping = false
run_callbacks(:boot) { boot }

SolidQueue.instrument(:start_process, process: self) do
run_callbacks(:boot) { boot }
end

run
end
@@ -30,8 +33,6 @@ def boot
register_signal_handlers
set_procline
end

SolidQueue.logger.info("[SolidQueue] Starting #{self}")
end

def shutting_down?
21 changes: 13 additions & 8 deletions lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
@@ -79,19 +79,24 @@ def shutdown
end

def graceful_termination
SolidQueue.logger.info("[SolidQueue] Terminating gracefully...")
term_forks
SolidQueue.instrument(:graceful_termination, supervisor_pid: ::Process.pid, supervised_pids: forks.keys) do |payload|
term_forks

wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do
reap_terminated_forks
end
wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do
reap_terminated_forks
end

immediate_termination unless all_forks_terminated?
unless all_forks_terminated?
payload[:shutdown_timeout_exceeded] = true
immediate_termination
end
end
end

def immediate_termination
SolidQueue.logger.info("[SolidQueue] Terminating immediately...")
quit_forks
SolidQueue.instrument(:immediate_termination, supervisor_pid: ::Process.pid, supervised_pids: forks.keys) do
quit_forks
end
end

def term_forks
8 changes: 4 additions & 4 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
@@ -14,6 +14,10 @@ def initialize(**options)
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
end

def metadata
super.merge(queues: queues.join(","), thread_pool_size: pool.size)
end

private
def poll
claim_executions.then do |executions|
@@ -45,9 +49,5 @@ def all_work_completed?
def set_procline
procline "waiting for jobs in #{queues.join(",")}"
end

def metadata
super.merge(queues: queues.join(","), thread_pool_size: pool.size)
end
end
end
15 changes: 15 additions & 0 deletions test/integration/instrumentation_test.rb
Original file line number Diff line number Diff line change
@@ -38,6 +38,21 @@ class InstrumentationTest < ActiveSupport::TestCase
assert_event release_many_event, "release_many_claimed", size: 1
end

test "starting a runnable process emits a start_process event" do
worker = SolidQueue::Worker.new

events = subscribed("start_process.solid_queue") do
worker.start
wait_for_registered_processes(1, timeout: 1.second)

worker.stop
wait_for_registered_processes(0, timeout: 1.second)
end

assert_equal 1, events.size
assert_event events.first, "start_process", process: worker
end

test "starting and stopping a worker emits register_process and deregister_process events" do
StoreResultJob.perform_later(42, pause: SolidQueue.shutdown_timeout + 10.second)
process = nil
4 changes: 4 additions & 0 deletions test/unit/log_subscriber_test.rb
Original file line number Diff line number Diff line change
@@ -8,6 +8,10 @@ class LogSubscriberTest < ActiveSupport::TestCase

teardown { ActiveSupport::LogSubscriber.log_subscribers.clear }

def set_logger(logger)
SolidQueue.logger = logger
end

test "unblock one job" do
attach_log_subscriber
instrument "release_blocked.solid_queue", job_id: 42, concurrency_key: "foo/1", released: true