-
Notifications
You must be signed in to change notification settings - Fork 169
Add capability to discard duplicate jobs with concurrency configuration #523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
f290af1
e561356
90793ad
b8dae8e
ddb513f
a3a7049
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,10 @@ def wait(job) | |
Proxy.new(job).wait | ||
end | ||
|
||
def at_limit?(job) | ||
Proxy.new(job).at_limit? | ||
end | ||
|
||
def signal(job) | ||
Proxy.new(job).signal | ||
end | ||
|
@@ -39,6 +43,14 @@ def initialize(job) | |
@job = job | ||
end | ||
|
||
def at_limit? | ||
if semaphore = Semaphore.find_by(key: key) | ||
semaphore.value.zero? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is vulnerable to race conditions and the reason we don't check it in this way when blocking jobs. If two concurrent jobs are claiming the semaphore, both of them will see it open, and none will be discarded. Then, both will run together because we won't block them either. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You raise an excellent point. Do you have a suggestion to how we might be able to avoid this race condition? Are you thinking Pessimistic Locking might help? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I was thinking that perhaps we could rely on the same check we're already doing to block the job, but instead of blocking the job, we'd rollback the transaction 🤔 What I'm not sure is whether this should actually raise some exception to indicate the job hasn't been enqueued. Maybe that's not necessary, but at least it should set the |
||
else | ||
false | ||
end | ||
end | ||
|
||
def wait | ||
if semaphore = Semaphore.find_by(key: key) | ||
semaphore.value > 0 && attempt_decrement | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is the way 🤔 We're in the middle of a transaction here, and the job hasn't even been committed to the DB. It makes no sense to delete a record in the same transaction you're creating it. It'd make sense to roll that transaction back instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps I don't fully understand how we're getting into this code path, but from my investigation, it looks as though there isn't an open transaction at this time.
I tried the following and running the test suite and didn't hit any open transactions.
I still believe we'd want to discard here. Let me know your thoughts.