Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 8dba605

Browse files
committedAug 7, 2023
ARTEMIS-4325 - Ability to failback after failover
1 parent 444d5da commit 8dba605

File tree

7 files changed

+369
-0
lines changed

7 files changed

+369
-0
lines changed
 

‎artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class ServerLocatorConfig {
4545
public int reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS;
4646
public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
4747
public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
48+
public int failbackAttempts = ActiveMQClient.DEFAULT_FAILBACK_ATTEMPTS;
4849
public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
4950
public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
5051
public int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;
@@ -83,6 +84,7 @@ public ServerLocatorConfig(final ServerLocatorConfig locator) {
8384
reconnectAttempts = locator.reconnectAttempts;
8485
initialConnectAttempts = locator.initialConnectAttempts;
8586
failoverAttempts = locator.failoverAttempts;
87+
failbackAttempts = locator.failbackAttempts;
8688
initialMessagePacketSize = locator.initialMessagePacketSize;
8789
useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
8890
compressionLevel = locator.compressionLevel;

‎artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java

+2
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ public final class ActiveMQClient {
117117

118118
public static final int DEFAULT_FAILOVER_ATTEMPTS = 0;
119119

120+
public static final int DEFAULT_FAILBACK_ATTEMPTS = 0;
121+
120122
@Deprecated
121123
public static final boolean DEFAULT_FAILOVER_ON_INITIAL_CONNECTION = false;
122124

‎artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java

+15
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,21 @@ ClientSessionFactory createSessionFactory(TransportConfiguration transportConfig
668668
*/
669669
int getFailoverAttempts();
670670

671+
/**
672+
* Sets the maximum number of failback attempts to establish a new conection to the original broker after a failover.
673+
* <p>
674+
* Value must be -1 (to try infinitely), 0 (to never atempt failback) or greater than 0.
675+
*
676+
* @param attempts maximum number of failback attempts after a successful failover
677+
* @return this ServerLocator
678+
*/
679+
ServerLocator setFailbackAttempts(int attempts);
680+
681+
/**
682+
* @return the number of failback attempts after a successful failover
683+
*/
684+
int getFailbackAttempts();
685+
671686
/**
672687
* Returns true if the client will automatically attempt to connect to the backup server if the initial
673688
* connection to the live server fails

‎artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

+216
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@
7777
import java.lang.invoke.MethodHandles;
7878
import java.util.function.BiPredicate;
7979

80+
import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.DISCONNECTED;
81+
8082
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {
8183

8284
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -93,6 +95,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
9395

9496
private volatile TransportConfiguration backupConnectorConfig;
9597

98+
private TransportConfiguration failbackConnectorConfig;
99+
96100
private ConnectorFactory connectorFactory;
97101

98102
private final long callTimeout;
@@ -135,6 +139,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
135139

136140
private int failoverAttempts;
137141

142+
private int failbackAttempts;
143+
138144
private final Set<SessionFailureListener> listeners = new ConcurrentHashSet<>();
139145

140146
private final Set<FailoverEventListener> failoverListeners = new ConcurrentHashSet<>();
@@ -144,6 +150,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
144150
private Future<?> pingerFuture;
145151
private PingRunnable pingRunnable;
146152

153+
private FailbackRunnable failbackRunnable;
154+
147155
private final List<Interceptor> incomingInterceptors;
148156

149157
private final List<Interceptor> outgoingInterceptors;
@@ -244,6 +252,8 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
244252

245253
this.failoverAttempts = locatorConfig.failoverAttempts;
246254

255+
this.failbackAttempts = locatorConfig.failbackAttempts;
256+
247257
this.scheduledThreadPool = scheduledThreadPool;
248258

249259
this.threadPool = threadPool;
@@ -722,6 +732,12 @@ private void failoverOrReconnect(final Object connectionID,
722732
int connectorsCount = 0;
723733
int failoverRetries = 0;
724734
long failoverRetryInterval = retryInterval;
735+
736+
//Save current connector config for failback purposes
737+
if (failbackAttempts != 0 && failbackConnectorConfig == null) {
738+
failbackConnectorConfig = connectorConfig;
739+
}
740+
725741
Pair<TransportConfiguration, TransportConfiguration> connectorPair;
726742
BiPredicate<Boolean, Integer> failoverRetryPredicate =
727743
(reconnected, retries) -> clientProtocolManager.isAlive() &&
@@ -815,6 +831,128 @@ private void failoverOrReconnect(final Object connectionID,
815831
}
816832
}
817833

834+
private void failback(final ActiveMQException me,
835+
final TransportConfiguration previousConnectorConfig) {
836+
837+
logger.debug("Original node has come back online, performing failback now");
838+
839+
for (ClientSessionInternal session : sessions) {
840+
SessionContext context = session.getSessionContext();
841+
if (context instanceof ActiveMQSessionContext) {
842+
ActiveMQSessionContext sessionContext = (ActiveMQSessionContext) context;
843+
if (sessionContext.isKilled()) {
844+
setReconnectAttempts(0);
845+
}
846+
}
847+
}
848+
849+
Set<ClientSessionInternal> sessionsToClose = null;
850+
if (!clientProtocolManager.isAlive()) {
851+
return;
852+
}
853+
854+
Lock localFailoverLock = lockFailover();
855+
856+
try {
857+
858+
callFailoverListeners(FailoverEventType.FAILURE_DETECTED);
859+
callSessionFailureListeners(me, false, false, null);
860+
861+
if (clientProtocolManager.cleanupBeforeFailover(me)) {
862+
863+
RemotingConnection oldConnection = connection;
864+
865+
connection = null;
866+
867+
Connector localConnector = connector;
868+
if (localConnector != null) {
869+
try {
870+
localConnector.close();
871+
} catch (Exception ignore) {
872+
// no-op
873+
}
874+
}
875+
876+
cancelScheduledTasks();
877+
878+
connector = null;
879+
880+
HashSet<ClientSessionInternal> sessionsToFailover;
881+
synchronized (sessions) {
882+
sessionsToFailover = new HashSet<>(sessions);
883+
}
884+
885+
// Notify sessions before failover.
886+
for (ClientSessionInternal session : sessionsToFailover) {
887+
session.preHandleFailover(connection);
888+
}
889+
890+
boolean sessionsReconnected = false;
891+
892+
connectorConfig = previousConnectorConfig;
893+
currentConnectorConfig = previousConnectorConfig;
894+
895+
getConnection();
896+
897+
if (connection != null) {
898+
sessionsReconnected = reconnectSessions(sessionsToFailover, oldConnection, me);
899+
900+
if (!sessionsReconnected) {
901+
if (oldConnection != null) {
902+
oldConnection.destroy();
903+
}
904+
905+
oldConnection = connection;
906+
connection = null;
907+
}
908+
}
909+
910+
// Notify sessions after failover.
911+
for (ClientSessionInternal session : sessionsToFailover) {
912+
session.postHandleFailover(connection, sessionsReconnected);
913+
}
914+
915+
if (oldConnection != null) {
916+
oldConnection.destroy();
917+
}
918+
919+
if (connection != null) {
920+
callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED);
921+
922+
}
923+
}
924+
925+
if (connection == null) {
926+
synchronized (sessions) {
927+
sessionsToClose = new HashSet<>(sessions);
928+
}
929+
callFailoverListeners(FailoverEventType.FAILOVER_FAILED);
930+
callSessionFailureListeners(me, true, false, null);
931+
}
932+
} finally {
933+
localFailoverLock.unlock();
934+
}
935+
936+
// This needs to be outside the failover lock to prevent deadlock
937+
if (connection != null) {
938+
callSessionFailureListeners(me, true, true);
939+
}
940+
941+
if (sessionsToClose != null) {
942+
// If connection is null it means we didn't succeed in failing over or reconnecting
943+
// so we close all the sessions, so they will throw exceptions when attempted to be used
944+
945+
for (ClientSessionInternal session : sessionsToClose) {
946+
try {
947+
session.cleanUp(true);
948+
} catch (Exception cause) {
949+
ActiveMQClientLogger.LOGGER.failedToCleanupSession(cause);
950+
}
951+
}
952+
}
953+
954+
}
955+
818956
private ClientSession createSessionInternal(final String rawUsername,
819957
final String rawPassword,
820958
final boolean xa,
@@ -1018,6 +1156,10 @@ public boolean waitForRetry(long interval) {
10181156
return false;
10191157
}
10201158

1159+
private long getRetryInterval() {
1160+
return retryInterval;
1161+
}
1162+
10211163
private void cancelScheduledTasks() {
10221164
Future<?> pingerFutureLocal = pingerFuture;
10231165
if (pingerFutureLocal != null) {
@@ -1027,8 +1169,13 @@ private void cancelScheduledTasks() {
10271169
if (pingRunnableLocal != null) {
10281170
pingRunnableLocal.cancel();
10291171
}
1172+
FailbackRunnable failbackRunnableLocal = failbackRunnable;
1173+
if (failbackRunnableLocal != null) {
1174+
failbackRunnableLocal.cancel();
1175+
}
10301176
pingerFuture = null;
10311177
pingRunnable = null;
1178+
failbackRunnable = null;
10321179
}
10331180

10341181
private void checkCloseConnection() {
@@ -1492,6 +1639,68 @@ public synchronized void cancel() {
14921639
}
14931640
}
14941641

1642+
private void attemptFailback() {
1643+
if (failbackRunnable == null) {
1644+
failbackRunnable = new FailbackRunnable();
1645+
}
1646+
threadPool.execute(failbackRunnable);
1647+
}
1648+
1649+
private class FailbackRunnable implements Runnable {
1650+
private boolean first = true;
1651+
private boolean cancelled;
1652+
1653+
@Override
1654+
public synchronized void run() {
1655+
1656+
if (!first) {
1657+
return;
1658+
}
1659+
1660+
first = false;
1661+
1662+
logger.debug("Attempting failback. Trying to reach {} for failback", failbackConnectorConfig.toString());
1663+
1664+
int attempts = 0;
1665+
long failbackRetryInterval = getRetryInterval();
1666+
1667+
ConnectorFactory transportConnectorFactory;
1668+
Connector transportConnector;
1669+
Connection transportConnection;
1670+
1671+
while (!cancelled && (failbackAttempts == -1 || attempts++ < failbackAttempts)) {
1672+
1673+
waitForRetry(failbackRetryInterval);
1674+
failbackRetryInterval = getNextRetryInterval(failbackRetryInterval);
1675+
1676+
transportConnectorFactory = instantiateConnectorFactory(failbackConnectorConfig.getFactoryClassName());
1677+
transportConnector = createConnector(transportConnectorFactory, failbackConnectorConfig);
1678+
transportConnection = openTransportConnection(transportConnector);
1679+
1680+
if (transportConnection != null) {
1681+
transportConnector.close();
1682+
transportConnection.close();
1683+
ActiveMQException exception = new ActiveMQException("Failing back to original broker: " + failbackConnectorConfig.toString(), DISCONNECTED);
1684+
failback(exception, failbackConnectorConfig);
1685+
break;
1686+
}
1687+
1688+
}
1689+
1690+
if (failbackConnectorConfig.equals(currentConnectorConfig)) {
1691+
failbackConnectorConfig = null;
1692+
}
1693+
1694+
first = true;
1695+
1696+
}
1697+
1698+
public synchronized void cancel() {
1699+
cancelled = true;
1700+
}
1701+
1702+
}
1703+
14951704
protected RemotingConnection establishNewConnection() {
14961705
Connection transportConnection = createTransportConnection();
14971706

@@ -1572,6 +1781,13 @@ public void notifyNodeUp(long uniqueEventID,
15721781
boolean isLast) {
15731782

15741783
try {
1784+
1785+
if (failbackConnectorConfig != null && connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(), failbackConnectorConfig)) {
1786+
if (!currentConnectorConfig.equals(failbackConnectorConfig) && failbackRunnable == null) {
1787+
attemptFailback();
1788+
}
1789+
}
1790+
15751791
// if it is our connector then set the live id used for failover
15761792
if (connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(), currentConnectorConfig)) {
15771793
liveNodeID = nodeID;

‎artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java

+12
Original file line numberDiff line numberDiff line change
@@ -1199,6 +1199,18 @@ public int getFailoverAttempts() {
11991199
return config.failoverAttempts;
12001200
}
12011201

1202+
@Override
1203+
public ServerLocatorImpl setFailbackAttempts(int attempts) {
1204+
checkWrite();
1205+
this.config.failbackAttempts = attempts;
1206+
return this;
1207+
}
1208+
1209+
@Override
1210+
public int getFailbackAttempts() {
1211+
return config.failbackAttempts;
1212+
}
1213+
12021214
@Deprecated
12031215
@Override
12041216
public boolean isFailoverOnInitialConnection() {

‎docs/user-manual/client-failover.adoc

+10
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,16 @@ Set `failoverAttempts` to any non-zero value to reconnect to other live servers,
4848

4949
If `reconnectAttempts` value is not zero then the client will try to reconnect to other live servers only after all attempts to <<reconnect-to-the-same-server,reconnect to the same server>> or <<reconnect-to-the-backup-server,reconnect to the backup server>> fail.
5050

51+
== Failing back after a successful failover
52+
53+
It is also possile to have the client keep trying to reconnect to the original live server after a successful failover. In this case the failover connection will get disconnected and moved back to the original server if it comes back online at some later time.
54+
55+
This is controlled by the property `failbackAttempts`
56+
57+
If set to a non-zero value then following a failover the client will try to reach the original server continiously according to the configured `retryInterval` until it reaches the number of configured `failbackAttempts`.
58+
59+
Setting this parameter to `-1` means try forever. Default value is `0`
60+
5161
== Session reconnection
5262

5363
When clients <<reconnect-to-the-same-server,reconnect to the same server>> after a restart, <<reconnect-to-the-backup-server,reconnect to the backup server>> or <<reconnect-to-other-live-servers,reconnect to other live servers>> any sessions will no longer exist on the server and it won't be possible to 100% transparently re-attach to them.

‎tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java

+112
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.activemq.artemis.api.core.management.ResourceNames;
3838
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
3939
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
40+
import org.apache.activemq.artemis.core.server.ActiveMQServer;
4041
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
4142
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
4243
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@@ -212,6 +213,117 @@ public void testConsumerAfterFailoverWithRedistribution() throws Exception {
212213
}
213214
}
214215

216+
@Test
217+
public void testConsumerAfterFailoverAndFailbackWithRedistribution() throws Exception {
218+
setupCluster();
219+
220+
AddressSettings testAddressSettings = new AddressSettings().setRedistributionDelay(0);
221+
for (int i : getServerIDs()) {
222+
getServer(i).getAddressSettingsRepository().addMatch(QUEUES_TESTADDRESS, testAddressSettings);
223+
}
224+
225+
startServers(getLiveServerIDs());
226+
startServers(getBackupServerIDs());
227+
228+
for (int i : getLiveServerIDs()) {
229+
waitForTopology(servers[i], 3, 3);
230+
}
231+
232+
for (int i : getBackupServerIDs()) {
233+
waitForFailoverTopology(i, 0, 1, 2);
234+
}
235+
236+
for (int i : getLiveServerIDs()) {
237+
setupSessionFactory(i, i + 3, isNetty(), false);
238+
createQueue(i, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
239+
}
240+
241+
List<TransportConfiguration> transportConfigList = new ArrayList<>();
242+
for (int i : getLiveServerIDs()) {
243+
Map<String, Object> params = generateParams(i, isNetty());
244+
TransportConfiguration serverToTC = createTransportConfiguration("node" + i, isNetty(), false, params);
245+
serverToTC.getExtraParams().put(TEST_PARAM, TEST_PARAM);
246+
transportConfigList.add(serverToTC);
247+
}
248+
TransportConfiguration[] transportConfigs = transportConfigList.toArray(new TransportConfiguration[transportConfigList.size()]);
249+
250+
try (ServerLocator serverLocator = new ServerLocatorImpl(false, transportConfigs)) {
251+
serverLocator.setFailoverAttempts(3);
252+
serverLocator.setFailbackAttempts(10);
253+
serverLocator.setReconnectAttempts(0);
254+
serverLocator.setUseTopologyForLoadBalancing(false);
255+
256+
try (ClientSessionFactory sessionFactory = serverLocator.createSessionFactory()) {
257+
try (ClientSession clientSession = sessionFactory.createSession()) {
258+
clientSession.start();
259+
260+
int serverIdBeforeCrash = Integer.parseInt(sessionFactory.
261+
getConnectorConfiguration().getName().substring(4));
262+
263+
QueueControl testQueueControlBeforeCrash = (QueueControl)getServer(serverIdBeforeCrash).
264+
getManagementService().getResource(ResourceNames.QUEUE + QUEUE_NAME);
265+
266+
Assert.assertEquals(0, testQueueControlBeforeCrash.getMessageCount());
267+
268+
try (ClientProducer clientProducer = clientSession.createProducer(QUEUES_TESTADDRESS)) {
269+
clientProducer.send(clientSession.createMessage(true));
270+
clientProducer.send(clientSession.createMessage(true));
271+
clientProducer.send(clientSession.createMessage(true));
272+
}
273+
274+
Assert.assertEquals(3, testQueueControlBeforeCrash.getMessageCount());
275+
276+
try (ClientConsumer clientConsumer = clientSession.createConsumer(QUEUE_NAME)) {
277+
ClientMessage messageBeforeCrash = clientConsumer.receive(3000);
278+
Assert.assertNotNull(messageBeforeCrash);
279+
messageBeforeCrash.acknowledge();
280+
clientSession.commit();
281+
282+
Assert.assertEquals(2, testQueueControlBeforeCrash.getMessageCount());
283+
284+
ActiveMQServer serv = getServer(serverIdBeforeCrash);
285+
serv.stop();
286+
waitForServerToStop(serv);
287+
288+
Assert.assertEquals(TEST_PARAM, sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM));
289+
290+
int serverIdAfterCrash = Integer.parseInt(sessionFactory.
291+
getConnectorConfiguration().getName().substring(4));
292+
Assert.assertNotEquals(serverIdBeforeCrash, serverIdAfterCrash);
293+
294+
Assert.assertTrue(isLiveServerID(serverIdAfterCrash));
295+
296+
QueueControl testQueueControlAfterCrash = (QueueControl)getServer(serverIdAfterCrash).
297+
getManagementService().getResource(ResourceNames.QUEUE + QUEUE_NAME);
298+
299+
Wait.waitFor(() -> testQueueControlAfterCrash.getMessageCount() == 2, 3000);
300+
301+
ClientMessage messageAfterCrash = clientConsumer.receive(3000);
302+
Assert.assertNotNull(messageAfterCrash);
303+
messageAfterCrash.acknowledge();
304+
clientSession.commit();
305+
306+
serv.start();
307+
waitForServerToStart(serv);
308+
309+
QueueControl testQueueControlAfterFailback = (QueueControl)getServer(serverIdBeforeCrash).
310+
getManagementService().getResource(ResourceNames.QUEUE + QUEUE_NAME);
311+
Wait.waitFor(() -> testQueueControlAfterFailback.getMessageCount() == 1, 3000);
312+
313+
int serverIdAfterFailback = Integer.parseInt(sessionFactory.
314+
getConnectorConfiguration().getName().substring(4));
315+
316+
Assert.assertEquals(serverIdBeforeCrash, serverIdAfterFailback);
317+
Assert.assertTrue(isLiveServerID(serverIdAfterFailback));
318+
Assert.assertNotNull(clientConsumer.receive());
319+
320+
}
321+
clientSession.stop();
322+
}
323+
}
324+
}
325+
}
326+
215327
@Test
216328
public void testAutoCreatedQueueAfterFailoverWithoutHA() throws Exception {
217329
setupCluster();

0 commit comments

Comments
 (0)
Please sign in to comment.