Skip to content

Add filtering to failed jobs #255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def set_filters
@job_filters = {
job_class_name: params.dig(:filter, :job_class_name).presence,
queue_name: params.dig(:filter, :queue_name).presence,
error: params.dig(:filter, :error).presence,
finished_at: finished_at_range_params
}.compact
end
Expand Down
4 changes: 4 additions & 0 deletions app/views/mission_control/jobs/jobs/_filters.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
<%= form.text_field :queue_name, value: @job_filters[:queue_name], class: "input", list: "queue-names", placeholder: "Filter by queue name...", autocomplete: "off" %>
</div>

<div class="text is-rounded">
<%= form.text_field :error, value: @job_filters[:error], class: "input", placeholder: "Search by error...", autocomplete: "off" %>
</div>

<% if jobs_status == "finished" %>
<div class="select is-rounded">
<%= form.datetime_field :finished_at_start, value: @job_filters[:finished_at]&.begin, class: "input", placeholder: "Finished from" %>
Expand Down
4 changes: 4 additions & 0 deletions lib/active_job/job_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ def serialize
end
end

def error
@last_execution_error.to_s
end

def perform_now
raise UnsupportedError, "A JobProxy doesn't support immediate execution, only enqueuing."
end
Expand Down
20 changes: 14 additions & 6 deletions lib/active_job/jobs_relation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ class ActiveJob::JobsRelation
include Enumerable

STATUSES = %i[ pending failed in_progress blocked scheduled finished ]
FILTERS = %i[ queue_name job_class_name ]
FILTERS = %i[ queue_name job_class_name error]

PROPERTIES = %i[ queue_name status offset_value limit_value job_class_name worker_id recurring_task_id finished_at ]
PROPERTIES = %i[ queue_name status offset_value limit_value job_class_name worker_id recurring_task_id finished_at error]
attr_reader *PROPERTIES, :default_page_size

delegate :last, :[], :reverse, to: :to_a
Expand All @@ -52,16 +52,18 @@ def initialize(queue_adapter: ActiveJob::Base.queue_adapter, default_page_size:
# * <tt>:worker_id</tt> - To only include the jobs processed by the provided worker.
# * <tt>:recurring_task_id</tt> - To only include the jobs corresponding to runs of a recurring task.
# * <tt>:finished_at</tt> - (Range) To only include the jobs finished between the provided range
def where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil, finished_at: nil)
def where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil, finished_at: nil, error: nil)
# Remove nil arguments to avoid overriding parameters when concatenating +where+ clauses
arguments = { job_class_name: job_class_name,
queue_name: queue_name&.to_s,
worker_id: worker_id,
recurring_task_id: recurring_task_id,
finished_at: finished_at
finished_at: finished_at,
error: error&.to_s # Ensure error is a string, as some adapters expect it to be
}.compact

clone_with **arguments
cloned = clone_with **arguments
cloned
end

def with_status(status)
Expand Down Expand Up @@ -272,7 +274,13 @@ def filter(jobs)
end

def satisfy_filter?(job)
filters.all? { |property| public_send(property) == job.public_send(property) }
filters.all? do |property|
if property == :error
job.error.to_s.include?(public_send(property))
else
public_send(property) == job.public_send(property)
end
end
end

def filters
Expand Down
35 changes: 33 additions & 2 deletions lib/active_job/queue_adapters/solid_queue_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def supported_job_statuses
end

def supported_job_filters(*)
[ :queue_name, :job_class_name, :finished_at ]
[ :queue_name, :job_class_name, :finished_at, :error ]
end

def jobs_count(jobs_relation)
Expand Down Expand Up @@ -173,15 +173,17 @@ def retry_all
attr_reader :jobs_relation

delegate :queue_name, :limit_value, :limit_value_provided?, :offset_value, :job_class_name,
:default_page_size, :worker_id, :recurring_task_id, :finished_at, to: :jobs_relation
:default_page_size, :worker_id, :recurring_task_id, :finished_at, :error, to: :jobs_relation

def executions
# binding.b
execution_class_by_status
.then { |executions| include_execution_association(executions) }
.then { |executions| filter_executions_by_queue(executions) }
.then { |executions| filter_executions_by_class(executions) }
.then { |executions| filter_executions_by_process_id(executions) }
.then { |executions| filter_executions_by_task_key(executions) }
.then { |executions| filter_executions_by_error(executions) }
.then { |executions| limit(executions) }
.then { |executions| offset(executions) }
end
Expand Down Expand Up @@ -276,6 +278,35 @@ def filter_jobs_by_finished_at(jobs)
finished_at.present? ? jobs.where(finished_at: finished_at) : jobs
end

def filter_executions_by_error(executions)
return executions unless error.present?

# Use a database-agnostic approach for case-insensitive search
# SQLite uses LIKE with LOWER(), Postgres uses ILIKE
case ActiveRecord::Base.connection.adapter_name.downcase
when "postgresql"
like_operator = "ILIKE"
else
like_operator = "LIKE"
error_pattern = "%#{error.downcase}%"
end

error_pattern ||= "%#{error}%"
where_clause = "solid_queue_failed_executions.error #{like_operator} ?"

# For non-PostgreSQL, use LOWER() for case-insensitive search
where_clause = "LOWER(solid_queue_failed_executions.error) #{like_operator} ?" unless like_operator == "ILIKE"

# Check if we need to add the join or if it's already present
if executions.to_sql.include?("solid_queue_failed_executions")
executions.where(where_clause, error_pattern)
else
executions
.joins("INNER JOIN solid_queue_failed_executions ON solid_queue_failed_executions.job_id = solid_queue_jobs.id")
.where(where_clause, error_pattern)
end
end

def limit(executions_or_jobs)
limit_value.present? ? executions_or_jobs.limit(limit_value) : executions_or_jobs
end
Expand Down
20 changes: 20 additions & 0 deletions test/active_job/jobs_relation_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,26 @@ class ActiveJob::JobsRelationTest < ActiveSupport::TestCase
assert_equal "MyJob", jobs.job_class_name
end

test "set error" do
jobs = @jobs.where(error: "Some error")
assert_equal "Some error", jobs.error
end

test "handle nil errors in satisfy_filter?" do
# Mock a job with nil error
job = mock
job.stubs(:error).returns(nil)
job.stubs(:job_class_name).returns("NilErrorJob")

# Force filtering to be done in memory
@jobs.stubs(:filters).returns([:error])
@jobs.stubs(:error).returns("something")

# This should not raise a NoMethodError
result = @jobs.send(:satisfy_filter?, job)
assert_equal false, result
end

test "set finished_at range" do
jobs = @jobs.where(finished_at: (1.day.ago..))
assert 1.hour.ago.in? jobs.finished_at
Expand Down
16 changes: 16 additions & 0 deletions test/active_job/queue_adapters/adapter_testing/query_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,22 @@ module ActiveJob::QueueAdapters::AdapterTesting::QueryJobs
assert_equal 3, ActiveJob.jobs.pending.where(queue_name: "queue_1").to_a.length
assert_equal 5, ActiveJob.jobs.pending.where(queue_name: "queue_2").to_a.length
end

test "filter failed jobs by error message" do
FailingJob.perform_later("First specific error")
FailingJob.perform_later("Second specific error")
FailingJob.perform_later("Different error message")
perform_enqueued_jobs

# Filter should only return jobs with matching error text
filtered_jobs = ActiveJob.jobs.failed.where(error: "specific").to_a
assert_equal 2, filtered_jobs.length

# Verify the correct jobs were found
assert filtered_jobs.any? { |j| j.error.include?("First specific error") }
assert filtered_jobs.any? { |j| j.error.include?("Second specific error") }
assert filtered_jobs.none? { |j| j.error.include?("Different error message") }
end

test "fetch job classes in the first jobs" do
3.times { DummyJob.perform_later }
Expand Down
13 changes: 13 additions & 0 deletions test/active_job/queue_adapters/solid_queue_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ class ActiveJob::QueueAdapters::SolidQueueTest < ActiveSupport::TestCase
setup do
SolidQueue.logger = ActiveSupport::Logger.new(nil)
end

test "filter by error when using multiple filters" do
# This test specifically verifies the fix for duplicate table alias issue
FailingJob.perform_later("Duplicate error test")
perform_enqueued_jobs

# Apply multiple filters that would cause joins, which previously caused the PG::DuplicateAlias error
jobs = ActiveJob.jobs.failed.where(job_class_name: "FailingJob", error: "Duplicate").to_a

# If the fix works, this should run without error and return the correct job
assert_equal 1, jobs.length
assert_includes jobs.first.error, "Duplicate error test"
end

private
def queue_adapter
Expand Down
4 changes: 2 additions & 2 deletions test/dummy/app/jobs/failing_job.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
class FailingJob < ApplicationJob
def perform(value = nil)
raise "This always fails!"
def perform(value = "This always fails!", error = RuntimeError)
raise error, value
end
end
29 changes: 29 additions & 0 deletions test/system/list_failed_jobs_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,33 @@ class ListFailedJobsTest < ApplicationSystemTestCase
end
end
end

test "filter by error message" do
2.times { |index| FailingJob.perform_later("Ratelimit Error") }
perform_enqueued_jobs

visit jobs_path(:failed, filter: { error: "Ratelimit" })

assert_equal 2, job_row_elements.length
job_row_elements.each.with_index do |job_element, index|
within job_element do
assert_text "FailingJob"
assert_text "Ratelimit Error" # Ensure the error message is displayed"
end
end
end

test "filter by error message should handle jobs with nil errors" do
# Create a custom failing job that might result in nil error
# This test ensures that the system doesn't crash when filtering jobs that might have nil errors
# First create some regular jobs with errors
2.times { FailingJob.perform_later("Regular Error") }
perform_enqueued_jobs

# Now visit with a filter that won't match any jobs
visit jobs_path(:failed, filter: { error: "NonexistentErrorText" })

# Page should load without errors, even if some jobs have nil errors
assert_text "No jobs found"
end
end