Skip to content

Commit 7522389

Browse files
committed
ARTEMIS-5376 Include all messages in queue management operations
1 parent ef999f7 commit 7522389

File tree

3 files changed

+238
-90
lines changed

3 files changed

+238
-90
lines changed

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

+76-89
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
4444
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
4545
import java.util.concurrent.locks.ReentrantLock;
46+
import java.util.function.Predicate;
4647

4748
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
4849
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -2006,6 +2007,11 @@ private int iterQueue(final int flushLimit,
20062007
QueueIterateAction messageAction) throws Exception {
20072008
int count = 0;
20082009
int txCount = 0;
2010+
2011+
if (filter1 != null) {
2012+
messageAction.addFilter(filter1);
2013+
}
2014+
20092015
// This is to avoid scheduling depaging while iterQueue is happening
20102016
// this should minimize the use of the paged executor.
20112017
depagePending = true;
@@ -2024,7 +2030,7 @@ private int iterQueue(final int flushLimit,
20242030
while (iter.hasNext() && !messageAction.expectedHitsReached(count)) {
20252031
MessageReference ref = iter.next();
20262032

2027-
if (filter1 == null || filter1.match(ref.getMessage())) {
2033+
if (messageAction.match(ref)) {
20282034
if (messageAction.actMessage(tx, ref)) {
20292035
iter.remove();
20302036
refRemoved(ref);
@@ -2046,7 +2052,7 @@ private int iterQueue(final int flushLimit,
20462052
return count;
20472053
}
20482054

2049-
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(ref -> filter1 == null ? true : filter1.match(ref.getMessage()));
2055+
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(ref -> messageAction.match(ref));
20502056
for (MessageReference messageReference : cancelled) {
20512057
messageAction.actMessage(tx, messageReference);
20522058
count++;
@@ -2069,12 +2075,12 @@ private int iterQueue(final int flushLimit,
20692075
PagedReference reference = pageIterator.next();
20702076
pageIterator.remove();
20712077

2072-
if (filter1 == null || filter1.match(reference.getMessage())) {
2073-
count++;
2074-
txCount++;
2078+
if (messageAction.match(reference)) {
20752079
if (!messageAction.actMessage(tx, reference)) {
20762080
addTail(reference, false);
20772081
}
2082+
txCount++;
2083+
count++;
20782084
} else {
20792085
addTail(reference, false);
20802086
}
@@ -2393,43 +2399,27 @@ public void run() {
23932399

23942400
@Override
23952401
public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
2396-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2397-
while (iter.hasNext()) {
2398-
MessageReference ref = iter.next();
2399-
if (ref.getMessage().getMessageID() == messageID) {
2400-
incDelivering(ref);
2401-
sendToDeadLetterAddress(null, ref);
2402-
iter.remove();
2403-
refRemoved(ref);
2404-
return true;
2405-
}
2406-
}
2407-
if (pageIterator != null && !queueDestroyed) {
2408-
while (pageIterator.hasNext()) {
2409-
PagedReference ref = pageIterator.next();
2410-
if (ref.getMessage().getMessageID() == messageID) {
2411-
incDelivering(ref);
2412-
sendToDeadLetterAddress(null, ref);
2413-
pageIterator.remove();
2414-
refRemoved(ref);
2415-
return true;
2416-
}
2417-
}
2402+
2403+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2404+
2405+
@Override
2406+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2407+
incDelivering(ref);
2408+
sendToDeadLetterAddress(tx, ref);
2409+
return true;
24182410
}
2419-
return false;
2420-
}
2411+
}) == 1;
24212412
}
24222413

24232414
@Override
24242415
public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
2425-
24262416
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
24272417

24282418
@Override
24292419
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2430-
24312420
incDelivering(ref);
2432-
return sendToDeadLetterAddress(tx, ref);
2421+
sendToDeadLetterAddress(tx, ref);
2422+
return true;
24332423
}
24342424
});
24352425
}
@@ -2439,24 +2429,17 @@ public synchronized boolean moveReference(final long messageID,
24392429
final SimpleString toAddress,
24402430
final Binding binding,
24412431
final boolean rejectDuplicate) throws Exception {
2442-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2443-
while (iter.hasNext()) {
2444-
MessageReference ref = iter.next();
2445-
if (ref.getMessage().getMessageID() == messageID) {
2446-
iter.remove();
2447-
refRemoved(ref);
2448-
incDelivering(ref);
2449-
try {
2450-
move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null, null, true);
2451-
} catch (Exception e) {
2452-
decDelivering(ref);
2453-
throw e;
2454-
}
2455-
return true;
2456-
}
2432+
2433+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2434+
2435+
@Override
2436+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2437+
incDelivering(ref);
2438+
move(tx, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null, null, true);
2439+
return true;
24572440
}
2458-
return false;
2459-
}
2441+
2442+
}) == 1;
24602443
}
24612444

24622445
@Override
@@ -2502,7 +2485,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25022485
}
25032486

25042487
if (!ignored) {
2505-
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null, null, true);
2488+
move(tx, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null, null, true);
25062489
}
25072490

25082491
return true;
@@ -2523,20 +2506,16 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25232506
public synchronized boolean copyReference(final long messageID,
25242507
final SimpleString toQueue,
25252508
final Binding binding) throws Exception {
2526-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2527-
while (iter.hasNext()) {
2528-
MessageReference ref = iter.next();
2529-
if (ref.getMessage().getMessageID() == messageID) {
2530-
try {
2531-
copy(null, toQueue, binding, ref);
2532-
} catch (Exception e) {
2533-
throw e;
2534-
}
2535-
return true;
2536-
}
2509+
2510+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2511+
2512+
@Override
2513+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2514+
copy(null, toQueue, binding, ref);
2515+
addTail(ref, false);
2516+
return true;
25372517
}
2538-
return false;
2539-
}
2518+
}) == 1;
25402519
}
25412520

25422521
public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
@@ -2609,39 +2588,28 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
26092588

26102589
@Override
26112590
public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
2612-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2591+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
26132592

2614-
while (iter.hasNext()) {
2615-
MessageReference ref = iter.next();
2616-
if (ref.getMessage().getMessageID() == messageID) {
2617-
iter.remove();
2618-
refRemoved(ref);
2619-
ref.getMessage().setPriority(newPriority);
2620-
addTail(ref, false);
2621-
return true;
2622-
}
2593+
@Override
2594+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2595+
ref.getMessage().setPriority(newPriority);
2596+
addTail(ref, false);
2597+
return true;
26232598
}
2624-
2625-
return false;
2626-
}
2599+
}) == 1;
26272600
}
26282601

26292602
@Override
26302603
public synchronized int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception {
2631-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2632-
int count = 0;
2633-
while (iter.hasNext()) {
2634-
MessageReference ref = iter.next();
2635-
if (filter == null || filter.match(ref.getMessage())) {
2636-
count++;
2637-
iter.remove();
2638-
refRemoved(ref);
2639-
ref.getMessage().setPriority(newPriority);
2640-
addTail(ref, false);
2641-
}
2604+
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
2605+
2606+
@Override
2607+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2608+
ref.getMessage().setPriority(newPriority);
2609+
addTail(ref, false);
2610+
return true;
26422611
}
2643-
return count;
2644-
}
2612+
});
26452613
}
26462614

26472615
@Override
@@ -4177,13 +4145,23 @@ public void run() {
41774145
abstract class QueueIterateAction {
41784146

41794147
protected Integer expectedHits;
4148+
protected Long messageID;
4149+
protected Filter filter1 = null;
4150+
protected Predicate<MessageReference> match;
41804151

41814152
QueueIterateAction(Integer expectedHits) {
41824153
this.expectedHits = expectedHits;
4154+
this.match = ref -> filter1 == null ? true : filter1.match(ref.getMessage());
4155+
}
4156+
4157+
QueueIterateAction(Long messageID) {
4158+
this.expectedHits = 1;
4159+
this.match = ref -> ref.getMessage().getMessageID() == messageID;
41834160
}
41844161

41854162
QueueIterateAction() {
41864163
this.expectedHits = null;
4164+
this.match = ref -> filter1 == null ? true : filter1.match(ref.getMessage());
41874165
}
41884166

41894167
/**
@@ -4198,6 +4176,15 @@ abstract class QueueIterateAction {
41984176
public boolean expectedHitsReached(int currentHits) {
41994177
return expectedHits != null && currentHits >= expectedHits.intValue();
42004178
}
4179+
4180+
public void addFilter(Filter filter1) {
4181+
this.filter1 = filter1;
4182+
}
4183+
4184+
public boolean match(MessageReference ref) {
4185+
return match.test(ref);
4186+
}
4187+
42014188
}
42024189

42034190
// For external use we need to use a synchronized version since the list is not thread safe

0 commit comments

Comments
 (0)