4294376dceb234b9f72e5cd0d9d307fa52818e31459ec5c7708abc04e72dc846e8e2a7870eed5276b2fcf95216f423a13687aa45baa0fad078c969b2dc9a3e 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. import { Subject, AnonymousSubject } from '../../Subject';
  2. import { Subscriber } from '../../Subscriber';
  3. import { Observable } from '../../Observable';
  4. import { Subscription } from '../../Subscription';
  5. import { Operator } from '../../Operator';
  6. import { ReplaySubject } from '../../ReplaySubject';
  7. import { Observer, NextObserver } from '../../types';
  8. /**
  9. * WebSocketSubjectConfig is a plain Object that allows us to make our
  10. * webSocket configurable.
  11. *
  12. * <span class="informal">Provides flexibility to {@link webSocket}</span>
  13. *
  14. * It defines a set of properties to provide custom behavior in specific
  15. * moments of the socket's lifecycle. When the connection opens we can
  16. * use `openObserver`, when the connection is closed `closeObserver`, if we
  17. * are interested in listening for data comming from server: `deserializer`,
  18. * which allows us to customize the deserialization strategy of data before passing it
  19. * to the socket client. By default `deserializer` is going to apply `JSON.parse` to each message comming
  20. * from the Server.
  21. *
  22. * ## Example
  23. * **deserializer**, the default for this property is `JSON.parse` but since there are just two options
  24. * for incomming data, either be text or binarydata. We can apply a custom deserialization strategy
  25. * or just simply skip the default behaviour.
  26. * ```ts
  27. * import { webSocket } from 'rxjs/webSocket';
  28. *
  29. * const wsSubject = webSocket({
  30. * url: 'ws://localhost:8081',
  31. * //Apply any transformation of your choice.
  32. * deserializer: ({data}) => data
  33. * });
  34. *
  35. * wsSubject.subscribe(console.log);
  36. *
  37. * // Let's suppose we have this on the Server: ws.send("This is a msg from the server")
  38. * //output
  39. * //
  40. * // This is a msg from the server
  41. * ```
  42. *
  43. * **serializer** allows us tom apply custom serialization strategy but for the outgoing messages
  44. * ```ts
  45. * import { webSocket } from 'rxjs/webSocket';
  46. *
  47. * const wsSubject = webSocket({
  48. * url: 'ws://localhost:8081',
  49. * //Apply any transformation of your choice.
  50. * serializer: msg => JSON.stringify({channel: "webDevelopment", msg: msg})
  51. * });
  52. *
  53. * wsSubject.subscribe(() => subject.next("msg to the server"));
  54. *
  55. * // Let's suppose we have this on the Server: ws.send("This is a msg from the server")
  56. * //output
  57. * //
  58. * // {"channel":"webDevelopment","msg":"msg to the server"}
  59. * ```
  60. *
  61. * **closeObserver** allows us to set a custom error when an error raise up.
  62. * ```ts
  63. * import { webSocket } from 'rxjs/webSocket';
  64. *
  65. * const wsSubject = webSocket({
  66. * url: 'ws://localhost:8081',
  67. * closeObserver: {
  68. next(closeEvent) {
  69. const customError = { code: 6666, reason: "Custom evil reason" }
  70. console.log(`code: ${customError.code}, reason: ${customError.reason}`);
  71. }
  72. }
  73. * });
  74. *
  75. * //output
  76. * // code: 6666, reason: Custom evil reason
  77. * ```
  78. *
  79. * **openObserver**, Let's say we need to make some kind of init task before sending/receiving msgs to the
  80. * webSocket or sending notification that the connection was successful, this is when
  81. * openObserver is usefull for.
  82. * ```ts
  83. * import { webSocket } from 'rxjs/webSocket';
  84. *
  85. * const wsSubject = webSocket({
  86. * url: 'ws://localhost:8081',
  87. * openObserver: {
  88. * next: () => {
  89. * console.log('connetion ok');
  90. * }
  91. * },
  92. * });
  93. *
  94. * //output
  95. * // connetion ok`
  96. * ```
  97. * */
  98. export interface WebSocketSubjectConfig<T> {
  99. /** The url of the socket server to connect to */
  100. url: string;
  101. /** The protocol to use to connect */
  102. protocol?: string | Array<string>;
  103. /** @deprecated use {@link deserializer} */
  104. resultSelector?: (e: MessageEvent) => T;
  105. /**
  106. * A serializer used to create messages from passed values before the
  107. * messages are sent to the server. Defaults to JSON.stringify.
  108. */
  109. serializer?: (value: T) => WebSocketMessage;
  110. /**
  111. * A deserializer used for messages arriving on the socket from the
  112. * server. Defaults to JSON.parse.
  113. */
  114. deserializer?: (e: MessageEvent) => T;
  115. /**
  116. * An Observer that watches when open events occur on the underlying web socket.
  117. */
  118. openObserver?: NextObserver<Event>;
  119. /**
  120. * An Observer than watches when close events occur on the underlying webSocket
  121. */
  122. closeObserver?: NextObserver<CloseEvent>;
  123. /**
  124. * An Observer that watches when a close is about to occur due to
  125. * unsubscription.
  126. */
  127. closingObserver?: NextObserver<void>;
  128. /**
  129. * A WebSocket constructor to use. This is useful for situations like using a
  130. * WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket
  131. * for testing purposes
  132. */
  133. WebSocketCtor?: { new(url: string, protocols?: string|string[]): WebSocket };
  134. /** Sets the `binaryType` property of the underlying WebSocket. */
  135. binaryType?: 'blob' | 'arraybuffer';
  136. }
  137. const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig<any> = {
  138. url: '',
  139. deserializer: (e: MessageEvent) => JSON.parse(e.data),
  140. serializer: (value: any) => JSON.stringify(value),
  141. };
  142. const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT =
  143. 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
  144. export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView;
  145. export class WebSocketSubject<T> extends AnonymousSubject<T> {
  146. private _config: WebSocketSubjectConfig<T>;
  147. /** @deprecated This is an internal implementation detail, do not use. */
  148. _output: Subject<T>;
  149. private _socket: WebSocket;
  150. constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) {
  151. super();
  152. if (urlConfigOrSource instanceof Observable) {
  153. this.destination = destination;
  154. this.source = urlConfigOrSource as Observable<T>;
  155. } else {
  156. const config = this._config = { ...DEFAULT_WEBSOCKET_CONFIG };
  157. this._output = new Subject<T>();
  158. if (typeof urlConfigOrSource === 'string') {
  159. config.url = urlConfigOrSource;
  160. } else {
  161. for (let key in urlConfigOrSource) {
  162. if (urlConfigOrSource.hasOwnProperty(key)) {
  163. config[key] = urlConfigOrSource[key];
  164. }
  165. }
  166. }
  167. if (!config.WebSocketCtor && WebSocket) {
  168. config.WebSocketCtor = WebSocket;
  169. } else if (!config.WebSocketCtor) {
  170. throw new Error('no WebSocket constructor can be found');
  171. }
  172. this.destination = new ReplaySubject();
  173. }
  174. }
  175. lift<R>(operator: Operator<T, R>): WebSocketSubject<R> {
  176. const sock = new WebSocketSubject<R>(this._config as WebSocketSubjectConfig<any>, <any> this.destination);
  177. sock.operator = operator;
  178. sock.source = this;
  179. return sock;
  180. }
  181. private _resetState() {
  182. this._socket = null;
  183. if (!this.source) {
  184. this.destination = new ReplaySubject();
  185. }
  186. this._output = new Subject<T>();
  187. }
  188. /**
  189. * Creates an {@link Observable}, that when subscribed to, sends a message,
  190. * defined by the `subMsg` function, to the server over the socket to begin a
  191. * subscription to data over that socket. Once data arrives, the
  192. * `messageFilter` argument will be used to select the appropriate data for
  193. * the resulting Observable. When teardown occurs, either due to
  194. * unsubscription, completion or error, a message defined by the `unsubMsg`
  195. * argument will be send to the server over the WebSocketSubject.
  196. *
  197. * @param subMsg A function to generate the subscription message to be sent to
  198. * the server. This will still be processed by the serializer in the
  199. * WebSocketSubject's config. (Which defaults to JSON serialization)
  200. * @param unsubMsg A function to generate the unsubscription message to be
  201. * sent to the server at teardown. This will still be processed by the
  202. * serializer in the WebSocketSubject's config.
  203. * @param messageFilter A predicate for selecting the appropriate messages
  204. * from the server for the output stream.
  205. */
  206. multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
  207. const self = this;
  208. return new Observable((observer: Observer<any>) => {
  209. try {
  210. self.next(subMsg());
  211. } catch (err) {
  212. observer.error(err);
  213. }
  214. const subscription = self.subscribe(x => {
  215. try {
  216. if (messageFilter(x)) {
  217. observer.next(x);
  218. }
  219. } catch (err) {
  220. observer.error(err);
  221. }
  222. },
  223. err => observer.error(err),
  224. () => observer.complete());
  225. return () => {
  226. try {
  227. self.next(unsubMsg());
  228. } catch (err) {
  229. observer.error(err);
  230. }
  231. subscription.unsubscribe();
  232. };
  233. });
  234. }
  235. private _connectSocket() {
  236. const { WebSocketCtor, protocol, url, binaryType } = this._config;
  237. const observer = this._output;
  238. let socket: WebSocket = null;
  239. try {
  240. socket = protocol ?
  241. new WebSocketCtor(url, protocol) :
  242. new WebSocketCtor(url);
  243. this._socket = socket;
  244. if (binaryType) {
  245. this._socket.binaryType = binaryType;
  246. }
  247. } catch (e) {
  248. observer.error(e);
  249. return;
  250. }
  251. const subscription = new Subscription(() => {
  252. this._socket = null;
  253. if (socket && socket.readyState === 1) {
  254. socket.close();
  255. }
  256. });
  257. socket.onopen = (e: Event) => {
  258. const { _socket } = this;
  259. if (!_socket) {
  260. socket.close();
  261. this._resetState();
  262. return;
  263. }
  264. const { openObserver } = this._config;
  265. if (openObserver) {
  266. openObserver.next(e);
  267. }
  268. const queue = this.destination;
  269. this.destination = Subscriber.create<T>(
  270. (x) => {
  271. if (socket.readyState === 1) {
  272. try {
  273. const { serializer } = this._config;
  274. socket.send(serializer(x));
  275. } catch (e) {
  276. this.destination.error(e);
  277. }
  278. }
  279. },
  280. (e) => {
  281. const { closingObserver } = this._config;
  282. if (closingObserver) {
  283. closingObserver.next(undefined);
  284. }
  285. if (e && e.code) {
  286. socket.close(e.code, e.reason);
  287. } else {
  288. observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
  289. }
  290. this._resetState();
  291. },
  292. () => {
  293. const { closingObserver } = this._config;
  294. if (closingObserver) {
  295. closingObserver.next(undefined);
  296. }
  297. socket.close();
  298. this._resetState();
  299. }
  300. ) as Subscriber<any>;
  301. if (queue && queue instanceof ReplaySubject) {
  302. subscription.add((<ReplaySubject<T>>queue).subscribe(this.destination));
  303. }
  304. };
  305. socket.onerror = (e: Event) => {
  306. this._resetState();
  307. observer.error(e);
  308. };
  309. socket.onclose = (e: CloseEvent) => {
  310. this._resetState();
  311. const { closeObserver } = this._config;
  312. if (closeObserver) {
  313. closeObserver.next(e);
  314. }
  315. if (e.wasClean) {
  316. observer.complete();
  317. } else {
  318. observer.error(e);
  319. }
  320. };
  321. socket.onmessage = (e: MessageEvent) => {
  322. try {
  323. const { deserializer } = this._config;
  324. observer.next(deserializer(e));
  325. } catch (err) {
  326. observer.error(err);
  327. }
  328. };
  329. }
  330. /** @deprecated This is an internal implementation detail, do not use. */
  331. _subscribe(subscriber: Subscriber<T>): Subscription {
  332. const { source } = this;
  333. if (source) {
  334. return source.subscribe(subscriber);
  335. }
  336. if (!this._socket) {
  337. this._connectSocket();
  338. }
  339. this._output.subscribe(subscriber);
  340. subscriber.add(() => {
  341. const { _socket } = this;
  342. if (this._output.observers.length === 0) {
  343. if (_socket && _socket.readyState === 1) {
  344. _socket.close();
  345. }
  346. this._resetState();
  347. }
  348. });
  349. return subscriber;
  350. }
  351. unsubscribe() {
  352. const { _socket } = this;
  353. if (_socket && _socket.readyState === 1) {
  354. _socket.close();
  355. }
  356. this._resetState();
  357. super.unsubscribe();
  358. }
  359. }