Skip to content

Commit 3d02fcd

Browse files
committed
ARTEMIS-4455 - Improve message redistribution balance for OFF_WITH_REDISTRIBUTION
1 parent 42be518 commit 3d02fcd

File tree

2 files changed

+39
-0
lines changed

2 files changed

+39
-0
lines changed

artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java

+5
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,11 @@ private Binding getNextBinding(final Message message,
470470
// if no bindings were found, we will apply a secondary level on the routing logic
471471
if (lastLowPriorityBinding != -1) {
472472
nextBinding = bindings[lastLowPriorityBinding];
473+
if (nextBinding != null && loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) {
474+
//return before changing bindingIndex, otherwise every incoming message sets the index to the same position.
475+
//bindingIndex is shared with the redistributor
476+
return nextBinding;
477+
}
473478
nextPosition = moveNextPosition(lastLowPriorityBinding, bindingsCount);
474479
}
475480
}

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java

+34
Original file line numberDiff line numberDiff line change
@@ -1011,6 +1011,40 @@ public void testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistributio
10111011

10121012
}
10131013

1014+
@Test
1015+
public void testEvenRedistributionLbOffWithRedistribution() throws Exception {
1016+
final int messageCount = 1000;
1017+
final String queue = "queues.test";
1018+
1019+
setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
1020+
startServers(0, 1, 2);
1021+
1022+
setupSessionFactory(0, isNetty());
1023+
setupSessionFactory(1, isNetty());
1024+
setupSessionFactory(2, isNetty());
1025+
1026+
createQueue(0, queue, queue, null, false, RoutingType.ANYCAST);
1027+
createQueue(1, queue, queue, null, false, RoutingType.ANYCAST);
1028+
createQueue(2, queue, queue, null, false, RoutingType.ANYCAST);
1029+
1030+
addConsumer(0, 1, queue, null);
1031+
addConsumer(1, 2, queue, null);
1032+
1033+
waitForBindings(0, queue, 1, 0, true);
1034+
waitForBindings(1, queue, 1, 1, true);
1035+
waitForBindings(2, queue, 1, 1, true);
1036+
1037+
waitForBindings(0, queue, 2, 2, false);
1038+
waitForBindings(1, queue, 2, 1, false);
1039+
waitForBindings(2, queue, 2, 1, false);
1040+
1041+
send(0, queue, messageCount * 2, false, null);
1042+
1043+
Wait.assertEquals(0L, () -> servers[0].getTotalMessageCount(), 5000, 100);
1044+
Assert.assertEquals(messageCount, servers[1].getTotalMessageCount());
1045+
Assert.assertEquals(messageCount, servers[2].getTotalMessageCount());
1046+
}
1047+
10141048
@Test
10151049
public void testRedistributionToRemoteMulticastConsumerLbOffWithRedistribution() throws Exception {
10161050

0 commit comments

Comments
 (0)