1 minute read

This guide shows how to observe and display the STOMP connection status in an Angular app using @stomp/rx-stomp and RxStomp.

See rx-stomp with Angular for a complete Angular setup.

The API

RxStomp exposes connectionState$ as an RxJS BehaviorSubject. It emits the current state immediately, then on every change. States are defined by the RxStompState enum and correspond to WebSocket readyState semantics:

  • CONNECTING
  • OPEN
  • CLOSING
  • CLOSED

Example: log state transitions

import { RxStomp, RxStompState } from '@stomp/rx-stomp';

const rxStomp = new RxStomp();
// ...configure and activate...
rxStomp.connectionState$.subscribe(state => {
  console.log(RxStompState[state]); // e.g., 'OPEN'
});

In Angular, when using an injected RxStompService (a simple subclass of RxStomp wired via DI), you can map the enum to strings for binding:

import { Observable, map } from 'rxjs';
import { RxStompService, RxStompState } from '@stomp/rx-stomp';

export class StatusComponent {
  connectionStatus$: Observable<string>;

  constructor(rxStompService: RxStompService) {
    this.connectionStatus$ = rxStompService.connectionState$.pipe(
      map(state => RxStompState[state])
    );
  }
}

Template (AsyncPipe binds to the latest value):

<div id="status">
  <div id="indicator" class=""></div>
  <span id="status-label"></span>
</div>

Minimal CSS for a traffic-light indicator:

div#indicator {
  width: 12px;
  height: 12px;
  border-radius: 6px;
  margin: 0 10px;
  display: inline-block;
  background: #ffbf00; /* CONNECTING, CLOSING */
}

div.CLOSED#indicator { background: red; }
div.OPEN#indicator   { background: green; }

Responding to successful connection

RxStomp also provides connected$ (Observable). It emits immediately if already connected, and on every subsequent connect. Use it to trigger logic that must run after the session is established (and on reconnects).

import { take } from 'rxjs/operators';
import { RxStompService } from '@stomp/rx-stomp';

// On every (re)connection
rxStompService.connected$.subscribe(() => {
  console.log('Connected (or reconnected).');
});

// Only once (immediately if already connected)
rxStompService.connected$.pipe(take(1)).subscribe(() => {
  console.log('Connected once.');
});

Internally, connected$ is a filtered view of connectionState$ for RxStompState.OPEN:

this.connected$ = this.connectionState$.pipe(
  filter(state => state === RxStompState.OPEN)
);

You can similarly watch for CLOSED, CONNECTING, or CLOSING if you need custom handling.

Notes

  • You do not need to delay subscribe/watch calls until after connection; RxStomp queues and (re)subscribes automatically when the broker is connected again.
  • For a complete Angular setup, see rx-stomp with Angular.

Updated: