Skip to content

Rely on ActiveSupport::Notifications and ActiveSupport::LogSubscriber for logging and instrumentation #208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Implement basic structure to use Active Support notifications and log…
… subscribers

And instrument a bunch of actions.
  • Loading branch information
rosa committed Apr 30, 2024
commit 9e894a58c072c514900a8325acee2a40bdf49699
22 changes: 13 additions & 9 deletions app/models/solid_queue/blocked_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ class BlockedExecution < Execution
scope :expired, -> { where(expires_at: ...Time.current) }

class << self
def unblock(count)
expired.distinct.limit(count).pluck(:concurrency_key).then do |concurrency_keys|
release_many releasable(concurrency_keys)
def unblock(limit)
SolidQueue.instrument(:release_many_blocked, limit: limit) do |payload|
expired.distinct.limit(limit).pluck(:concurrency_key).then do |concurrency_keys|
payload[:size] = release_many releasable(concurrency_keys)
end
end
end

def release_many(concurrency_keys)
# We want to release exactly one blocked execution for each concurrency key, and we need to do it
# one by one, locking each record and acquiring the semaphore individually for each of them:
Array(concurrency_keys).each { |concurrency_key| release_one(concurrency_key) }
Array(concurrency_keys).count { |concurrency_key| release_one(concurrency_key) }
end

def release_one(concurrency_key)
Expand All @@ -38,12 +40,14 @@ def releasable(concurrency_keys)
end

def release
transaction do
if acquire_concurrency_lock
promote_to_ready
destroy!
SolidQueue.instrument(:release_blocked, job_id: job.id, concurrency_key: concurrency_key, released: false) do |payload|
transaction do
if acquire_concurrency_lock
promote_to_ready
destroy!

SolidQueue.logger.debug("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}")
payload[:released] = true
end
end
end
end
Expand Down
15 changes: 11 additions & 4 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ def claiming(job_ids, process_id, &block)
end

def release_all
includes(:job).each(&:release)
SolidQueue.instrument(:release_many_claimed) do |payload|
includes(:job).tap do |executions|
payload[:size] = executions.size
executions.each(&:release)
end
end
end

def discard_all_in_batches(*)
Expand All @@ -45,9 +50,11 @@ def perform
end

def release
transaction do
job.dispatch_bypassing_concurrency_limits
destroy!
SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id) do
transaction do
job.dispatch_bypassing_concurrency_limits
destroy!
end
end
end

Expand Down
1 change: 0 additions & 1 deletion app/models/solid_queue/execution/dispatching.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ def dispatch_jobs(job_ids)

Job.dispatch_all(jobs).map(&:id).tap do |dispatched_job_ids|
where(job_id: dispatched_job_ids).order(:job_id).delete_all
SolidQueue.logger.info("[SolidQueue] Dispatched #{dispatched_job_ids.size} jobs")
end
end
end
Expand Down
14 changes: 9 additions & 5 deletions app/models/solid_queue/failed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@ class FailedExecution < Execution
attr_accessor :exception

def self.retry_all(jobs)
transaction do
dispatch_jobs lock_all_from_jobs(jobs)
SolidQueue.instrument(:retry_all, jobs_size: jobs.size) do |payload|
transaction do
payload[:size] = dispatch_jobs lock_all_from_jobs(jobs)
end
end
end

def retry
with_lock do
job.prepare_for_execution
destroy!
SolidQueue.instrument(:retry, job_id: job.id) do
with_lock do
job.prepare_for_execution
destroy!
end
end
end

Expand Down
17 changes: 12 additions & 5 deletions app/models/solid_queue/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,24 @@ class SolidQueue::Process < SolidQueue::Record
after_destroy -> { claimed_executions.release_all }

def self.register(**attributes)
create!(attributes.merge(last_heartbeat_at: Time.current))
SolidQueue.instrument :register_process, **attributes do
create!(attributes.merge(last_heartbeat_at: Time.current))
end
rescue Exception => error
SolidQueue.instrument :register_process, **attributes.merge(error: error)
raise
end

def heartbeat
touch(:last_heartbeat_at)
end

def deregister
destroy!
rescue Exception
SolidQueue.logger.error("[SolidQueue] Error deregistering process #{id} - #{metadata}")
def deregister(pruned: false)
SolidQueue.instrument :deregister_process, process: self, pruned: pruned do
destroy!
end
rescue Exception => error
SolidQueue.instrument :deregister_process, process: self, error: error
raise
end
end
9 changes: 5 additions & 4 deletions app/models/solid_queue/process/prunable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ module SolidQueue::Process::Prunable

class_methods do
def prune
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
batch.each do |process|
SolidQueue.logger.info("[SolidQueue] Pruning dead process #{process.id} - #{process.metadata}")
process.deregister
SolidQueue.instrument :prune_processes, size: 0 do |payload|
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
payload[:size] += batch.size

batch.each { |process| process.deregister(pruned: true) }
end
end
end
Expand Down
12 changes: 7 additions & 5 deletions app/models/solid_queue/scheduled_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ class ScheduledExecution < Execution

class << self
def dispatch_next_batch(batch_size)
transaction do
job_ids = next_batch(batch_size).non_blocking_lock.pluck(:job_id)
if job_ids.empty? then []
else
dispatch_jobs(job_ids)
SolidQueue.instrument(:dispatch_scheduled, batch_size: batch_size, count: 0) do |payload|
transaction do
job_ids = next_batch(batch_size).non_blocking_lock.pluck(:job_id)
if job_ids.empty? then []
else
payload[:count] = dispatch_jobs(job_ids)
end
end
end
end
Expand Down
24 changes: 14 additions & 10 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
loader.setup

module SolidQueue
extend self

mattr_accessor :logger, default: ActiveSupport::Logger.new($stdout)
mattr_accessor :app_executor, :on_thread_error, :connects_to

Expand All @@ -39,17 +41,19 @@ module SolidQueue
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes

class << self
def supervisor?
supervisor
end
def supervisor?
supervisor
end

def silence_polling?
silence_polling
end
def silence_polling?
silence_polling
end

def preserve_finished_jobs?
preserve_finished_jobs
end

def preserve_finished_jobs?
preserve_finished_jobs
end
def instrument(channel, **options, &block)
ActiveSupport::Notifications.instrument("#{channel}.solid_queue", **options, &block)
end
end
2 changes: 2 additions & 0 deletions lib/solid_queue/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class Engine < ::Rails::Engine
ActiveSupport.on_load(:solid_queue) do
self.logger = app.logger
end

SolidQueue::LogSubscriber.attach_to :solid_queue
end

initializer "solid_queue.active_job.extensions" do
Expand Down
80 changes: 80 additions & 0 deletions lib/solid_queue/log_subscriber.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# frozen_string_literal: true

require "active_support/log_subscriber"

class SolidQueue::LogSubscriber < ActiveSupport::LogSubscriber
def release_many_blocked(event)
debug formatted_event(event, action: "Unblock jobs", **event.payload.slice(:limit, :size))
end

def release_blocked(event)
debug formatted_event(event, action: "Release blocked job", **event.payload.slice(:job_id, :concurrency_key, :released))
end

def release_many_claimed(event)
debug formatted_event(event, action: "Release claimed jobs", **event.payload.slice(:size))
end

def release_claimed(event)
debug formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id))
end

def dispatch_scheduled(event)
debug formatted_event(event, action: "Dispatch scheduled jobs", **event.payload.slice(:batch_size, :size))
end

def retry_all(event)
debug formatted_event(event, action: "Retry failed jobs", **event.payload.slice(:jobs_size, :size))
end

def retry(event)
debug formatted_event(event, action: "Retry failed job", **event.payload.slice(:job_id))
end

def register_process(event)
attributes = event.payload.slice(:kind, :pid, :hostname)

if error = event.payload[:error]
warn formatted_event(event, action: "Error registering process", **attributes.merge(error: formatted_error(error)))
else
info formatted_event(event, action: "Register process", **attributes)
end
end

def deregister_process(event)
process = event.payload[:process]

attributes = {
process_id: process.id,
pid: process.pid,
kind: process.kind,
hostname: process.hostname,
last_heartbeat_at: process.last_heartbeat_at,
claimed_size: process.claimed_executions.size,
pruned: event.payload
}

if error = event.payload[:error]
warn formatted_event(event, action: "Error deregistering process", **attributes.merge(formatted_error(error)))
else
info formatted_event(event, action: "Deregister process", **attributes)
end
end

def prune_processes(event)
debug formatted_event(event, action: "Prune dead processes", **event.payload.slice(:size))
end

private
def formatted_event(event, action:, **attributes)
"SolidQueue-#{SolidQueue::VERSION} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}"
end

def formatted_attributes(**attributes)
attributes.map { |attr, value| "#{attr}: #{value.inspect}" }.join(", ")
end

def formatted_error(error)
[ error.class, error.message ].compact.join(" ")
end
end
39 changes: 39 additions & 0 deletions test/unit/log_subscriber_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

require "test_helper"
require "active_support/log_subscriber/test_helper"

class LogSubscriberTest < ActiveSupport::TestCase
include ActiveSupport::LogSubscriber::TestHelper

teardown { ActiveSupport::LogSubscriber.log_subscribers.clear }

test "unblock one job" do
attach_log_subscriber
instrument "release_blocked.solid_queue", job_id: 42, concurrency_key: "foo/1", released: true

assert_match_logged :debug, "Release blocked job", "job_id: 42, concurrency_key: \"foo/1\", released: true"
end

test "unblock many jobs" do
attach_log_subscriber
instrument "unblock_batch.solid_queue", batch_size: 42

assert_match_logged :debug, "Unblock jobs", "batch_size: 42"
end

private
def attach_log_subscriber
ActiveSupport::LogSubscriber.attach_to :solid_queue, SolidQueue::LogSubscriber.new
end

def instrument(...)
ActiveSupport::Notifications.instrument(...)
wait
end

def assert_match_logged(level, action, attributes)
assert_equal 1, @logger.logged(level).size
assert_match /SolidQueue-[\d.]+ #{action} \(\d+\.\d+ms\) #{Regexp.escape(attributes)}/, @logger.logged(level).last
end
end