Skip to content

Commit 3cbc925

Browse files
committed
ARTEMIS-5376 Include all messages in queue management operations
1 parent e8d38f4 commit 3cbc925

File tree

4 files changed

+254
-98
lines changed

4 files changed

+254
-98
lines changed

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

+88-93
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
4444
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
4545
import java.util.concurrent.locks.ReentrantLock;
46+
import java.util.function.Predicate;
4647

4748
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
4849
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -2015,6 +2016,11 @@ private int iterQueue(final int flushLimit,
20152016
QueueIterateAction messageAction) throws Exception {
20162017
int count = 0;
20172018
int txCount = 0;
2019+
2020+
if (filter1 != null) {
2021+
messageAction.addFilter(filter1);
2022+
}
2023+
20182024
// This is to avoid scheduling depaging while iterQueue is happening
20192025
// this should minimize the use of the paged executor.
20202026
depagePending = true;
@@ -2033,10 +2039,12 @@ private int iterQueue(final int flushLimit,
20332039
while (iter.hasNext() && !messageAction.expectedHitsReached(count)) {
20342040
MessageReference ref = iter.next();
20352041

2036-
if (filter1 == null || filter1.match(ref.getMessage())) {
2042+
if (messageAction.match(ref)) {
20372043
if (messageAction.actMessage(tx, ref)) {
20382044
iter.remove();
2039-
refRemoved(ref);
2045+
if (!isLastValue()) {
2046+
refRemoved(ref);
2047+
}
20402048
}
20412049
txCount++;
20422050
count++;
@@ -2055,7 +2063,7 @@ private int iterQueue(final int flushLimit,
20552063
return count;
20562064
}
20572065

2058-
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(ref -> filter1 == null ? true : filter1.match(ref.getMessage()));
2066+
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(messageAction::match);
20592067
for (MessageReference messageReference : cancelled) {
20602068
messageAction.actMessage(tx, messageReference);
20612069
count++;
@@ -2078,12 +2086,12 @@ private int iterQueue(final int flushLimit,
20782086
PagedReference reference = pageIterator.next();
20792087
pageIterator.remove();
20802088

2081-
if (filter1 == null || filter1.match(reference.getMessage())) {
2082-
count++;
2083-
txCount++;
2089+
if (messageAction.match(reference)) {
20842090
if (!messageAction.actMessage(tx, reference)) {
20852091
addTail(reference, false);
20862092
}
2093+
txCount++;
2094+
count++;
20872095
} else {
20882096
addTail(reference, false);
20892097
}
@@ -2401,71 +2409,48 @@ public void run() {
24012409
}
24022410

24032411
@Override
2404-
public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
2405-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2406-
while (iter.hasNext()) {
2407-
MessageReference ref = iter.next();
2408-
if (ref.getMessage().getMessageID() == messageID) {
2409-
incDelivering(ref);
2410-
sendToDeadLetterAddress(null, ref);
2411-
iter.remove();
2412-
refRemoved(ref);
2413-
return true;
2414-
}
2415-
}
2416-
if (pageIterator != null && !queueDestroyed) {
2417-
while (pageIterator.hasNext()) {
2418-
PagedReference ref = pageIterator.next();
2419-
if (ref.getMessage().getMessageID() == messageID) {
2420-
incDelivering(ref);
2421-
sendToDeadLetterAddress(null, ref);
2422-
pageIterator.remove();
2423-
refRemoved(ref);
2424-
return true;
2425-
}
2426-
}
2412+
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
2413+
2414+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2415+
2416+
@Override
2417+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2418+
incDelivering(ref);
2419+
sendToDeadLetterAddress(tx, ref);
2420+
return true;
24272421
}
2428-
return false;
2429-
}
2422+
}) == 1;
24302423
}
24312424

24322425
@Override
2433-
public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
2434-
2426+
public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
24352427
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
24362428

24372429
@Override
24382430
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2439-
24402431
incDelivering(ref);
2441-
return sendToDeadLetterAddress(tx, ref);
2432+
sendToDeadLetterAddress(tx, ref);
2433+
return true;
24422434
}
24432435
});
24442436
}
24452437

24462438
@Override
2447-
public synchronized boolean moveReference(final long messageID,
2439+
public boolean moveReference(final long messageID,
24482440
final SimpleString toAddress,
24492441
final Binding binding,
24502442
final boolean rejectDuplicate) throws Exception {
2451-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2452-
while (iter.hasNext()) {
2453-
MessageReference ref = iter.next();
2454-
if (ref.getMessage().getMessageID() == messageID) {
2455-
iter.remove();
2456-
refRemoved(ref);
2457-
incDelivering(ref);
2458-
try {
2459-
move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null, null, true);
2460-
} catch (Exception e) {
2461-
decDelivering(ref);
2462-
throw e;
2463-
}
2464-
return true;
2465-
}
2443+
2444+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2445+
2446+
@Override
2447+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2448+
incDelivering(ref);
2449+
move(tx, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null, null, true);
2450+
return true;
24662451
}
2467-
return false;
2468-
}
2452+
2453+
}) == 1;
24692454
}
24702455

24712456
@Override
@@ -2511,7 +2496,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25112496
}
25122497

25132498
if (!ignored) {
2514-
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null, null, true);
2499+
move(tx, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null, null, true);
25152500
}
25162501

25172502
return true;
@@ -2529,26 +2514,22 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25292514
}
25302515

25312516
@Override
2532-
public synchronized boolean copyReference(final long messageID,
2517+
public boolean copyReference(final long messageID,
25332518
final SimpleString toQueue,
25342519
final Binding binding) throws Exception {
2535-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2536-
while (iter.hasNext()) {
2537-
MessageReference ref = iter.next();
2538-
if (ref.getMessage().getMessageID() == messageID) {
2539-
try {
2540-
copy(null, toQueue, binding, ref);
2541-
} catch (Exception e) {
2542-
throw e;
2543-
}
2544-
return true;
2545-
}
2520+
2521+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2522+
2523+
@Override
2524+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2525+
copy(tx, toQueue, binding, ref);
2526+
addTail(ref, false);
2527+
return true;
25462528
}
2547-
return false;
2548-
}
2529+
}) == 1;
25492530
}
25502531

2551-
public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
2532+
public int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
25522533
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
25532534
@Override
25542535
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
@@ -2617,40 +2598,35 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
26172598
}
26182599

26192600
@Override
2620-
public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
2621-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2601+
public boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
2602+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
26222603

2623-
while (iter.hasNext()) {
2624-
MessageReference ref = iter.next();
2625-
if (ref.getMessage().getMessageID() == messageID) {
2626-
iter.remove();
2604+
@Override
2605+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2606+
ref.getMessage().setPriority(newPriority);
2607+
if (isLastValue()) {
26272608
refRemoved(ref);
2628-
ref.getMessage().setPriority(newPriority);
2629-
addTail(ref, false);
2630-
return true;
26312609
}
2610+
addTail(ref, false);
2611+
return true;
26322612
}
2633-
2634-
return false;
2635-
}
2613+
}) == 1;
26362614
}
26372615

26382616
@Override
2639-
public synchronized int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception {
2640-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2641-
int count = 0;
2642-
while (iter.hasNext()) {
2643-
MessageReference ref = iter.next();
2644-
if (filter == null || filter.match(ref.getMessage())) {
2645-
count++;
2646-
iter.remove();
2617+
public int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception {
2618+
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
2619+
2620+
@Override
2621+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2622+
ref.getMessage().setPriority(newPriority);
2623+
if (isLastValue()) {
26472624
refRemoved(ref);
2648-
ref.getMessage().setPriority(newPriority);
2649-
addTail(ref, false);
26502625
}
2626+
addTail(ref, false);
2627+
return true;
26512628
}
2652-
return count;
2653-
}
2629+
});
26542630
}
26552631

26562632
@Override
@@ -4186,13 +4162,23 @@ public void run() {
41864162
abstract class QueueIterateAction {
41874163

41884164
protected Integer expectedHits;
4165+
protected Long messageID;
4166+
protected Filter filter1 = null;
4167+
protected Predicate<MessageReference> match;
41894168

41904169
QueueIterateAction(Integer expectedHits) {
41914170
this.expectedHits = expectedHits;
4171+
this.match = ref -> filter1 == null ? true : filter1.match(ref.getMessage());
4172+
}
4173+
4174+
QueueIterateAction(Long messageID) {
4175+
this.expectedHits = 1;
4176+
this.match = ref -> ref.getMessage().getMessageID() == messageID;
41924177
}
41934178

41944179
QueueIterateAction() {
41954180
this.expectedHits = null;
4181+
this.match = ref -> filter1 == null ? true : filter1.match(ref.getMessage());
41964182
}
41974183

41984184
/**
@@ -4207,6 +4193,15 @@ abstract class QueueIterateAction {
42074193
public boolean expectedHitsReached(int currentHits) {
42084194
return expectedHits != null && currentHits >= expectedHits.intValue();
42094195
}
4196+
4197+
public void addFilter(Filter filter1) {
4198+
this.filter1 = filter1;
4199+
}
4200+
4201+
public boolean match(MessageReference ref) {
4202+
return match.test(ref);
4203+
}
4204+
42104205
}
42114206

42124207
// For external use we need to use a synchronized version since the list is not thread safe

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -230,15 +230,15 @@ public void testCopyMessageWhilstPaging() throws Exception {
230230

231231
long messageID = (Long) messages[99].get("messageID");
232232

233-
assertFalse(queueControl.copyMessage(messageID, otherQueue.toString()));
233+
assertTrue(queueControl.copyMessage(messageID, otherQueue.toString()));
234234

235235
messageID = (Long) messages[0].get("messageID");
236236

237237
assertTrue(queueControl.copyMessage(messageID, otherQueue.toString()));
238238

239239
Map<String, Object>[] copiedMessages = otherQueueControl.listMessages(null);
240240

241-
assertEquals(1, copiedMessages.length);
241+
assertEquals(2, copiedMessages.length);
242242
}
243243

244244
@Test
@@ -281,8 +281,8 @@ public void testCopyMessageWhilstPagingSameAddress() throws Exception {
281281

282282
messageID = (Long) otherMessages[100].get("messageID");
283283

284-
//this should fail as the message was paged successfully
285-
assertFalse(otherQueueControl.copyMessage(messageID, queue.toString()));
284+
//this verifies copying of a paged message
285+
assertTrue(otherQueueControl.copyMessage(messageID, queue.toString()));
286286
}
287287

288288
@Test

0 commit comments

Comments
 (0)