91cf6689827ce672c7d5a753f9bbcf45321b33d94650222b3d39927fe1324e29a5b8440e4dc4be0fdef42a1de65c72ebe064abaabbf49b78cd3d1e54220612 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. import { asyncScheduler } from '../scheduler/async';
  2. import { MonoTypeOperatorFunction, SchedulerLike, OperatorFunction, ObservableInput, ObservedValueOf } from '../types';
  3. import { isValidDate } from '../util/isDate';
  4. import { Subscription } from '../Subscription';
  5. import { operate } from '../util/lift';
  6. import { Observable } from '../Observable';
  7. import { innerFrom } from '../observable/innerFrom';
  8. import { createErrorClass } from '../util/createErrorClass';
  9. import { createOperatorSubscriber } from './OperatorSubscriber';
  10. import { executeSchedule } from '../util/executeSchedule';
  11. export interface TimeoutConfig<T, O extends ObservableInput<unknown> = ObservableInput<T>, M = unknown> {
  12. /**
  13. * The time allowed between values from the source before timeout is triggered.
  14. */
  15. each?: number;
  16. /**
  17. * The relative time as a `number` in milliseconds, or a specific time as a `Date` object,
  18. * by which the first value must arrive from the source before timeout is triggered.
  19. */
  20. first?: number | Date;
  21. /**
  22. * The scheduler to use with time-related operations within this operator. Defaults to {@link asyncScheduler}
  23. */
  24. scheduler?: SchedulerLike;
  25. /**
  26. * A factory used to create observable to switch to when timeout occurs. Provides
  27. * a {@link TimeoutInfo} about the source observable's emissions and what delay or
  28. * exact time triggered the timeout.
  29. */
  30. with?: (info: TimeoutInfo<T, M>) => O;
  31. /**
  32. * Optional additional metadata you can provide to code that handles
  33. * the timeout, will be provided through the {@link TimeoutError}.
  34. * This can be used to help identify the source of a timeout or pass along
  35. * other information related to the timeout.
  36. */
  37. meta?: M;
  38. }
  39. export interface TimeoutInfo<T, M = unknown> {
  40. /** Optional metadata that was provided to the timeout configuration. */
  41. readonly meta: M;
  42. /** The number of messages seen before the timeout */
  43. readonly seen: number;
  44. /** The last message seen */
  45. readonly lastValue: T | null;
  46. }
  47. /**
  48. * An error emitted when a timeout occurs.
  49. */
  50. export interface TimeoutError<T = unknown, M = unknown> extends Error {
  51. /**
  52. * The information provided to the error by the timeout
  53. * operation that created the error. Will be `null` if
  54. * used directly in non-RxJS code with an empty constructor.
  55. * (Note that using this constructor directly is not recommended,
  56. * you should create your own errors)
  57. */
  58. info: TimeoutInfo<T, M> | null;
  59. }
  60. export interface TimeoutErrorCtor {
  61. /**
  62. * @deprecated Internal implementation detail. Do not construct error instances.
  63. * Cannot be tagged as internal: https://github.com/ReactiveX/rxjs/issues/6269
  64. */
  65. new <T = unknown, M = unknown>(info?: TimeoutInfo<T, M>): TimeoutError<T, M>;
  66. }
  67. /**
  68. * An error thrown by the {@link timeout} operator.
  69. *
  70. * Provided so users can use as a type and do quality comparisons.
  71. * We recommend you do not subclass this or create instances of this class directly.
  72. * If you have need of a error representing a timeout, you should
  73. * create your own error class and use that.
  74. *
  75. * @see {@link timeout}
  76. */
  77. export const TimeoutError: TimeoutErrorCtor = createErrorClass(
  78. (_super) =>
  79. function TimeoutErrorImpl(this: any, info: TimeoutInfo<any> | null = null) {
  80. _super(this);
  81. this.message = 'Timeout has occurred';
  82. this.name = 'TimeoutError';
  83. this.info = info;
  84. }
  85. );
  86. /**
  87. * If `with` is provided, this will return an observable that will switch to a different observable if the source
  88. * does not push values within the specified time parameters.
  89. *
  90. * <span class="informal">The most flexible option for creating a timeout behavior.</span>
  91. *
  92. * The first thing to know about the configuration is if you do not provide a `with` property to the configuration,
  93. * when timeout conditions are met, this operator will emit a {@link TimeoutError}. Otherwise, it will use the factory
  94. * function provided by `with`, and switch your subscription to the result of that. Timeout conditions are provided by
  95. * the settings in `first` and `each`.
  96. *
  97. * The `first` property can be either a `Date` for a specific time, a `number` for a time period relative to the
  98. * point of subscription, or it can be skipped. This property is to check timeout conditions for the arrival of
  99. * the first value from the source _only_. The timings of all subsequent values from the source will be checked
  100. * against the time period provided by `each`, if it was provided.
  101. *
  102. * The `each` property can be either a `number` or skipped. If a value for `each` is provided, it represents the amount of
  103. * time the resulting observable will wait between the arrival of values from the source before timing out. Note that if
  104. * `first` is _not_ provided, the value from `each` will be used to check timeout conditions for the arrival of the first
  105. * value and all subsequent values. If `first` _is_ provided, `each` will only be use to check all values after the first.
  106. *
  107. * ## Examples
  108. *
  109. * Emit a custom error if there is too much time between values
  110. *
  111. * ```ts
  112. * import { interval, timeout, throwError } from 'rxjs';
  113. *
  114. * class CustomTimeoutError extends Error {
  115. * constructor() {
  116. * super('It was too slow');
  117. * this.name = 'CustomTimeoutError';
  118. * }
  119. * }
  120. *
  121. * const slow$ = interval(900);
  122. *
  123. * slow$.pipe(
  124. * timeout({
  125. * each: 1000,
  126. * with: () => throwError(() => new CustomTimeoutError())
  127. * })
  128. * )
  129. * .subscribe({
  130. * error: console.error
  131. * });
  132. * ```
  133. *
  134. * Switch to a faster observable if your source is slow.
  135. *
  136. * ```ts
  137. * import { interval, timeout } from 'rxjs';
  138. *
  139. * const slow$ = interval(900);
  140. * const fast$ = interval(500);
  141. *
  142. * slow$.pipe(
  143. * timeout({
  144. * each: 1000,
  145. * with: () => fast$,
  146. * })
  147. * )
  148. * .subscribe(console.log);
  149. * ```
  150. * @param config The configuration for the timeout.
  151. */
  152. export function timeout<T, O extends ObservableInput<unknown>, M = unknown>(
  153. config: TimeoutConfig<T, O, M> & { with: (info: TimeoutInfo<T, M>) => O }
  154. ): OperatorFunction<T, T | ObservedValueOf<O>>;
  155. /**
  156. * Returns an observable that will error or switch to a different observable if the source does not push values
  157. * within the specified time parameters.
  158. *
  159. * <span class="informal">The most flexible option for creating a timeout behavior.</span>
  160. *
  161. * The first thing to know about the configuration is if you do not provide a `with` property to the configuration,
  162. * when timeout conditions are met, this operator will emit a {@link TimeoutError}. Otherwise, it will use the factory
  163. * function provided by `with`, and switch your subscription to the result of that. Timeout conditions are provided by
  164. * the settings in `first` and `each`.
  165. *
  166. * The `first` property can be either a `Date` for a specific time, a `number` for a time period relative to the
  167. * point of subscription, or it can be skipped. This property is to check timeout conditions for the arrival of
  168. * the first value from the source _only_. The timings of all subsequent values from the source will be checked
  169. * against the time period provided by `each`, if it was provided.
  170. *
  171. * The `each` property can be either a `number` or skipped. If a value for `each` is provided, it represents the amount of
  172. * time the resulting observable will wait between the arrival of values from the source before timing out. Note that if
  173. * `first` is _not_ provided, the value from `each` will be used to check timeout conditions for the arrival of the first
  174. * value and all subsequent values. If `first` _is_ provided, `each` will only be use to check all values after the first.
  175. *
  176. * ### Handling TimeoutErrors
  177. *
  178. * If no `with` property was provided, subscriptions to the resulting observable may emit an error of {@link TimeoutError}.
  179. * The timeout error provides useful information you can examine when you're handling the error. The most common way to handle
  180. * the error would be with {@link catchError}, although you could use {@link tap} or just the error handler in your `subscribe` call
  181. * directly, if your error handling is only a side effect (such as notifying the user, or logging).
  182. *
  183. * In this case, you would check the error for `instanceof TimeoutError` to validate that the error was indeed from `timeout`, and
  184. * not from some other source. If it's not from `timeout`, you should probably rethrow it if you're in a `catchError`.
  185. *
  186. * ## Examples
  187. *
  188. * Emit a {@link TimeoutError} if the first value, and _only_ the first value, does not arrive within 5 seconds
  189. *
  190. * ```ts
  191. * import { interval, timeout } from 'rxjs';
  192. *
  193. * // A random interval that lasts between 0 and 10 seconds per tick
  194. * const source$ = interval(Math.round(Math.random() * 10_000));
  195. *
  196. * source$.pipe(
  197. * timeout({ first: 5_000 })
  198. * )
  199. * .subscribe({
  200. * next: console.log,
  201. * error: console.error
  202. * });
  203. * ```
  204. *
  205. * Emit a {@link TimeoutError} if the source waits longer than 5 seconds between any two values or the first value
  206. * and subscription.
  207. *
  208. * ```ts
  209. * import { timer, timeout, expand } from 'rxjs';
  210. *
  211. * const getRandomTime = () => Math.round(Math.random() * 10_000);
  212. *
  213. * // An observable that waits a random amount of time between each delivered value
  214. * const source$ = timer(getRandomTime())
  215. * .pipe(expand(() => timer(getRandomTime())));
  216. *
  217. * source$
  218. * .pipe(timeout({ each: 5_000 }))
  219. * .subscribe({
  220. * next: console.log,
  221. * error: console.error
  222. * });
  223. * ```
  224. *
  225. * Emit a {@link TimeoutError} if the source does not emit before 7 seconds, _or_ if the source waits longer than
  226. * 5 seconds between any two values after the first.
  227. *
  228. * ```ts
  229. * import { timer, timeout, expand } from 'rxjs';
  230. *
  231. * const getRandomTime = () => Math.round(Math.random() * 10_000);
  232. *
  233. * // An observable that waits a random amount of time between each delivered value
  234. * const source$ = timer(getRandomTime())
  235. * .pipe(expand(() => timer(getRandomTime())));
  236. *
  237. * source$
  238. * .pipe(timeout({ first: 7_000, each: 5_000 }))
  239. * .subscribe({
  240. * next: console.log,
  241. * error: console.error
  242. * });
  243. * ```
  244. */
  245. export function timeout<T, M = unknown>(config: Omit<TimeoutConfig<T, any, M>, 'with'>): OperatorFunction<T, T>;
  246. /**
  247. * Returns an observable that will error if the source does not push its first value before the specified time passed as a `Date`.
  248. * This is functionally the same as `timeout({ first: someDate })`.
  249. *
  250. * <span class="informal">Errors if the first value doesn't show up before the given date and time</span>
  251. *
  252. * ![](timeout.png)
  253. *
  254. * @param first The date to at which the resulting observable will timeout if the source observable
  255. * does not emit at least one value.
  256. * @param scheduler The scheduler to use. Defaults to {@link asyncScheduler}.
  257. */
  258. export function timeout<T>(first: Date, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
  259. /**
  260. * Returns an observable that will error if the source does not push a value within the specified time in milliseconds.
  261. * This is functionally the same as `timeout({ each: milliseconds })`.
  262. *
  263. * <span class="informal">Errors if it waits too long between any value</span>
  264. *
  265. * ![](timeout.png)
  266. *
  267. * @param each The time allowed between each pushed value from the source before the resulting observable
  268. * will timeout.
  269. * @param scheduler The scheduler to use. Defaults to {@link asyncScheduler}.
  270. */
  271. export function timeout<T>(each: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
  272. /**
  273. *
  274. * Errors if Observable does not emit a value in given time span.
  275. *
  276. * <span class="informal">Timeouts on Observable that doesn't emit values fast enough.</span>
  277. *
  278. * ![](timeout.png)
  279. *
  280. * @see {@link timeoutWith}
  281. *
  282. * @return A function that returns an Observable that mirrors behaviour of the
  283. * source Observable, unless timeout happens when it throws an error.
  284. */
  285. export function timeout<T, O extends ObservableInput<any>, M>(
  286. config: number | Date | TimeoutConfig<T, O, M>,
  287. schedulerArg?: SchedulerLike
  288. ): OperatorFunction<T, T | ObservedValueOf<O>> {
  289. // Intentionally terse code.
  290. // If the first argument is a valid `Date`, then we use it as the `first` config.
  291. // Otherwise, if the first argument is a `number`, then we use it as the `each` config.
  292. // Otherwise, it can be assumed the first argument is the configuration object itself, and
  293. // we destructure that into what we're going to use, setting important defaults as we do.
  294. // NOTE: The default for `scheduler` will be the `scheduler` argument if it exists, or
  295. // it will default to the `asyncScheduler`.
  296. const {
  297. first,
  298. each,
  299. with: _with = timeoutErrorFactory,
  300. scheduler = schedulerArg ?? asyncScheduler,
  301. meta = null!,
  302. } = (isValidDate(config) ? { first: config } : typeof config === 'number' ? { each: config } : config) as TimeoutConfig<T, O, M>;
  303. if (first == null && each == null) {
  304. // Ensure timeout was provided at runtime.
  305. throw new TypeError('No timeout provided.');
  306. }
  307. return operate((source, subscriber) => {
  308. // This subscription encapsulates our subscription to the
  309. // source for this operator. We're capturing it separately,
  310. // because if there is a `with` observable to fail over to,
  311. // we want to unsubscribe from our original subscription, and
  312. // hand of the subscription to that one.
  313. let originalSourceSubscription: Subscription;
  314. // The subscription for our timeout timer. This changes
  315. // every time we get a new value.
  316. let timerSubscription: Subscription;
  317. // A bit of state we pass to our with and error factories to
  318. // tell what the last value we saw was.
  319. let lastValue: T | null = null;
  320. // A bit of state we pass to the with and error factories to
  321. // tell how many values we have seen so far.
  322. let seen = 0;
  323. const startTimer = (delay: number) => {
  324. timerSubscription = executeSchedule(
  325. subscriber,
  326. scheduler,
  327. () => {
  328. try {
  329. originalSourceSubscription.unsubscribe();
  330. innerFrom(
  331. _with!({
  332. meta,
  333. lastValue,
  334. seen,
  335. })
  336. ).subscribe(subscriber);
  337. } catch (err) {
  338. subscriber.error(err);
  339. }
  340. },
  341. delay
  342. );
  343. };
  344. originalSourceSubscription = source.subscribe(
  345. createOperatorSubscriber(
  346. subscriber,
  347. (value: T) => {
  348. // clear the timer so we can emit and start another one.
  349. timerSubscription?.unsubscribe();
  350. seen++;
  351. // Emit
  352. subscriber.next((lastValue = value));
  353. // null | undefined are both < 0. Thanks, JavaScript.
  354. each! > 0 && startTimer(each!);
  355. },
  356. undefined,
  357. undefined,
  358. () => {
  359. if (!timerSubscription?.closed) {
  360. timerSubscription?.unsubscribe();
  361. }
  362. // Be sure not to hold the last value in memory after unsubscription
  363. // it could be quite large.
  364. lastValue = null;
  365. }
  366. )
  367. );
  368. // Intentionally terse code.
  369. // If we've `seen` a value, that means the "first" clause was met already, if it existed.
  370. // it also means that a timer was already started for "each" (in the next handler above).
  371. // If `first` was provided, and it's a number, then use it.
  372. // If `first` was provided and it's not a number, it's a Date, and we get the difference between it and "now".
  373. // If `first` was not provided at all, then our first timer will be the value from `each`.
  374. !seen && startTimer(first != null ? (typeof first === 'number' ? first : +first - scheduler!.now()) : each!);
  375. });
  376. }
  377. /**
  378. * The default function to use to emit an error when timeout occurs and a `with` function
  379. * is not specified.
  380. * @param info The information about the timeout to pass along to the error
  381. */
  382. function timeoutErrorFactory(info: TimeoutInfo<any>): Observable<never> {
  383. throw new TimeoutError(info);
  384. }