Connecting to multiple Message Broker Addresses over STOMP (Spring Boot) - Part 1

I had to rely my Spring Boot application on connecting to an external message broker, Amazon MQ for example instead of my own self hosting message broker such as RabbitMQ.

Fortunately! The Spring Team had already developed this implementation, BUT there are additional tweaks needed to support for STOMP. As a reference, this implementation is only available in Spring Messaging 4.3.x, refer to this Jira Ticket for this new release which uses the Reactor2TcpClient method which accepts a Supplier<InetSocketAddress> in which we will be using to supply our message broker addresses.

Taking a deep dive into the Spring Documentation for multiple broker address support, let's take a look at this code snippet here.

private Reactor2TcpClient<byte[]> createTcpClient() {

    Supplier<InetSocketAddress> addressSupplier = new Supplier<InetSocketAddress>() {
        @Override
        public InetSocketAddress get() {
            // Select address to connect to ...
        }
    };

    StompDecoder decoder = new StompDecoder();
    Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder);
    return new Reactor2TcpClient<>(addressSupplier, codec);
}

The createTcpClient() method returns a Reactor2TcpClient which implements TcpOperations<P>. This is extremely important as we will be using this method for StompBrokerRelayMessageHandler to configure its TcpClient.

The crucial part is in the override method above. Whenever there is a failure to establish a websocket connection to one of the message broker address, then the get() method is automatically called to retrieve a different address to connect to which allows for failover because our STOMP tcp client is explicitly set to this method.

A simple STOMP broker message example code simply looks like this:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {    
    registry.enableStompBrokerRelay("/queue", /topic")
    .setRelayHost("<hostname>")
    .setRelayPort("<port>")
    .setSystemLogin("<broker login>")
    .setSystemPasscode("<broker password>")
    .setTcpClient(createTcpClient());
    registry.setApplicationDestinationPrefixes("/app");
  }

    private Reactor2TcpClient<byte[]> createTcpClient() {

        Supplier<InetSocketAddress> addressSupplier = new Supplier<InetSocketAddress>() {
            @Override
            public InetSocketAddress get() {
                // Select address to connect to ...
            }
        };

        StompDecoder decoder = new StompDecoder();
        Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder);
        return new Reactor2TcpClient<>(addressSupplier, codec);
    }

}

Excellent! Are we done yet? Unfortunately no...if you have set up your own local message broker, you are pretty much done here. However, for external message brokers, SSL handshake is needed in which the Reactor2TcpClient constructor did not implement.

Nonetheless, it is not difficult as the Spring Team has already done a great job in implementing this, and all we need to do is copy some of their methods to fit our needs by creating a custom Stomp Factory class that supports SSL.

Stay tune for Part 2 as I will be taking a dive into the Reactor2TcpClient class, and we will begin to understand how we can extract ideas from this class to implement our own custom STOMP configuration!