Skip to content

Support priority queues #114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Alxandir opened this issue Jan 22, 2025 · 1 comment
Open

Support priority queues #114

Alxandir opened this issue Jan 22, 2025 · 1 comment

Comments

@Alxandir
Copy link

We are using this plugin in an instance where most queues also have priority queues enabled.
It would be great if this plugin could be adapted to optionally support the "increase" and "decrease" of priority of a message if a duplicate message of a different priority is submitted to the queue. Ideally being able to enable increase and decrease separately to support the most use-cases.

Example 1 - Increasing priority when "duplicate priority increase" enabled:

  1. A message with deduplication header "123" is present on Queue A, priority 7.
  2. A second message with deduplication header "123" is published to Queue A, with priority set to 9.
  3. The plugin determines that there is a cache hit, but the priority of the existing message is lower.
  4. Drop the existing priority 7 message.
  5. Publish the new message to priority 9.

Example 2 - Decreasing priority when "duplicate priority decrease" enabled:

  1. A message with deduplication header "123" is present on Queue A, priority 7.
  2. A second message with deduplication header "123" is published to Queue A, with priority set to 2.
  3. The plugin determines that there is a cache hit, but the priority of the existing message is higher.
  4. Drop the existing priority 7 message.
  5. Publish the new message to priority 2.

Example 3 - Ignoring priority increase when "duplicate priority increase" is disabled:

  1. A message with deduplication header "123" is present on Queue A, priority 7.
  2. A second message with deduplication header "123" is published to Queue A, with priority set to 9.
  3. The plugin determines that there is a cache hit, but the priority of the existing message is lower.
  4. The incoming message is dropped.

Example 4 - Ignoring priority decrease when "duplicate priority decrease" is disabled:

  1. A message with deduplication header "123" is present on Queue A, priority 7.
  2. A second message with deduplication header "123" is published to Queue A, with priority set to 2.
  3. The plugin determines that there is a cache hit, but the priority of the existing message is higher.
  4. The incoming message is dropped.
@noxdafox
Copy link
Owner

noxdafox commented Apr 13, 2025

Hello,

apologies for the late reply.

The behaviour you are requesting cannot be easily implemented. RabbitMQ queues do not support the removal of messages which are not at the head or tail of the queues. This is the reason they are called queues and not, for example, lists or streams.

The way the queue deduplication logic works is by hooking into the queue behaviour before the actual implementation is executed. In other words, when a message is about to be published into the queue, we check if the deduplication header is already in the deduplication cache. If so we discard it, otherwise we add its deduplication header to the cache. When a message is consumed and acknowledged, its deduplication header is removed from the cache.

This makes possible to implement the deduplication logic on top of the queue and it's quite optimal as we do not need to scan the entire queue to assess if the message is already present or not.

The drawback for this implementation is we are bound to the rabbit_backing_queue behaviour which is only used for classic and priority queues. This is why we cannot support streams or quorum queue (so far).

It would be very inefficient to do what you are suggesting as the way the messages are stored internally (in memory or disk) is opaque. This lets the broker implement the most performant mechanisms for storage but the consequence is we cannot fetch a given message at will.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants