5 minute read

This is an advanced topic. You may choose to understand the philosophy by reading the initial sections, or, you may just skip to the usage sections which are not complicated.

Messaging usually works one way. There is, however, a convention for two-way communication (i.e. request/response). This involves reply-to queues which routes the response back to correct client program and correlation-id to uniquely match a response to the correct request.

See: [https://www.rabbitmq.com/tutorials/tutorial-six-python.html] (https://www.rabbitmq.com/tutorials/tutorial-six-python.html) for a sample using a very similar approach.

See sample RPC servers and clients at: https://github.com/stomp-js/samples/

Why RPC using messaging

REST calls using JSON encoded payloads over http(s) are quite popular. These have become almost ubiquitous and supported by a variety of frameworks and have become the most common technique to access third party APIs.

There are, however, some limitations with REST/JSON/http(s) approach:

  • http(s) can only wait for finite time before timing out. Typically, for long-running operations as an alternative a request will be submitted and the client will keep polling to check if results are ready.
  • There are some other interesting scenarios where the request is submitted to process A. However, the final response is better handled by process B. REST/JSON/http(s) will mandate that B communicates back to A which then intimates the client. This scenario is quite common in credit card processing.
  • As an extension to the long-running tasks, there is no easy way to intimate the client about the progress of the task.
  • We have gotten accustomed to the overheads of the REST /JSON/http(s) approach. However, for every request, there is connection setup (including TLS/SSL) before a request is made.

The approach suggested in this guide solves all of the above issues/patterns. On top of that, it offers some interesting bonus as well:

  • It offers natural load balancing. Just boot up a new servers.
  • Even more — this form of load balancing automatically takes care of fast and slow instances.
  • It allows fault tolerance — if a process dies before completing, it would be submitted to another node.
  • If there are no servers available when a request is made, these can be queued till a server becomes available.
  • It offers a mechanism where a server may indicate a temporary failure while processing a request. This will allow the request to be resubmitted for processing again.

The samples at https://github.com/stomp-js/samples/ demonstrate most of the above.


This feature will work with @stomp/rx-stomp.

Implementing the RPC server endpoint

All code snippets are from https://github.com/stomp-js/samples/.

This can be implemented in any language. In most cases, it will be executed in a backend server.

    // Destination is RabbitMQ specific, change as per your environment
    const rpcEndPoint = '/amq/queue/integer-addition';

    // We will implement an endpoint
    // This endpoint will wait for random period before responding to simulate real RPC servers
    rxStomp.watch(rpcEndPoint).subscribe(function (request) {
      console.log("RPC Server: Request: " + request.body);
      // The response needs to be sent back here, it can safely be inlined
      const replyTo = request.headers['reply-to'];

      // Same correlation id needs to be sent back as message header, it can safely be inlined
      const correlationId = request.headers['correlation-id'];

      // simulate a random delay while computing
      setTimeout(function () {
        // Process the request, compute the response
        const operands = JSON.parse(request.body);
        const result = {result: Number.parseInt(operands.x) + Number.parseInt(operands.y)};
        // Completed processing

        const responseBody = JSON.stringify(result);

        console.log("RPC Server: Response: " + responseBody + " for " + request.body);
        // Send the response back to destination `replyTo` with `correlation-id` header
          destination: replyTo,
          body: responseBody,
          headers: {'correlation-id': correlationId}
      }, randomInt(10000));

A very similar server in Ruby (it is a single-threaded blocking server):

amqp_conn = Bunny.new

channel = amqp_conn.create_channel

# Notice that RabbitMQ STOMP adaptor maps queue/exchange names
queue = channel.queue("integer-addition")

queue.subscribe(block: true) do |delivery_info, metadata, payload|
  puts "Received: #{payload}"

  # Process the request, compute the response
  operands = JSON.parse(payload)
  result = {result: operands['x'].to_i + operands['y'].to_i}
  response_body = result.to_json
  # Completed processing

  puts "RPC Server: Response: #{response_body} for #{payload}"

  default_exchange = channel.default_exchange

  default_exchange.publish response_body, routing_key: metadata[:reply_to], correlation_id: metadata[:correlation_id]

Using it from the client


RabbitMQ has special support for temp-queues in reply-to messages which make things to work magically. Please check details at https://www.rabbitmq.com/stomp.html#d.tqd

The client code looks simple and similar to JSON and HTTP-based backend services.

Create an instance of RxStompRPC, you will need an instance of RxStomp in the constructor:

const rxStompRPC = new RxStompRPC(rxStomp);

Making the RPC call is simple, takes same parameters as RxStomp#publish and returns an Observable which will trigger once.

const myServiceEndPoint = '/topic/echo';

const request = 'Hello';
// It accepts a optional third argument a Hash of headers to be sent as part of the request
  .rpc({ destination: myServiceEndPoint, body: request })
  .subscribe((message: Message) => {
    // Consume the response

You can notice similarity with Angular HTTP calls.

See https://github.com/stomp-js/samples/ for a sample client.

Other Brokers

There are few requirements:

  • the reply queue name must be unique across the broker.
  • ideally, for security reasons, only the client creating the queue should have access to it.

Many brokers have temp-queue concept which should simplify your work.

The following gives an outline:

const stompRPCConfigForActiveMQ = {
  // A unique name, BTW angular2-uuid module is already added as dependency
  replyQueueName: `/topic/replies.${UUID.UUID()}`,

  // Simply subscribe, you would need to secure by adding broker specific options
  setupReplyQueue: (replyQueueName: string, stompService: StompRService) => {
    return stompService.subscribe(replyQueueName);

Apart from this additional setup step usage remains the same as RabbitMQ as documented above.

What Next

In Why RPC using messaging, few additional benefits like reporting progress, fault tolerance etc. are discussed.

There are samples which cover some of these patterns:

  • Multithreaded Ruby server.
  • Manual acknowledgement — it will retry in case of failure.
  • Factoring out boilerplate server code.
  • Implementing server/client using RxStompRPC#stream to report progress.

If you have questions, or you would like to see more guides, please raise an issue at https://github.com/stomp-js/rx-stomp/issues.