Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 44bc79d

Browse files
committedJul 26, 2024
ARTEMIS-4924 Do not allow sending messages directly to store-and-forward queues
- To send any invalid messages in snf queues to DLA - Add documentation for store and forward queue proper usage
1 parent bc1bb99 commit 44bc79d

File tree

9 files changed

+166
-9
lines changed

9 files changed

+166
-9
lines changed
 

‎artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java

+6
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,12 @@ public ActiveMQException createException(String msg) {
279279
public ActiveMQException createException(String msg) {
280280
return new ActiveMQTimeoutException(msg);
281281
}
282+
},
283+
INVALID_MESSAGE_EXCEPTION(224) {
284+
@Override
285+
public ActiveMQException createException(String msg) {
286+
return new ActiveMQInvalidMessageException(msg);
287+
}
282288
};
283289
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
284290

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.activemq.artemis.api.core;
19+
20+
public class ActiveMQInvalidMessageException extends ActiveMQException {
21+
22+
public ActiveMQInvalidMessageException(String message) {
23+
super(ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION, message);
24+
}
25+
}

‎artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java

+5
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,11 @@ public interface Message {
173173
*/
174174
SimpleString HDR_INGRESS_TIMESTAMP = SimpleString.of("_AMQ_INGRESS_TIMESTAMP");
175175

176+
/**
177+
* This gives extra information as to why the messages is sent to DLQ
178+
*/
179+
SimpleString HDR_ROUTE_DLQ_DETAIL = SimpleString.of("_AMQ_DLQ_DETAIL");
180+
176181
/**
177182
* The prefix used (if any) when sending this message. For protocols (e.g. STOMP) that need to track this and restore
178183
* the prefix when the message is consumed.

‎artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java

+2
Original file line numberDiff line numberDiff line change
@@ -559,4 +559,6 @@ IllegalStateException invalidRoutingTypeUpdate(String queueName,
559559
@Message(id = 229255, value = "Bridge {} cannot be {}. Current state: {}")
560560
ActiveMQIllegalStateException bridgeOperationCannotBeExecuted(String bridgeName, String failedOp, BridgeImpl.State currentState);
561561

562+
@Message(id = 229256, value = "Missing header {}")
563+
String messageMissingHeader(SimpleString idsHeaderName);
562564
}

‎artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -641,9 +641,8 @@ void slowConsumerDetected(String sessionID,
641641
@LogMessage(id = 222109, value = "Timed out waiting for write lock on consumer {} from {}. Check the Thread dump", level = LogMessage.Level.WARN)
642642
void timeoutLockingConsumer(String consumer, String remoteAddress);
643643

644-
@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, copiedMessage = {}, props={}", level = LogMessage.Level.WARN)
644+
@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, props={}", level = LogMessage.Level.WARN)
645645
void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
646-
org.apache.activemq.artemis.api.core.Message messageCopy,
647646
SimpleString idsHeaderName);
648647

649648
@LogMessage(id = 222111, value = "exception while invoking {} on {}", level = LogMessage.Level.TRACE)

‎artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ public void failed(Throwable t) {
506506
}
507507

508508
/* Hook for processing message before forwarding */
509-
protected Message beforeForward(Message message, final SimpleString forwardingAddress) {
509+
protected Message beforeForward(Message message, final SimpleString forwardingAddress) throws ActiveMQException {
510510
message = message.copy();
511511
((RefCountMessage)message).setParentRef((RefCountMessage)message);
512512

@@ -605,7 +605,19 @@ public HandleStatus handle(final MessageReference ref) throws Exception {
605605
dest = ref.getMessage().getAddressSimpleString();
606606
}
607607

608-
final Message message = beforeForward(ref.getMessage(), dest);
608+
final Message message;
609+
try {
610+
message = beforeForward(ref.getMessage(), dest);
611+
} catch (ActiveMQException ex) {
612+
if (ex.getType() == ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION) {
613+
ref.getMessage().putStringProperty(Message.HDR_ROUTE_DLQ_DETAIL, SimpleString.of(ex.getMessage()));
614+
ref.getQueue().sendToDeadLetterAddress(null, ref);
615+
refs.remove(ref.getMessageID());
616+
return HandleStatus.HANDLED;
617+
} else {
618+
throw ex;
619+
}
620+
}
609621

610622
pendingAcks.countUp();
611623

‎artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.activemq.artemis.core.filter.Filter;
4848
import org.apache.activemq.artemis.core.persistence.StorageManager;
4949
import org.apache.activemq.artemis.core.postoffice.BindingType;
50+
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
5051
import org.apache.activemq.artemis.core.server.ActiveMQServer;
5152
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
5253
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
@@ -185,7 +186,7 @@ protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
185186
}
186187

187188
@Override
188-
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) {
189+
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) throws ActiveMQException {
189190
// We make a copy of the message, then we strip out the unwanted routing id headers and leave
190191
// only
191192
// the one pertinent for the address node - this is important since different queues on different
@@ -200,11 +201,9 @@ protected Message beforeForward(final Message message, final SimpleString forwar
200201
Set<SimpleString> propNames = new HashSet<>(messageCopy.getPropertyNames());
201202

202203
byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);
203-
204204
if (queueIds == null) {
205-
// Sanity check only
206-
ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, messageCopy, idsHeaderName);
207-
throw new IllegalStateException("no queueIDs defined");
205+
ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, idsHeaderName);
206+
throw ActiveMQExceptionType.createException(ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION.getCode(), ActiveMQMessageBundle.BUNDLE.messageMissingHeader(idsHeaderName));
208207
}
209208

210209
for (SimpleString propName : propNames) {

‎docs/user-manual/clusters.adoc

+5
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,11 @@ The default value is `-1`.
715715

716716
It often makes sense to introduce a delay before redistributing as it's a common case that a consumer closes but another one quickly is created on the same queue, in such a case you probably don't want to redistribute immediately since the new consumer will arrive shortly.
717717

718+
[WARNING]
719+
====
720+
The broker uses internal store and forward queues to handle message redistribution. Be aware that any clients should not directly send messages to the sore and forward queues. If a client sends messages to a store and forward queue, the messages will be sent to dead letter address. If security is enabled, make sure the clients do not have `send` permission on any store and forward queues. (The name pattern for a store and forward queue is <internal-naming-prefix>.sf.<cluster-name>.<nodeID> where the default internal-naming-prefix is `$.activemq.internal`, the cluster-name is the name of the cluster-connection, and the nodeID is the target node's ID)
721+
====
722+
718723
== Cluster topologies
719724

720725
Apache ActiveMQ Artemis clusters can be connected together in many different topologies, let's consider the two most common ones here

‎tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java

+104
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.junit.jupiter.api.Assertions.assertEquals;
2020
import static org.junit.jupiter.api.Assertions.assertNotNull;
21+
import static org.junit.jupiter.api.Assertions.assertNull;
2122
import static org.junit.jupiter.api.Assertions.assertTrue;
2223

2324
import java.util.ArrayList;
@@ -28,17 +29,21 @@
2829
import java.util.concurrent.atomic.AtomicInteger;
2930

3031
import org.apache.activemq.artemis.api.core.ActiveMQException;
32+
import org.apache.activemq.artemis.api.core.Message;
3133
import org.apache.activemq.artemis.api.core.SimpleString;
34+
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
3235
import org.apache.activemq.artemis.api.core.client.ClientMessage;
3336
import org.apache.activemq.artemis.api.core.client.ClientProducer;
3437
import org.apache.activemq.artemis.api.core.client.ClientSession;
3538
import org.apache.activemq.artemis.api.core.client.TopologyMember;
3639
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
40+
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
3741
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
3842
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor;
3943
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
4044
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
4145
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
46+
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
4247
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
4348
import org.apache.activemq.artemis.tests.util.Wait;
4449
import org.junit.jupiter.api.AfterEach;
@@ -305,6 +310,105 @@ public void testClusterBridgeAddRemoteBinding() throws Exception {
305310
stopServers(0, 1);
306311
}
307312

313+
@Test
314+
public void testBadClientSendMessagesToSnFQueue() throws Exception {
315+
setupServer(0, isFileStorage(), isNetty());
316+
setupServer(1, isFileStorage(), isNetty());
317+
318+
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
319+
320+
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
321+
322+
String dla = "DLA";
323+
AddressSettings addressSettings = new AddressSettings();
324+
addressSettings.setDeadLetterAddress(SimpleString.of(dla));
325+
326+
servers[0].getAddressSettingsRepository().addMatch("#", addressSettings);
327+
servers[1].getAddressSettingsRepository().addMatch("#", addressSettings);
328+
329+
startServers(0, 1);
330+
331+
setupSessionFactory(0, isNetty());
332+
setupSessionFactory(1, isNetty());
333+
334+
createQueue(0, dla, dla, null, true);
335+
createQueue(1, dla, dla, null, true);
336+
337+
waitForBindings(0, dla, 1, 0, true);
338+
waitForBindings(1, dla, 1, 0, true);
339+
340+
ClientSession session0 = sfs[0].createSession();
341+
ClientSession session1 = sfs[1].createSession();
342+
343+
session0.start();
344+
session1.start();
345+
346+
final int num = 10;
347+
348+
SimpleString nodeId1 = servers[1].getNodeID();
349+
ClusterConnectionImpl cc0 = (ClusterConnectionImpl) servers[0].getClusterManager().getClusterConnection("cluster0");
350+
SimpleString snfQueue0 = cc0.getSfQueueName(nodeId1.toString());
351+
352+
ClientProducer badProducer0 = session0.createProducer(snfQueue0);
353+
for (int i = 0; i < num; i++) {
354+
Message msg = session0.createMessage(true);
355+
msg.putStringProperty("origin", "from producer 0");
356+
badProducer0.send(msg);
357+
}
358+
359+
//add a remote queue and consumer to enable message to flow from node 0 to node 1
360+
createQueue(1, "queues.testaddress", "queue0", null, true);
361+
ClientConsumer consumer1 = session1.createConsumer("queue0");
362+
363+
waitForBindings(0, "queues.testaddress", 0, 0, true);
364+
waitForBindings(1, "queues.testaddress", 1, 1, true);
365+
366+
waitForBindings(0, "queues.testaddress", 1, 1, false);
367+
waitForBindings(1, "queues.testaddress", 0, 0, false);
368+
369+
ClientConsumer dlqConsumer = session0.createConsumer(dla);
370+
371+
for (int i = 0; i < num; i++) {
372+
Message msg = session0.createMessage(true);
373+
msg.putStringProperty("origin", "from producer 0");
374+
badProducer0.send(msg);
375+
}
376+
377+
//messages will never reache the consumer
378+
assertNull(consumer1.receiveImmediate());
379+
380+
SimpleString idHeadersName = Message.HDR_ROUTE_TO_IDS.concat(snfQueue0);
381+
for (int i = 0; i < num * 2; i++) {
382+
ClientMessage m = dlqConsumer.receive(5000);
383+
assertNotNull(m);
384+
String propValue = m.getStringProperty("origin");
385+
assertEquals("from producer 0", propValue);
386+
propValue = m.getStringProperty(Message.HDR_ROUTE_DLQ_DETAIL);
387+
assertEquals(ActiveMQMessageBundle.BUNDLE.messageMissingHeader(idHeadersName), propValue);
388+
m.acknowledge();
389+
}
390+
assertNull(dlqConsumer.receiveImmediate());
391+
392+
//normal message flow should work
393+
ClientProducer goodProducer0 = session0.createProducer("queues.testaddress");
394+
for (int i = 0; i < num; i++) {
395+
Message msg = session0.createMessage(true);
396+
msg.putStringProperty("origin", "from producer 0");
397+
goodProducer0.send(msg);
398+
}
399+
400+
//consumer1 can receive from node0
401+
for (int i = 0; i < num; i++) {
402+
ClientMessage m = consumer1.receive(5000);
403+
assertNotNull(m);
404+
String propValue = m.getStringProperty("origin");
405+
assertEquals("from producer 0", propValue);
406+
m.acknowledge();
407+
}
408+
assertNull(consumer1.receiveImmediate());
409+
410+
stopServers(0, 1);
411+
}
308412

309413
@Override
310414
@AfterEach

0 commit comments

Comments
 (0)
Please sign in to comment.