consolidated/rx-stomp/src/rx-stomp-rpc.ts
Remote Procedure Call (RPC) helper over STOMP.
RxStompRPC implements a simple request/reply pattern using STOMP frames:
reply-to
header.correlation-id
header.Usage summary
rpc(...)
when you expect exactly one reply; it completes after the first matching message.stream(...)
when the server may send multiple messages (e.g., progress updates).correlation-id
in params.headers
, it will be used; otherwise a UUID is generated.reply-to
header is set automatically to replyQueueName
(default /temp-queue/rpc-replies
).Reply queue strategy
rxStomp.unhandledMessage$
, which is suitable when the broker
routes temporary-queue replies without an explicit subscription.{ setupReplyQueue }
via RxStompRPCConfig. RxStompRPC will keep your observable “hot” internally.See the guide for end-to-end examples: /guide/rx-stomp/ng2-stompjs/remote-procedure-call.html
Part of @stomp/rx-stomp
Methods |
constructor(rxStomp: RxStomp, stompRPCConfig?: RxStompRPCConfig)
|
||||||||||||
Defined in consolidated/rx-stomp/src/rx-stomp-rpc.ts:67
|
||||||||||||
Construct a new RxStompRPC. Notes
See the guide for broker-specific considerations.
Parameters :
|
Public rpc | ||||||
rpc(params: IPublishParams)
|
||||||
Perform a unary RPC request that resolves with the first matching reply. Behavior
Use stream if you expect multiple replies for a single request.
Parameters :
Returns :
Observable<IMessage>
|
Public stream | ||||||
stream(params: IPublishParams)
|
||||||
Perform an RPC request and receive a stream of matching replies. How it matches replies
Headers set by RxStompRPC
Observability
When to use
Parameters :
Returns :
Observable<IMessage>
|
import { filter, first, Observable, Observer, Subscription } from 'rxjs';
import { v4 as uuid } from 'uuid';
import { IMessage, IPublishParams, StompHeaders } from '@stomp/stompjs';
import { RxStomp } from './rx-stomp.js';
import {
RxStompRPCConfig,
setupReplyQueueFnType,
} from './rx-stomp-rpc-config.js';
/**
* Remote Procedure Call (RPC) helper over STOMP.
*
* RxStompRPC implements a simple request/reply pattern using STOMP frames:
* - Requests are published to a destination you control.
* - A reply destination is advertised via the `reply-to` header.
* - Responses are matched back to the request using a `correlation-id` header.
*
* Usage summary
* - Use `rpc(...)` when you expect exactly one reply; it completes after the first matching message.
* - Use `stream(...)` when the server may send multiple messages (e.g., progress updates).
* - If you provide a `correlation-id` in `params.headers`, it will be used; otherwise a UUID is generated.
* - The `reply-to` header is set automatically to `replyQueueName` (default `/temp-queue/rpc-replies`).
*
* Reply queue strategy
* - By default, replies are read from `rxStomp.unhandledMessage$`, which is suitable when the broker
* routes temporary-queue replies without an explicit subscription.
* - To subscribe to a dedicated queue or customize the reply stream, provide `{ setupReplyQueue }`
* via {@link RxStompRPCConfig}. RxStompRPC will keep your observable “hot” internally.
*
* See the guide for end-to-end examples:
* /guide/rx-stomp/ng2-stompjs/remote-procedure-call.html
*
* Part of `@stomp/rx-stomp`
*/
export class RxStompRPC {
/**
* Destination used in the `reply-to` header for all RPC calls.
* You can override it through {@link RxStompRPCConfig.replyQueueName}.
*/
private _replyQueueName = '/temp-queue/rpc-replies';
/**
* Factory that returns a hot Observable delivering all reply messages.
* Defaults to a function that uses `rxStomp.unhandledMessage$`.
* Override via {@link RxStompRPCConfig.setupReplyQueue}.
*/
private _setupReplyQueue: setupReplyQueueFnType = () => {
return this.rxStomp.unhandledMessage$;
};
/**
* Shared stream of all messages arriving on the reply destination.
* Lazily initialized on first call to {@link stream}.
*/
private _repliesObservable: Observable<IMessage>;
/**
* True when a custom reply queue setup function is supplied.
* In that case, this class keeps a dummy subscription to prevent accidental teardown.
*/
private _customReplyQueue: boolean = false;
// This is used to ensure that underlying subscription remains subscribed
private _dummySubscription: Subscription;
/**
* Construct a new RxStompRPC.
*
* @param rxStomp The active {@link RxStomp} instance to use for publishing and receiving.
* @param stompRPCConfig Optional hooks to customize reply queue name and setup.
*
* Notes
* - If `replyQueueName` is provided, it is used in the `reply-to` header for all requests.
* - If `setupReplyQueue` is provided, it must return a hot Observable of all reply messages.
* RxStompRPC will subscribe internally to keep it alive across consumers.
*
* See the guide for broker-specific considerations.
*/
constructor(
private rxStomp: RxStomp,
private stompRPCConfig?: RxStompRPCConfig,
) {
if (stompRPCConfig) {
if (stompRPCConfig.replyQueueName) {
this._replyQueueName = stompRPCConfig.replyQueueName;
}
if (stompRPCConfig.setupReplyQueue) {
this._customReplyQueue = true;
this._setupReplyQueue = stompRPCConfig.setupReplyQueue;
}
}
}
/**
* Perform a unary RPC request that resolves with the first matching reply.
*
* Behavior
* - Sends a single request using {@link stream} and returns an Observable that emits the first
* reply whose `correlation-id` matches the request.
* - The returned Observable completes after emitting the first message.
*
* Use {@link stream} if you expect multiple replies for a single request.
*/
public rpc(params: IPublishParams): Observable<IMessage> {
// We know there will be only one message in reply
return this.stream(params).pipe(first());
}
/**
* Perform an RPC request and receive a stream of matching replies.
*
* How it matches replies
* - A `correlation-id` is attached to the request and used to filter messages
* from the reply stream. If you pass `headers['correlation-id']`, it is preserved;
* otherwise, a UUID is generated.
*
* Headers set by RxStompRPC
* - `reply-to`: set to {@link _replyQueueName}.
* - `correlation-id`: set or preserved as described above.
*
* Observability
* - The returned Observable is cold with respect to the request; the request is sent
* upon subscription, and the filtered replies are forwarded to the subscriber.
* - Unsubscribe to stop receiving further replies for the request; the underlying
* reply-queue subscription remains active and shared.
*
* When to use
* - Use this when the server responds with multiple messages (progress events, partials).
* - Prefer {@link rpc} if exactly one reply is expected.
*/
public stream(params: IPublishParams): Observable<IMessage> {
// defensively copy
const headers: StompHeaders = { ...(params.headers || {}) };
if (!this._repliesObservable) {
const repliesObservable = this._setupReplyQueue(
this._replyQueueName,
this.rxStomp,
);
// In case of custom queue, ensure it remains subscribed
if (this._customReplyQueue) {
this._dummySubscription = repliesObservable.subscribe(() => {});
}
this._repliesObservable = repliesObservable;
}
return Observable.create((rpcObserver: Observer<IMessage>) => {
let defaultMessagesSubscription: Subscription;
const correlationId = headers['correlation-id'] || uuid();
defaultMessagesSubscription = this._repliesObservable
.pipe(
filter((message: IMessage) => {
return message.headers['correlation-id'] === correlationId;
}),
)
.subscribe((message: IMessage) => {
rpcObserver.next(message);
});
// send an RPC request
headers['reply-to'] = this._replyQueueName;
headers['correlation-id'] = correlationId;
this.rxStomp.publish({ ...params, headers });
return () => {
// Cleanup: stop forwarding matching replies to this observer
defaultMessagesSubscription.unsubscribe();
};
});
}
}