Skip to content

Commit ad1161a

Browse files
committed
ARTEMIS-5037: option to limit mirror propagation
Add a new option in the Mirror settings to prevent a broker from propagating messages. When working with a topology where 4 nodes are forming a square and where each node in that square mirrors its two neighbors: a message leaving a corner can reach the opposite corner of the square by two different routes. This is causing the message ordering to get broken. example: 1 <-> 2 ^ ^ | | v v 4 <-> 3 A message from 1 will reach 3 by 2 and 4. Message duplication checks will prevent the message from being duplicated but won't help regarding the order of the messages. This is because a either the route by 2 or 4 can be faster than the other, so whomever wins the race sets the message first. Fixing the example: Using the new option to not forward messages coming from a link, we break the possibilities to have two routes to reach the opposite corner. The above example is updated as followed: * 2 never forwards messages coming from 1 * 1 never forwards messages coming from 2 * 3 never forwards messages coming from 4 * 4 never forwards messages coming from 3 Now, when a messages leaves 1: * it reaches 2 and stops there * it reaches 4 * it reaches 3 through 4 and stops there Now, when a messages leaves 2: * it reaches 1 and stops there * it reaches 3 * it reaches 4 through 3 and stops there Now, when a messages leaves 3: * it reaches 4 and stops there * it reaches 2 * it reaches 1 through 2 and stops there Now, when a messages leaves 4: * it reaches 3 and stops there * it reaches 1 * it reaches 2 through 1 and stops there The new test AMQPSquareMirroringTest.java is testing this exact setup.
1 parent daba842 commit ad1161a

File tree

7 files changed

+336
-8
lines changed

7 files changed

+336
-8
lines changed

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.activemq.artemis.protocol.amqp.connect;
1818

1919
import java.nio.charset.StandardCharsets;
20+
import java.util.ArrayList;
2021
import java.util.Arrays;
2122
import java.util.Collections;
2223
import java.util.HashMap;
@@ -430,15 +431,18 @@ private void doConnect() {
430431
final Queue queue = server.locateQueue(getMirrorSNF(replica));
431432

432433
final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica);
433-
final Symbol[] desiredCapabilities;
434434

435+
ArrayList<Symbol> desiredCapabilitiesList = new ArrayList<>();
436+
desiredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
435437
if (coreTunnelingEnabled) {
436-
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
437-
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
438-
} else {
439-
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
438+
desiredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
439+
}
440+
if (!replica.getCanForwardMessages()) {
441+
desiredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD);
440442
}
441443

444+
final Symbol[] desiredCapabilities = (Symbol[]) desiredCapabilitiesList.toArray(new Symbol[]{});
445+
442446
final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
443447

444448
connectSender(queue,
@@ -447,7 +451,7 @@ private void doConnect() {
447451
(r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)),
448452
server.getNodeID().toString(),
449453
desiredCapabilities,
450-
null,
454+
replica.getCanForwardMessages() ? null : new Symbol[] {AMQPMirrorControllerSource.NO_FORWARD},
451455
requiredOfferedCapabilities);
452456
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.FEDERATION) {
453457
// Starting the Federation triggers rebuild of federation links

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java

+11
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,11 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
8989
// Capabilities
9090
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
9191
public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");
92+
public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward");
9293

9394
public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString());
9495
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString());
96+
public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString());
9597

9698
private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));
9799

@@ -301,6 +303,10 @@ private boolean invalidTarget(MirrorController controller) {
301303
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
302304
}
303305

306+
private boolean isBlockedByNoForward(Message message) {
307+
return Boolean.TRUE.equals(message.getBrokerProperty(INTERNAL_NO_FORWARD));
308+
}
309+
304310
private boolean ignoreAddress(SimpleString address) {
305311
if (address.startsWith(server.getConfiguration().getManagementAddress())) {
306312
return true;
@@ -344,6 +350,11 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
344350
return;
345351
}
346352

353+
if (isBlockedByNoForward(message)) {
354+
logger.trace("sendMessage::server {} is discarding the message because its source is setting a noForward policy", server);
355+
return;
356+
}
357+
347358
logger.trace("sendMessage::{} send message {}", server, message);
348359

349360
try {

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
5858
import org.apache.activemq.artemis.utils.ByteUtil;
5959
import org.apache.activemq.artemis.utils.pools.MpscPool;
60+
import org.apache.qpid.proton.amqp.Symbol;
6061
import org.apache.qpid.proton.amqp.messaging.Accepted;
6162
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
6263
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
@@ -77,8 +78,10 @@
7778
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
7879
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
7980
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
81+
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD;
8082
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
8183
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
84+
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD;
8285
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
8386
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
8487
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
@@ -92,6 +95,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
9295

9396
private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
9497

98+
private boolean noMessageForwarding = false;
99+
95100
public static void setControllerInUse(MirrorController controller) {
96101
CONTROLLER_THREAD_LOCAL.set(controller);
97102
}
@@ -248,6 +253,13 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
248253
this.configuration = server.getConfiguration();
249254
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
250255
mirrorContext = protonSession.getSessionSPI().getSessionContext();
256+
if (receiver.getRemoteDesiredCapabilities() != null) {
257+
for (Symbol capability : receiver.getRemoteDesiredCapabilities()) {
258+
if (capability == NO_FORWARD) {
259+
this.noMessageForwarding = true;
260+
}
261+
}
262+
}
251263
}
252264

253265
@Override
@@ -534,6 +546,9 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat
534546

535547
message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
536548
message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID);
549+
if (this.noMessageForwarding) {
550+
message.setBrokerProperty(INTERNAL_NO_FORWARD, true);
551+
}
537552

538553
if (internalAddress != null) {
539554
message.setAddress(internalAddress);
@@ -590,4 +605,4 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
590605
// Do nothing
591606
}
592607

593-
}
608+
}

artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java

+11
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme
3030

3131
boolean queueCreation = true;
3232

33+
boolean canForwardMessages = true;
34+
3335
boolean queueRemoval = true;
3436

3537
boolean messageAcknowledgements = true;
@@ -75,6 +77,15 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation)
7577
return this;
7678
}
7779

80+
public boolean getCanForwardMessages() {
81+
return canForwardMessages;
82+
}
83+
84+
public AMQPMirrorBrokerConnectionElement setCanForwardMessages(boolean canForwardMessages) {
85+
this.canForwardMessages = canForwardMessages;
86+
return this;
87+
}
88+
7889
public boolean isQueueRemoval() {
7990
return queueRemoval;
8091
}

artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -2191,10 +2191,11 @@ private void parseAMQPBrokerConnections(final Element e,
21912191
boolean durable = getBooleanAttribute(e2, "durable", true);
21922192
boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true);
21932193
boolean sync = getBooleanAttribute(e2, "sync", false);
2194+
boolean canForwardMessages = !getBooleanAttribute(e2, "no-messages-forwarding", false);
21942195
String addressFilter = getAttributeValue(e2, "address-filter");
21952196

21962197
AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement();
2197-
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync);
2198+
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync).setCanForwardMessages(canForwardMessages);
21982199
connectionElement = amqpMirrorConnectionElement;
21992200
connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR);
22002201

artemis-server/src/main/resources/schema/artemis-configuration.xsd

+8
Original file line numberDiff line numberDiff line change
@@ -2290,6 +2290,14 @@
22902290
</xsd:documentation>
22912291
</xsd:annotation>
22922292
</xsd:attribute>
2293+
<xsd:attribute name="no-message-forwarding" type="xsd:boolean" use="optional" default="false">
2294+
<xsd:annotation>
2295+
<xsd:documentation>
2296+
If this is true, the mirror at the opposite end of the link will not forward messages coming from that link to any other mirrors down the line.
2297+
This is false by default.
2298+
</xsd:documentation>
2299+
</xsd:annotation>
2300+
</xsd:attribute>
22932301
<xsd:attribute name="address-filter" type="xsd:string" use="optional">
22942302
<xsd:annotation>
22952303
<xsd:documentation>

0 commit comments

Comments
 (0)