Skip to content

Commit a37f602

Browse files
committed
ARTEMIS-5037: option to limit mirror propagation
Add a new option in the Mirror setting to prevent a broker from propagating messages. When working with a topology where 4 nodes form a square and each node mirrors to its two neighbors, a message leaving a corner can reach the opposite corner of the square by two different routes. This is causing the ordering of message to get broken. example: 1 <-> 2 ^ ^ | | v v 4 <-> 3 A message from a will reach 3 by 2 and 4. Message duplication checks will prevent the message from being duplicated but won't help regarding the order of the messages. Using the new option to not forward messages coming from a link, we break the possibilities to have two routes to reach the opposite corner. On the example above, we ask 4 to not forward any messages coming from 1 if 1 is the primary sender of the messages. This means that now, a message sent to 1 will reach 3 only by 2 and not by 4.
1 parent daba842 commit a37f602

File tree

5 files changed

+298
-9
lines changed

5 files changed

+298
-9
lines changed

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java

+24-6
Original file line numberDiff line numberDiff line change
@@ -432,11 +432,29 @@ private void doConnect() {
432432
final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica);
433433
final Symbol[] desiredCapabilities;
434434

435-
if (coreTunnelingEnabled) {
436-
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
437-
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
438-
} else {
439-
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
435+
if (!replica.isForward()){
436+
if (coreTunnelingEnabled) {
437+
desiredCapabilities = new Symbol[] {
438+
AMQPMirrorControllerSource.MIRROR_CAPABILITY,
439+
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT,
440+
AMQPMirrorControllerSource.NO_FORWARD
441+
};
442+
} else {
443+
desiredCapabilities = new Symbol[] {
444+
AMQPMirrorControllerSource.MIRROR_CAPABILITY,
445+
AMQPMirrorControllerSource.NO_FORWARD
446+
};
447+
}
448+
}else {
449+
if (coreTunnelingEnabled) {
450+
desiredCapabilities = new Symbol[] {
451+
AMQPMirrorControllerSource.MIRROR_CAPABILITY,
452+
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
453+
} else {
454+
desiredCapabilities = new Symbol[] {
455+
AMQPMirrorControllerSource.MIRROR_CAPABILITY
456+
};
457+
}
440458
}
441459

442460
final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
@@ -447,7 +465,7 @@ private void doConnect() {
447465
(r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)),
448466
server.getNodeID().toString(),
449467
desiredCapabilities,
450-
null,
468+
replica.isForward() ? null : new Symbol[] {AMQPMirrorControllerSource.NO_FORWARD},
451469
requiredOfferedCapabilities);
452470
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.FEDERATION) {
453471
// Starting the Federation triggers rebuild of federation links

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java

+1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
9292

9393
public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString());
9494
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString());
95+
public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward");
9596

9697
private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));
9798

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
5858
import org.apache.activemq.artemis.utils.ByteUtil;
5959
import org.apache.activemq.artemis.utils.pools.MpscPool;
60+
import org.apache.qpid.proton.amqp.Symbol;
6061
import org.apache.qpid.proton.amqp.messaging.Accepted;
6162
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
6263
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
@@ -264,6 +265,14 @@ public void flow() {
264265
protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction tx) {
265266
OperationContext oldContext = recoverContext();
266267
incrementSettle();
268+
boolean noForward = false;
269+
if (receiver.getRemoteDesiredCapabilities() != null) {
270+
for (Symbol capability : receiver.getRemoteDesiredCapabilities()) {
271+
if (capability == AMQPMirrorControllerSource.NO_FORWARD) {
272+
noForward = true;
273+
}
274+
}
275+
}
267276

268277
logger.trace("{}::actualDelivery call for {}", server, message);
269278
setControllerInUse(this);
@@ -320,14 +329,14 @@ protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnota
320329
}
321330
}
322331
} else {
323-
if (sendMessage(amqpMessage, deliveryAnnotations, messageAckOperation)) {
332+
if (sendMessage(amqpMessage, deliveryAnnotations, messageAckOperation, noForward)) {
324333
// since the send was successful, we give up the reference here,
325334
// so there won't be any call on afterCompleteOperations
326335
messageAckOperation = null;
327336
}
328337
}
329338
} else {
330-
if (sendMessage(message, deliveryAnnotations, messageAckOperation)) {
339+
if (sendMessage(message, deliveryAnnotations, messageAckOperation, noForward)) {
331340
// since the send was successful, we give up the reference here,
332341
// so there won't be any call on afterCompleteOperations
333342
messageAckOperation = null;
@@ -486,7 +495,7 @@ private void performAck(String nodeID,
486495
* as the sendMessage was successful the OperationContext of the transaction will take care of the completion.
487496
* The caller of this method should give up any reference to messageCompletionAck when this method returns true.
488497
* */
489-
private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotations, ACKMessageOperation messageCompletionAck) throws Exception {
498+
private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotations, ACKMessageOperation messageCompletionAck, boolean noForward) throws Exception {
490499
if (message.getMessageID() <= 0) {
491500
message.setMessageID(server.getStorageManager().generateID());
492501
}
@@ -512,6 +521,12 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat
512521

513522
routingContext.setDuplicateDetection(false); // we do our own duplicate detection here
514523

524+
if (noForward) {
525+
message.usageDown(); // large messages would be removed here
526+
flow();
527+
return false;
528+
}
529+
515530
DuplicateIDCache duplicateIDCache;
516531
if (lruDuplicateIDKey != null && lruDuplicateIDKey.equals(internalMirrorID)) {
517532
duplicateIDCache = lruduplicateIDCache;

artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java

+8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme
3030

3131
boolean queueCreation = true;
3232

33+
boolean forward = true;
34+
3335
boolean queueRemoval = true;
3436

3537
boolean messageAcknowledgements = true;
@@ -75,6 +77,12 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation)
7577
return this;
7678
}
7779

80+
public boolean isForward () { return forward; }
81+
public AMQPMirrorBrokerConnectionElement setForward(boolean forward) {
82+
this.forward = forward;
83+
return this;
84+
}
85+
7886
public boolean isQueueRemoval() {
7987
return queueRemoval;
8088
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.artemis.tests.integration.amqp.connect;
18+
19+
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
20+
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
21+
import org.apache.activemq.artemis.core.server.ActiveMQServer;
22+
import org.apache.activemq.artemis.core.server.Queue;
23+
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
24+
import org.apache.activemq.artemis.tests.util.CFUtil;
25+
import org.apache.activemq.artemis.utils.Wait;
26+
import org.junit.jupiter.api.Test;
27+
28+
import javax.jms.*;
29+
30+
import static org.junit.jupiter.api.Assertions.*;
31+
32+
public class AMQSquareMirroringTest extends AmqpClientTestSupport {
33+
34+
protected static final int AMQP_PORT_2 = 5673;
35+
protected static final int AMQP_PORT_3 = 5674;
36+
protected static final int AMQP_PORT_4 = 5675;
37+
38+
ActiveMQServer server_2;
39+
ActiveMQServer server_3;
40+
ActiveMQServer server_4;
41+
42+
@Override
43+
protected ActiveMQServer createServer() throws Exception {
44+
return createServer(AMQP_PORT, false);
45+
}
46+
47+
@Test
48+
public void testSquare() throws Exception {
49+
server_2 = createServer(AMQP_PORT_2, false);
50+
server_3 = createServer(AMQP_PORT_3, false);
51+
server_4 = createServer(AMQP_PORT_4, false);
52+
53+
// name the servers, for convenience during debugging
54+
server.getConfiguration().setName("1");
55+
server_2.getConfiguration().setName("2");
56+
server_3.getConfiguration().setName("3");
57+
server_4.getConfiguration().setName("4");
58+
59+
/**
60+
* 1 <----> 2
61+
* ^ ^
62+
* | |
63+
* v v
64+
* 4 <----> 3
65+
*/
66+
67+
{
68+
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100).setReconnectAttempts(-1);
69+
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
70+
server.getConfiguration().addAMQPConnection(amqpConnection);
71+
amqpConnection = new AMQPBrokerConnectConfiguration("to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100).setReconnectAttempts(-1);
72+
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setForward(false));
73+
server.getConfiguration().addAMQPConnection(amqpConnection);
74+
}
75+
76+
{
77+
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100).setReconnectAttempts(-1);
78+
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
79+
server_2.getConfiguration().addAMQPConnection(amqpConnection);
80+
amqpConnection = new AMQPBrokerConnectConfiguration("to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100).setReconnectAttempts(-1);
81+
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
82+
server_2.getConfiguration().addAMQPConnection(amqpConnection);
83+
}
84+
85+
{
86+
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100).setReconnectAttempts(-1);
87+
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
88+
server_3.getConfiguration().addAMQPConnection(amqpConnection);
89+
amqpConnection = new AMQPBrokerConnectConfiguration("to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100).setReconnectAttempts(-1);
90+
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
91+
server_3.getConfiguration().addAMQPConnection(amqpConnection);
92+
}
93+
94+
{
95+
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100).setReconnectAttempts(-1);
96+
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
97+
server_4.getConfiguration().addAMQPConnection(amqpConnection);
98+
amqpConnection = new AMQPBrokerConnectConfiguration("to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100).setReconnectAttempts(-1);
99+
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
100+
server_4.getConfiguration().addAMQPConnection(amqpConnection);
101+
}
102+
103+
server.start();
104+
server_2.start();
105+
server_3.start();
106+
server_4.start();
107+
108+
createAddressAndQueues(server);
109+
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
110+
Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
111+
Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) != null);
112+
Wait.assertTrue(() -> server_4.locateQueue(getQueueName()) != null);
113+
114+
Queue q1 = server.locateQueue(getQueueName());
115+
assertNotNull(q1);
116+
117+
Queue q2 = server.locateQueue(getQueueName());
118+
assertNotNull(q2);
119+
120+
Queue q3 = server.locateQueue(getQueueName());
121+
assertNotNull(q3);
122+
123+
Queue q4 = server.locateQueue(getQueueName());
124+
assertNotNull(q4);
125+
126+
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT+"?amqp.idleTimeout=-1");
127+
ConnectionFactory factory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2+"?amqp.idleTimeout=-1");
128+
ConnectionFactory factory3 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_3+"?amqp.idleTimeout=-1");
129+
ConnectionFactory factory4 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_4+"?amqp.idleTimeout=-1");
130+
131+
try (Connection conn = factory.createConnection()) {
132+
Session session = conn.createSession();
133+
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
134+
for (int i = 0; i < 40; i++) {
135+
producer.send(session.createTextMessage("message " + i));
136+
}
137+
}
138+
139+
Thread.sleep(1_000); // some time to allow eventual loops
140+
141+
Wait.assertEquals(40L, q1::getMessageCount, 1000, 100);
142+
Wait.assertEquals(40L, q2::getMessageCount, 1000, 100);
143+
Wait.assertEquals(40L, q3::getMessageCount, 1000, 100);
144+
Wait.assertEquals(40L, q4::getMessageCount, 1000, 100);
145+
146+
try (Connection conn = factory.createConnection()) {
147+
Session session = conn.createSession();
148+
conn.start();
149+
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
150+
for (int i = 0; i < 10; i++) {
151+
TextMessage message = (TextMessage) consumer.receive(1000);
152+
assertNotNull(message);
153+
assertEquals("message " + i, message.getText());
154+
}
155+
consumer.close();
156+
}
157+
158+
Wait.assertEquals(30L, q1::getMessageCount, 1000, 100);
159+
Wait.assertEquals(30L, q2::getMessageCount, 1000, 100);
160+
Wait.assertEquals(30L, q3::getMessageCount, 1000, 100);
161+
Wait.assertEquals(30L, q4::getMessageCount, 1000, 100);
162+
163+
try (Connection conn = factory2.createConnection()) {
164+
Session session = conn.createSession();
165+
conn.start();
166+
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
167+
for (int i = 10; i < 20; i++) {
168+
TextMessage message = (TextMessage) consumer.receive(1000);
169+
assertNotNull(message);
170+
assertEquals("message " + i, message.getText());
171+
}
172+
consumer.close();
173+
}
174+
175+
Wait.assertEquals(20L, q1::getMessageCount, 1000, 100);
176+
Wait.assertEquals(20L, q2::getMessageCount, 1000, 100);
177+
Wait.assertEquals(20L, q3::getMessageCount, 1000, 100);
178+
Wait.assertEquals(20L, q4::getMessageCount, 1000, 100);
179+
180+
try (Connection conn = factory3.createConnection()) {
181+
Session session = conn.createSession();
182+
conn.start();
183+
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
184+
for (int i = 20; i < 30; i++) {
185+
TextMessage message = (TextMessage) consumer.receive(1000);
186+
assertNotNull(message);
187+
assertEquals("message " + i, message.getText());
188+
}
189+
consumer.close();
190+
}
191+
192+
Wait.assertEquals(10L, q1::getMessageCount, 1000, 100);
193+
Wait.assertEquals(10L, q2::getMessageCount, 1000, 100);
194+
Wait.assertEquals(10L, q3::getMessageCount, 1000, 100);
195+
Wait.assertEquals(10L, q4::getMessageCount, 1000, 100);
196+
197+
try (Connection conn = factory4.createConnection()) {
198+
Session session = conn.createSession();
199+
conn.start();
200+
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
201+
for (int i = 30; i < 40; i++) {
202+
TextMessage message = (TextMessage) consumer.receive(1000);
203+
assertNotNull(message);
204+
assertEquals("message " + i, message.getText());
205+
}
206+
consumer.close();
207+
}
208+
209+
Wait.assertEquals(0L, q1::getMessageCount, 1000, 100);
210+
Wait.assertEquals(0L, q2::getMessageCount, 1000, 100);
211+
Wait.assertEquals(0L, q3::getMessageCount, 1000, 100);
212+
Wait.assertEquals(0L, q4::getMessageCount, 1000, 100);
213+
214+
try (Connection conn = factory.createConnection()) {
215+
Session session = conn.createSession();
216+
conn.start();
217+
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
218+
assertNull(consumer.receiveNoWait());
219+
consumer.close();
220+
}
221+
222+
try (Connection conn = factory2.createConnection()) {
223+
Session session = conn.createSession();
224+
conn.start();
225+
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
226+
assertNull(consumer.receiveNoWait());
227+
consumer.close();
228+
}
229+
230+
try (Connection conn = factory3.createConnection()) {
231+
Session session = conn.createSession();
232+
conn.start();
233+
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
234+
assertNull(consumer.receiveNoWait());
235+
consumer.close();
236+
}
237+
238+
try (Connection conn = factory4.createConnection()) {
239+
Session session = conn.createSession();
240+
conn.start();
241+
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
242+
assertNull(consumer.receiveNoWait());
243+
consumer.close();
244+
}
245+
246+
}
247+
}

0 commit comments

Comments
 (0)