Skip to content

Commit 210f382

Browse files
committed
ARTEMIS-5037: limit mirror propagation
Add a new option in the Mirror settings to prevent a broker from propagating messages. On a topology such as: ``` 1 ---> 2 ---> 3 ^______| ``` Where 1 is connected to 2 via a noForward link, the behavior is as follows: * Every command from 1 are reaching 2 and are stopping at 2 not reaching 3. * If a message is produced on 1 and consumed on 2, the message is acknowledged on 1 and 2. No ack is reaching 3. * Every message produced on 2 are mirrored on 1 and 3. * If a message is produced on 2 and consumed on 1, it is acked on 1 2 and 3 * If a message is produced on 3 and consumed on 3, it is acked only on 3.
1 parent a3028f1 commit 210f382

File tree

14 files changed

+929
-27
lines changed

14 files changed

+929
-27
lines changed

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java

+15-9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.activemq.artemis.protocol.amqp.connect;
1818

1919
import java.nio.charset.StandardCharsets;
20+
import java.util.ArrayList;
2021
import java.util.Arrays;
2122
import java.util.Collections;
2223
import java.util.HashMap;
@@ -306,10 +307,10 @@ public void validateMatching(Queue queue, AMQPBrokerConnectionElement connection
306307
public void createLink(Queue queue, AMQPBrokerConnectionElement connectionElement) {
307308
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) {
308309
Symbol[] dispatchCapability = new Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY};
309-
connectSender(queue, queue.getAddress().toString(), null, null, null, null, dispatchCapability, null);
310+
connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, dispatchCapability, null);
310311
connectReceiver(protonRemotingConnection, session, sessionContext, queue, dispatchCapability);
311312
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) {
312-
connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null);
313+
connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null, null);
313314
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) {
314315
connectReceiver(protonRemotingConnection, session, sessionContext, queue);
315316
}
@@ -450,21 +451,25 @@ private void doConnect() {
450451
final Queue queue = server.locateQueue(getMirrorSNF(replica));
451452

452453
final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica);
453-
final Symbol[] desiredCapabilities;
454454

455+
ArrayList<Symbol> desiredCapabilitiesList = new ArrayList<>();
456+
desiredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
455457
if (coreTunnelingEnabled) {
456-
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
457-
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
458-
} else {
459-
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
458+
desiredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
459+
}
460+
if (replica.getNoForward()) {
461+
desiredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD);
460462
}
461463

462-
final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
464+
final Symbol[] desiredCapabilities = (Symbol[]) desiredCapabilitiesList.toArray(new Symbol[]{});
465+
466+
final Symbol[] requiredOfferedCapabilities = replica.getNoForward() ? new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, AMQPMirrorControllerSource.NO_FORWARD} : new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
463467

464468
connectSender(queue,
465469
queue.getName().toString(),
466470
mirrorControllerSource::setLink,
467471
(r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)),
472+
(r) -> mirrorControllerSource.filterMessage(r),
468473
server.getNodeID().toString(),
469474
desiredCapabilities,
470475
null,
@@ -771,6 +776,7 @@ private void connectSender(Queue queue,
771776
String targetName,
772777
java.util.function.Consumer<Sender> senderConsumer,
773778
java.util.function.Consumer<? super MessageReference> beforeDeliver,
779+
java.util.function.Predicate<? super MessageReference> beforeDeliverFiltering,
774780
String brokerID,
775781
Symbol[] desiredCapabilities,
776782
Symbol[] targetCapabilities,
@@ -831,7 +837,7 @@ private void connectSender(Queue queue,
831837

832838
// Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened
833839
sender.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> {
834-
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver);
840+
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setBeforeDeliveryFiltering(beforeDeliverFiltering);
835841
try {
836842
if (!cancelled.get()) {
837843
if (futureTimeout != null) {

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java

+69
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
7070
public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu");
7171
public static final Symbol BROKER_ID = Symbol.getSymbol("x-opt-amq-bkr-id");
7272
public static final SimpleString BROKER_ID_SIMPLE_STRING = SimpleString.of(BROKER_ID.toString());
73+
public static final SimpleString NO_FORWARD_SOURCE = SimpleString.of("x-opt-amq-no-forward-source");
74+
public static final SimpleString RECEIVER_ID_FILTER = SimpleString.of("x-opt-amq-receiver-id-filter");
7375

7476
// Events:
7577
public static final Symbol ADD_ADDRESS = Symbol.getSymbol("addAddress");
@@ -89,9 +91,11 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
8991
// Capabilities
9092
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
9193
public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");
94+
public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward");
9295

9396
public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString());
9497
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString());
98+
public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString());
9599

96100
private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));
97101

@@ -230,12 +234,17 @@ public void addAddress(AddressInfo addressInfo) throws Exception {
230234
public void deleteAddress(AddressInfo addressInfo) throws Exception {
231235
logger.trace("{} deleteAddress {}", server, addressInfo);
232236

237+
if (isBlockedByNoForward()) {
238+
return;
239+
}
240+
233241
if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
234242
return;
235243
}
236244
if (ignoreAddress(addressInfo.getName())) {
237245
return;
238246
}
247+
239248
if (deleteQueues) {
240249
Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON());
241250
routeMirrorCommand(server, message);
@@ -246,6 +255,10 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception {
246255
public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
247256
logger.trace("{} createQueue {}", server, queueConfiguration);
248257

258+
if (isBlockedByNoForward()) {
259+
return;
260+
}
261+
249262
if (invalidTarget(getControllerInUse()) || queueConfiguration.isInternal()) {
250263
if (logger.isTraceEnabled()) {
251264
logger.trace("Rejecting ping pong on create {} as isInternal={} and mirror target = {}", queueConfiguration, queueConfiguration.isInternal(), getControllerInUse());
@@ -264,6 +277,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
264277
}
265278
return;
266279
}
280+
267281
if (addQueues) {
268282
Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON());
269283
routeMirrorCommand(server, message);
@@ -276,6 +290,10 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti
276290
logger.trace("{} deleteQueue {}/{}", server, address, queue);
277291
}
278292

293+
if (isBlockedByNoForward()) {
294+
return;
295+
}
296+
279297
if (invalidTarget(getControllerInUse())) {
280298
return;
281299
}
@@ -310,6 +328,14 @@ private boolean invalidTarget(MirrorController controller) {
310328
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
311329
}
312330

331+
private boolean isBlockedByNoForward() {
332+
return getControllerInUse() != null && getControllerInUse().isNoForward();
333+
}
334+
335+
private boolean isBlockedByNoForward(Message message) {
336+
return isBlockedByNoForward() || Boolean.TRUE.equals(message.getBrokerProperty(INTERNAL_NO_FORWARD));
337+
}
338+
313339
private boolean ignoreAddress(SimpleString address) {
314340
if (address.startsWith(server.getConfiguration().getManagementAddress())) {
315341
return true;
@@ -338,6 +364,11 @@ Message copyMessageForPaging(Message message) {
338364
public void sendMessage(Transaction tx, Message message, RoutingContext context) {
339365
SimpleString address = context.getAddress(message);
340366

367+
if (isBlockedByNoForward(message)) {
368+
logger.trace("sendMessage::server {} is discarding the message because its source is setting a noForward policy", server);
369+
return;
370+
}
371+
341372
if (context.isInternal()) {
342373
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server);
343374
return;
@@ -353,6 +384,8 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
353384
return;
354385
}
355386

387+
logger.trace("sendMessage::{} send message {}", server, message);
388+
356389
try {
357390
context.setReusable(false);
358391

@@ -467,6 +500,28 @@ public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier,
467500
}
468501
}
469502

503+
/**
504+
* Checks if the message ref should be filtered or not.
505+
* @param ref the message to check
506+
* @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value
507+
* than the remoteMirrorID, false otherwise.
508+
*/
509+
public boolean filterMessage(MessageReference ref) {
510+
Object filterID = ref.getMessage().getAnnotation(RECEIVER_ID_FILTER);
511+
if (filterID != null) {
512+
String remoteMirrorId = getRemoteMirrorId();
513+
if (remoteMirrorId != null) {
514+
if (remoteMirrorId.equals(filterID)) {
515+
return false;
516+
} else {
517+
return true;
518+
}
519+
}
520+
return false;
521+
}
522+
return false;
523+
}
524+
470525
/** This method will return the brokerID used by the message */
471526
private static String setProtocolData(ReferenceIDSupplier referenceIDSupplier, MessageReference ref) {
472527
String brokerID = referenceIDSupplier.getServerID(ref);
@@ -543,6 +598,17 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
543598
logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref, reason);
544599
}
545600

601+
SimpleString noForwardSource = null;
602+
if (Boolean.TRUE.equals(ref.getMessage().getBooleanProperty(INTERNAL_NO_FORWARD))) {
603+
noForwardSource = (SimpleString) ref.getMessage().getBrokerProperty(NO_FORWARD_SOURCE);
604+
String remoteMirrorId = getRemoteMirrorId();
605+
if (remoteMirrorId != null) {
606+
if (!SimpleString.of(remoteMirrorId).equals(noForwardSource)) {
607+
return;
608+
}
609+
}
610+
}
611+
546612
MirrorController controllerInUse = getControllerInUse();
547613

548614
// Retried ACKs are not forwarded.
@@ -578,6 +644,9 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
578644
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will be null for any message generated on this broker.
579645
long internalID = idSupplier.getID(ref);
580646
Message messageCommand = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
647+
if (noForwardSource != null) {
648+
messageCommand.setBrokerProperty(RECEIVER_ID_FILTER, noForwardSource);
649+
}
581650
if (sync) {
582651
OperationContext operationContext;
583652
operationContext = OperationContextImpl.getContext(server.getExecutorFactory());

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java

+21-5
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import org.apache.activemq.artemis.core.server.impl.AckReason;
4040
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
4141
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
42-
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
42+
import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController;
4343
import org.apache.activemq.artemis.core.transaction.Transaction;
4444
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
4545
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
@@ -53,6 +53,7 @@
5353
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
5454
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader;
5555
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader;
56+
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
5657
import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader;
5758
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
5859
import org.apache.activemq.artemis.utils.ByteUtil;
@@ -77,29 +78,39 @@
7778
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
7879
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
7980
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
81+
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD;
8082
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
8183
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
84+
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD;
85+
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD_SOURCE;
8286
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
8387
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
8488
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
8589
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.TARGET_QUEUES;
8690
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT;
8791
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
8892

89-
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController {
93+
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements TargetMirrorController {
9094

9195
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
9296

93-
private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
97+
private static final ThreadLocal<TargetMirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
9498

95-
public static void setControllerInUse(MirrorController controller) {
99+
public static void setControllerInUse(TargetMirrorController controller) {
96100
CONTROLLER_THREAD_LOCAL.set(controller);
97101
}
98102

99-
public static MirrorController getControllerInUse() {
103+
public static TargetMirrorController getControllerInUse() {
100104
return CONTROLLER_THREAD_LOCAL.get();
101105
}
102106

107+
private boolean noMessageForwarding = false;
108+
109+
@Override
110+
public boolean isNoForward() {
111+
return noMessageForwarding;
112+
}
113+
103114
/**
104115
* Objects of this class can be used by either transaction or by OperationContext.
105116
* It is important that when you're using the transactions you clear any references to
@@ -248,6 +259,7 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
248259
this.configuration = server.getConfiguration();
249260
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
250261
mirrorContext = protonSession.getSessionSPI().getSessionContext();
262+
this.noMessageForwarding = AmqpSupport.verifyDesiredCapability(receiver, NO_FORWARD);
251263
}
252264

253265
@Override
@@ -534,6 +546,10 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat
534546

535547
message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
536548
message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID);
549+
if (noMessageForwarding) {
550+
message.setBrokerProperty(INTERNAL_NO_FORWARD, true);
551+
message.setBrokerProperty(NO_FORWARD_SOURCE, getRemoteMirrorId());
552+
}
537553

538554
if (internalAddress != null) {
539555
message.setAddress(internalAddress);

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import org.apache.activemq.artemis.core.server.RoutingContext;
5555
import org.apache.activemq.artemis.core.server.impl.AckReason;
5656
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
57-
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
57+
import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController;
5858
import org.apache.activemq.artemis.core.transaction.Transaction;
5959
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
6060
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
@@ -237,7 +237,7 @@ private boolean isEmpty(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Que
237237

238238
// to be used with the same executor as the PagingStore executor
239239
public void retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
240-
MirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse();
240+
TargetMirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse();
241241
logger.trace("retrying address {} on server {}", address, server);
242242
try {
243243
AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController);
@@ -518,7 +518,7 @@ private void deliveryAsync(JournalHashMap<AckRetry, AckRetry, Queue> map) {
518518

519519

520520

521-
private static class DisabledAckMirrorController implements MirrorController {
521+
private static class DisabledAckMirrorController implements TargetMirrorController {
522522

523523
@Override
524524
public boolean isRetryACK() {
@@ -564,5 +564,10 @@ public void preAcknowledge(Transaction tx, MessageReference ref, AckReason reaso
564564
public String getRemoteMirrorId() {
565565
return null;
566566
}
567+
568+
@Override
569+
public boolean isNoForward() {
570+
return false;
571+
}
567572
}
568573
}

0 commit comments

Comments
 (0)