rx-stomp with Angular

8 minute read

This step-by-step guide will create a new Angular application and demonstrate usage of rx-stomp.

While preparing this guide, Angular 13.2.0 and @stomp/rx-stomp 1.1.4 are used.

For the impatient, the final code from this tutorial is at: https://github.com/stomp-js/rx-stomp-angular

Prerequisites

You need to have Node.js and npm installed.

You must have basic familiarity with Angular and Typescript. If you are unsure, please go through the famous Tour of Heroes.

Instructions

Create a new Angular Application

Install the latest Angular CLI as below:

$ npm i @angular/cli -g

Change to the intended folder and create your new application:

$ ng new rx-stomp-angular --skip-install --defaults

Change to newly created folder and install all dependencies:

$ cd rx-stomp-angular/
$ npm i

To run the application locally, execute the following and point your browser to http://localhost:4200/.

$ ng serve

You can keep it running in a terminal and keep the browser tab open. It will detect changes, recompile and reload the browser tab.

At this stage, you can use your favorite IDE and be ready to edit the code.

Add @stomp/rx-stomp, Inject RxStompService

$ npm i @stomp/rx-stomp

We will need to define our configuration. This configuration will get injected by the Angular Dependency Injection mechanism while creating an instance of RxStompService.

Configuration

Create my-rx-stomp.config.ts file inside src/app/ with the following content:

import { RxStompConfig } from '@stomp/rx-stomp';

export const myRxStompConfig: RxStompConfig = {
  // Which server?
  brokerURL: 'ws://127.0.0.1:15674/ws',

  // Headers
  // Typical keys: login, passcode, host
  connectHeaders: {
    login: 'guest',
    passcode: 'guest',
  },

  // How often to heartbeat?
  // Interval in milliseconds, set to 0 to disable
  heartbeatIncoming: 0, // Typical value 0 - disabled
  heartbeatOutgoing: 20000, // Typical value 20000 - every 20 seconds

  // Wait in milliseconds before attempting auto reconnect
  // Set to 0 to disable
  // Typical value 500 (500 milli seconds)
  reconnectDelay: 200,

  // Will log diagnostics on console
  // It can be quite verbose, not recommended in production
  // Skip this key to stop logging to console
  debug: (msg: string): void => {
    console.log(new Date(), msg);
  },
};

The above should work for an out-of-the-box installation of RabbitMQ broker. Please change as per your broker configuration.

The RxStompService

Create rx-stomp.service.ts file inside src/app/ with the following content:

import { Injectable } from '@angular/core';
import { RxStomp } from '@stomp/rx-stomp';

@Injectable({
  providedIn: 'root',
})
export class RxStompService extends RxStomp {
  constructor() {
    super();
  }
}

Factory to create and initialize RxStompService

Create rx-stomp-service-factory.ts file inside src/app/ with the following content:

import { RxStompService } from './rx-stomp.service';
import { myRxStompConfig } from './my-rx-stomp.config';

export function rxStompServiceFactory() {
  const rxStomp = new RxStompService();
  rxStomp.configure(myRxStompConfig);
  rxStomp.activate();
  return rxStomp;
}

Angular DI setup

Next, we need the service to get injected. Open src/app/app.module.ts Add the following to the providers array of your @NgModule:

providers: [
  {
    provide: RxStompService,
    useFactory: rxStompServiceFactory,
  },
];

Also, add appropriate import lines towards the top of this file (after the existing import statements):

import { RxStompService } from './rx-stomp.service';
import { rxStompServiceFactory } from './rx-stomp-service-factory';

Messages

We will create a component that will do the following:

  • It will have a clickable button to send a message.
  • It will subscribe and keep listing all received messages.

Skeleton

Create the Messages component:

$ ng generate component messages

Inspect the files generated in src/app/messages/.

Now we will create the basic HTML in src/app/messages/messages.component.html - put the following content:

<div id="messages">
  <button class="btn btn-primary">Send Test Message</button>
  <h2>Received messages</h2>
  <ol>
    <!-- we will use Angular binding to populate list of messages -->
    <li class="message">message</li>
  </ol>
</div>

We will add this component to the main UI by editing src/app/app.component.html. We will remove most of the default HTML generated by Angular CLI in this process. Edit src/app/app.component.html to look like the following:

<div style="text-align:center">
  <h1>Welcome to !</h1>
</div>
<app-messages></app-messages>

Now is a great time to check the browser tab to see that display has changed and it should show the HTML that we have added to src/app/messages/messages.component.html.

If you find an empty screen, please check the terminal where you executed ng serve. If there are compilation errors, you will see them here. Also, check the browser Javascript console. Sometimes, you may notice errors here.

Sending messages

We will now inject RxStompService as a dependency in MessageComponent. To achieve that, we will add it to the constructor in src/app/messages/messages.component.ts, which should look like the following:

  constructor(private rxStompService: RxStompService) { }

We will now add code to send a message:

  onSendMessage() {
  const message = `Message generated at ${new Date()}`;
  this.rxStompService.publish({ destination: '/topic/demo', body: message });
}

Please see RxStomp#publish. Keep this page open as we will be using more methods from this class soon.

The entire content of src/app/messages/messages.component.ts should look like the following:

import { Component, OnInit } from '@angular/core';
import { RxStompService } from '../rx-stomp.service';

@Component({
  selector: 'app-messages',
  templateUrl: './messages.component.html',
  styleUrls: ['./messages.component.css'],
})
export class MessagesComponent implements OnInit {
  constructor(private rxStompService: RxStompService) {}

  ngOnInit(): void {}

  onSendMessage() {
    const message = `Message generated at ${new Date()}`;
    this.rxStompService.publish({ destination: '/topic/demo', body: message });
  }
}

We will attach onSendMessage to the button in the HTML. Edit src/app/messages/messages.component.html and add (click)="onSendMessage()" to the button. The file should look like the following now:

<div id="messages">
  <button class="btn btn-primary" (click)="onSendMessage()">
    Send Test Message
  </button>
  <h2>Received messages</h2>
  <ol>
    <!-- we will use Angular binding to populate list of messages -->
    <li class="message">message</li>
  </ol>
</div>

You should go back to the browser tab and open the web console at this stage. When you click on the Send Message button, you can see that the message is being sent to the broker in the console.

Receiving messages

The RxStomp#watch method initiates a subscription with the broker. this.rxStompService.watch('/topic/demo') will initiate a subscription with the broker for topic /topic/demo and returns an RxJS Observable. Typically we will subscribe this Observable to receive actual messages.

  ngOnInit() {
    this.rxStompService.watch('/topic/demo').subscribe((message: Message) => {
      this.receivedMessages.push(message.body);
    });
  }

We will need to add a declaration for receivedMessages and import Message from @stomp/stompjs. A few other modules also expose Message classes, so you need to be careful.

If you are coming from @stomp/stompjs, please notice that you do not need to subscribe within the callback of stomp getting connected. This library internally ensures that actual subscription happens when the broker is actually connected. It also keeps tracking of broker re-connections and automatically resubscribes.

Now is the time to link the HTML template to the received messages. We will use ngFor to bind the list of messages to <li>. Edit src/app/messages/messages.component.html:

<li class="message" *ngFor="let message of receivedMessages"></li>

Now your src/app/messages/messages.component.ts and src/app/messages/messages.component.html should look like:

import { Component, OnInit } from '@angular/core';
import { RxStompService } from '../rx-stomp.service';
import { Message } from '@stomp/stompjs';

@Component({
  selector: 'app-messages',
  templateUrl: './messages.component.html',
  styleUrls: ['./messages.component.css'],
})
export class MessagesComponent implements OnInit {
  receivedMessages: string[] = [];

  constructor(private rxStompService: RxStompService) {}

  ngOnInit(): void {
    this.rxStompService.watch('/topic/demo').subscribe((message: Message) => {
      this.receivedMessages.push(message.body);
    });
  }

  onSendMessage() {
    const message = `Message generated at ${new Date()}`;
    this.rxStompService.publish({ destination: '/topic/demo', body: message });
  }
}
<div id="messages">
  <button class="btn btn-primary" (click)="onSendMessage()">
    Send Test Message
  </button>
  <h2>Received messages</h2>
  <ol>
    <!-- we will use Angular binding to populate list of messages -->
    <li class="message" *ngFor="let message of receivedMessages">
      
    </li>
  </ol>
</div>

Check your application in the browser now — try sending a few messages. Then open another browser window/tab and see that both receive messages.

Stopping the watch

We are almost done. We need to stop the watch when the component is destroyed. For this, we need to call unsubscribe on the RxJS subscription. We will need to do the following:

  • Add OnDestroy to the implements list (by default, OnInit will be there).
  • Implement ngOnDestroy method.
  • Add OnDestroy to the imports.

After this, we will modify the code in ngOnInit to store the RxJS subscription in a member variable and call unsubscribe in ngOnDestroy.

ngOnInit() {
  this.topicSubscription = this.rxStompService
          .watch('/topic/demo')
          .subscribe((message: Message) => {
            this.receivedMessages.push(message.body);
          });
}

ngOnDestroy() {
  this.topicSubscription.unsubscribe();
}

Type of topicSubscription will be Subscription from rxjs. Your src/app/messages/messages.component.ts should look like the following:

import { Component, OnDestroy, OnInit } from '@angular/core';
import { RxStompService } from '../rx-stomp.service';
import { Message } from '@stomp/stompjs';
import { Subscription } from 'rxjs';

@Component({
  selector: 'app-messages',
  templateUrl: './messages.component.html',
  styleUrls: ['./messages.component.css'],
})
export class MessagesComponent implements OnInit, OnDestroy {
  receivedMessages: string[] = [];
  // @ts-ignore, to suppress warning related to being undefined
  private topicSubscription: Subscription;

  constructor(private rxStompService: RxStompService) {}

  ngOnInit() {
    this.topicSubscription = this.rxStompService
      .watch('/topic/demo')
      .subscribe((message: Message) => {
        this.receivedMessages.push(message.body);
      });
  }

  ngOnDestroy() {
    this.topicSubscription.unsubscribe();
  }

  onSendMessage() {
    const message = `Message generated at ${new Date()}`;
    this.rxStompService.publish({ destination: '/topic/demo', body: message });
  }
}

Where next

Updated: