1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of dart.async;
6
7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream.
9 // -------------------------------------------------------------------
10
11 /**
12 * Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks.
13 */
14 typedef void ControllerCallback();
15
16 /**
17 * Type of stream controller `onCancel` callbacks.
18 *
19 * The callback may return either `void` or a future.
20 */
21 typedef ControllerCancelCallback();
22
23 /**
24 * A controller with the stream it controls.
25 *
26 * This controller allows sending data, error and done events on
27 * its [stream].
28 * This class can be used to create a simple stream that others
29 * can listen on, and to push events to that stream.
30 *
31 * It's possible to check whether the stream is paused or not, and whether
32 * it has subscribers or not, as well as getting a callback when either of
33 * these change.
34 */
35 abstract class StreamController<T> implements StreamSink<T> {
36 /** The stream that this controller is controlling. */
37 Stream<T> get stream;
38
39 /**
40 * A controller with a [stream] that supports only one single subscriber.
41 *
42 * If [sync] is true, the returned stream controller is a
43 * [SynchronousStreamController], and must be used with the care
44 * and attention necessary to not break the [Stream] contract. If in doubt,
45 * use the non-sync version.
46 *
47 * Using an asynchronous controller will never give the wrong
48 * behavior, but using a synchronous controller incorrectly can cause
49 * otherwise correct programs to break.
50 *
51 * A synchronous controller is only intended for optimizing event
52 * propagation when one asynchronous event immediately triggers another.
53 * It should not be used unless the calls to [add] or [addError]
54 * are guaranteed to occur in places where it won't break `Stream` invariants.
55 *
56 * Use synchronous controllers only to forward (potentially transformed)
57 * events from another stream or a future.
58 *
59 * A Stream should be inert until a subscriber starts listening on it (using
60 * the [onListen] callback to start producing events). Streams should not
61 * leak resources (like websockets) when no user ever listens on the stream.
62 *
63 * The controller buffers all incoming events until a subscriber is
64 * registered, but this feature should only be used in rare circumstances.
65 *
66 * The [onPause] function is called when the stream becomes
67 * paused. [onResume] is called when the stream resumed.
68 *
69 * The [onListen] callback is called when the stream
70 * receives its listener and [onCancel] when the listener ends
71 * its subscription. If [onCancel] needs to perform an asynchronous operation,
72 * [onCancel] should return a future that completes when the cancel operation
73 * is done.
74 *
75 * If the stream is canceled before the controller needs new data the
76 * [onResume] call might not be executed.
77 */
78 factory StreamController(
79 {void onListen(),
80 void onPause(),
81 void onResume(),
82 onCancel(),
83 bool sync: false}) {
84 return sync
85 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
86 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
87 }
88
89 /**
90 * A controller where [stream] can be listened to more than once.
91 *
92 * The [Stream] returned by [stream] is a broadcast stream.
93 * It can be listened to more than once.
94 *
95 * A Stream should be inert until a subscriber starts listening on it (using
96 * the [onListen] callback to start producing events). Streams should not
97 * leak resources (like websockets) when no user ever listens on the stream.
98 *
99 * Broadcast streams do not buffer events when there is no listener.
100 *
101 * The controller distributes any events to all currently subscribed
102 * listeners at the time when [add], [addError] or [close] is called.
103 * It is not allowed to call `add`, `addError`, or `close` before a previous
104 * call has returned. The controller does not have any internal queue of
105 * events, and if there are no listeners at the time the event is added,
106 * it will just be dropped, or, if it is an error, be reported as uncaught.
107 *
108 * Each listener subscription is handled independently,
109 * and if one pauses, only the pausing listener is affected.
110 * A paused listener will buffer events internally until unpaused or canceled.
111 *
112 * If [sync] is true, events may be fired directly by the stream's
113 * subscriptions during an [add], [addError] or [close] call.
114 * The returned stream controller is a [SynchronousStreamController],
115 * and must be used with the care and attention necessary to not break
116 * the [Stream] contract.
117 * See [Completer.sync] for some explanations on when a synchronous
118 * dispatching can be used.
119 * If in doubt, keep the controller non-sync.
120 *
121 * If [sync] is false, the event will always be fired at a later time,
122 * after the code adding the event has completed.
123 * In that case, no guarantees are given with regard to when
124 * multiple listeners get the events, except that each listener will get
125 * all events in the correct order. Each subscription handles the events
126 * individually.
127 * If two events are sent on an async controller with two listeners,
128 * one of the listeners may get both events
129 * before the other listener gets any.
130 * A listener must be subscribed both when the event is initiated
131 * (that is, when [add] is called)
132 * and when the event is later delivered,
133 * in order to receive the event.
134 *
135 * The [onListen] callback is called when the first listener is subscribed,
136 * and the [onCancel] is called when there are no longer any active listeners.
137 * If a listener is added again later, after the [onCancel] was called,
138 * the [onListen] will be called again.
139 */
140 factory StreamController.broadcast(
141 {void onListen(), void onCancel(), bool sync: false}) {
142 return sync
143 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
144 : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
145 }
146
147 /**
148 * The callback which is called when the stream is listened to.
149 *
150 * May be set to `null`, in which case no callback will happen.
151 */
152 ControllerCallback get onListen;
153
154 void set onListen(void onListenHandler());
155
156 /**
157 * The callback which is called when the stream is paused.
158 *
159 * May be set to `null`, in which case no callback will happen.
160 *
161 * Pause related callbacks are not supported on broadcast stream controllers.
162 */
163 ControllerCallback get onPause;
164
165 void set onPause(void onPauseHandler());
166
167 /**
168 * The callback which is called when the stream is resumed.
169 *
170 * May be set to `null`, in which case no callback will happen.
171 *
172 * Pause related callbacks are not supported on broadcast stream controllers.
173 */
174 ControllerCallback get onResume;
175
176 void set onResume(void onResumeHandler());
177
178 /**
179 * The callback which is called when the stream is canceled.
180 *
181 * May be set to `null`, in which case no callback will happen.
182 */
183 ControllerCancelCallback get onCancel;
184
185 void set onCancel(onCancelHandler());
186
187 /**
188 * Returns a view of this object that only exposes the [StreamSink] interface.
189 */
190 StreamSink<T> get sink;
191
192 /**
193 * Whether the stream controller is closed for adding more events.
194 *
195 * The controller becomes closed by calling the [close] method.
196 * New events cannot be added, by calling [add] or [addError],
197 * to a closed controller.
198 *
199 * If the controller is closed,
200 * the "done" event might not have been delivered yet,
201 * but it has been scheduled, and it is too late to add more events.
202 */
203 bool get isClosed;
204
205 /**
206 * Whether the subscription would need to buffer events.
207 *
208 * This is the case if the controller's stream has a listener and it is
209 * paused, or if it has not received a listener yet. In that case, the
210 * controller is considered paused as well.
211 *
212 * A broadcast stream controller is never considered paused. It always
213 * forwards its events to all uncanceled subscriptions, if any,
214 * and let the subscriptions handle their own pausing and buffering.
215 */
216 bool get isPaused;
217
218 /** Whether there is a subscriber on the [Stream]. */
219 bool get hasListener;
220
221 /**
222 * Sends a data [event].
223 *
224 * Listeners receive this event in a later microtask.
225 *
226 * Note that a synchronous controller (created by passing true to the `sync`
227 * parameter of the `StreamController` constructor) delivers events
228 * immediately. Since this behavior violates the contract mentioned here,
229 * synchronous controllers should only be used as described in the
230 * documentation to ensure that the delivered events always *appear* as if
231 * they were delivered in a separate microtask.
232 */
233 void add(T event);
234
235 /**
236 * Sends or enqueues an error event.
237 *
238 * If [error] is `null`, it is replaced by a [NullThrownError].
239 *
240 * Listeners receive this event at a later microtask. This behavior can be
241 * overridden by using `sync` controllers. Note, however, that sync
242 * controllers have to satisfy the preconditions mentioned in the
243 * documentation of the constructors.
244 */
245 void addError(Object error, [StackTrace stackTrace]);
246
247 /**
248 * Closes the stream.
249 *
250 * Listeners receive the done event at a later microtask. This behavior can be
251 * overridden by using `sync` controllers. Note, however, that sync
252 * controllers have to satisfy the preconditions mentioned in the
253 * documentation of the constructors.
254 */
255 Future close();
256
257 /**
258 * Receives events from [source] and puts them into this controller's stream.
259 *
260 * Returns a future which completes when the source stream is done.
261 *
262 * Events must not be added directly to this controller using [add],
263 * [addError], [close] or [addStream], until the returned future
264 * is complete.
265 *
266 * Data and error events are forwarded to this controller's stream. A done
267 * event on the source will end the `addStream` operation and complete the
268 * returned future.
269 *
270 * If [cancelOnError] is true, only the first error on [source] is
271 * forwarded to the controller's stream, and the `addStream` ends
272 * after this. If [cancelOnError] is false, all errors are forwarded
273 * and only a done event will end the `addStream`.
274 * If [cancelOnError] is omitted, it defaults to false.
275 */
276 Future addStream(Stream<T> source, {bool cancelOnError});
277 }
278
279 /**
280 * A stream controller that delivers its events synchronously.
281 *
282 * A synchronous stream controller is intended for cases where
283 * an already asynchronous event triggers an event on a stream.
284 *
285 * Instead of adding the event to the stream in a later microtask,
286 * causing extra latency, the event is instead fired immediately by the
287 * synchronous stream controller, as if the stream event was
288 * the current event or microtask.
289 *
290 * The synchronous stream controller can be used to break the contract
291 * on [Stream], and it must be used carefully to avoid doing so.
292 *
293 * The only advantage to using a [SynchronousStreamController] over a
294 * normal [StreamController] is the improved latency.
295 * Only use the synchronous version if the improvement is significant,
296 * and if its use is safe. Otherwise just use a normal stream controller,
297 * which will always have the correct behavior for a [Stream], and won't
298 * accidentally break other code.
299 *
300 * Adding events to a synchronous controller should only happen as the
301 * very last part of the handling of the original event.
302 * At that point, adding an event to the stream is equivalent to
303 * returning to the event loop and adding the event in the next microtask.
304 *
305 * Each listener callback will be run as if it was a top-level event
306 * or microtask. This means that if it throws, the error will be reported as
307 * uncaught as soon as possible.
308 * This is one reason to add the event as the last thing in the original event
309 * handler - any action done after adding the event will delay the report of
310 * errors in the event listener callbacks.
311 *
312 * If an event is added in a setting that isn't known to be another event,
313 * it may cause the stream's listener to get that event before the listener
314 * is ready to handle it. We promise that after calling [Stream.listen],
315 * you won't get any events until the code doing the listen has completed.
316 * Calling [add] in response to a function call of unknown origin may break
317 * that promise.
318 *
319 * An [onListen] callback from the controller is *not* an asynchronous event,
320 * and adding events to the controller in the `onListen` callback is always
321 * wrong. The events will be delivered before the listener has even received
322 * the subscription yet.
323 *
324 * The synchronous broadcast stream controller also has a restrictions that a
325 * normal stream controller does not:
326 * The [add], [addError], [close] and [addStream] methods *must not* be
327 * called while an event is being delivered.
328 * That is, if a callback on a subscription on the controller's stream causes
329 * a call to any of the functions above, the call will fail.
330 * A broadcast stream may have more than one listener, and if an
331 * event is added synchronously while another is being also in the process
332 * of being added, the latter event might reach some listeners before
333 * the former. To prevent that, an event cannot be added while a previous
334 * event is being fired.
335 * This guarantees that an event is fully delivered when the
336 * first [add], [addError] or [close] returns,
337 * and further events will be delivered in the correct order.
338 *
339 * This still only guarantees that the event is delivered to the subscription.
340 * If the subscription is paused, the actual callback may still happen later,
341 * and the event will instead be buffered by the subscription.
342 * Barring pausing, and the following buffered events that haven't been
343 * delivered yet, callbacks will be called synchronously when an event is added.
344 *
345 * Adding an event to a synchronous non-broadcast stream controller while
346 * another event is in progress may cause the second event to be delayed
347 * and not be delivered synchronously, and until that event is delivered,
348 * the controller will not act synchronously.
349 */
350 abstract class SynchronousStreamController<T> implements StreamController<T> {
351 /**
352 * Adds event to the controller's stream.
353 *
354 * As [StreamController.add], but must not be called while an event is
355 * being added by [add], [addError] or [close].
356 */
357 void add(T data);
358
359 /**
360 * Adds error to the controller's stream.
361 *
362 * As [StreamController.addError], but must not be called while an event is
363 * being added by [add], [addError] or [close].
364 */
365 void addError(Object error, [StackTrace stackTrace]);
366
367 /**
368 * Closes the controller's stream.
369 *
370 * As [StreamController.close], but must not be called while an event is
371 * being added by [add], [addError] or [close].
372 */
373 Future close();
374 }
375
376 abstract class _StreamControllerLifecycle<T> {
377 StreamSubscription<T> _subscribe(
378 void onData(T data), Function onError, void onDone(), bool cancelOnError);
379 void _recordPause(StreamSubscription<T> subscription) {}
380 void _recordResume(StreamSubscription<T> subscription) {}
381 Future _recordCancel(StreamSubscription<T> subscription) => null;
382 }
383
384 // Base type for implementations of stream controllers.
385 abstract class _StreamControllerBase<T>
386 implements
387 StreamController<T>,
388 _StreamControllerLifecycle<T>,
389 _EventSink<T>,
390 _EventDispatch<T> {}
391
392 /**
393 * Default implementation of [StreamController].
394 *
395 * Controls a stream that only supports a single controller.
396 */
397 abstract class _StreamController<T> implements _StreamControllerBase<T> {
398 // The states are bit-flags. More than one can be set at a time.
399 //
400 // The "subscription state" goes through the states:
401 // initial -> subscribed -> canceled.
402 // These are mutually exclusive.
403 // The "closed" state records whether the [close] method has been called
404 // on the controller. This can be done at any time. If done before
405 // subscription, the done event is queued. If done after cancel, the done
406 // event is ignored (just as any other event after a cancel).
407
408 /** The controller is in its initial state with no subscription. */
409 static const int _STATE_INITIAL = 0;
410 /** The controller has a subscription, but hasn't been closed or canceled. */
411 static const int _STATE_SUBSCRIBED = 1;
412 /** The subscription is canceled. */
413 static const int _STATE_CANCELED = 2;
414 /** Mask for the subscription state. */
415 static const int _STATE_SUBSCRIPTION_MASK = 3;
416
417 // The following state relate to the controller, not the subscription.
418 // If closed, adding more events is not allowed.
419 // If executing an [addStream], new events are not allowed either, but will
420 // be added by the stream.
421
422 /**
423 * The controller is closed due to calling [close].
424 *
425 * When the stream is closed, you can neither add new events nor add new
426 * listeners.
427 */
428 static const int _STATE_CLOSED = 4;
429 /**
430 * The controller is in the middle of an [addStream] operation.
431 *
432 * While adding events from a stream, no new events can be added directly
433 * on the controller.
434 */
435 static const int _STATE_ADDSTREAM = 8;
436
437 /**
438 * Field containing different data depending on the current subscription
439 * state.
440 *
441 * If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents]
442 * for events added to the controller before a subscription.
443 *
444 * While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription.
445 *
446 * When [_state] is [_STATE_CANCELED] the field is currently not used.
447 */
448 var _varData;
449
450 /** Current state of the controller. */
451 int _state = _STATE_INITIAL;
452
453 /**
454 * Future completed when the stream sends its last event.
455 *
456 * This is also the future returned by [close].
457 */
458 // TODO(lrn): Could this be stored in the varData field too, if it's not
459 // accessed until the call to "close"? Then we need to special case if it's
460 // accessed earlier, or if close is called before subscribing.
461 _Future _doneFuture;
462
463 ControllerCallback onListen;
464 ControllerCallback onPause;
465 ControllerCallback onResume;
466 ControllerCancelCallback onCancel;
467
468 _StreamController(this.onListen, this.onPause, this.onResume, this.onCancel);
469
470 // Return a new stream every time. The streams are equal, but not identical.
471 Stream<T> get stream => new _ControllerStream<T>(this);
472
473 /**
474 * Returns a view of this object that only exposes the [StreamSink] interface.
475 */
476 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
477
478 /**
479 * Whether a listener has existed and been canceled.
480 *
481 * After this, adding more events will be ignored.
482 */
483 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
484
485 /** Whether there is an active listener. */
486 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;
487
488 /** Whether there has not been a listener yet. */
489 bool get _isInitialState =>
490 (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;
491
492 bool get isClosed => (_state & _STATE_CLOSED) != 0;
493
494 bool get isPaused =>
495 hasListener ? _subscription._isInputPaused : !_isCanceled;
496
497 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
498
499 /** New events may not be added after close, or during addStream. */
500 bool get _mayAddEvent => (_state < _STATE_CLOSED);
501
502 // Returns the pending events.
503 // Pending events are events added before a subscription exists.
504 // They are added to the subscription when it is created.
505 // Pending events, if any, are kept in the _varData field until the
506 // stream is listened to.
507 // While adding a stream, pending events are moved into the
508 // state object to allow the state object to use the _varData field.
509 _PendingEvents<T> get _pendingEvents {
510 assert(_isInitialState);
511 if (!_isAddingStream) {
512 return _varData;
513 }
514 _StreamControllerAddStreamState<T> state = _varData;
515 return state.varData;
516 }
517
518 // Returns the pending events, and creates the object if necessary.
519 _StreamImplEvents<T> _ensurePendingEvents() {
520 assert(_isInitialState);
521 if (!_isAddingStream) {
522 _varData ??= new _StreamImplEvents<T>();
523 return _varData;
524 }
525 _StreamControllerAddStreamState<T> state = _varData;
526 if (state.varData == null) state.varData = new _StreamImplEvents<T>();
527 return state.varData;
528 }
529
530 // Get the current subscription.
531 // If we are adding a stream, the subscription is moved into the state
532 // object to allow the state object to use the _varData field.
533 _ControllerSubscription<T> get _subscription {
534 assert(hasListener);
535 if (_isAddingStream) {
536 _StreamControllerAddStreamState<T> addState = _varData;
537 return addState.varData;
538 }
539 return _varData;
540 }
541
542 /**
543 * Creates an error describing why an event cannot be added.
544 *
545 * The reason, and therefore the error message, depends on the current state.
546 */
547 Error _badEventState() {
548 if (isClosed) {
549 return new StateError("Cannot add event after closing");
550 }
551 assert(_isAddingStream);
552 return new StateError("Cannot add event while adding a stream");
553 }
554
555 // StreamSink interface.
556 Future addStream(Stream<T> source, {bool cancelOnError}) {
557 if (!_mayAddEvent) throw _badEventState();
558 if (_isCanceled) return new _Future.immediate(null);
559 _StreamControllerAddStreamState<T> addState =
560 new _StreamControllerAddStreamState<T>(
561 this, _varData, source, cancelOnError ?? false);
562 _varData = addState;
563 _state |= _STATE_ADDSTREAM;
564 return addState.addStreamFuture;
565 }
566
567 /**
568 * Returns a future that is completed when the stream is done
569 * processing events.
570 *
571 * This happens either when the done event has been sent, or if the
572 * subscriber of a single-subscription stream is cancelled.
573 */
574 Future get done => _ensureDoneFuture();
575
576 Future _ensureDoneFuture() {
577 _doneFuture ??= _isCanceled ? Future._nullFuture : new _Future();
578 return _doneFuture;
579 }
580
581 /**
582 * Send or enqueue a data event.
583 */
584 void add(T value) {
585 if (!_mayAddEvent) throw _badEventState();
586 _add(value);
587 }
588
589 /**
590 * Send or enqueue an error event.
591 */
592 void addError(Object error, [StackTrace stackTrace]) {
593 if (!_mayAddEvent) throw _badEventState();
594 error = _nonNullError(error);
595 AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
596 if (replacement != null) {
597 error = _nonNullError(replacement.error);
598 stackTrace = replacement.stackTrace;
599 }
600 _addError(error, stackTrace);
601 }
602
603 /**
604 * Closes this controller and sends a done event on the stream.
605 *
606 * The first time a controller is closed, a "done" event is added to its
607 * stream.
608 *
609 * You are allowed to close the controller more than once, but only the first
610 * call has any effect.
611 *
612 * After closing, no further events may be added using [add], [addError]
613 * or [addStream].
614 *
615 * The returned future is completed when the done event has been delivered.
616 */
617 Future close() {
618 if (isClosed) {
619 return _ensureDoneFuture();
620 }
621 if (!_mayAddEvent) throw _badEventState();
622 _closeUnchecked();
623 return _ensureDoneFuture();
624 }
625
626 void _closeUnchecked() {
627 _state |= _STATE_CLOSED;
628 if (hasListener) {
629 _sendDone();
630 } else if (_isInitialState) {
631 _ensurePendingEvents().add(const _DelayedDone());
632 }
633 }
634
635 // EventSink interface. Used by the [addStream] events.
636
637 // Add data event, used both by the [addStream] events and by [add].
638 void _add(T value) {
639 if (hasListener) {
640 _sendData(value);
641 } else if (_isInitialState) {
642 _ensurePendingEvents().add(new _DelayedData<T>(value));
643 }
644 }
645
646 void _addError(Object error, StackTrace stackTrace) {
647 if (hasListener) {
648 _sendError(error, stackTrace);
649 } else if (_isInitialState) {
650 _ensurePendingEvents().add(new _DelayedError(error, stackTrace));
651 }
652 }
653
654 void _close() {
655 // End of addStream stream.
656 assert(_isAddingStream);
657 _StreamControllerAddStreamState<T> addState = _varData;
658 _varData = addState.varData;
659 _state &= ~_STATE_ADDSTREAM;
660 addState.complete();
661 }
662
663 // _StreamControllerLifeCycle interface
664
665 StreamSubscription<T> _subscribe(void onData(T data), Function onError,
666 void onDone(), bool cancelOnError) {
667 if (!_isInitialState) {
668 throw new StateError("Stream has already been listened to.");
669 }
670 _ControllerSubscription<T> subscription = new _ControllerSubscription<T>(
671 this, onData, onError, onDone, cancelOnError);
672
673 _PendingEvents<T> pendingEvents = _pendingEvents;
674 _state |= _STATE_SUBSCRIBED;
675 if (_isAddingStream) {
676 _StreamControllerAddStreamState<T> addState = _varData;
677 addState.varData = subscription;
678 addState.resume();
679 } else {
680 _varData = subscription;
681 }
682 subscription._setPendingEvents(pendingEvents);
683 subscription._guardCallback(() {
684 _runGuarded(onListen);
685 });
686
687 return subscription;
688 }
689
690 Future _recordCancel(StreamSubscription<T> subscription) {
691 // When we cancel, we first cancel any stream being added,
692 // Then we call `onCancel`, and finally the _doneFuture is completed.
693 // If either of addStream's cancel or `onCancel` returns a future,
694 // we wait for it before continuing.
695 // Any error during this process ends up in the returned future.
696 // If more errors happen, we act as if it happens inside nested try/finallys
697 // or whenComplete calls, and only the last error ends up in the
698 // returned future.
699 Future result;
700 if (_isAddingStream) {
701 _StreamControllerAddStreamState<T> addState = _varData;
702 result = addState.cancel();
703 }
704 _varData = null;
705 _state =
706 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
707
708 if (onCancel != null) {
709 if (result == null) {
710 // Only introduce a future if one is needed.
711 // If _onCancel returns null, no future is needed.
712 try {
713 result = onCancel();
714 } catch (e, s) {
715 // Return the error in the returned future.
716 // Complete it asynchronously, so there is time for a listener
717 // to handle the error.
718 result = new _Future().._asyncCompleteError(e, s);
719 }
720 } else {
721 // Simpler case when we already know that we will return a future.
722 result = result.whenComplete(onCancel);
723 }
724 }
725
726 void complete() {
727 if (_doneFuture != null && _doneFuture._mayComplete) {
728 _doneFuture._asyncComplete(null);
729 }
730 }
731
732 if (result != null) {
733 result = result.whenComplete(complete);
734 } else {
735 complete();
736 }
737
738 return result;
739 }
740
741 void _recordPause(StreamSubscription<T> subscription) {
742 if (_isAddingStream) {
743 _StreamControllerAddStreamState<T> addState = _varData;
744 addState.pause();
745 }
746 _runGuarded(onPause);
747 }
748
749 void _recordResume(StreamSubscription<T> subscription) {
750 if (_isAddingStream) {
751 _StreamControllerAddStreamState<T> addState = _varData;
752 addState.resume();
753 }
754 _runGuarded(onResume);
755 }
756 }
757
758 abstract class _SyncStreamControllerDispatch<T>
759 implements _StreamController<T>, SynchronousStreamController<T> {
760 int get _state;
761 void set _state(int state);
762
763 void _sendData(T data) {
764 _subscription._add(data);
765 }
766
767 void _sendError(Object error, StackTrace stackTrace) {
768 _subscription._addError(error, stackTrace);
769 }
770
771 void _sendDone() {
772 _subscription._close();
773 }
774 }
775
776 abstract class _AsyncStreamControllerDispatch<T>
777 implements _StreamController<T> {
778 void _sendData(T data) {
779 _subscription._addPending(new _DelayedData<T>(data));
780 }
781
782 void _sendError(Object error, StackTrace stackTrace) {
783 _subscription._addPending(new _DelayedError(error, stackTrace));
784 }
785
786 void _sendDone() {
787 _subscription._addPending(const _DelayedDone());
788 }
789 }
790
791 // TODO(lrn): Use common superclass for callback-controllers when VM supports
792 // constructors in mixin superclasses.
793
794 class _AsyncStreamController<T> = _StreamController<T>
795 with _AsyncStreamControllerDispatch<T>;
796
797 class _SyncStreamController<T> = _StreamController<T>
798 with _SyncStreamControllerDispatch<T>;
799
800 typedef _NotificationHandler();
801
802 void _runGuarded(_NotificationHandler notificationHandler) {
803 if (notificationHandler == null) return;
804 try {
805 notificationHandler();
806 } catch (e, s) {
807 Zone.current.handleUncaughtError(e, s);
808 }
809 }
810
811 class _ControllerStream<T> extends _StreamImpl<T> {
812 _StreamControllerLifecycle<T> _controller;
813
814 _ControllerStream(this._controller);
815
816 StreamSubscription<T> _createSubscription(void onData(T data),
817 Function onError, void onDone(), bool cancelOnError) =>
818 _controller._subscribe(onData, onError, onDone, cancelOnError);
819
820 // Override == and hashCode so that new streams returned by the same
821 // controller are considered equal. The controller returns a new stream
822 // each time it's queried, but doesn't have to cache the result.
823
824 int get hashCode => _controller.hashCode ^ 0x35323532;
825
826 bool operator ==(Object other) {
827 if (identical(this, other)) return true;
828 return other is _ControllerStream &&
829 identical(other._controller, this._controller);
830 }
831 }
832
833 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
834 final _StreamControllerLifecycle<T> _controller;
835
836 _ControllerSubscription(this._controller, void onData(T data),
837 Function onError, void onDone(), bool cancelOnError)
838 : super(onData, onError, onDone, cancelOnError);
839
840 Future _onCancel() {
841 return _controller._recordCancel(this);
842 }
843
844 void _onPause() {
845 _controller._recordPause(this);
846 }
847
848 void _onResume() {
849 _controller._recordResume(this);
850 }
851 }
852
853 /** A class that exposes only the [StreamSink] interface of an object. */
854 class _StreamSinkWrapper<T> implements StreamSink<T> {
855 final StreamController _target;
856 _StreamSinkWrapper(this._target);
857 void add(T data) {
858 _target.add(data);
859 }
860
861 void addError(Object error, [StackTrace stackTrace]) {
862 _target.addError(error, stackTrace);
863 }
864
865 Future close() => _target.close();
866
867 Future addStream(Stream<T> source) => _target.addStream(source);
868
869 Future get done => _target.done;
870 }
871
872 /**
873 * Object containing the state used to handle [StreamController.addStream].
874 */
875 class _AddStreamState<T> {
876 // [_Future] returned by call to addStream.
877 final _Future addStreamFuture;
878
879 // Subscription on stream argument to addStream.
880 final StreamSubscription addSubscription;
881
882 _AddStreamState(
883 _EventSink<T> controller, Stream<T> source, bool cancelOnError)
884 : addStreamFuture = new _Future(),
885 addSubscription = source.listen(controller._add,
886 onError: cancelOnError
887 ? makeErrorHandler(controller)
888 : controller._addError,
889 onDone: controller._close,
890 cancelOnError: cancelOnError);
891
892 static makeErrorHandler(_EventSink controller) => (e, StackTrace s) {
893 controller._addError(e, s);
894 controller._close();
895 };
896
897 void pause() {
898 addSubscription.pause();
899 }
900
901 void resume() {
902 addSubscription.resume();
903 }
904
905 /**
906 * Stop adding the stream.
907 *
908 * Complete the future returned by `StreamController.addStream` when
909 * the cancel is complete.
910 *
911 * Return a future if the cancel takes time, otherwise return `null`.
912 */
913 Future cancel() {
914 var cancel = addSubscription.cancel();
915 if (cancel == null) {
916 addStreamFuture._asyncComplete(null);
917 return null;
918 }
919 return cancel.whenComplete(() {
920 addStreamFuture._asyncComplete(null);
921 });
922 }
923
924 void complete() {
925 addStreamFuture._asyncComplete(null);
926 }
927 }
928
929 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {
930 // The subscription or pending data of a _StreamController.
931 // Stored here because we reuse the `_varData` field in the _StreamController
932 // to store this state object.
933 var varData;
934
935 _StreamControllerAddStreamState(_StreamController<T> controller, this.varData,
936 Stream<T> source, bool cancelOnError)
937 : super(controller, source, cancelOnError) {
938 if (controller.isPaused) {
939 addSubscription.pause();
940 }
941 }
942 }
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of dart.async;
6
7 // -------------------------------------------------------------------
8 // Core Stream types
9 // -------------------------------------------------------------------
10
11 typedef void _TimerCallback();
12
13 /**
14 * A source of asynchronous data events.
15 *
16 * A Stream provides a way to receive a sequence of events.
17 * Each event is either a data event, also called an *element* of the stream,
18 * or an error event, which is a notification that something has failed.
19 * When a stream has emitted all its event,
20 * a single "done" event will notify the listener that the end has been reached.
21 *
22 * You [listen] on a stream to make it start generating events,
23 * and to set up listeners that receive the events.
24 * When you listen, you receive a [StreamSubscription] object
25 * which is the active object providing the events,
26 * and which can be used to stop listening again,
27 * or to temporarily pause events from the subscription.
28 *
29 * There are two kinds of streams: "Single-subscription" streams and
30 * "broadcast" streams.
31 *
32 * *A single-subscription stream* allows only a single listener during the whole
33 * lifetime of the stream.
34 * It doesn't start generating events until it has a listener,
35 * and it stops sending events when the listener is unsubscribed,
36 * even if the source of events could still provide more.
37 *
38 * Listening twice on a single-subscription stream is not allowed, even after
39 * the first subscription has been canceled.
40 *
41 * Single-subscription streams are generally used for streaming chunks of
42 * larger contiguous data like file I/O.
43 *
44 * *A broadcast stream* allows any number of listeners, and it fires
45 * its events when they are ready, whether there are listeners or not.
46 *
47 * Broadcast streams are used for independent events/observers.
48 *
49 * If several listeners want to listen to a single subscription stream,
50 * use [asBroadcastStream] to create a broadcast stream on top of the
51 * non-broadcast stream.
52 *
53 * On either kind of stream, stream transformations, such as [where] and
54 * [skip], return the same type of stream as the one the method was called on,
55 * unless otherwise noted.
56 *
57 * When an event is fired, the listener(s) at that time will receive the event.
58 * If a listener is added to a broadcast stream while an event is being fired,
59 * that listener will not receive the event currently being fired.
60 * If a listener is canceled, it immediately stops receiving events.
61 * Listening on a broadcast stream can be treated as listening on a new stream
62 * containing only the events that have not yet been emitted when the [listen]
63 * call occurs.
64 * For example, the [first] getter listens to the stream, then returns the first
65 * event that listener receives.
66 * This is not necessarily the first even emitted by the stream, but the first
67 * of the *remaining* events of the broadcast stream.
68 *
69 * When the "done" event is fired, subscribers are unsubscribed before
70 * receiving the event. After the event has been sent, the stream has no
71 * subscribers. Adding new subscribers to a broadcast stream after this point
72 * is allowed, but they will just receive a new "done" event as soon
73 * as possible.
74 *
75 * Stream subscriptions always respect "pause" requests. If necessary they need
76 * to buffer their input, but often, and preferably, they can simply request
77 * their input to pause too.
78 *
79 * The default implementation of [isBroadcast] returns false.
80 * A broadcast stream inheriting from [Stream] must override [isBroadcast]
81 * to return `true`.
82 */
83 abstract class Stream<T> {
84 Stream();
85
86 /**
87 * Internal use only. We do not want to promise that Stream stays const.
88 *
89 * If mixins become compatible with const constructors, we may use a
90 * stream mixin instead of extending Stream from a const class.
91 */
92 const Stream._internal();
93
94 /**
95 * Creates an empty broadcast stream.
96 *
97 * This is a stream which does nothing except sending a done event
98 * when it's listened to.
99 */
100 const factory Stream.empty() = _EmptyStream<T>;
101
102 /**
103 * Creates a new single-subscription stream from the future.
104 *
105 * When the future completes, the stream will fire one event, either
106 * data or error, and then close with a done-event.
107 */
108 factory Stream.fromFuture(Future<T> future) {
109 // Use the controller's buffering to fill in the value even before
110 // the stream has a listener. For a single value, it's not worth it
111 // to wait for a listener before doing the `then` on the future.
112 _StreamController<T> controller = new StreamController<T>(sync: true);
113 future.then((value) {
114 controller._add(value);
115 controller._closeUnchecked();
116 }, onError: (error, stackTrace) {
117 controller._addError(error, stackTrace);
118 controller._closeUnchecked();
119 });
120 return controller.stream;
121 }
122
123 /**
124 * Create a stream from a group of futures.
125 *
126 * The stream reports the results of the futures on the stream in the order
127 * in which the futures complete.
128 * Each future provides either a data event or an error event,
129 * depending on how the future completes.
130 *
131 * If some futures have already completed when `Stream.fromFutures` is called,
132 * their results will be emitted in some unspecified order.
133 *
134 * When all futures have completed, the stream is closed.
135 *
136 * If [futures] is empty, the stream closes as soon as possible.
137 */
138 factory Stream.fromFutures(Iterable<Future<T>> futures) {
139 _StreamController<T> controller = new StreamController<T>(sync: true);
140 int count = 0;
141 // Declare these as variables holding closures instead of as
142 // function declarations.
143 // This avoids creating a new closure from the functions for each future.
144 var onValue = (T value) {
145 if (!controller.isClosed) {
146 controller._add(value);
147 if (--count == 0) controller._closeUnchecked();
148 }
149 };
150 var onError = (error, StackTrace stack) {
151 if (!controller.isClosed) {
152 controller._addError(error, stack);
153 if (--count == 0) controller._closeUnchecked();
154 }
155 };
156 // The futures are already running, so start listening to them immediately
157 // (instead of waiting for the stream to be listened on).
158 // If we wait, we might not catch errors in the futures in time.
159 for (var future in futures) {
160 count++;
161 future.then(onValue, onError: onError);
162 }
163 // Use schedule microtask since controller is sync.
164 if (count == 0) scheduleMicrotask(controller.close);
165 return controller.stream;
166 }
167
168 /**
169 * Creates a single-subscription stream that gets its data from [elements].
170 *
171 * The iterable is iterated when the stream receives a listener, and stops
172 * iterating if the listener cancels the subscription, or if the
173 * [Iterator.moveNext] method returns `false` or throws.
174 * Iteration is suspended while the stream subscription is paused.
175 *
176 * If calling [Iterator.moveNext] on `elements.iterator` throws,
177 * the stream emits that error and then it closes.
178 * If reading [Iterator.current] on `elements.iterator` throws,
179 * the stream emits that error, but keeps iterating.
180 */
181 factory Stream.fromIterable(Iterable<T> elements) {
182 return new _GeneratedStreamImpl<T>(
183 () => new _IterablePendingEvents<T>(elements));
184 }
185
186 /**
187 * Creates a stream that repeatedly emits events at [period] intervals.
188 *
189 * The event values are computed by invoking [computation]. The argument to
190 * this callback is an integer that starts with 0 and is incremented for
191 * every event.
192 *
193 * If [computation] is omitted the event values will all be `null`.
194 */
195 factory Stream.periodic(Duration period,
196 [T computation(int computationCount)]) {
197 Timer timer;
198 int computationCount = 0;
199 StreamController<T> controller;
200 // Counts the time that the Stream was running (and not paused).
201 Stopwatch watch = new Stopwatch();
202
203 void sendEvent() {
204 watch.reset();
205 T data;
206 if (computation != null) {
207 try {
208 data = computation(computationCount++);
209 } catch (e, s) {
210 controller.addError(e, s);
211 return;
212 }
213 }
214 controller.add(data);
215 }
216
217 void startPeriodicTimer() {
218 assert(timer == null);
219 timer = new Timer.periodic(period, (Timer timer) {
220 sendEvent();
221 });
222 }
223
224 controller = new StreamController<T>(
225 sync: true,
226 onListen: () {
227 watch.start();
228 startPeriodicTimer();
229 },
230 onPause: () {
231 timer.cancel();
232 timer = null;
233 watch.stop();
234 },
235 onResume: () {
236 assert(timer == null);
237 Duration elapsed = watch.elapsed;
238 watch.start();
239 timer = new Timer(period - elapsed, () {
240 timer = null;
241 startPeriodicTimer();
242 sendEvent();
243 });
244 },
245 onCancel: () {
246 if (timer != null) timer.cancel();
247 timer = null;
248 return Future._nullFuture;
249 });
250 return controller.stream;
251 }
252
253 /**
254 * Creates a stream where all events of an existing stream are piped through
255 * a sink-transformation.
256 *
257 * The given [mapSink] closure is invoked when the returned stream is
258 * listened to. All events from the [source] are added into the event sink
259 * that is returned from the invocation. The transformation puts all
260 * transformed events into the sink the [mapSink] closure received during
261 * its invocation. Conceptually the [mapSink] creates a transformation pipe
262 * with the input sink being the returned [EventSink] and the output sink
263 * being the sink it received.
264 *
265 * This constructor is frequently used to build transformers.
266 *
267 * Example use for a duplicating transformer:
268 *
269 * class DuplicationSink implements EventSink<String> {
270 * final EventSink<String> _outputSink;
271 * DuplicationSink(this._outputSink);
272 *
273 * void add(String data) {
274 * _outputSink.add(data);
275 * _outputSink.add(data);
276 * }
277 *
278 * void addError(e, [st]) { _outputSink.addError(e, st); }
279 * void close() { _outputSink.close(); }
280 * }
281 *
282 * class DuplicationTransformer extends StreamTransformerBase<String, String> {
283 * // Some generic types omitted for brevity.
284 * Stream bind(Stream stream) => new Stream<String>.eventTransformed(
285 * stream,
286 * (EventSink sink) => new DuplicationSink(sink));
287 * }
288 *
289 * stringStream.transform(new DuplicationTransformer());
290 *
291 * The resulting stream is a broadcast stream if [source] is.
292 */
293 factory Stream.eventTransformed(
294 Stream source, EventSink mapSink(EventSink<T> sink)) {
295 return new _BoundSinkStream(source, mapSink);
296 }
297
298 /**
299 * Adapts [source] to be a `Stream<T>`.
300 *
301 * This allows [source] to be used at the new type, but at run-time it
302 * must satisfy the requirements of both the new type and its original type.
303 *
304 * Data events created by the source stream must also be instances of [T].
305 */
306 static Stream<T> castFrom<S, T>(Stream<S> source) =>
307 new CastStream<S, T>(source);
308
309 /**
310 * Whether this stream is a broadcast stream.
311 */
312 bool get isBroadcast => false;
313
314 /**
315 * Returns a multi-subscription stream that produces the same events as this.
316 *
317 * The returned stream will subscribe to this stream when its first
318 * subscriber is added, and will stay subscribed until this stream ends,
319 * or a callback cancels the subscription.
320 *
321 * If [onListen] is provided, it is called with a subscription-like object
322 * that represents the underlying subscription to this stream. It is
323 * possible to pause, resume or cancel the subscription during the call
324 * to [onListen]. It is not possible to change the event handlers, including
325 * using [StreamSubscription.asFuture].
326 *
327 * If [onCancel] is provided, it is called in a similar way to [onListen]
328 * when the returned stream stops having listener. If it later gets
329 * a new listener, the [onListen] function is called again.
330 *
331 * Use the callbacks, for example, for pausing the underlying subscription
332 * while having no subscribers to prevent losing events, or canceling the
333 * subscription when there are no listeners.
334 */
335 Stream<T> asBroadcastStream(
336 {void onListen(StreamSubscription<T> subscription),
337 void onCancel(StreamSubscription<T> subscription)}) {
338 return new _AsBroadcastStream<T>(this, onListen, onCancel);
339 }
340
341 /**
342 * Adds a subscription to this stream.
343 *
344 * Returns a [StreamSubscription] which handles events from this stream using
345 * the provided [onData], [onError] and [onDone] handlers.
346 * The handlers can be changed on the subscription, but they start out
347 * as the provided functions.
348 *
349 * On each data event from this stream, the subscriber's [onData] handler
350 * is called. If [onData] is `null`, nothing happens.
351 *
352 * On errors from this stream, the [onError] handler is called with the
353 * error object and possibly a stack trace.
354 *
355 * The [onError] callback must be of type `void onError(error)` or
356 * `void onError(error, StackTrace stackTrace)`. If [onError] accepts
357 * two arguments it is called with the error object and the stack trace
358 * (which could be `null` if this stream itself received an error without
359 * stack trace).
360 * Otherwise it is called with just the error object.
361 * If [onError] is omitted, any errors on this stream are considered unhandled,
362 * and will be passed to the current [Zone]'s error handler.
363 * By default unhandled async errors are treated
364 * as if they were uncaught top-level errors.
365 *
366 * If this stream closes and sends a done event, the [onDone] handler is
367 * called. If [onDone] is `null`, nothing happens.
368 *
369 * If [cancelOnError] is true, the subscription is automatically canceled
370 * when the first error event is delivered. The default is `false`.
371 *
372 * While a subscription is paused, or when it has been canceled,
373 * the subscription doesn't receive events and none of the
374 * event handler functions are called.
375 */
376 StreamSubscription<T> listen(void onData(T event),
377 {Function onError, void onDone(), bool cancelOnError});
378
379 /**
380 * Creates a new stream from this stream that discards some elements.
381 *
382 * The new stream sends the same error and done events as this stream,
383 * but it only sends the data events that satisfy the [test].
384 *
385 * If the [test] function throws, the data event is dropped and the
386 * error is emitted on the returned stream instead.
387 *
388 * The returned stream is a broadcast stream if this stream is.
389 * If a broadcast stream is listened to more than once, each subscription
390 * will individually perform the `test`.
391 */
392 Stream<T> where(bool test(T event)) {
393 return new _WhereStream<T>(this, test);
394 }
395
396 /**
397 * Transforms each element of this stream into a new stream event.
398 *
399 * Creates a new stream that converts each element of this stream
400 * to a new value using the [convert] function, and emits the result.
401 *
402 * For each data event, `o`, in this stream, the returned stream
403 * provides a data event with the value `convert(o)`.
404 * If [convert] throws, the returned stream reports it as an error
405 * event instead.
406 *
407 * Error and done events are passed through unchanged to the returned stream.
408 *
409 * The returned stream is a broadcast stream if this stream is.
410 * The [convert] function is called once per data event per listener.
411 * If a broadcast stream is listened to more than once, each subscription
412 * will individually call [convert] on each data event.
413 *
414 * Unlike [transform], this method does not treat the stream as
415 * chunks of a single value. Instead each event is converted independently
416 * of the previous and following events, which may not always be correct.
417 * For example, UTF-8 encoding, or decoding, will give wrong results
418 * if a surrogate pair, or a multibyte UTF-8 encoding, is split into
419 * separate events, and those events are attempted encoded or decoded
420 * independently.
421 */
422 Stream<S> map<S>(S convert(T event)) {
423 return new _MapStream<T, S>(this, convert);
424 }
425
426 /**
427 * Creates a new stream with each data event of this stream asynchronously
428 * mapped to a new event.
429 *
430 * This acts like [map], except that [convert] may return a [Future],
431 * and in that case, this stream waits for that future to complete before
432 * continuing with its result.
433 *
434 * The returned stream is a broadcast stream if this stream is.
435 */
436 Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
437 _StreamControllerBase<E> controller;
438 StreamSubscription<T> subscription;
439
440 void onListen() {
441 final add = controller.add;
442 assert(controller is _StreamController<E> ||
443 controller is _BroadcastStreamController);
444 final addError = controller._addError;
445 subscription = this.listen((T event) {
446 FutureOr<E> newValue;
447 try {
448 newValue = convert(event);
449 } catch (e, s) {
450 controller.addError(e, s);
451 return;
452 }
453 if (newValue is Future<E>) {
454 subscription.pause();
455 newValue
456 .then(add, onError: addError)
457 .whenComplete(subscription.resume);
458 } else {
459 controller.add(newValue);
460 }
461 }, onError: addError, onDone: controller.close);
462 }
463
464 if (this.isBroadcast) {
465 controller = new StreamController<E>.broadcast(
466 onListen: onListen,
467 onCancel: () {
468 subscription.cancel();
469 },
470 sync: true);
471 } else {
472 controller = new StreamController<E>(
473 onListen: onListen,
474 onPause: () {
475 subscription.pause();
476 },
477 onResume: () {
478 subscription.resume();
479 },
480 onCancel: () => subscription.cancel(),
481 sync: true);
482 }
483 return controller.stream;
484 }
485
486 /**
487 * Transforms each element into a sequence of asynchronous events.
488 *
489 * Returns a new stream and for each event of this stream, do the following:
490 *
491 * * If the event is an error event or a done event, it is emitted directly
492 * by the returned stream.
493 * * Otherwise it is an element. Then the [convert] function is called
494 * with the element as argument to produce a convert-stream for the element.
495 * * If that call throws, the error is emitted on the returned stream.
496 * * If the call returns `null`, no further action is taken for the elements.
497 * * Otherwise, this stream is paused and convert-stream is listened to.
498 * Every data and error event of the convert-stream is emitted on the returned
499 * stream in the order it is produced.
500 * When the convert-stream ends, this stream is resumed.
501 *
502 * The returned stream is a broadcast stream if this stream is.
503 */
504 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) {
505 _StreamControllerBase<E> controller;
506 StreamSubscription<T> subscription;
507 void onListen() {
508 assert(controller is _StreamController ||
509 controller is _BroadcastStreamController);
510 subscription = this.listen((T event) {
511 Stream<E> newStream;
512 try {
513 newStream = convert(event);
514 } catch (e, s) {
515 controller.addError(e, s);
516 return;
517 }
518 if (newStream != null) {
519 subscription.pause();
520 controller.addStream(newStream).whenComplete(subscription.resume);
521 }
522 },
523 onError: controller._addError, // Avoid Zone error replacement.
524 onDone: controller.close);
525 }
526
527 if (this.isBroadcast) {
528 controller = new StreamController<E>.broadcast(
529 onListen: onListen,
530 onCancel: () {
531 subscription.cancel();
532 },
533 sync: true);
534 } else {
535 controller = new StreamController<E>(
536 onListen: onListen,
537 onPause: () {
538 subscription.pause();
539 },
540 onResume: () {
541 subscription.resume();
542 },
543 onCancel: () => subscription.cancel(),
544 sync: true);
545 }
546 return controller.stream;
547 }
548
549 /**
550 * Creates a wrapper Stream that intercepts some errors from this stream.
551 *
552 * If this stream sends an error that matches [test], then it is intercepted
553 * by the [onError] function.
554 *
555 * The [onError] callback must be of type `void onError(error)` or
556 * `void onError(error, StackTrace stackTrace)`.
557 * The function type determines whether [onError] is invoked with a stack
558 * trace argument.
559 * The stack trace argument may be `null` if this stream received an error
560 * without a stack trace.
561 *
562 * An asynchronous error `error` is matched by a test function if
563 *`test(error)` returns true. If [test] is omitted, every error is considered
564 * matching.
565 *
566 * If the error is intercepted, the [onError] function can decide what to do
567 * with it. It can throw if it wants to raise a new (or the same) error,
568 * or simply return to make this stream forget the error.
569 * If the received `error` value is thrown again by the [onError] function,
570 * it acts like a `rethrow` and it is emitted along with its original
571 * stack trace, not the stack trace of the `throw` inside [onError].
572 *
573 * If you need to transform an error into a data event, use the more generic
574 * [Stream.transform] to handle the event by writing a data event to
575 * the output sink.
576 *
577 * The returned stream is a broadcast stream if this stream is.
578 * If a broadcast stream is listened to more than once, each subscription
579 * will individually perform the `test` and handle the error.
580 */
581 Stream<T> handleError(Function onError, {bool test(error)}) {
582 return new _HandleErrorStream<T>(this, onError, test);
583 }
584
585 /**
586 * Transforms each element of this stream into a sequence of elements.
587 *
588 * Returns a new stream where each element of this stream is replaced
589 * by zero or more data events.
590 * The event values are provided as an [Iterable] by a call to [convert]
591 * with the element as argument, and the elements of that iterable is
592 * emitted in iteration order.
593 * If calling [convert] throws, or if the iteration of the returned values
594 * throws, the error is emitted on the returned stream and iteration ends
595 * for that element of this stream.
596 *
597 * Error events and the done event of this stream are forwarded directly
598 * to the returned stream.
599 *
600 * The returned stream is a broadcast stream if this stream is.
601 * If a broadcast stream is listened to more than once, each subscription
602 * will individually call `convert` and expand the events.
603 */
604 Stream<S> expand<S>(Iterable<S> convert(T element)) {
605 return new _ExpandStream<T, S>(this, convert);
606 }
607
608 /**
609 * Pipes the events of this stream into [streamConsumer].
610 *
611 * All events of this stream are added to `streamConsumer` using
612 * [StreamConsumer.addStream].
613 * The `streamConsumer` is closed when this stream has been successfully added
614 * to it - when the future returned by `addStream` completes without an error.
615 *
616 * Returns a future which completes when this stream has been consumed
617 * and the consumer has been closed.
618 *
619 * The returned future completes with the same result as the future returned
620 * by [StreamConsumer.close].
621 * If the call to [StreamConsumer.addStream] fails in some way, this
622 * method fails in the same way.
623 */
624 Future pipe(StreamConsumer<T> streamConsumer) {
625 return streamConsumer.addStream(this).then((_) => streamConsumer.close());
626 }
627
628 /**
629 * Applies [streamTransformer] to this stream.
630 *
631 * Returns the transformed stream,
632 * that is, the result of `streamTransformer.bind(this)`.
633 * This method simply allows writing the call to `streamTransformer.bind`
634 * in a chained fashion, like
635 * ```
636 * stream.map(mapping).transform(transformation).toList()
637 * ```
638 * which can be more convenient than calling `bind` directly.
639 *
640 * The [streamTransformer] can return any stream.
641 * Whether the returned stream is a broadcast stream or not,
642 * and which elements it will contain,
643 * is entirely up to the transformation.
644 *
645 * This method should always be used for transformations which treat
646 * the entire stream as representing a single value
647 * which has perhaps been split into several parts for transport,
648 * like a file being read from disk or being fetched over a network.
649 * The transformation will then produce a new stream which
650 * transforms the stream's value incrementally (perhaps using
651 * [Converter.startChunkedConversion]). The resulting stream
652 * may again be chunks of the result, but does not have to
653 * correspond to specific events from the source string.
654 */
655 Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
656 return streamTransformer.bind(this);
657 }
658
659 /**
660 * Combines a sequence of values by repeatedly applying [combine].
661 *
662 * Similar to [Iterable.reduce], this function maintains a value,
663 * starting with the first element of this stream
664 * and updated for each further element of this stream.
665 * For each element after the first,
666 * the value is updated to the result of calling [combine]
667 * with the previous value and the element.
668 *
669 * When this stream is done, the returned future is completed with
670 * the value at that time.
671 *
672 * If this stream is empty, the returned future is completed with
673 * an error.
674 * If this stream emits an error, or the call to [combine] throws,
675 * the returned future is completed with that error,
676 * and processing is stopped.
677 */
678 Future<T> reduce(T combine(T previous, T element)) {
679 _Future<T> result = new _Future<T>();
680 bool seenFirst = false;
681 T value;
682 StreamSubscription subscription;
683 subscription = this.listen(
684 (T element) {
685 if (seenFirst) {
686 _runUserCode(() => combine(value, element), (T newValue) {
687 value = newValue;
688 }, _cancelAndErrorClosure(subscription, result));
689 } else {
690 value = element;
691 seenFirst = true;
692 }
693 },
694 onError: result._completeError,
695 onDone: () {
696 if (!seenFirst) {
697 try {
698 // Throw and recatch, instead of just doing
699 // _completeWithErrorCallback, e, theError, StackTrace.current),
700 // to ensure that the stackTrace is set on the error.
701 throw IterableElementError.noElement();
702 } catch (e, s) {
703 _completeWithErrorCallback(result, e, s);
704 }
705 } else {
706 result._complete(value);
707 }
708 },
709 cancelOnError: true);
710 return result;
711 }
712
713 /**
714 * Combines a sequence of values by repeatedly applying [combine].
715 *
716 * Similar to [Iterable.fold], this function maintains a value,
717 * starting with [initialValue] and updated for each element of
718 * this stream.
719 * For each element, the value is updated to the result of calling
720 * [combine] with the previous value and the element.
721 *
722 * When this stream is done, the returned future is completed with
723 * the value at that time.
724 * For an empty stream, the future is completed with [initialValue].
725 *
726 * If this stream emits an error, or the call to [combine] throws,
727 * the returned future is completed with that error,
728 * and processing is stopped.
729 */
730 Future<S> fold<S>(S initialValue, S combine(S previous, T element)) {
731 _Future<S> result = new _Future<S>();
732 S value = initialValue;
733 StreamSubscription subscription;
734 subscription = this.listen(
735 (T element) {
736 _runUserCode(() => combine(value, element), (S newValue) {
737 value = newValue;
738 }, _cancelAndErrorClosure(subscription, result));
739 },
740 onError: result._completeError,
741 onDone: () {
742 result._complete(value);
743 },
744 cancelOnError: true);
745 return result;
746 }
747
748 /**
749 * Combines the string representation of elements into a single string.
750 *
751 * Each element is converted to a string using its [Object.toString] method.
752 * If [separator] is provided, it is inserted between element string
753 * representations.
754 *
755 * The returned future is completed with the combined string when this stream
756 * is done.
757 *
758 * If this stream emits an error, or the call to [Object.toString] throws,
759 * the returned future is completed with that error,
760 * and processing stops.
761 */
762 Future<String> join([String separator = ""]) {
763 _Future<String> result = new _Future<String>();
764 StringBuffer buffer = new StringBuffer();
765 StreamSubscription subscription;
766 bool first = true;
767 subscription = this.listen(
768 (T element) {
769 if (!first) {
770 buffer.write(separator);
771 }
772 first = false;
773 try {
774 buffer.write(element);
775 } catch (e, s) {
776 _cancelAndErrorWithReplacement(subscription, result, e, s);
777 }
778 },
779 onError: result._completeError,
780 onDone: () {
781 result._complete(buffer.toString());
782 },
783 cancelOnError: true);
784 return result;
785 }
786
787 /**
788 * Returns whether [needle] occurs in the elements provided by this stream.
789 *
790 * Compares each element of this stream to [needle] using [Object.==].
791 * If an equal element is found, the returned future is completed with `true`.
792 * If this stream ends without finding a match, the future is completed with
793 * `false`.
794 *
795 * If this stream emits an error, or the call to [Object.==] throws,
796 * the returned future is completed with that error,
797 * and processing stops.
798 */
799 Future<bool> contains(Object needle) {
800 _Future<bool> future = new _Future<bool>();
801 StreamSubscription subscription;
802 subscription = this.listen(
803 (T element) {
804 _runUserCode(() => (element == needle), (bool isMatch) {
805 if (isMatch) {
806 _cancelAndValue(subscription, future, true);
807 }
808 }, _cancelAndErrorClosure(subscription, future));
809 },
810 onError: future._completeError,
811 onDone: () {
812 future._complete(false);
813 },
814 cancelOnError: true);
815 return future;
816 }
817
818 /**
819 * Executes [action] on each element of this stream.
820 *
821 * Completes the returned [Future] when all elements of this stream
822 * have been processed.
823 *
824 * If this stream emits an error, or if the call to [action] throws,
825 * the returned future completes with that error,
826 * and processing stops.
827 */
828 Future forEach(void action(T element)) {
829 _Future future = new _Future();
830 StreamSubscription subscription;
831 subscription = this.listen(
832 (T element) {
833 // TODO(floitsch): the type should be 'void' and inferred.
834 _runUserCode<dynamic>(() => action(element), (_) {},
835 _cancelAndErrorClosure(subscription, future));
836 },
837 onError: future._completeError,
838 onDone: () {
839 future._complete(null);
840 },
841 cancelOnError: true);
842 return future;
843 }
844
845 /**
846 * Checks whether [test] accepts all elements provided by this stream.
847 *
848 * Calls [test] on each element of this stream.
849 * If the call returns `false`, the returned future is completed with `false`
850 * and processing stops.
851 *
852 * If this stream ends without finding an element that [test] rejects,
853 * the returned future is completed with `true`.
854 *
855 * If this stream emits an error, or if the call to [test] throws,
856 * the returned future is completed with that error,
857 * and processing stops.
858 */
859 Future<bool> every(bool test(T element)) {
860 _Future<bool> future = new _Future<bool>();
861 StreamSubscription subscription;
862 subscription = this.listen(
863 (T element) {
864 _runUserCode(() => test(element), (bool isMatch) {
865 if (!isMatch) {
866 _cancelAndValue(subscription, future, false);
867 }
868 }, _cancelAndErrorClosure(subscription, future));
869 },
870 onError: future._completeError,
871 onDone: () {
872 future._complete(true);
873 },
874 cancelOnError: true);
875 return future;
876 }
877
878 /**
879 * Checks whether [test] accepts any element provided by this stream.
880 *
881 * Calls [test] on each element of this stream.
882 * If the call returns `true`, the returned future is completed with `true`
883 * and processing stops.
884 *
885 * If this stream ends without finding an element that [test] accepts,
886 * the returned future is completed with `false`.
887 *
888 * If this stream emits an error, or if the call to [test] throws,
889 * the returned future is completed with that error,
890 * and processing stops.
891 */
892 Future<bool> any(bool test(T element)) {
893 _Future<bool> future = new _Future<bool>();
894 StreamSubscription subscription;
895 subscription = this.listen(
896 (T element) {
897 _runUserCode(() => test(element), (bool isMatch) {
898 if (isMatch) {
899 _cancelAndValue(subscription, future, true);
900 }
901 }, _cancelAndErrorClosure(subscription, future));
902 },
903 onError: future._completeError,
904 onDone: () {
905 future._complete(false);
906 },
907 cancelOnError: true);
908 return future;
909 }
910
911 /**
912 * The number of elements in this stream.
913 *
914 * Waits for all elements of this stream. When this stream ends,
915 * the returned future is completed with the number of elements.
916 *
917 * If this stream emits an error,
918 * the returned future is completed with that error,
919 * and processing stops.
920 *
921 * This operation listens to this stream, and a non-broadcast stream cannot
922 * be reused after finding its length.
923 */
924 Future<int> get length {
925 _Future<int> future = new _Future<int>();
926 int count = 0;
927 this.listen(
928 (_) {
929 count++;
930 },
931 onError: future._completeError,
932 onDone: () {
933 future._complete(count);
934 },
935 cancelOnError: true);
936 return future;
937 }
938
939 /**
940 * Whether this stream contains any elements.
941 *
942 * Waits for the first element of this stream, then completes the returned
943 * future with `true`.
944 * If this stream ends without emitting any elements, the returned future is
945 * completed with `false`.
946 *
947 * If the first event is an error, the returned future is completed with that
948 * error.
949 *
950 * This operation listens to this stream, and a non-broadcast stream cannot
951 * be reused after checking whether it is empty.
952 */
953 Future<bool> get isEmpty {
954 _Future<bool> future = new _Future<bool>();
955 StreamSubscription subscription;
956 subscription = this.listen(
957 (_) {
958 _cancelAndValue(subscription, future, false);
959 },
960 onError: future._completeError,
961 onDone: () {
962 future._complete(true);
963 },
964 cancelOnError: true);
965 return future;
966 }
967
968 /**
969 * Adapt this stream to be a `Stream<R>`.
970 *
971 * This stream is wrapped as a `Stream<R>` which checks at run-time that
972 * each data event emitted by this stream is also an instance of [R].
973 */
974 Stream<R> cast<R>() => Stream.castFrom<T, R>(this);
975 /**
976 * Collects all elements of this stream in a [List].
977 *
978 * Creates a `List<T>` and adds all elements of this stream to the list
979 * in the order they arrive.
980 * When this stream ends, the returned future is completed with that list.
981 *
982 * If this stream emits an error,
983 * the returned future is completed with that error,
984 * and processing stops.
985 */
986 Future<List<T>> toList() {
987 List<T> result = <T>[];
988 _Future<List<T>> future = new _Future<List<T>>();
989 this.listen(
990 (T data) {
991 result.add(data);
992 },
993 onError: future._completeError,
994 onDone: () {
995 future._complete(result);
996 },
997 cancelOnError: true);
998 return future;
999 }
1000
1001 /**
1002 * Collects the data of this stream in a [Set].
1003 *
1004 * Creates a `Set<T>` and adds all elements of this stream to the set.
1005 * in the order they arrive.
1006 * When this stream ends, the returned future is completed with that set.
1007 *
1008 * The returned set is the same type as returned by `new Set<T>()`.
1009 * If another type of set is needed, either use [forEach] to add each
1010 * element to the set, or use
1011 * `toList().then((list) => new SomeOtherSet.from(list))`
1012 * to create the set.
1013 *
1014 * If this stream emits an error,
1015 * the returned future is completed with that error,
1016 * and processing stops.
1017 */
1018 Future<Set<T>> toSet() {
1019 Set<T> result = new Set<T>();
1020 _Future<Set<T>> future = new _Future<Set<T>>();
1021 this.listen(
1022 (T data) {
1023 result.add(data);
1024 },
1025 onError: future._completeError,
1026 onDone: () {
1027 future._complete(result);
1028 },
1029 cancelOnError: true);
1030 return future;
1031 }
1032
1033 /**
1034 * Discards all data on this stream, but signals when it is done or an error
1035 * occurred.
1036 *
1037 * When subscribing using [drain], cancelOnError will be true. This means
1038 * that the future will complete with the first error on this stream and then
1039 * cancel the subscription.
1040 * If this stream emits an error, or the call to [combine] throws,
1041 * the returned future is completed with that error,
1042 * and processing is stopped.
1043 *
1044 * In case of a `done` event the future completes with the given
1045 * [futureValue].
1046 */
1047 Future<E> drain<E>([E futureValue]) =>
1048 listen(null, cancelOnError: true).asFuture<E>(futureValue);
1049
1050 /**
1051 * Provides at most the first [count] data events of this stream.
1052 *
1053 * Returns a stream that emits the same events that this stream would
1054 * if listened to at the same time,
1055 * until either this stream ends or it has emitted [count] data events,
1056 * at which point the returned stream is done.
1057 *
1058 * If this stream produces fewer than [count] data events before it's done,
1059 * so will the returned stream.
1060 *
1061 * Starts listening to this stream when the returned stream is listened to
1062 * and stops listening when the first [count] data events have been received.
1063 *
1064 * This means that if this is a single-subscription (non-broadcast) streams
1065 * it cannot be reused after the returned stream has been listened to.
1066 *
1067 * If this is a broadcast stream, the returned stream is a broadcast stream.
1068 * In that case, the events are only counted from the time
1069 * the returned stream is listened to.
1070 */
1071 Stream<T> take(int count) {
1072 return new _TakeStream<T>(this, count);
1073 }
1074
1075 /**
1076 * Forwards data events while [test] is successful.
1077 *
1078 * Returns a stream that provides the same events as this stream
1079 * until [test] fails for a data event.
1080 * The returned stream is done when either this stream is done,
1081 * or when this stream first emits a data event that fails [test].
1082 *
1083 * The `test` call is considered failing if it returns a non-`true` value
1084 * or if it throws. If the `test` call throws, the error is emitted as the
1085 * last event on the returned streams.
1086 *
1087 * Stops listening to this stream after the accepted elements.
1088 *
1089 * Internally the method cancels its subscription after these elements. This
1090 * means that single-subscription (non-broadcast) streams are closed and
1091 * cannot be reused after a call to this method.
1092 *
1093 * The returned stream is a broadcast stream if this stream is.
1094 * For a broadcast stream, the events are only tested from the time
1095 * the returned stream is listened to.
1096 */
1097 Stream<T> takeWhile(bool test(T element)) {
1098 return new _TakeWhileStream<T>(this, test);
1099 }
1100
1101 /**
1102 * Skips the first [count] data events from this stream.
1103 *
1104 * Returns a stream that emits the same events as this stream would
1105 * if listened to at the same time, except that the first [count]
1106 * data events are not emitted.
1107 * The returned stream is done when this stream is.
1108 *
1109 * If this stream emits fewer than [count] data events
1110 * before being done, the returned stream emits no data events.
1111 *
1112 * The returned stream is a broadcast stream if this stream is.
1113 * For a broadcast stream, the events are only counted from the time
1114 * the returned stream is listened to.
1115 */
1116 Stream<T> skip(int count) {
1117 return new _SkipStream<T>(this, count);
1118 }
1119
1120 /**
1121 * Skip data events from this stream while they are matched by [test].
1122 *
1123 * Returns a stream that emits the same events as this stream,
1124 * except that data events are not emitted until a data event fails `test`.
1125 * The test fails when called with a data event
1126 * if it returns a non-`true` value or if the call to `test` throws.
1127 * If the call throws, the error is emitted as an error event
1128 * on the returned stream instead of the data event,
1129 * otherwise the event that made `test` return non-true is emitted as the
1130 * first data event.
1131 *
1132 * Error and done events are provided by the returned stream unmodified.
1133 *
1134 * The returned stream is a broadcast stream if this stream is.
1135 * For a broadcast stream, the events are only tested from the time
1136 * the returned stream is listened to.
1137 */
1138 Stream<T> skipWhile(bool test(T element)) {
1139 return new _SkipWhileStream<T>(this, test);
1140 }
1141
1142 /**
1143 * Skips data events if they are equal to the previous data event.
1144 *
1145 * The returned stream provides the same events as this stream, except
1146 * that it never provides two consecutive data events that are equal.
1147 * That is, errors are passed through to the returned stream, and
1148 * data events are passed through if they are distinct from the most
1149 * recently emitted data event.
1150 *
1151 * Equality is determined by the provided [equals] method. If that is
1152 * omitted, the '==' operator on the last provided data element is used.
1153 *
1154 * If [equals] throws, the data event is replaced by an error event
1155 * containing the thrown error. The behavior is equivalent to the
1156 * original stream emitting the error event, and it doesn't change
1157 * the what the most recently emitted data event is.
1158 *
1159 * The returned stream is a broadcast stream if this stream is.
1160 * If a broadcast stream is listened to more than once, each subscription
1161 * will individually perform the `equals` test.
1162 */
1163 Stream<T> distinct([bool equals(T previous, T next)]) {
1164 return new _DistinctStream<T>(this, equals);
1165 }
1166
1167 /**
1168 * The first element of this stream.
1169 *
1170 * Stops listening to this stream after the first element has been received.
1171 *
1172 * Internally the method cancels its subscription after the first element.
1173 * This means that single-subscription (non-broadcast) streams are closed
1174 * and cannot be reused after a call to this getter.
1175 *
1176 * If an error event occurs before the first data event, the returned future
1177 * is completed with that error.
1178 *
1179 * If this stream is empty (a done event occurs before the first data event),
1180 * the returned future completes with an error.
1181 *
1182 * Except for the type of the error, this method is equivalent to
1183 * `this.elementAt(0)`.
1184 */
1185 Future<T> get first {
1186 _Future<T> future = new _Future<T>();
1187 StreamSubscription subscription;
1188 subscription = this.listen(
1189 (T value) {
1190 _cancelAndValue(subscription, future, value);
1191 },
1192 onError: future._completeError,
1193 onDone: () {
1194 try {
1195 throw IterableElementError.noElement();
1196 } catch (e, s) {
1197 _completeWithErrorCallback(future, e, s);
1198 }
1199 },
1200 cancelOnError: true);
1201 return future;
1202 }
1203
1204 /**
1205 * The last element of this stream.
1206 *
1207 * If this stream emits an error event,
1208 * the returned future is completed with that error
1209 * and processing stops.
1210 *
1211 * If this stream is empty (the done event is the first event),
1212 * the returned future completes with an error.
1213 */
1214 Future<T> get last {
1215 _Future<T> future = new _Future<T>();
1216 T result;
1217 bool foundResult = false;
1218 listen(
1219 (T value) {
1220 foundResult = true;
1221 result = value;
1222 },
1223 onError: future._completeError,
1224 onDone: () {
1225 if (foundResult) {
1226 future._complete(result);
1227 return;
1228 }
1229 try {
1230 throw IterableElementError.noElement();
1231 } catch (e, s) {
1232 _completeWithErrorCallback(future, e, s);
1233 }
1234 },
1235 cancelOnError: true);
1236 return future;
1237 }
1238
1239 /**
1240 * The single element of this stream.
1241 *
1242 * If this stream emits an error event,
1243 * the returned future is completed with that error
1244 * and processing stops.
1245 *
1246 * If [this] is empty or has more than one element,
1247 * the returned future completes with an error.
1248 */
1249 Future<T> get single {
1250 _Future<T> future = new _Future<T>();
1251 T result;
1252 bool foundResult = false;
1253 StreamSubscription subscription;
1254 subscription = this.listen(
1255 (T value) {
1256 if (foundResult) {
1257 // This is the second element we get.
1258 try {
1259 throw IterableElementError.tooMany();
1260 } catch (e, s) {
1261 _cancelAndErrorWithReplacement(subscription, future, e, s);
1262 }
1263 return;
1264 }
1265 foundResult = true;
1266 result = value;
1267 },
1268 onError: future._completeError,
1269 onDone: () {
1270 if (foundResult) {
1271 future._complete(result);
1272 return;
1273 }
1274 try {
1275 throw IterableElementError.noElement();
1276 } catch (e, s) {
1277 _completeWithErrorCallback(future, e, s);
1278 }
1279 },
1280 cancelOnError: true);
1281 return future;
1282 }
1283
1284 /**
1285 * Finds the first element of this stream matching [test].
1286 *
1287 * Returns a future that is completed with the first element of this stream
1288 * that [test] returns `true` for.
1289 *
1290 * If no such element is found before this stream is done, and a
1291 * [orElse] function is provided, the result of calling [orElse]
1292 * becomes the value of the future. If [orElse] throws, the returned
1293 * future is completed with that error.
1294 *
1295 * If this stream emits an error before the first matching element,
1296 * the returned future is completed with that error, and processing stops.
1297 *
1298 * Stops listening to this stream after the first matching element or error
1299 * has been received.
1300 *
1301 * Internally the method cancels its subscription after the first element that
1302 * matches the predicate. This means that single-subscription (non-broadcast)
1303 * streams are closed and cannot be reused after a call to this method.
1304 *
1305 * If an error occurs, or if this stream ends without finding a match and
1306 * with no [orElse] function provided,
1307 * the returned future is completed with an error.
1308 */
1309 Future<T> firstWhere(bool test(T element), {T orElse()}) {
1310 _Future<T> future = new _Future();
1311 StreamSubscription subscription;
1312 subscription = this.listen(
1313 (T value) {
1314 _runUserCode(() => test(value), (bool isMatch) {
1315 if (isMatch) {
1316 _cancelAndValue(subscription, future, value);
1317 }
1318 }, _cancelAndErrorClosure(subscription, future));
1319 },
1320 onError: future._completeError,
1321 onDone: () {
1322 if (orElse != null) {
1323 _runUserCode(orElse, future._complete, future._completeError);
1324 return;
1325 }
1326 try {
1327 throw IterableElementError.noElement();
1328 } catch (e, s) {
1329 _completeWithErrorCallback(future, e, s);
1330 }
1331 },
1332 cancelOnError: true);
1333 return future;
1334 }
1335
1336 /**
1337 * Finds the last element in this stream matching [test].
1338 *
1339 * If this stream emits an error, the returned future is completed with that
1340 * error, and processing stops.
1341 *
1342 * Otherwise as [firstWhere], except that the last matching element is found
1343 * instead of the first.
1344 * That means that a non-error result cannot be provided before this stream
1345 * is done.
1346 */
1347 Future<T> lastWhere(bool test(T element), {T orElse()}) {
1348 _Future<T> future = new _Future();
1349 T result;
1350 bool foundResult = false;
1351 StreamSubscription subscription;
1352 subscription = this.listen(
1353 (T value) {
1354 _runUserCode(() => true == test(value), (bool isMatch) {
1355 if (isMatch) {
1356 foundResult = true;
1357 result = value;
1358 }
1359 }, _cancelAndErrorClosure(subscription, future));
1360 },
1361 onError: future._completeError,
1362 onDone: () {
1363 if (foundResult) {
1364 future._complete(result);
1365 return;
1366 }
1367 if (orElse != null) {
1368 _runUserCode(orElse, future._complete, future._completeError);
1369 return;
1370 }
1371 try {
1372 throw IterableElementError.noElement();
1373 } catch (e, s) {
1374 _completeWithErrorCallback(future, e, s);
1375 }
1376 },
1377 cancelOnError: true);
1378 return future;
1379 }
1380
1381 /**
1382 * Finds the single element in this stream matching [test].
1383 *
1384 * Like [lastWhere], except that it is an error if more than one
1385 * matching element occurs in this stream.
1386 */
1387 Future<T> singleWhere(bool test(T element), {T orElse()}) {
1388 _Future<T> future = new _Future<T>();
1389 T result;
1390 bool foundResult = false;
1391 StreamSubscription subscription;
1392 subscription = this.listen(
1393 (T value) {
1394 _runUserCode(() => true == test(value), (bool isMatch) {
1395 if (isMatch) {
1396 if (foundResult) {
1397 try {
1398 throw IterableElementError.tooMany();
1399 } catch (e, s) {
1400 _cancelAndErrorWithReplacement(subscription, future, e, s);
1401 }
1402 return;
1403 }
1404 foundResult = true;
1405 result = value;
1406 }
1407 }, _cancelAndErrorClosure(subscription, future));
1408 },
1409 onError: future._completeError,
1410 onDone: () {
1411 if (foundResult) {
1412 future._complete(result);
1413 return;
1414 }
1415 try {
1416 if (orElse != null) {
1417 _runUserCode(orElse, future._complete, future._completeError);
1418 return;
1419 }
1420 throw IterableElementError.noElement();
1421 } catch (e, s) {
1422 _completeWithErrorCallback(future, e, s);
1423 }
1424 },
1425 cancelOnError: true);
1426 return future;
1427 }
1428
1429 /**
1430 * Returns the value of the [index]th data event of this stream.
1431 *
1432 * Stops listening to this stream after the [index]th data event has been
1433 * received.
1434 *
1435 * Internally the method cancels its subscription after these elements. This
1436 * means that single-subscription (non-broadcast) streams are closed and
1437 * cannot be reused after a call to this method.
1438 *
1439 * If an error event occurs before the value is found, the future completes
1440 * with this error.
1441 *
1442 * If a done event occurs before the value is found, the future completes
1443 * with a [RangeError].
1444 */
1445 Future<T> elementAt(int index) {
1446 ArgumentError.checkNotNull(index, "index");
1447 RangeError.checkNotNegative(index, "index");
1448 _Future<T> future = new _Future<T>();
1449 StreamSubscription subscription;
1450 int elementIndex = 0;
1451 subscription = this.listen(
1452 (T value) {
1453 if (index == elementIndex) {
1454 _cancelAndValue(subscription, future, value);
1455 return;
1456 }
1457 elementIndex += 1;
1458 },
1459 onError: future._completeError,
1460 onDone: () {
1461 future._completeError(
1462 new RangeError.index(index, this, "index", null, elementIndex));
1463 },
1464 cancelOnError: true);
1465 return future;
1466 }
1467
1468 /**
1469 * Creates a new stream with the same events as this stream.
1470 *
1471 * Whenever more than [timeLimit] passes between two events from this stream,
1472 * the [onTimeout] function is called, which can emit further events on
1473 * the returned stream.
1474 *
1475 * The countdown doesn't start until the returned stream is listened to.
1476 * The countdown is reset every time an event is forwarded from this stream,
1477 * or when this stream is paused and resumed.
1478 *
1479 * The [onTimeout] function is called with one argument: an
1480 * [EventSink] that allows putting events into the returned stream.
1481 * This `EventSink` is only valid during the call to [onTimeout].
1482 * Calling [EventSink.close] on the sink passed to [onTimeout] closes the
1483 * returned stream, and no further events are processed.
1484 *
1485 * If [onTimeout] is omitted, a timeout will just put a [TimeoutException]
1486 * into the error channel of the returned stream.
1487 * If the call to [onTimeout] throws, the error is emitted on the returned
1488 * stream.
1489 *
1490 * The returned stream is a broadcast stream if this stream is.
1491 * If a broadcast stream is listened to more than once, each subscription
1492 * will have its individually timer that starts counting on listen,
1493 * and the subscriptions' timers can be paused individually.
1494 */
1495 Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) {
1496 _StreamControllerBase<T> controller;
1497 // The following variables are set on listen.
1498 StreamSubscription<T> subscription;
1499 Timer timer;
1500 Zone zone;
1501 _TimerCallback timeout;
1502
1503 void onData(T event) {
1504 timer.cancel();
1505 controller.add(event);
1506 timer = zone.createTimer(timeLimit, timeout);
1507 }
1508
1509 void onError(error, StackTrace stackTrace) {
1510 timer.cancel();
1511 assert(controller is _StreamController ||
1512 controller is _BroadcastStreamController);
1513 controller._addError(error, stackTrace); // Avoid Zone error replacement.
1514 timer = zone.createTimer(timeLimit, timeout);
1515 }
1516
1517 void onDone() {
1518 timer.cancel();
1519 controller.close();
1520 }
1521
1522 void onListen() {
1523 // This is the onListen callback for of controller.
1524 // It runs in the same zone that the subscription was created in.
1525 // Use that zone for creating timers and running the onTimeout
1526 // callback.
1527 zone = Zone.current;
1528 if (onTimeout == null) {
1529 timeout = () {
1530 controller.addError(
1531 new TimeoutException("No stream event", timeLimit), null);
1532 };
1533 } else {
1534 // TODO(floitsch): the return type should be 'void', and the type
1535 // should be inferred.
1536 var registeredOnTimeout =
1537 zone.registerUnaryCallback<dynamic, EventSink<T>>(onTimeout);
1538 var wrapper = new _ControllerEventSinkWrapper<T>(null);
1539 timeout = () {
1540 wrapper._sink = controller; // Only valid during call.
1541 zone.runUnaryGuarded(registeredOnTimeout, wrapper);
1542 wrapper._sink = null;
1543 };
1544 }
1545
1546 subscription = this.listen(onData, onError: onError, onDone: onDone);
1547 timer = zone.createTimer(timeLimit, timeout);
1548 }
1549
1550 Future onCancel() {
1551 timer.cancel();
1552 Future result = subscription.cancel();
1553 subscription = null;
1554 return result;
1555 }
1556
1557 controller = isBroadcast
1558 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
1559 : new _SyncStreamController<T>(onListen, () {
1560 // Don't null the timer, onCancel may call cancel again.
1561 timer.cancel();
1562 subscription.pause();
1563 }, () {
1564 subscription.resume();
1565 timer = zone.createTimer(timeLimit, timeout);
1566 }, onCancel);
1567 return controller.stream;
1568 }
1569 }
1570
1571 /**
1572 * A subscription on events from a [Stream].
1573 *
1574 * When you listen on a [Stream] using [Stream.listen],
1575 * a [StreamSubscription] object is returned.
1576 *
1577 * The subscription provides events to the listener,
1578 * and holds the callbacks used to handle the events.
1579 * The subscription can also be used to unsubscribe from the events,
1580 * or to temporarily pause the events from the stream.
1581 */
1582 abstract class StreamSubscription<T> {
1583 /**
1584 * Cancels this subscription.
1585 *
1586 * After this call, the subscription no longer receives events.
1587 *
1588 * The stream may need to shut down the source of events and clean up after
1589 * the subscription is canceled.
1590 *
1591 * Returns a future that is completed once the stream has finished
1592 * its cleanup.
1593 *
1594 * For historical reasons, may also return `null` if no cleanup was necessary.
1595 * Returning `null` is deprecated and should be avoided.
1596 *
1597 * Typically, futures are returned when the stream needs to release resources.
1598 * For example, a stream might need to close an open file (as an asynchronous
1599 * operation). If the listener wants to delete the file after having
1600 * canceled the subscription, it must wait for the cleanup future to complete.
1601 *
1602 * A returned future completes with a `null` value.
1603 * If the cleanup throws, which it really shouldn't, the returned future
1604 * completes with that error.
1605 */
1606 Future cancel();
1607
1608 /**
1609 * Replaces the data event handler of this subscription.
1610 *
1611 * The [handleData] function is called for each element of the stream
1612 * after this function is called.
1613 * If [handleData] is `null`, further elements are ignored.
1614 *
1615 * This method replaces the current handler set by the invocation of
1616 * [Stream.listen] or by a previous call to [onData].
1617 */
1618 void onData(void handleData(T data));
1619
1620 /**
1621 * Replaces the error event handler of this subscription.
1622 *
1623 * The [handleError] function must be able to be called with either
1624 * one positional argument, or with two positional arguments
1625 * where the seconds is always a [StackTrace].
1626 *
1627 * The [handleError] argument may be `null`, in which case further
1628 * error events are considered unhandled, and will be reported to
1629 * [Zone.handleUncaughtError].
1630 *
1631 * The provided function is called for all error events from the
1632 * stream subscription.
1633 *
1634 * This method replaces the current handler set by the invocation of
1635 * [Stream.listen], by calling [asFuture], or by a previous call to [onError].
1636 */
1637 void onError(Function handleError);
1638
1639 /**
1640 * Replaces the done event handler of this subscription.
1641 *
1642 * The [handleDone] function is called when the stream closes.
1643 * The value may be `null`, in which case no function is called.
1644 *
1645 * This method replaces the current handler set by the invocation of
1646 * [Stream.listen], by calling [asFuture], or by a previous call to [onDone].
1647 */
1648 void onDone(void handleDone());
1649
1650 /**
1651 * Request that the stream pauses events until further notice.
1652 *
1653 * While paused, the subscription will not fire any events.
1654 * If it receives events from its source, they will be buffered until
1655 * the subscription is resumed.
1656 * For non-broadcast streams, the underlying source is usually informed
1657 * about the pause,
1658 * so it can stop generating events until the subscription is resumed.
1659 *
1660 * To avoid buffering events on a broadcast stream, it is better to
1661 * cancel this subscription, and start to listen again when events
1662 * are needed, if the intermediate events are not important.
1663 *
1664 * If [resumeSignal] is provided, the stream subscription will undo the pause
1665 * when the future completes, as if by a call to [resume].
1666 * If the future completes with an error,
1667 * the stream will still resume, but the error will be considered unhandled
1668 * and is passed to [Zone.handleUncaughtError].
1669 *
1670 * A call to [resume] will also undo a pause.
1671 *
1672 * If the subscription is paused more than once, an equal number
1673 * of resumes must be performed to resume the stream.
1674 * Calls to [resume] and the completion of a [resumeSignal] are
1675 * interchangeable - the [pause] which was passed a [resumeSignal] may be
1676 * ended by a call to [resume], and completing the [resumeSignal] may end a
1677 * different [pause].
1678 *
1679 * It is safe to [resume] or complete a [resumeSignal] even when the
1680 * subscription is not paused, and the resume will have no effect.
1681 *
1682 * Currently DOM streams silently drop events when the stream is paused. This
1683 * is a bug and will be fixed.
1684 */
1685 void pause([Future resumeSignal]);
1686
1687 /**
1688 * Resume after a pause.
1689 *
1690 * This undoes one previous call to [pause].
1691 * When all previously calls to [pause] have been matched by a calls to
1692 * [resume], possibly through a `resumeSignal` passed to [pause],
1693 * the stream subscription may emit events again.
1694 *
1695 * It is safe to [resume] even when the subscription is not paused, and the
1696 * resume will have no effect.
1697 */
1698 void resume();
1699
1700 /**
1701 * Whether the [StreamSubscription] is currently paused.
1702 *
1703 * If there have been more calls to [pause] than to [resume] on this
1704 * stream subscription, the subscription is paused, and this getter
1705 * returns `true`.
1706 *
1707 * Returns `false` if the stream can currently emit events, or if
1708 * the subscription has completed or been cancelled.
1709 */
1710 bool get isPaused;
1711
1712 /**
1713 * Returns a future that handles the [onDone] and [onError] callbacks.
1714 *
1715 * This method *overwrites* the existing [onDone] and [onError] callbacks
1716 * with new ones that complete the returned future.
1717 *
1718 * In case of an error the subscription will automatically cancel (even
1719 * when it was listening with `cancelOnError` set to `false`).
1720 *
1721 * In case of a `done` event the future completes with the given
1722 * [futureValue].
1723 */
1724 Future<E> asFuture<E>([E futureValue]);
1725 }
1726
1727 /**
1728 * A [Sink] that supports adding errors.
1729 *
1730 * This makes it suitable for capturing the results of asynchronous
1731 * computations, which can complete with a value or an error.
1732 *
1733 * The [EventSink] has been designed to handle asynchronous events from
1734 * [Stream]s. See, for example, [Stream.eventTransformed] which uses
1735 * `EventSink`s to transform events.
1736 */
1737 abstract class EventSink<T> implements Sink<T> {
1738 /**
1739 * Adds a data [event] to the sink.
1740 *
1741 * Must not be called on a closed sink.
1742 */
1743 void add(T event);
1744
1745 /**
1746 * Adds an [error] to the sink.
1747 *
1748 * Must not be called on a closed sink.
1749 */
1750 void addError(Object error, [StackTrace stackTrace]);
1751
1752 /**
1753 * Closes the sink.
1754 *
1755 * Calling this method more than once is allowed, but does nothing.
1756 *
1757 * Neither [add] nor [addError] must be called after this method.
1758 */
1759 void close();
1760 }
1761
1762 /** [Stream] wrapper that only exposes the [Stream] interface. */
1763 class StreamView<T> extends Stream<T> {
1764 final Stream<T> _stream;
1765
1766 const StreamView(Stream<T> stream)
1767 : _stream = stream,
1768 super._internal();
1769
1770 bool get isBroadcast => _stream.isBroadcast;
1771
1772 Stream<T> asBroadcastStream(
1773 {void onListen(StreamSubscription<T> subscription),
1774 void onCancel(StreamSubscription<T> subscription)}) =>
1775 _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
1776
1777 StreamSubscription<T> listen(void onData(T value),
1778 {Function onError, void onDone(), bool cancelOnError}) {
1779 return _stream.listen(onData,
1780 onError: onError, onDone: onDone, cancelOnError: cancelOnError);
1781 }
1782 }
1783
1784 /**
1785 * Abstract interface for a "sink" accepting multiple entire streams.
1786 *
1787 * A consumer can accept a number of consecutive streams using [addStream],
1788 * and when no further data need to be added, the [close] method tells the
1789 * consumer to complete its work and shut down.
1790 *
1791 * The [Stream.pipe] accepts a `StreamConsumer` and will pass the stream
1792 * to the consumer's [addStream] method. When that completes, it will
1793 * call [close] and then complete its own returned future.
1794 */
1795 abstract class StreamConsumer<S> {
1796 /**
1797 * Consumes the elements of [stream].
1798 *
1799 * Listens on [stream] and does something for each event.
1800 *
1801 * Returns a future which is completed when the stream is done being added,
1802 * and the consumer is ready to accept a new stream.
1803 * No further calls to [addStream] or [close] should happen before the
1804 * returned future has completed.
1805 *
1806 * The consumer may stop listening to the stream after an error,
1807 * it may consume all the errors and only stop at a done event,
1808 * or it may be canceled early if the receiver don't want any further events.
1809 *
1810 * If the consumer stops listening because of some error preventing it
1811 * from continuing, it may report this error in the returned future,
1812 * otherwise it will just complete the future with `null`.
1813 */
1814 Future addStream(Stream<S> stream);
1815
1816 /**
1817 * Tells the consumer that no further streams will be added.
1818 *
1819 * This allows the consumer to complete any remaining work and release
1820 * resources that are no longer needed
1821 *
1822 * Returns a future which is completed when the consumer has shut down.
1823 * If cleaning up can fail, the error may be reported in the returned future,
1824 * otherwise it completes with `null`.
1825 */
1826 Future close();
1827 }
1828
1829 /**
1830 * A object that accepts stream events both synchronously and asynchronously.
1831 *
1832 * A [StreamSink] combines the methods from [StreamConsumer] and [EventSink].
1833 *
1834 * The [EventSink] methods can't be used while the [addStream] is called.
1835 * As soon as the [addStream]'s [Future] completes with a value, the
1836 * [EventSink] methods can be used again.
1837 *
1838 * If [addStream] is called after any of the [EventSink] methods, it'll
1839 * be delayed until the underlying system has consumed the data added by the
1840 * [EventSink] methods.
1841 *
1842 * When [EventSink] methods are used, the [done] [Future] can be used to
1843 * catch any errors.
1844 *
1845 * When [close] is called, it will return the [done] [Future].
1846 */
1847 abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> {
1848 /**
1849 * Tells the stream sink that no further streams will be added.
1850 *
1851 * This allows the stream sink to complete any remaining work and release
1852 * resources that are no longer needed
1853 *
1854 * Returns a future which is completed when the stream sink has shut down.
1855 * If cleaning up can fail, the error may be reported in the returned future,
1856 * otherwise it completes with `null`.
1857 *
1858 * Returns the same future as [done].
1859 *
1860 * The stream sink may close before the [close] method is called, either due
1861 * to an error or because it is itself providing events to someone who has
1862 * stopped listening. In that case, the [done] future is completed first,
1863 * and the `close` method will return the `done` future when called.
1864 *
1865 * Unifies [StreamConsumer.close] and [EventSink.close] which both mark their
1866 * object as not expecting any further events.
1867 */
1868 Future close();
1869
1870 /**
1871 * Return a future which is completed when the [StreamSink] is finished.
1872 *
1873 * If the `StreamSink` fails with an error,
1874 * perhaps in response to adding events using [add], [addError] or [close],
1875 * the [done] future will complete with that error.
1876 *
1877 * Otherwise, the returned future will complete when either:
1878 *
1879 * * all events have been processed and the sink has been closed, or
1880 * * the sink has otherwise been stopped from handling more events
1881 * (for example by canceling a stream subscription).
1882 */
1883 Future get done;
1884 }
1885
1886 /**
1887 * Transforms a Stream.
1888 *
1889 * When a stream's [Stream.transform] method is invoked with a
1890 * [StreamTransformer], the stream calls the [bind] method on the provided
1891 * transformer. The resulting stream is then returned from the
1892 * [Stream.transform] method.
1893 *
1894 * Conceptually, a transformer is simply a function from [Stream] to [Stream]
1895 * that is encapsulated into a class.
1896 *
1897 * It is good practice to write transformers that can be used multiple times.
1898 *
1899 * All other transforming methods on [Stream], such as [Stream.map],
1900 * [Stream.where] or [Stream.expand] can be implemented using
1901 * [Stream.transform]. A [StreamTransformer] is thus very powerful but often
1902 * also a bit more complicated to use.
1903 */
1904 abstract class StreamTransformer<S, T> {
1905 /**
1906 * Creates a [StreamTransformer] based on the given [onListen] callback.
1907 *
1908 * The returned stream transformer uses the provided [onListen] callback
1909 * when a transformed stream is listened to. At that time, the callback
1910 * receives the input stream (the one passed to [bind]) and a
1911 * boolean flag `cancelOnError` to create a [StreamSubscription].
1912 *
1913 * If the transformed stream is a broadcast stream, so is the stream
1914 * returned by the [StreamTransformer.bind] method by this transformer.
1915 *
1916 * If the transformed stream is listened to multiple times, the [onListen]
1917 * callback is called again for each new [Stream.listen] call.
1918 * This happens whether the stream is a broadcast stream or not,
1919 * but the call will usually fail for non-broadcast streams.
1920 *
1921 * The [onListen] callback does *not* receive the handlers that were passed
1922 * to [Stream.listen]. These are automatically set after the call to the
1923 * [onListen] callback (using [StreamSubscription.onData],
1924 * [StreamSubscription.onError] and [StreamSubscription.onDone]).
1925 *
1926 * Most commonly, an [onListen] callback will first call [Stream.listen] on
1927 * the provided stream (with the corresponding `cancelOnError` flag), and then
1928 * return a new [StreamSubscription].
1929 *
1930 * There are two common ways to create a StreamSubscription:
1931 *
1932 * 1. by allocating a [StreamController] and to return the result of
1933 * listening to its stream. It's important to forward pause, resume and
1934 * cancel events (unless the transformer intentionally wants to change
1935 * this behavior).
1936 * 2. by creating a new class that implements [StreamSubscription].
1937 * Note that the subscription should run callbacks in the [Zone] the
1938 * stream was listened to (see [Zone] and [Zone.bindCallback]).
1939 *
1940 * Example:
1941 *
1942 * ```
1943 * /// Starts listening to [input] and duplicates all non-error events.
1944 * StreamSubscription<int> _onListen(Stream<int> input, bool cancelOnError) {
1945 * StreamSubscription<String> subscription;
1946 * // Create controller that forwards pause, resume and cancel events.
1947 * var controller = new StreamController<String>(
1948 * onPause: () {
1949 * subscription.pause();
1950 * },
1951 * onResume: () {
1952 * subscription.resume();
1953 * },
1954 * onCancel: () => subscription.cancel(),
1955 * sync: true); // "sync" is correct here, since events are forwarded.
1956 *
1957 * // Listen to the provided stream using `cancelOnError`.
1958 * subscription = input.listen((data) {
1959 * // Duplicate the data.
1960 * controller.add(data);
1961 * controller.add(data);
1962 * },
1963 * onError: controller.addError,
1964 * onDone: controller.close,
1965 * cancelOnError: cancelOnError);
1966 *
1967 * // Return a new [StreamSubscription] by listening to the controller's
1968 * // stream.
1969 * return controller.stream.listen(null);
1970 * }
1971 *
1972 * // Instantiate a transformer:
1973 * var duplicator = const StreamTransformer<int, int>(_onListen);
1974 *
1975 * // Use as follows:
1976 * intStream.transform(duplicator);
1977 * ```
1978 */
1979 const factory StreamTransformer(
1980 StreamSubscription<T> onListen(
1981 Stream<S> stream, bool cancelOnError)) =
1982 _StreamSubscriptionTransformer<S, T>;
1983
1984 /**
1985 * Creates a [StreamTransformer] that delegates events to the given functions.
1986 *
1987 * Example use of a duplicating transformer:
1988 *
1989 * ```
1990 * stringStream.transform(new StreamTransformer<String, String>.fromHandlers(
1991 * handleData: (String value, EventSink<String> sink) {
1992 * sink.add(value);
1993 * sink.add(value); // Duplicate the incoming events.
1994 * }));
1995 * ```
1996 *
1997 * Transformers that are constructed this way cannot use captured state if
1998 * they are used in streams that can be listened to multiple times.
1999 * ```
2000 * StreamController<String> controller;
2001 * controller = new StreamController.broadcast(onListen: () {
2002 * scheduleMicrotask(() {
2003 * controller.addError("Bad");
2004 * controller.addError("Worse");
2005 * controller.addError("Worst");
2006 * });
2007 * });
2008 * var sharedState = 0;
2009 * var transformedStream = controller.stream.transform(
2010 * new StreamTransformer<String>.fromHandlers(
2011 * handleError: (error, stackTrace, sink) {
2012 * sharedState++; // Increment shared error-counter.
2013 * sink.add("Error $sharedState: $error");
2014 * }));
2015 *
2016 * transformedStream.listen(print);
2017 * transformedStream.listen(print); // Listen twice.
2018 * // Listening twice to the same stream makes the transformer share the same
2019 * // state. Instead of having "Error 1: Bad", "Error 2: Worse",
2020 * // "Error 3: Worst" as output (each twice for the separate subscriptions),
2021 * // this program emits:
2022 * // Error 1: Bad
2023 * // Error 2: Bad
2024 * // Error 3: Worse
2025 * // Error 4: Worse
2026 * // Error 5: Worst
2027 * // Error 6: Worst
2028 * ```
2029 */
2030 factory StreamTransformer.fromHandlers(
2031 {void handleData(S data, EventSink<T> sink),
2032 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
2033 void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>;
2034
2035 /**
2036 * Creates a [StreamTransformer] based on a [bind] callback.
2037 *
2038 * The returned stream transformer uses the [bind] argument to implement the
2039 * [StreamTransformer.bind] API and can be used when the transformation is
2040 * available as a stream-to-stream function.
2041 *
2042 * ```dart
2043 * final splitDecoded = StreamTransformer<List<int>, String>.fromBind(
2044 * (stream) => stream.transform(utf8.decoder).transform(LineSplitter()));
2045 * ```
2046 */
2047 @Since("2.1")
2048 factory StreamTransformer.fromBind(Stream<T> Function(Stream<S>) bind) =
2049 _StreamBindTransformer<S, T>;
2050
2051 /**
2052 * Adapts [source] to be a `StreamTransformer<TS, TT>`.
2053 *
2054 * This allows [source] to be used at the new type, but at run-time it
2055 * must satisfy the requirements of both the new type and its original type.
2056 *
2057 * Data events passed into the returned transformer must also be instances
2058 * of [SS], and data events produced by [source] for those events must
2059 * also be instances of [TT].
2060 */
2061 static StreamTransformer<TS, TT> castFrom<SS, ST, TS, TT>(
2062 StreamTransformer<SS, ST> source) {
2063 return new CastStreamTransformer<SS, ST, TS, TT>(source);
2064 }
2065
2066 /**
2067 * Transforms the provided [stream].
2068 *
2069 * Returns a new stream with events that are computed from events of the
2070 * provided [stream].
2071 *
2072 * The [StreamTransformer] interface is completely generic,
2073 * so it cannot say what subclasses do.
2074 * Each [StreamTransformer] should document clearly how it transforms the
2075 * stream (on the class or variable used to access the transformer),
2076 * as well as any differences from the following typical behavior:
2077 *
2078 * * When the returned stream is listened to, it starts listening to the
2079 * input [stream].
2080 * * Subscriptions of the returned stream forward (in a reasonable time)
2081 * a [StreamSubscription.pause] call to the subscription of the input
2082 * [stream].
2083 * * Similarly, canceling a subscription of the returned stream eventually
2084 * (in reasonable time) cancels the subscription of the input [stream].
2085 *
2086 * "Reasonable time" depends on the transformer and stream. Some transformers,
2087 * like a "timeout" transformer, might make these operations depend on a
2088 * duration. Others might not delay them at all, or just by a microtask.
2089 *
2090 * Transformers are free to handle errors in any way.
2091 * A transformer implementation may choose to propagate errors,
2092 * or convert them to other events, or ignore them completely,
2093 * but if errors are ignored, it should be documented explicitly.
2094 */
2095 Stream<T> bind(Stream<S> stream);
2096
2097 /**
2098 * Provides a `StreamTransformer<RS, RT>` view of this stream transformer.
2099 *
2100 * The resulting transformer will check at run-time that all data events
2101 * of the stream it transforms are actually instances of [S],
2102 * and it will check that all data events produced by this transformer
2103 * are actually instances of [RT].
2104 */
2105 StreamTransformer<RS, RT> cast<RS, RT>();
2106 }
2107
2108 /**
2109 * Base class for implementing [StreamTransformer].
2110 *
2111 * Contains default implementations of every method except [bind].
2112 */
2113 abstract class StreamTransformerBase<S, T> implements StreamTransformer<S, T> {
2114 const StreamTransformerBase();
2115
2116 StreamTransformer<RS, RT> cast<RS, RT>() =>
2117 StreamTransformer.castFrom<S, T, RS, RT>(this);
2118 }
2119
2120 /**
2121 * An [Iterator] like interface for the values of a [Stream].
2122 *
2123 * This wraps a [Stream] and a subscription on the stream. It listens
2124 * on the stream, and completes the future returned by [moveNext] when the
2125 * next value becomes available.
2126 *
2127 * The stream may be paused between calls to [moveNext].
2128 */
2129 abstract class StreamIterator<T> {
2130 /** Create a [StreamIterator] on [stream]. */
2131 factory StreamIterator(Stream<T> stream)
2132 // TODO(lrn): use redirecting factory constructor when type
2133 // arguments are supported.
2134 =>
2135 new _StreamIterator<T>(stream);
2136
2137 /**
2138 * Wait for the next stream value to be available.
2139 *
2140 * Returns a future which will complete with either `true` or `false`.
2141 * Completing with `true` means that another event has been received and
2142 * can be read as [current].
2143 * Completing with `false` means that the stream iteration is done and
2144 * no further events will ever be available.
2145 * The future may complete with an error, if the stream produces an error,
2146 * which also ends iteration.
2147 *
2148 * The function must not be called again until the future returned by a
2149 * previous call is completed.
2150 */
2151 Future<bool> moveNext();
2152
2153 /**
2154 * The current value of the stream.
2155 *
2156 * Is `null` before the first call to [moveNext] and after a call to
2157 * `moveNext` completes with a `false` result or an error.
2158 *
2159 * When a `moveNext` call completes with `true`, the `current` field holds
2160 * the most recent event of the stream, and it stays like that until the next
2161 * call to `moveNext`.
2162 * Between a call to `moveNext` and when its returned future completes,
2163 * the value is unspecified.
2164 */
2165 T get current;
2166
2167 /**
2168 * Cancels the stream iterator (and the underlying stream subscription) early.
2169 *
2170 * The stream iterator is automatically canceled if the [moveNext] future
2171 * completes with either `false` or an error.
2172 *
2173 * If you need to stop listening for values before the stream iterator is
2174 * automatically closed, you must call [cancel] to ensure that the stream
2175 * is properly closed.
2176 *
2177 * If [moveNext] has been called when the iterator is canceled,
2178 * its returned future will complete with `false` as value,
2179 * as will all further calls to [moveNext].
2180 *
2181 * Returns a future if the cancel-operation is not completed synchronously.
2182 * Otherwise returns `null`.
2183 */
2184 Future cancel();
2185 }
2186
2187 /**
2188 * Wraps an [_EventSink] so it exposes only the [EventSink] interface.
2189 */
2190 class _ControllerEventSinkWrapper<T> implements EventSink<T> {
2191 EventSink _sink;
2192 _ControllerEventSinkWrapper(this._sink);
2193
2194 void add(T data) {
2195 _sink.add(data);
2196 }
2197
2198 void addError(error, [StackTrace stackTrace]) {
2199 _sink.addError(error, stackTrace);
2200 }
2201
2202 void close() {
2203 _sink.close();
2204 }
2205 }
Flutter Stream相关代码
点赞
收藏