Skip to content

Commit 68f3cfe

Browse files
committed
ARTEMIS-5037: option to limit mirror propagation
Add a new option in the Mirror settings to prevent a broker from propagating messages. When working with a topology where 4 nodes are forming a square and where each node in that square mirrors its two neighbors: a message leaving a corner can reach the opposite corner of the square by two different routes. This is causing the message ordering to get broken. example: 1 <-> 2 ^ ^ | | v v 4 <-> 3 A message from 1 will reach 3 by 2 and 4. Message duplication checks will prevent the message from being duplicated but won't help regarding the order of the messages. This is because a either the route by 2 or 4 can be faster than the other, so whomever wins the race sets the message first. Fixing the example: Using the new option to not forward messages coming from a link, we break the possibilities to have two routes to reach the opposite corner. The above example is updated as followed: * 2 never forwards messages coming from 1 * 1 never forwards messages coming from 2 * 3 never forwards messages coming from 4 * 4 never forwards messages coming from 3 Now, when a messages leaves 1: * it reaches 2 and stops there * it reaches 4 * it reaches 3 through 4 and stops there Now, when a messages leaves 2: * it reaches 1 and stops there * it reaches 3 * it reaches 4 through 3 and stops there Now, when a messages leaves 3: * it reaches 4 and stops there * it reaches 2 * it reaches 1 through 2 and stops there Now, when a messages leaves 4: * it reaches 3 and stops there * it reaches 1 * it reaches 2 through 1 and stops there The new test AMQPSquareMirroringTest.java is testing this exact setup.
1 parent daba842 commit 68f3cfe

File tree

6 files changed

+334
-11
lines changed

6 files changed

+334
-11
lines changed

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.activemq.artemis.protocol.amqp.connect;
1818

1919
import java.nio.charset.StandardCharsets;
20+
import java.util.ArrayList;
2021
import java.util.Arrays;
2122
import java.util.Collections;
2223
import java.util.HashMap;
@@ -74,6 +75,7 @@
7475
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationSource;
7576
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
7677
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
78+
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
7779
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
7880
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
7981
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
@@ -430,24 +432,27 @@ private void doConnect() {
430432
final Queue queue = server.locateQueue(getMirrorSNF(replica));
431433

432434
final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica);
433-
final Symbol[] desiredCapabilities;
434435

436+
ArrayList<Symbol> capabilities = new ArrayList<Symbol>();
437+
capabilities.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
435438
if (coreTunnelingEnabled) {
436-
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
437-
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
438-
} else {
439-
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
439+
capabilities.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
440+
}
441+
if (!replica.getCanForwardMessages()) {
442+
capabilities.add(AMQPMirrorControllerSource.NO_FORWARD);
440443
}
441444

445+
final Symbol[] desiredCapabilities = (Symbol[]) capabilities.toArray(new Symbol[]{});
446+
442447
final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
443448

444449
connectSender(queue,
445450
queue.getName().toString(),
446451
mirrorControllerSource::setLink,
447452
(r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)),
448453
server.getNodeID().toString(),
449-
desiredCapabilities,
450-
null,
454+
desiredCapabilities,
455+
replica.getCanForwardMessages() ? null : new Symbol[] {AMQPMirrorControllerSource.NO_FORWARD},
451456
requiredOfferedCapabilities);
452457
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.FEDERATION) {
453458
// Starting the Federation triggers rebuild of federation links
@@ -1204,4 +1209,4 @@ private void doCloseConnector() {
12041209
});
12051210
}
12061211
}
1207-
}
1212+
}

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,11 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
8989
// Capabilities
9090
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
9191
public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");
92+
public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward");
9293

9394
public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString());
9495
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString());
96+
public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString());
9597

9698
private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));
9799

@@ -301,6 +303,10 @@ private boolean invalidTarget(MirrorController controller) {
301303
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
302304
}
303305

306+
private boolean isBlockedByNoForward(Message message) {
307+
return Boolean.valueOf(true).equals(message.getBrokerProperty(INTERNAL_NO_FORWARD));
308+
}
309+
304310
private boolean ignoreAddress(SimpleString address) {
305311
if (address.startsWith(server.getConfiguration().getManagementAddress())) {
306312
return true;
@@ -344,6 +350,11 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
344350
return;
345351
}
346352

353+
if (isBlockedByNoForward(message)) {
354+
logger.trace("sendMessage::server {} is discarding the message because its source is setting a noForward policy", server);
355+
return;
356+
}
357+
347358
logger.trace("sendMessage::{} send message {}", server, message);
348359

349360
try {
@@ -779,4 +790,4 @@ public boolean isAlreadyAcked(Queue queue) {
779790

780791

781792

782-
}
793+
}

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java

+18
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
5858
import org.apache.activemq.artemis.utils.ByteUtil;
5959
import org.apache.activemq.artemis.utils.pools.MpscPool;
60+
import org.apache.qpid.proton.amqp.Symbol;
6061
import org.apache.qpid.proton.amqp.messaging.Accepted;
6162
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
6263
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
@@ -77,8 +78,10 @@
7778
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
7879
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
7980
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
81+
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD;
8082
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
8183
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
84+
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD;
8285
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
8386
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
8487
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
@@ -92,6 +95,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
9295

9396
private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
9497

98+
private boolean canForwardMessages = true;
99+
95100
public static void setControllerInUse(MirrorController controller) {
96101
CONTROLLER_THREAD_LOCAL.set(controller);
97102
}
@@ -248,6 +253,13 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
248253
this.configuration = server.getConfiguration();
249254
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
250255
mirrorContext = protonSession.getSessionSPI().getSessionContext();
256+
if (receiver.getRemoteDesiredCapabilities() != null) {
257+
for (Symbol capability : receiver.getRemoteDesiredCapabilities()) {
258+
if (capability == NO_FORWARD) {
259+
this.canForwardMessages = false;
260+
}
261+
}
262+
}
251263
}
252264

253265
@Override
@@ -265,6 +277,7 @@ protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnota
265277
OperationContext oldContext = recoverContext();
266278
incrementSettle();
267279

280+
268281
logger.trace("{}::actualDelivery call for {}", server, message);
269282
setControllerInUse(this);
270283

@@ -487,6 +500,7 @@ private void performAck(String nodeID,
487500
* The caller of this method should give up any reference to messageCompletionAck when this method returns true.
488501
* */
489502
private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotations, ACKMessageOperation messageCompletionAck) throws Exception {
503+
490504
if (message.getMessageID() <= 0) {
491505
message.setMessageID(server.getStorageManager().generateID());
492506
}
@@ -512,6 +526,7 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat
512526

513527
routingContext.setDuplicateDetection(false); // we do our own duplicate detection here
514528

529+
515530
DuplicateIDCache duplicateIDCache;
516531
if (lruDuplicateIDKey != null && lruDuplicateIDKey.equals(internalMirrorID)) {
517532
duplicateIDCache = lruduplicateIDCache;
@@ -534,6 +549,9 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat
534549

535550
message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
536551
message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID);
552+
if (!this.canForwardMessages) {
553+
message.setBrokerProperty(INTERNAL_NO_FORWARD, true);
554+
}
537555

538556
if (internalAddress != null) {
539557
message.setAddress(internalAddress);

artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme
3030

3131
boolean queueCreation = true;
3232

33+
boolean canForwardMessages = true;
34+
3335
boolean queueRemoval = true;
3436

3537
boolean messageAcknowledgements = true;
@@ -75,6 +77,15 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation)
7577
return this;
7678
}
7779

80+
public boolean getCanForwardMessages() {
81+
return canForwardMessages;
82+
}
83+
84+
public AMQPMirrorBrokerConnectionElement setCanForwardMessages(boolean canForwardMessages) {
85+
this.canForwardMessages = canForwardMessages;
86+
return this;
87+
}
88+
7889
public boolean isQueueRemoval() {
7990
return queueRemoval;
8091
}
@@ -188,4 +199,4 @@ public boolean equals(Object obj) {
188199
queueRemoval == other.queueRemoval &&
189200
sync == other.sync;
190201
}
191-
}
202+
}

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPChainedReplicaTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -196,4 +196,4 @@ public void testChained() throws Exception {
196196
}
197197

198198
}
199-
}
199+
}

0 commit comments

Comments
 (0)