Skip to content

Commit 4e126f6

Browse files
committed
WIP: testing another placement for filtering
1 parent 2bfb318 commit 4e126f6

File tree

6 files changed

+32
-14
lines changed

6 files changed

+32
-14
lines changed

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java

+6
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,12 @@ public int sendMessage(MessageReference ref, ServerConsumer consumer, int delive
758758

759759
}
760760

761+
@Override
762+
public boolean filterRef(MessageReference ref, ServerConsumer consumer) {
763+
ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
764+
return plugSender.filterRef(ref);
765+
}
766+
761767
@Override
762768
public int sendLargeMessage(MessageReference ref,
763769
ServerConsumer consumer,

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ private void doConnect() {
469469
queue.getName().toString(),
470470
mirrorControllerSource::setLink,
471471
(r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)),
472-
(r) -> mirrorControllerSource.filterMessage(r),
472+
(r) -> mirrorControllerSource.shouldFilterRef(r),
473473
server.getNodeID().toString(),
474474
desiredCapabilities,
475475
null,
@@ -776,7 +776,7 @@ private void connectSender(Queue queue,
776776
String targetName,
777777
java.util.function.Consumer<Sender> senderConsumer,
778778
java.util.function.Consumer<? super MessageReference> beforeDeliver,
779-
java.util.function.Predicate<? super MessageReference> beforeDeliverFiltering,
779+
java.util.function.Predicate<? super MessageReference> shouldFilterRef,
780780
String brokerID,
781781
Symbol[] desiredCapabilities,
782782
Symbol[] targetCapabilities,
@@ -837,7 +837,7 @@ private void connectSender(Queue queue,
837837

838838
// Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened
839839
sender.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> {
840-
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setBeforeDeliveryFiltering(beforeDeliverFiltering);
840+
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setShouldFilterRef(shouldFilterRef);
841841
try {
842842
if (!cancelled.get()) {
843843
if (futureTimeout != null) {

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -506,14 +506,15 @@ public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier,
506506
* @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value
507507
* than the remoteMirrorID, false otherwise.
508508
*/
509-
public boolean filterMessage(MessageReference ref) {
509+
public boolean shouldFilterRef(MessageReference ref) {
510510
Object filterID = ref.getMessage().getAnnotation(RECEIVER_ID_FILTER);
511511
if (filterID != null) {
512512
String remoteMirrorId = getRemoteMirrorId();
513513
if (remoteMirrorId != null) {
514514
if (remoteMirrorId.equals(filterID)) {
515515
return false;
516516
} else {
517+
logger.trace("filtering message {} as remote mirror ID {} diverges from the wanted receiver {}", ref, remoteMirrorId, filterID);
517518
return true;
518519
}
519520
}
@@ -607,7 +608,7 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
607608
noForwardSource = String.valueOf(ref.getMessage().getBrokerProperty(NO_FORWARD_SOURCE));
608609
if (remoteMirrorId != null && !remoteMirrorId.equals(noForwardSource)) {
609610
if (logger.isInfoEnabled()) {
610-
logger.info("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId);
611+
logger.trace("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId);
611612
}
612613
return;
613614
}

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
8989
private int credits = 0;
9090
private AtomicInteger pending = new AtomicInteger(0);
9191
private java.util.function.Consumer<? super MessageReference> beforeDelivery;
92-
private java.util.function.Predicate<? super MessageReference> beforeDeliveryFiltering;
92+
private java.util.function.Predicate<? super MessageReference> shouldFilterRef;
9393

9494
protected volatile Runnable afterDelivery;
9595
protected volatile MessageWriter messageWriter = SenderController.REJECTING_MESSAGE_WRITER;
@@ -121,8 +121,8 @@ public ProtonServerSenderContext setBeforeDelivery(java.util.function.Consumer<?
121121
return this;
122122
}
123123

124-
public ProtonServerSenderContext setBeforeDeliveryFiltering(java.util.function.Predicate<? super MessageReference> beforeDeliveryFiltering) {
125-
this.beforeDeliveryFiltering = beforeDeliveryFiltering;
124+
public ProtonServerSenderContext setShouldFilterRef(java.util.function.Predicate<? super MessageReference> shouldFilterRef) {
125+
this.shouldFilterRef = shouldFilterRef;
126126
return this;
127127
}
128128

@@ -447,6 +447,13 @@ private boolean handleExtendedDeliveryOutcomes(Message message, Delivery deliver
447447
return handled;
448448
}
449449

450+
public boolean filterRef(MessageReference ref) {
451+
if (shouldFilterRef != null) {
452+
return shouldFilterRef.test(ref);
453+
}
454+
return false;
455+
}
456+
450457
private final class ConnectionFlushIOCallback implements IOCallback {
451458

452459
@Override
@@ -481,12 +488,6 @@ public int deliverMessage(final MessageReference messageReference, final ServerC
481488
beforeDelivery.accept(messageReference);
482489
}
483490

484-
if (beforeDeliveryFiltering != null) {
485-
if (beforeDeliveryFiltering.test(messageReference)) {
486-
return 0;
487-
}
488-
}
489-
490491
synchronized (creditsLock) {
491492
if (sender.getLocalState() == EndpointState.CLOSED) {
492493
return 0;

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

+6
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,12 @@ public HandleStatus handle(final MessageReference ref) throws Exception {
428428

429429
return HandleStatus.NO_MATCH;
430430
}
431+
if (callback != null && callback.filterRef(ref, ServerConsumerImpl.this)) {
432+
if (logger.isDebugEnabled()) {
433+
// TODO
434+
}
435+
return HandleStatus.NO_MATCH;
436+
}
431437

432438
synchronized (lock) {
433439
// If the consumer is stopped then we don't accept the message, it

artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java

+4
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,8 @@ default void close(boolean failed) {
9898
default Transaction getCurrentTransaction() {
9999
return null;
100100
}
101+
102+
default boolean filterRef(MessageReference ref, ServerConsumer consumer) {
103+
return false;
104+
}
101105
}

0 commit comments

Comments
 (0)