c3a0f53e272dbe29e3c3f695f9e6f29ed3e4ab7f7439e40e4dcc07bdbb04429be5e8e6360e4edf832418b49db1e399c1a3087f34f9fcae881eb868510a0a58 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. import { Subject, AnonymousSubject } from '../../Subject';
  2. import { Subscriber } from '../../Subscriber';
  3. import { Observable } from '../../Observable';
  4. import { Subscription } from '../../Subscription';
  5. import { Operator } from '../../Operator';
  6. import { ReplaySubject } from '../../ReplaySubject';
  7. import { Observer, NextObserver } from '../../types';
  8. /**
  9. * WebSocketSubjectConfig is a plain Object that allows us to make our
  10. * webSocket configurable.
  11. *
  12. * <span class="informal">Provides flexibility to {@link webSocket}</span>
  13. *
  14. * It defines a set of properties to provide custom behavior in specific
  15. * moments of the socket's lifecycle. When the connection opens we can
  16. * use `openObserver`, when the connection is closed `closeObserver`, if we
  17. * are interested in listening for data coming from server: `deserializer`,
  18. * which allows us to customize the deserialization strategy of data before passing it
  19. * to the socket client. By default, `deserializer` is going to apply `JSON.parse` to each message coming
  20. * from the Server.
  21. *
  22. * ## Examples
  23. *
  24. * **deserializer**, the default for this property is `JSON.parse` but since there are just two options
  25. * for incoming data, either be text or binary data. We can apply a custom deserialization strategy
  26. * or just simply skip the default behaviour.
  27. *
  28. * ```ts
  29. * import { webSocket } from 'rxjs/webSocket';
  30. *
  31. * const wsSubject = webSocket({
  32. * url: 'ws://localhost:8081',
  33. * //Apply any transformation of your choice.
  34. * deserializer: ({ data }) => data
  35. * });
  36. *
  37. * wsSubject.subscribe(console.log);
  38. *
  39. * // Let's suppose we have this on the Server: ws.send('This is a msg from the server')
  40. * //output
  41. * //
  42. * // This is a msg from the server
  43. * ```
  44. *
  45. * **serializer** allows us to apply custom serialization strategy but for the outgoing messages.
  46. *
  47. * ```ts
  48. * import { webSocket } from 'rxjs/webSocket';
  49. *
  50. * const wsSubject = webSocket({
  51. * url: 'ws://localhost:8081',
  52. * // Apply any transformation of your choice.
  53. * serializer: msg => JSON.stringify({ channel: 'webDevelopment', msg: msg })
  54. * });
  55. *
  56. * wsSubject.subscribe(() => subject.next('msg to the server'));
  57. *
  58. * // Let's suppose we have this on the Server:
  59. * // ws.on('message', msg => console.log);
  60. * // ws.send('This is a msg from the server');
  61. * // output at server side:
  62. * //
  63. * // {"channel":"webDevelopment","msg":"msg to the server"}
  64. * ```
  65. *
  66. * **closeObserver** allows us to set a custom error when an error raises up.
  67. *
  68. * ```ts
  69. * import { webSocket } from 'rxjs/webSocket';
  70. *
  71. * const wsSubject = webSocket({
  72. * url: 'ws://localhost:8081',
  73. * closeObserver: {
  74. * next() {
  75. * const customError = { code: 6666, reason: 'Custom evil reason' }
  76. * console.log(`code: ${ customError.code }, reason: ${ customError.reason }`);
  77. * }
  78. * }
  79. * });
  80. *
  81. * // output
  82. * // code: 6666, reason: Custom evil reason
  83. * ```
  84. *
  85. * **openObserver**, Let's say we need to make some kind of init task before sending/receiving msgs to the
  86. * webSocket or sending notification that the connection was successful, this is when
  87. * openObserver is useful for.
  88. *
  89. * ```ts
  90. * import { webSocket } from 'rxjs/webSocket';
  91. *
  92. * const wsSubject = webSocket({
  93. * url: 'ws://localhost:8081',
  94. * openObserver: {
  95. * next: () => {
  96. * console.log('Connection ok');
  97. * }
  98. * }
  99. * });
  100. *
  101. * // output
  102. * // Connection ok
  103. * ```
  104. */
  105. export interface WebSocketSubjectConfig<T> {
  106. /** The url of the socket server to connect to */
  107. url: string;
  108. /** The protocol to use to connect */
  109. protocol?: string | Array<string>;
  110. /** @deprecated Will be removed in v8. Use {@link deserializer} instead. */
  111. resultSelector?: (e: MessageEvent) => T;
  112. /**
  113. * A serializer used to create messages from passed values before the
  114. * messages are sent to the server. Defaults to JSON.stringify.
  115. */
  116. serializer?: (value: T) => WebSocketMessage;
  117. /**
  118. * A deserializer used for messages arriving on the socket from the
  119. * server. Defaults to JSON.parse.
  120. */
  121. deserializer?: (e: MessageEvent) => T;
  122. /**
  123. * An Observer that watches when open events occur on the underlying web socket.
  124. */
  125. openObserver?: NextObserver<Event>;
  126. /**
  127. * An Observer that watches when close events occur on the underlying web socket
  128. */
  129. closeObserver?: NextObserver<CloseEvent>;
  130. /**
  131. * An Observer that watches when a close is about to occur due to
  132. * unsubscription.
  133. */
  134. closingObserver?: NextObserver<void>;
  135. /**
  136. * A WebSocket constructor to use. This is useful for situations like using a
  137. * WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket
  138. * for testing purposes
  139. */
  140. WebSocketCtor?: { new (url: string, protocols?: string | string[]): WebSocket };
  141. /** Sets the `binaryType` property of the underlying WebSocket. */
  142. binaryType?: 'blob' | 'arraybuffer';
  143. }
  144. const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig<any> = {
  145. url: '',
  146. deserializer: (e: MessageEvent) => JSON.parse(e.data),
  147. serializer: (value: any) => JSON.stringify(value),
  148. };
  149. const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT =
  150. 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
  151. export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView;
  152. export class WebSocketSubject<T> extends AnonymousSubject<T> {
  153. // @ts-ignore: Property has no initializer and is not definitely assigned
  154. private _config: WebSocketSubjectConfig<T>;
  155. /** @internal */
  156. // @ts-ignore: Property has no initializer and is not definitely assigned
  157. _output: Subject<T>;
  158. private _socket: WebSocket | null = null;
  159. constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) {
  160. super();
  161. if (urlConfigOrSource instanceof Observable) {
  162. this.destination = destination;
  163. this.source = urlConfigOrSource as Observable<T>;
  164. } else {
  165. const config = (this._config = { ...DEFAULT_WEBSOCKET_CONFIG });
  166. this._output = new Subject<T>();
  167. if (typeof urlConfigOrSource === 'string') {
  168. config.url = urlConfigOrSource;
  169. } else {
  170. for (const key in urlConfigOrSource) {
  171. if (urlConfigOrSource.hasOwnProperty(key)) {
  172. (config as any)[key] = (urlConfigOrSource as any)[key];
  173. }
  174. }
  175. }
  176. if (!config.WebSocketCtor && WebSocket) {
  177. config.WebSocketCtor = WebSocket;
  178. } else if (!config.WebSocketCtor) {
  179. throw new Error('no WebSocket constructor can be found');
  180. }
  181. this.destination = new ReplaySubject();
  182. }
  183. }
  184. /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
  185. lift<R>(operator: Operator<T, R>): WebSocketSubject<R> {
  186. const sock = new WebSocketSubject<R>(this._config as WebSocketSubjectConfig<any>, this.destination as any);
  187. sock.operator = operator;
  188. sock.source = this;
  189. return sock;
  190. }
  191. private _resetState() {
  192. this._socket = null;
  193. if (!this.source) {
  194. this.destination = new ReplaySubject();
  195. }
  196. this._output = new Subject<T>();
  197. }
  198. /**
  199. * Creates an {@link Observable}, that when subscribed to, sends a message,
  200. * defined by the `subMsg` function, to the server over the socket to begin a
  201. * subscription to data over that socket. Once data arrives, the
  202. * `messageFilter` argument will be used to select the appropriate data for
  203. * the resulting Observable. When finalization occurs, either due to
  204. * unsubscription, completion, or error, a message defined by the `unsubMsg`
  205. * argument will be sent to the server over the WebSocketSubject.
  206. *
  207. * @param subMsg A function to generate the subscription message to be sent to
  208. * the server. This will still be processed by the serializer in the
  209. * WebSocketSubject's config. (Which defaults to JSON serialization)
  210. * @param unsubMsg A function to generate the unsubscription message to be
  211. * sent to the server at finalization. This will still be processed by the
  212. * serializer in the WebSocketSubject's config.
  213. * @param messageFilter A predicate for selecting the appropriate messages
  214. * from the server for the output stream.
  215. */
  216. multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
  217. const self = this;
  218. return new Observable((observer: Observer<T>) => {
  219. try {
  220. self.next(subMsg());
  221. } catch (err) {
  222. observer.error(err);
  223. }
  224. const subscription = self.subscribe({
  225. next: (x) => {
  226. try {
  227. if (messageFilter(x)) {
  228. observer.next(x);
  229. }
  230. } catch (err) {
  231. observer.error(err);
  232. }
  233. },
  234. error: (err) => observer.error(err),
  235. complete: () => observer.complete(),
  236. });
  237. return () => {
  238. try {
  239. self.next(unsubMsg());
  240. } catch (err) {
  241. observer.error(err);
  242. }
  243. subscription.unsubscribe();
  244. };
  245. });
  246. }
  247. private _connectSocket() {
  248. const { WebSocketCtor, protocol, url, binaryType } = this._config;
  249. const observer = this._output;
  250. let socket: WebSocket | null = null;
  251. try {
  252. socket = protocol ? new WebSocketCtor!(url, protocol) : new WebSocketCtor!(url);
  253. this._socket = socket;
  254. if (binaryType) {
  255. this._socket.binaryType = binaryType;
  256. }
  257. } catch (e) {
  258. observer.error(e);
  259. return;
  260. }
  261. const subscription = new Subscription(() => {
  262. this._socket = null;
  263. if (socket && socket.readyState === 1) {
  264. socket.close();
  265. }
  266. });
  267. socket.onopen = (evt: Event) => {
  268. const { _socket } = this;
  269. if (!_socket) {
  270. socket!.close();
  271. this._resetState();
  272. return;
  273. }
  274. const { openObserver } = this._config;
  275. if (openObserver) {
  276. openObserver.next(evt);
  277. }
  278. const queue = this.destination;
  279. this.destination = Subscriber.create<T>(
  280. (x) => {
  281. if (socket!.readyState === 1) {
  282. try {
  283. const { serializer } = this._config;
  284. socket!.send(serializer!(x!));
  285. } catch (e) {
  286. this.destination!.error(e);
  287. }
  288. }
  289. },
  290. (err) => {
  291. const { closingObserver } = this._config;
  292. if (closingObserver) {
  293. closingObserver.next(undefined);
  294. }
  295. if (err && err.code) {
  296. socket!.close(err.code, err.reason);
  297. } else {
  298. observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
  299. }
  300. this._resetState();
  301. },
  302. () => {
  303. const { closingObserver } = this._config;
  304. if (closingObserver) {
  305. closingObserver.next(undefined);
  306. }
  307. socket!.close();
  308. this._resetState();
  309. }
  310. ) as Subscriber<any>;
  311. if (queue && queue instanceof ReplaySubject) {
  312. subscription.add((queue as ReplaySubject<T>).subscribe(this.destination));
  313. }
  314. };
  315. socket.onerror = (e: Event) => {
  316. this._resetState();
  317. observer.error(e);
  318. };
  319. socket.onclose = (e: CloseEvent) => {
  320. if (socket === this._socket) {
  321. this._resetState();
  322. }
  323. const { closeObserver } = this._config;
  324. if (closeObserver) {
  325. closeObserver.next(e);
  326. }
  327. if (e.wasClean) {
  328. observer.complete();
  329. } else {
  330. observer.error(e);
  331. }
  332. };
  333. socket.onmessage = (e: MessageEvent) => {
  334. try {
  335. const { deserializer } = this._config;
  336. observer.next(deserializer!(e));
  337. } catch (err) {
  338. observer.error(err);
  339. }
  340. };
  341. }
  342. /** @internal */
  343. protected _subscribe(subscriber: Subscriber<T>): Subscription {
  344. const { source } = this;
  345. if (source) {
  346. return source.subscribe(subscriber);
  347. }
  348. if (!this._socket) {
  349. this._connectSocket();
  350. }
  351. this._output.subscribe(subscriber);
  352. subscriber.add(() => {
  353. const { _socket } = this;
  354. if (this._output.observers.length === 0) {
  355. if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
  356. _socket.close();
  357. }
  358. this._resetState();
  359. }
  360. });
  361. return subscriber;
  362. }
  363. unsubscribe() {
  364. const { _socket } = this;
  365. if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
  366. _socket.close();
  367. }
  368. this._resetState();
  369. super.unsubscribe();
  370. }
  371. }