123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- import { Subject, AnonymousSubject } from '../../Subject';
- import { Subscriber } from '../../Subscriber';
- import { Observable } from '../../Observable';
- import { Subscription } from '../../Subscription';
- import { Operator } from '../../Operator';
- import { ReplaySubject } from '../../ReplaySubject';
- import { Observer, NextObserver } from '../../types';
- /**
- * WebSocketSubjectConfig is a plain Object that allows us to make our
- * webSocket configurable.
- *
- * <span class="informal">Provides flexibility to {@link webSocket}</span>
- *
- * It defines a set of properties to provide custom behavior in specific
- * moments of the socket's lifecycle. When the connection opens we can
- * use `openObserver`, when the connection is closed `closeObserver`, if we
- * are interested in listening for data coming from server: `deserializer`,
- * which allows us to customize the deserialization strategy of data before passing it
- * to the socket client. By default, `deserializer` is going to apply `JSON.parse` to each message coming
- * from the Server.
- *
- * ## Examples
- *
- * **deserializer**, the default for this property is `JSON.parse` but since there are just two options
- * for incoming data, either be text or binary data. We can apply a custom deserialization strategy
- * or just simply skip the default behaviour.
- *
- * ```ts
- * import { webSocket } from 'rxjs/webSocket';
- *
- * const wsSubject = webSocket({
- * url: 'ws://localhost:8081',
- * //Apply any transformation of your choice.
- * deserializer: ({ data }) => data
- * });
- *
- * wsSubject.subscribe(console.log);
- *
- * // Let's suppose we have this on the Server: ws.send('This is a msg from the server')
- * //output
- * //
- * // This is a msg from the server
- * ```
- *
- * **serializer** allows us to apply custom serialization strategy but for the outgoing messages.
- *
- * ```ts
- * import { webSocket } from 'rxjs/webSocket';
- *
- * const wsSubject = webSocket({
- * url: 'ws://localhost:8081',
- * // Apply any transformation of your choice.
- * serializer: msg => JSON.stringify({ channel: 'webDevelopment', msg: msg })
- * });
- *
- * wsSubject.subscribe(() => subject.next('msg to the server'));
- *
- * // Let's suppose we have this on the Server:
- * // ws.on('message', msg => console.log);
- * // ws.send('This is a msg from the server');
- * // output at server side:
- * //
- * // {"channel":"webDevelopment","msg":"msg to the server"}
- * ```
- *
- * **closeObserver** allows us to set a custom error when an error raises up.
- *
- * ```ts
- * import { webSocket } from 'rxjs/webSocket';
- *
- * const wsSubject = webSocket({
- * url: 'ws://localhost:8081',
- * closeObserver: {
- * next() {
- * const customError = { code: 6666, reason: 'Custom evil reason' }
- * console.log(`code: ${ customError.code }, reason: ${ customError.reason }`);
- * }
- * }
- * });
- *
- * // output
- * // code: 6666, reason: Custom evil reason
- * ```
- *
- * **openObserver**, Let's say we need to make some kind of init task before sending/receiving msgs to the
- * webSocket or sending notification that the connection was successful, this is when
- * openObserver is useful for.
- *
- * ```ts
- * import { webSocket } from 'rxjs/webSocket';
- *
- * const wsSubject = webSocket({
- * url: 'ws://localhost:8081',
- * openObserver: {
- * next: () => {
- * console.log('Connection ok');
- * }
- * }
- * });
- *
- * // output
- * // Connection ok
- * ```
- */
- export interface WebSocketSubjectConfig<T> {
- /** The url of the socket server to connect to */
- url: string;
- /** The protocol to use to connect */
- protocol?: string | Array<string>;
- /** @deprecated Will be removed in v8. Use {@link deserializer} instead. */
- resultSelector?: (e: MessageEvent) => T;
- /**
- * A serializer used to create messages from passed values before the
- * messages are sent to the server. Defaults to JSON.stringify.
- */
- serializer?: (value: T) => WebSocketMessage;
- /**
- * A deserializer used for messages arriving on the socket from the
- * server. Defaults to JSON.parse.
- */
- deserializer?: (e: MessageEvent) => T;
- /**
- * An Observer that watches when open events occur on the underlying web socket.
- */
- openObserver?: NextObserver<Event>;
- /**
- * An Observer that watches when close events occur on the underlying web socket
- */
- closeObserver?: NextObserver<CloseEvent>;
- /**
- * An Observer that watches when a close is about to occur due to
- * unsubscription.
- */
- closingObserver?: NextObserver<void>;
- /**
- * A WebSocket constructor to use. This is useful for situations like using a
- * WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket
- * for testing purposes
- */
- WebSocketCtor?: { new (url: string, protocols?: string | string[]): WebSocket };
- /** Sets the `binaryType` property of the underlying WebSocket. */
- binaryType?: 'blob' | 'arraybuffer';
- }
- const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig<any> = {
- url: '',
- deserializer: (e: MessageEvent) => JSON.parse(e.data),
- serializer: (value: any) => JSON.stringify(value),
- };
- const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT =
- 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
- export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView;
- export class WebSocketSubject<T> extends AnonymousSubject<T> {
- // @ts-ignore: Property has no initializer and is not definitely assigned
- private _config: WebSocketSubjectConfig<T>;
- /** @internal */
- // @ts-ignore: Property has no initializer and is not definitely assigned
- _output: Subject<T>;
- private _socket: WebSocket | null = null;
- constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) {
- super();
- if (urlConfigOrSource instanceof Observable) {
- this.destination = destination;
- this.source = urlConfigOrSource as Observable<T>;
- } else {
- const config = (this._config = { ...DEFAULT_WEBSOCKET_CONFIG });
- this._output = new Subject<T>();
- if (typeof urlConfigOrSource === 'string') {
- config.url = urlConfigOrSource;
- } else {
- for (const key in urlConfigOrSource) {
- if (urlConfigOrSource.hasOwnProperty(key)) {
- (config as any)[key] = (urlConfigOrSource as any)[key];
- }
- }
- }
- if (!config.WebSocketCtor && WebSocket) {
- config.WebSocketCtor = WebSocket;
- } else if (!config.WebSocketCtor) {
- throw new Error('no WebSocket constructor can be found');
- }
- this.destination = new ReplaySubject();
- }
- }
- /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
- lift<R>(operator: Operator<T, R>): WebSocketSubject<R> {
- const sock = new WebSocketSubject<R>(this._config as WebSocketSubjectConfig<any>, this.destination as any);
- sock.operator = operator;
- sock.source = this;
- return sock;
- }
- private _resetState() {
- this._socket = null;
- if (!this.source) {
- this.destination = new ReplaySubject();
- }
- this._output = new Subject<T>();
- }
- /**
- * Creates an {@link Observable}, that when subscribed to, sends a message,
- * defined by the `subMsg` function, to the server over the socket to begin a
- * subscription to data over that socket. Once data arrives, the
- * `messageFilter` argument will be used to select the appropriate data for
- * the resulting Observable. When finalization occurs, either due to
- * unsubscription, completion, or error, a message defined by the `unsubMsg`
- * argument will be sent to the server over the WebSocketSubject.
- *
- * @param subMsg A function to generate the subscription message to be sent to
- * the server. This will still be processed by the serializer in the
- * WebSocketSubject's config. (Which defaults to JSON serialization)
- * @param unsubMsg A function to generate the unsubscription message to be
- * sent to the server at finalization. This will still be processed by the
- * serializer in the WebSocketSubject's config.
- * @param messageFilter A predicate for selecting the appropriate messages
- * from the server for the output stream.
- */
- multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
- const self = this;
- return new Observable((observer: Observer<T>) => {
- try {
- self.next(subMsg());
- } catch (err) {
- observer.error(err);
- }
- const subscription = self.subscribe({
- next: (x) => {
- try {
- if (messageFilter(x)) {
- observer.next(x);
- }
- } catch (err) {
- observer.error(err);
- }
- },
- error: (err) => observer.error(err),
- complete: () => observer.complete(),
- });
- return () => {
- try {
- self.next(unsubMsg());
- } catch (err) {
- observer.error(err);
- }
- subscription.unsubscribe();
- };
- });
- }
- private _connectSocket() {
- const { WebSocketCtor, protocol, url, binaryType } = this._config;
- const observer = this._output;
- let socket: WebSocket | null = null;
- try {
- socket = protocol ? new WebSocketCtor!(url, protocol) : new WebSocketCtor!(url);
- this._socket = socket;
- if (binaryType) {
- this._socket.binaryType = binaryType;
- }
- } catch (e) {
- observer.error(e);
- return;
- }
- const subscription = new Subscription(() => {
- this._socket = null;
- if (socket && socket.readyState === 1) {
- socket.close();
- }
- });
- socket.onopen = (evt: Event) => {
- const { _socket } = this;
- if (!_socket) {
- socket!.close();
- this._resetState();
- return;
- }
- const { openObserver } = this._config;
- if (openObserver) {
- openObserver.next(evt);
- }
- const queue = this.destination;
- this.destination = Subscriber.create<T>(
- (x) => {
- if (socket!.readyState === 1) {
- try {
- const { serializer } = this._config;
- socket!.send(serializer!(x!));
- } catch (e) {
- this.destination!.error(e);
- }
- }
- },
- (err) => {
- const { closingObserver } = this._config;
- if (closingObserver) {
- closingObserver.next(undefined);
- }
- if (err && err.code) {
- socket!.close(err.code, err.reason);
- } else {
- observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
- }
- this._resetState();
- },
- () => {
- const { closingObserver } = this._config;
- if (closingObserver) {
- closingObserver.next(undefined);
- }
- socket!.close();
- this._resetState();
- }
- ) as Subscriber<any>;
- if (queue && queue instanceof ReplaySubject) {
- subscription.add((queue as ReplaySubject<T>).subscribe(this.destination));
- }
- };
- socket.onerror = (e: Event) => {
- this._resetState();
- observer.error(e);
- };
- socket.onclose = (e: CloseEvent) => {
- if (socket === this._socket) {
- this._resetState();
- }
- const { closeObserver } = this._config;
- if (closeObserver) {
- closeObserver.next(e);
- }
- if (e.wasClean) {
- observer.complete();
- } else {
- observer.error(e);
- }
- };
- socket.onmessage = (e: MessageEvent) => {
- try {
- const { deserializer } = this._config;
- observer.next(deserializer!(e));
- } catch (err) {
- observer.error(err);
- }
- };
- }
- /** @internal */
- protected _subscribe(subscriber: Subscriber<T>): Subscription {
- const { source } = this;
- if (source) {
- return source.subscribe(subscriber);
- }
- if (!this._socket) {
- this._connectSocket();
- }
- this._output.subscribe(subscriber);
- subscriber.add(() => {
- const { _socket } = this;
- if (this._output.observers.length === 0) {
- if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
- _socket.close();
- }
- this._resetState();
- }
- });
- return subscriber;
- }
- unsubscribe() {
- const { _socket } = this;
- if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
- _socket.close();
- }
- this._resetState();
- super.unsubscribe();
- }
- }
|