d1d47455ddc02181adef937e5dd3c4f53ed89f91832f8cc1aa9dcc2969f2fe589643eb36db4b9bf27f4b8853cc8d35a1937d75bf78ed553b73ff406b6f2774 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import { Observable } from '../Observable';
  2. import { Unsubscribable, ObservableInput } from '../types';
  3. import { from } from './from'; // from from from! LAWL
  4. import { EMPTY } from './empty';
  5. /**
  6. * Creates an Observable that uses a resource which will be disposed at the same time as the Observable.
  7. *
  8. * <span class="informal">Use it when you catch yourself cleaning up after an Observable.</span>
  9. *
  10. * `using` is a factory operator, which accepts two functions. First function returns a disposable resource.
  11. * It can be an arbitrary object that implements `unsubscribe` method. Second function will be injected with
  12. * that object and should return an Observable. That Observable can use resource object during its execution.
  13. * Both functions passed to `using` will be called every time someone subscribes - neither an Observable nor
  14. * resource object will be shared in any way between subscriptions.
  15. *
  16. * When Observable returned by `using` is subscribed, Observable returned from the second function will be subscribed
  17. * as well. All its notifications (nexted values, completion and error events) will be emitted unchanged by the output
  18. * Observable. If however someone unsubscribes from the Observable or source Observable completes or errors by itself,
  19. * the `unsubscribe` method on resource object will be called. This can be used to do any necessary clean up, which
  20. * otherwise would have to be handled by hand. Note that complete or error notifications are not emitted when someone
  21. * cancels subscription to an Observable via `unsubscribe`, so `using` can be used as a hook, allowing you to make
  22. * sure that all resources which need to exist during an Observable execution will be disposed at appropriate time.
  23. *
  24. * @see {@link defer}
  25. *
  26. * @param {function(): ISubscription} resourceFactory A function which creates any resource object
  27. * that implements `unsubscribe` method.
  28. * @param {function(resource: ISubscription): Observable<T>} observableFactory A function which
  29. * creates an Observable, that can use injected resource object.
  30. * @return {Observable<T>} An Observable that behaves the same as Observable returned by `observableFactory`, but
  31. * which - when completed, errored or unsubscribed - will also call `unsubscribe` on created resource object.
  32. */
  33. export function using<T>(resourceFactory: () => Unsubscribable | void,
  34. observableFactory: (resource: Unsubscribable | void) => ObservableInput<T> | void): Observable<T> {
  35. return new Observable<T>(subscriber => {
  36. let resource: Unsubscribable | void;
  37. try {
  38. resource = resourceFactory();
  39. } catch (err) {
  40. subscriber.error(err);
  41. return undefined;
  42. }
  43. let result: ObservableInput<T> | void;
  44. try {
  45. result = observableFactory(resource);
  46. } catch (err) {
  47. subscriber.error(err);
  48. return undefined;
  49. }
  50. const source = result ? from(result) : EMPTY;
  51. const subscription = source.subscribe(subscriber);
  52. return () => {
  53. subscription.unsubscribe();
  54. if (resource) {
  55. resource.unsubscribe();
  56. }
  57. };
  58. });
  59. }