Skip to content

Commit 067eaa0

Browse files
committed
ARTEMIS-4455 - Improve message redistribution balance for OFF_WITH_REDISTRIBUTION
1 parent 42be518 commit 067eaa0

File tree

3 files changed

+62
-2
lines changed

3 files changed

+62
-2
lines changed

artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ public Message redistribute(final Message message,
257257

258258
final int bindingsCount = bindings.length;
259259

260-
int nextPosition = bindingIndex.getIndex();
260+
int nextPosition = bindingIndex.getRedistributorIndex();
261261

262262
if (nextPosition >= bindingsCount) {
263263
nextPosition = 0;
@@ -294,7 +294,7 @@ public Message redistribute(final Message message,
294294
context.setTransaction(new TransactionImpl(storageManager));
295295
}
296296

297-
bindingIndex.setIndex(nextPosition);
297+
bindingIndex.setRedistributorIndex(nextPosition);
298298
nextBinding.route(copyRedistribute, context);
299299
logger.debug("Redistribution successful on message={}, towards bindings={}", message, bindings);
300300
return copyRedistribute;

artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CopyOnWriteBindings.java

+26
Original file line numberDiff line numberDiff line change
@@ -42,35 +42,61 @@ public interface BindingIndex {
4242
*/
4343
int getIndex();
4444

45+
/**
46+
* Cannot return a negative value and returns {@code 0} if uninitialized.
47+
*/
48+
int getRedistributorIndex();
49+
4550
/**
4651
* Cannot set a negative value.
4752
*/
4853
void setIndex(int v);
54+
55+
/**
56+
* Cannot set a negative value.
57+
*/
58+
void setRedistributorIndex(int v);
4959
}
5060

5161
private static final class BindingsAndPosition extends AtomicReference<Binding[]> implements BindingIndex {
5262

5363
private static final AtomicIntegerFieldUpdater<BindingsAndPosition> NEXT_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BindingsAndPosition.class, "nextPosition");
64+
private static final AtomicIntegerFieldUpdater<BindingsAndPosition> NEXT_REDISTRIBUTION_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BindingsAndPosition.class, "nextRedistributionPosition");
5465

5566
public volatile int nextPosition;
67+
public volatile int nextRedistributionPosition;
5668

5769
BindingsAndPosition(Binding[] bindings) {
5870
super(bindings);
5971
NEXT_POSITION_UPDATER.lazySet(this, 0);
72+
NEXT_REDISTRIBUTION_POSITION_UPDATER.lazySet(this, 0);
6073
}
6174

6275
@Override
6376
public int getIndex() {
6477
return nextPosition;
6578
}
6679

80+
@Override
81+
public int getRedistributorIndex() {
82+
return nextRedistributionPosition;
83+
}
84+
6785
@Override
6886
public void setIndex(int v) {
6987
if (v < 0) {
7088
throw new IllegalArgumentException("cannot set a negative position");
7189
}
7290
NEXT_POSITION_UPDATER.lazySet(this, v);
7391
}
92+
93+
@Override
94+
public void setRedistributorIndex(int v) {
95+
if (v < 0) {
96+
throw new IllegalArgumentException("cannot set a negative position");
97+
}
98+
NEXT_REDISTRIBUTION_POSITION_UPDATER.lazySet(this, v);
99+
}
74100
}
75101

76102
private final ConcurrentHashMap<SimpleString, BindingsAndPosition> map;

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java

+34
Original file line numberDiff line numberDiff line change
@@ -1011,6 +1011,40 @@ public void testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistributio
10111011

10121012
}
10131013

1014+
@Test
1015+
public void testEvenRedistributionLbOffWithRedistribution() throws Exception {
1016+
final int messageCount = 1000;
1017+
final String queue = "queues.test";
1018+
1019+
setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
1020+
startServers(0, 1, 2);
1021+
1022+
setupSessionFactory(0, isNetty());
1023+
setupSessionFactory(1, isNetty());
1024+
setupSessionFactory(2, isNetty());
1025+
1026+
createQueue(0, queue, queue, null, false, RoutingType.ANYCAST);
1027+
createQueue(1, queue, queue, null, false, RoutingType.ANYCAST);
1028+
createQueue(2, queue, queue, null, false, RoutingType.ANYCAST);
1029+
1030+
addConsumer(0, 1, queue, null);
1031+
addConsumer(1, 2, queue, null);
1032+
1033+
waitForBindings(0, queue, 1, 0, true);
1034+
waitForBindings(1, queue, 1, 1, true);
1035+
waitForBindings(2, queue, 1, 1, true);
1036+
1037+
waitForBindings(0, queue, 2, 2, false);
1038+
waitForBindings(1, queue, 2, 1, false);
1039+
waitForBindings(2, queue, 2, 1, false);
1040+
1041+
send(0, queue, messageCount * 2, false, null);
1042+
1043+
Wait.assertEquals(0L, () -> servers[0].getTotalMessageCount(), 5000, 100);
1044+
Assert.assertEquals(messageCount, servers[1].getTotalMessageCount());
1045+
Assert.assertEquals(messageCount, servers[2].getTotalMessageCount());
1046+
}
1047+
10141048
@Test
10151049
public void testRedistributionToRemoteMulticastConsumerLbOffWithRedistribution() throws Exception {
10161050

0 commit comments

Comments
 (0)