ActiveMQ failover transport timeout на подключение

Если ActiveMQ недоступна — код библиотеки будет пытатся подключится к очреди  бесконечно, из за этого  возникала необходимость установить timeout на подключение к брокеру сообщений.

Это будет особенно полезно если Вы пишите HealthCheck для ActiveMq и хотите получить результат немедленно.

У меня проект написан на .Net Framework, соответственно используется библиотеку Apache.NMS.ActiveMQ.

Для работы с брокером сообщений ActiveMQ используется следующий код:

using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Transport;
...
var connectionFactory = new ConnectionFactory("failover:tcp://localhost:61616")
    {

        UserName = "admin",
        Password = "admin",
        RequestTimeout = BrokerTimeoutMs
    };
using (var connection = connectionFactory.CreateConnection()){
    connection.Start(); // <--- hangs here
...
}

Code language: C# (cs)

Если брокер сообщений не будет доступен — мы попадаем в ситуация бесконечного переподключения.

Данная проблема решается несколькими способами:

  • добавить в url параметр transport.maxReconnectAttempts и transport.initialReconnectDelay. В таком случае Url будет выглядеть следующим образом
failover:tcp://localhost:61616?transport.initialReconnectDelay=2000&transport.maxReconnectAttempts=2Code language: JavaScript (javascript)

или так, если очередей несколько

failover:(tcp://localhost:61616,tcp://localhost:717171)?transport.initialReconnectDelay=2000&transport.maxReconnectAttempts=2Code language: JavaScript (javascript)

С остальными параметрами можно ознакомится на страницах

  • Другой способ — это переопределить мeтод ConnectionFactory.CreateActiveMQConnection и задать время transport.Timeout или transport.AsyncTimeout.
    internal sealed class ActiveMqConnectionFactory : ConnectionFactory
    {
        private const int InfiniteConnectionTimeout = -1;

        public int ConnectionTimeout { get; set; } = InfiniteConnectionTimeout;

        public InternalConnectionFactory(string brokerUri) : base(brokerUri)
        {
        }

        protected override Connection CreateActiveMQConnection(ITransport transport)
        {
            if (transport != null && ConnectionTimeout != InfiniteConnectionTimeout)
            {
                transport.Timeout = ConnectionTimeout;
                transport.AsyncTimeout = ConnectionTimeout;
            }

            return base.CreateActiveMQConnection(transport);
        }
    }
Code language: C# (cs)

PS: Данный код проверялся только с failover транспортом и может не работать с другими транспортами.

Добавить комментарий