Skip to content

Commit 921bdf8

Browse files
committed
ARTEMIS-5037: limit mirror propagation
Add a new option in the Mirror settings to prevent a broker from propagating messages. On a topology such as: ``` 1 ---> 2 ---> 3 ^______| ``` Where 1 is connected to 2 via a noForward link, the behavior is as follows: * Every command from 1 are reaching 2 and are stopping at 2 not reaching 3. * If a message is produced on 1 and consumed on 2, the message is acknowledged on 1 and 2. No ack is reaching 3. * Every message produced on 2 are mirrored on 1 and 3. * If a message is produced on 2 and consumed on 1, it is acked on 1 2 and 3 * If a message is produced on 3 and consumed on 3, it is acked only on 3.
1 parent a3028f1 commit 921bdf8

File tree

14 files changed

+927
-27
lines changed

14 files changed

+927
-27
lines changed

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java

+15-9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.activemq.artemis.protocol.amqp.connect;
1818

1919
import java.nio.charset.StandardCharsets;
20+
import java.util.ArrayList;
2021
import java.util.Arrays;
2122
import java.util.Collections;
2223
import java.util.HashMap;
@@ -306,10 +307,10 @@ public void validateMatching(Queue queue, AMQPBrokerConnectionElement connection
306307
public void createLink(Queue queue, AMQPBrokerConnectionElement connectionElement) {
307308
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) {
308309
Symbol[] dispatchCapability = new Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY};
309-
connectSender(queue, queue.getAddress().toString(), null, null, null, null, dispatchCapability, null);
310+
connectSender(queue, queue.getAddress().toString(), null, null,null, null, null, dispatchCapability, null);
310311
connectReceiver(protonRemotingConnection, session, sessionContext, queue, dispatchCapability);
311312
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) {
312-
connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null);
313+
connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null, null);
313314
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) {
314315
connectReceiver(protonRemotingConnection, session, sessionContext, queue);
315316
}
@@ -450,21 +451,25 @@ private void doConnect() {
450451
final Queue queue = server.locateQueue(getMirrorSNF(replica));
451452

452453
final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica);
453-
final Symbol[] desiredCapabilities;
454454

455+
ArrayList<Symbol> desiredCapabilitiesList = new ArrayList<>();
456+
desiredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
455457
if (coreTunnelingEnabled) {
456-
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
457-
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
458-
} else {
459-
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
458+
desiredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
459+
}
460+
if (replica.getNoForward()) {
461+
desiredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD);
460462
}
461463

462-
final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
464+
final Symbol[] desiredCapabilities = (Symbol[]) desiredCapabilitiesList.toArray(new Symbol[]{});
465+
466+
final Symbol[] requiredOfferedCapabilities = replica.getNoForward() ? new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, AMQPMirrorControllerSource.NO_FORWARD} : new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
463467

464468
connectSender(queue,
465469
queue.getName().toString(),
466470
mirrorControllerSource::setLink,
467471
(r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)),
472+
(r) -> mirrorControllerSource.filterMessage(r),
468473
server.getNodeID().toString(),
469474
desiredCapabilities,
470475
null,
@@ -771,6 +776,7 @@ private void connectSender(Queue queue,
771776
String targetName,
772777
java.util.function.Consumer<Sender> senderConsumer,
773778
java.util.function.Consumer<? super MessageReference> beforeDeliver,
779+
java.util.function.Predicate<? super MessageReference> beforeDeliverFiltering,
774780
String brokerID,
775781
Symbol[] desiredCapabilities,
776782
Symbol[] targetCapabilities,
@@ -831,7 +837,7 @@ private void connectSender(Queue queue,
831837

832838
// Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened
833839
sender.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> {
834-
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver);
840+
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setBeforeDeliveryFiltering(beforeDeliverFiltering);
835841
try {
836842
if (!cancelled.get()) {
837843
if (futureTimeout != null) {

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java

+73
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.qpid.proton.amqp.Symbol;
5454
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
5555
import org.apache.qpid.proton.amqp.messaging.Properties;
56+
import org.apache.qpid.proton.amqp.transport.Target;
5657
import org.apache.qpid.proton.engine.Sender;
5758
import org.slf4j.Logger;
5859
import org.slf4j.LoggerFactory;
@@ -89,9 +90,15 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
8990
// Capabilities
9091
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
9192
public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");
93+
public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward");
94+
public static final Symbol NO_FORWARD_SOURCE = Symbol.getSymbol("amq.no.forward.source");
95+
public static final Symbol RECEIVER_ID_FILTER = Symbol.getSymbol("amq.receiver.id.filter");
9296

9397
public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString());
9498
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString());
99+
public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString());
100+
public static final SimpleString INTERNAL_NO_FORWARD_SOURCE = SimpleString.of(NO_FORWARD_SOURCE.toString());
101+
public static final SimpleString INTERNAL_RECEIVER_ID_FILTER = SimpleString.of(RECEIVER_ID_FILTER.toString());
95102

96103
private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));
97104

@@ -230,12 +237,17 @@ public void addAddress(AddressInfo addressInfo) throws Exception {
230237
public void deleteAddress(AddressInfo addressInfo) throws Exception {
231238
logger.trace("{} deleteAddress {}", server, addressInfo);
232239

240+
if (isBlockedByNoForward()) {
241+
return;
242+
}
243+
233244
if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
234245
return;
235246
}
236247
if (ignoreAddress(addressInfo.getName())) {
237248
return;
238249
}
250+
239251
if (deleteQueues) {
240252
Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON());
241253
routeMirrorCommand(server, message);
@@ -246,6 +258,10 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception {
246258
public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
247259
logger.trace("{} createQueue {}", server, queueConfiguration);
248260

261+
if (isBlockedByNoForward()) {
262+
return;
263+
}
264+
249265
if (invalidTarget(getControllerInUse()) || queueConfiguration.isInternal()) {
250266
if (logger.isTraceEnabled()) {
251267
logger.trace("Rejecting ping pong on create {} as isInternal={} and mirror target = {}", queueConfiguration, queueConfiguration.isInternal(), getControllerInUse());
@@ -264,6 +280,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
264280
}
265281
return;
266282
}
283+
267284
if (addQueues) {
268285
Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON());
269286
routeMirrorCommand(server, message);
@@ -276,6 +293,10 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti
276293
logger.trace("{} deleteQueue {}/{}", server, address, queue);
277294
}
278295

296+
if (isBlockedByNoForward()) {
297+
return;
298+
}
299+
279300
if (invalidTarget(getControllerInUse())) {
280301
return;
281302
}
@@ -310,6 +331,14 @@ private boolean invalidTarget(MirrorController controller) {
310331
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
311332
}
312333

334+
private boolean isBlockedByNoForward() {
335+
return getControllerInUse() != null && getControllerInUse().isNoForward();
336+
}
337+
338+
private boolean isBlockedByNoForward(Message message) {
339+
return isBlockedByNoForward() || Boolean.TRUE.equals(message.getBrokerProperty(INTERNAL_NO_FORWARD));
340+
}
341+
313342
private boolean ignoreAddress(SimpleString address) {
314343
if (address.startsWith(server.getConfiguration().getManagementAddress())) {
315344
return true;
@@ -338,6 +367,12 @@ Message copyMessageForPaging(Message message) {
338367
public void sendMessage(Transaction tx, Message message, RoutingContext context) {
339368
SimpleString address = context.getAddress(message);
340369

370+
if (isBlockedByNoForward(message)) {
371+
String remoteID = getRemoteMirrorId();
372+
logger.trace("sendMessage::server {} is discarding the message because its source is setting a noForward policy", server);
373+
return;
374+
}
375+
341376
if (context.isInternal()) {
342377
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server);
343378
return;
@@ -353,6 +388,8 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
353388
return;
354389
}
355390

391+
logger.trace("sendMessage::{} send message {}", server, message);
392+
356393
try {
357394
context.setReusable(false);
358395

@@ -467,6 +504,28 @@ public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier,
467504
}
468505
}
469506

507+
/**
508+
* Checks if the message ref should be filtered or not.
509+
* @param ref the message to filter
510+
* @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value
511+
* that the remoteMirrorID, false otherwise.
512+
*/
513+
public boolean filterMessage(MessageReference ref) {
514+
Object filterID = ref.getMessage().getAnnotation(INTERNAL_RECEIVER_ID_FILTER);
515+
if (filterID != null) {
516+
String remoteMirrorId = getRemoteMirrorId();
517+
if (remoteMirrorId != null) {
518+
if(remoteMirrorId.equals(filterID)){
519+
return false;
520+
} else {
521+
return true;
522+
}
523+
}
524+
return false;
525+
}
526+
return false;
527+
}
528+
470529
/** This method will return the brokerID used by the message */
471530
private static String setProtocolData(ReferenceIDSupplier referenceIDSupplier, MessageReference ref) {
472531
String brokerID = referenceIDSupplier.getServerID(ref);
@@ -543,6 +602,17 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
543602
logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref, reason);
544603
}
545604

605+
SimpleString noForwardSource = null;
606+
if (Boolean.TRUE.equals(ref.getMessage().getBooleanProperty(INTERNAL_NO_FORWARD))) {
607+
noForwardSource = (SimpleString) ref.getMessage().getBrokerProperty(INTERNAL_NO_FORWARD_SOURCE);
608+
String remoteMirrorId = getRemoteMirrorId();
609+
if (remoteMirrorId != null) {
610+
if (!SimpleString.of(remoteMirrorId).equals(noForwardSource)) {
611+
return;
612+
}
613+
}
614+
}
615+
546616
MirrorController controllerInUse = getControllerInUse();
547617

548618
// Retried ACKs are not forwarded.
@@ -578,6 +648,9 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
578648
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will be null for any message generated on this broker.
579649
long internalID = idSupplier.getID(ref);
580650
Message messageCommand = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
651+
if (noForwardSource != null) {
652+
messageCommand.setBrokerProperty(INTERNAL_RECEIVER_ID_FILTER, noForwardSource);
653+
}
581654
if (sync) {
582655
OperationContext operationContext;
583656
operationContext = OperationContextImpl.getContext(server.getExecutorFactory());

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java

+21-5
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import org.apache.activemq.artemis.core.server.impl.AckReason;
4040
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
4141
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
42-
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
42+
import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController;
4343
import org.apache.activemq.artemis.core.transaction.Transaction;
4444
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
4545
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
@@ -53,6 +53,7 @@
5353
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
5454
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader;
5555
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader;
56+
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
5657
import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader;
5758
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
5859
import org.apache.activemq.artemis.utils.ByteUtil;
@@ -77,29 +78,39 @@
7778
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
7879
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
7980
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
81+
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD;
8082
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
8183
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
84+
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD;
85+
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD_SOURCE;
8286
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
8387
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
8488
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
8589
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.TARGET_QUEUES;
8690
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT;
8791
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
8892

89-
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController {
93+
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements TargetMirrorController {
9094

9195
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
9296

93-
private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
97+
private static final ThreadLocal<TargetMirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
9498

95-
public static void setControllerInUse(MirrorController controller) {
99+
public static void setControllerInUse(TargetMirrorController controller) {
96100
CONTROLLER_THREAD_LOCAL.set(controller);
97101
}
98102

99-
public static MirrorController getControllerInUse() {
103+
public static TargetMirrorController getControllerInUse() {
100104
return CONTROLLER_THREAD_LOCAL.get();
101105
}
102106

107+
private boolean noMessageForwarding = false;
108+
109+
@Override
110+
public boolean isNoForward() {
111+
return noMessageForwarding;
112+
}
113+
103114
/**
104115
* Objects of this class can be used by either transaction or by OperationContext.
105116
* It is important that when you're using the transactions you clear any references to
@@ -248,6 +259,7 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
248259
this.configuration = server.getConfiguration();
249260
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
250261
mirrorContext = protonSession.getSessionSPI().getSessionContext();
262+
this.noMessageForwarding = AmqpSupport.verifyDesiredCapability(receiver, NO_FORWARD);
251263
}
252264

253265
@Override
@@ -534,6 +546,10 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat
534546

535547
message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
536548
message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID);
549+
if (noMessageForwarding) {
550+
message.setBrokerProperty(INTERNAL_NO_FORWARD, true);
551+
message.setBrokerProperty(INTERNAL_NO_FORWARD_SOURCE, getRemoteMirrorId());
552+
}
537553

538554
if (internalAddress != null) {
539555
message.setAddress(internalAddress);

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import org.apache.activemq.artemis.core.server.RoutingContext;
5555
import org.apache.activemq.artemis.core.server.impl.AckReason;
5656
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
57-
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
57+
import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController;
5858
import org.apache.activemq.artemis.core.transaction.Transaction;
5959
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
6060
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
@@ -237,7 +237,7 @@ private boolean isEmpty(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Que
237237

238238
// to be used with the same executor as the PagingStore executor
239239
public void retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
240-
MirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse();
240+
TargetMirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse();
241241
logger.trace("retrying address {} on server {}", address, server);
242242
try {
243243
AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController);
@@ -518,7 +518,7 @@ private void deliveryAsync(JournalHashMap<AckRetry, AckRetry, Queue> map) {
518518

519519

520520

521-
private static class DisabledAckMirrorController implements MirrorController {
521+
private static class DisabledAckMirrorController implements TargetMirrorController {
522522

523523
@Override
524524
public boolean isRetryACK() {
@@ -564,5 +564,10 @@ public void preAcknowledge(Transaction tx, MessageReference ref, AckReason reaso
564564
public String getRemoteMirrorId() {
565565
return null;
566566
}
567+
568+
@Override
569+
public boolean isNoForward() {
570+
return false;
571+
}
567572
}
568573
}

0 commit comments

Comments
 (0)