Dart API Referencedart:asyncStream<T>

Stream<T> abstract class

A source of asynchronous data events.

A Stream provides a sequence of events. Each event is either a data event or an error event, representing the result of a single computation. When the Stream is exhausted, it may send a single "done" event.

You can listen on a stream to receive the events it sends. When you listen, you receive a StreamSubscription object that can be used to stop listening, or to temporarily pause events from the stream.

When an event is fired, the listeners at that time are informed. If a listener is added while an event is being fired, the change will only take effect after the event is completely fired. If a listener is canceled, it immediately stops receiving events.

When the "done" event is fired, subscribers are unsubscribed before receiving the event. After the event has been sent, the stream has no subscribers. Adding new subscribers after this point is allowed, but they will just receive a new "done" event as soon as possible.

Streams always respect "pause" requests. If necessary they need to buffer their input, but often, and preferably, they can simply request their input to pause too.

There are two kinds of streams: The normal "single-subscription" streams and "broadcast" streams.

A single-subscription stream allows only a single listener at a time. It holds back events until it gets a listener, and it may exhaust itself when the listener is unsubscribed, even if the stream wasn't done.

Single-subscription streams are generally used for streaming parts of contiguous data like file I/O.

A broadcast stream allows any number of listeners, and it fires its events when they are ready, whether there are listeners or not.

Broadcast streams are used for independent events/observers.

The default implementation of isBroadcast returns false. A broadcast stream inheriting from Stream must override isBroadcast to return true.

abstract class Stream<T> {
 Stream();

 /**
  * Creates a new single-subscription stream from the future.
  *
  * When the future completes, the stream will fire one event, either
  * data or error, and then close with a done-event.
  */
 factory Stream.fromFuture(Future<T> future) {
   _StreamImpl<T> stream = new _SingleStreamImpl<T>();
   future.then((value) {
       stream._add(value);
       stream._close();
     },
     onError: (error) {
       stream._addError(error);
       stream._close();
     });
   return stream;
 }

 /**
  * Creates a single-subscription stream that gets its data from [data].
  */
 factory Stream.fromIterable(Iterable<T> data) {
   _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data);
   return new _GeneratedSingleStreamImpl<T>(iterableEvents);
 }

 /**
  * Creates a stream that repeatedly emits events at [period] intervals.
  *
  * The event values are computed by invoking [computation]. The argument to
  * this callback is an integer that starts with 0 and is incremented for
  * every event.
  *
  * If [computation] is omitted the event values will all be `null`.
  */
 factory Stream.periodic(Duration period,
                         [T computation(int computationCount)]) {
   if (computation == null) computation = ((i) => null);

   Timer timer;
   int computationCount = 0;
   StreamController<T> controller;
   // Counts the time that the Stream was running (and not paused).
   Stopwatch watch = new Stopwatch();

   void sendEvent() {
     watch.reset();
     T data = computation(computationCount++);
     controller.add(data);
   }

   void startPeriodicTimer() {
     assert(timer == null);
     timer = new Timer.periodic(period, (Timer timer) {
       sendEvent();
     });
   }

   controller = new StreamController<T>(
       onListen: () {
         watch.start();
         startPeriodicTimer();
       },
       onPause: () {
         timer.cancel();
         timer = null;
         watch.stop();
       },
       onResume: () {
         assert(timer == null);
         Duration elapsed = watch.elapsed;
         watch.start();
         timer = new Timer(period - elapsed, () {
           timer = null;
           startPeriodicTimer();
           sendEvent();
         });
       },
       onCancel: () {
         if (timer != null) timer.cancel();
         timer = null;
       });
   return controller.stream;
 }

 /**
  * Reports whether this stream is a broadcast stream.
  */
 bool get isBroadcast => false;

 /**
  * Returns a multi-subscription stream that produces the same events as this.
  *
  * If this stream is single-subscription, return a new stream that allows
  * multiple subscribers. It will subscribe to this stream when its first
  * subscriber is added, and unsubscribe again when the last subscription is
  * cancelled.
  *
  * If this stream is already a broadcast stream, it is returned unmodified.
  */
 Stream<T> asBroadcastStream() {
   if (isBroadcast) return this;
   return new _SingleStreamMultiplexer<T>(this);
 }

 /**
  * Adds a subscription to this stream.
  *
  * On each data event from this stream, the subscriber's [onData] handler
  * is called. If [onData] is null, nothing happens.
  *
  * On errors from this stream, the [onError] handler is given a
  * object describing the error.
  *
  * If this stream closes, the [onDone] handler is called.
  *
  * If [cancelOnError] is true, the subscription is ended when
  * the first error is reported. The default is false.
  */
 StreamSubscription<T> listen(void onData(T event),
                              { void onError(error),
                                void onDone(),
                                bool cancelOnError});

 /**
  * Creates a new stream from this stream that discards some data events.
  *
  * The new stream sends the same error and done events as this stream,
  * but it only sends the data events that satisfy the [test].
  */
 Stream<T> where(bool test(T event)) {
   return new _WhereStream<T>(this, test);
 }

 /**
  * Creates a new stream that converts each element of this stream
  * to a new value using the [convert] function.
  */
 Stream map(convert(T event)) {
   return new _MapStream<T, dynamic>(this, convert);
 }

 /**
  * Creates a wrapper Stream that intercepts some errors from this stream.
  *
  * If this stream sends an error that matches [test], then it is intercepted
  * by the [handle] function.
  *
  * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns
  * true. If [test] is omitted, every error is considered matching.
  *
  * If the error is intercepted, the [handle] function can decide what to do
  * with it. It can throw if it wants to raise a new (or the same) error,
  * or simply return to make the stream forget the error.
  *
  * If you need to transform an error into a data event, use the more generic
  * [Stream.transformEvent] to handle the event by writing a data event to
  * the output sink
  */
 Stream<T> handleError(void handle( error), { bool test(error) }) {
   return new _HandleErrorStream<T>(this, handle, test);
 }

 /**
  * Creates a new stream from this stream that converts each element
  * into zero or more events.
  *
  * Each incoming event is converted to an [Iterable] of new events,
  * and each of these new events are then sent by the returned stream
  * in order.
  */
 Stream expand(Iterable convert(T value)) {
   return new _ExpandStream<T, dynamic>(this, convert);
 }

 /**
  * Binds this stream as the input of the provided [StreamConsumer].
  */
 Future pipe(StreamConsumer<T> streamConsumer) {
   return streamConsumer.addStream(this).then((_) => streamConsumer.close());
 }

 /**
  * Chains this stream as the input of the provided [StreamTransformer].
  *
  * Returns the result of [:streamTransformer.bind:] itself.
  */
 Stream transform(StreamTransformer<T, dynamic> streamTransformer) {
   return streamTransformer.bind(this);
 }

 /**
  * Reduces a sequence of values by repeatedly applying [combine].
  */
 Future<T> reduce(T combine(T previous, T element)) {
   _FutureImpl<T> result = new _FutureImpl<T>();
   bool seenFirst = false;
   T value;
   StreamSubscription subscription;
   subscription = this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/* T */ element) {
       if (seenFirst) {
         _runUserCode(() => combine(value, element),
                      (T newValue) { value = newValue; },
                      _cancelAndError(subscription, result));
       } else {
         value = element;
         seenFirst = true;
       }
     },
     onError: result._setError,
     onDone: () {
       if (!seenFirst) {
         result._setError(new StateError("No elements"));
       } else {
         result._setValue(value);
       }
     },
     cancelOnError: true
   );
   return result;
 }

 /** Reduces a sequence of values by repeatedly applying [combine]. */
 Future fold(var initialValue, combine(var previous, T element)) {
   _FutureImpl result = new _FutureImpl();
   var value = initialValue;
   StreamSubscription subscription;
   subscription = this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ element) {
       _runUserCode(
         () => combine(value, element),
         (newValue) { value = newValue; },
         _cancelAndError(subscription, result)
       );
     },
     onError: (e) {
       result._setError(e);
     },
     onDone: () {
       result._setValue(value);
     },
     cancelOnError: true);
   return result;
 }

 /**
  * Checks whether [match] occurs in the elements provided by this stream.
  *
  * Completes the [Future] when the answer is known.
  * If this stream reports an error, the [Future] will report that error.
  */
 Future<bool> contains(T match) {
   _FutureImpl<bool> future = new _FutureImpl<bool>();
   StreamSubscription subscription;
   subscription = this.listen(
       // TODO(ahe): Restore type when feature is implemented in dart2js
       // checked mode. http://dartbug.com/7733
       (/*T*/ element) {
         _runUserCode(
           () => (element == match),
           (bool isMatch) {
             if (isMatch) {
               subscription.cancel();
               future._setValue(true);
             }
           },
           _cancelAndError(subscription, future)
         );
       },
       onError: future._setError,
       onDone: () {
         future._setValue(false);
       },
       cancelOnError: true);
   return future;
 }

 /**
  * Executes [action] on each data event of the stream.
  *
  * Completes the returned [Future] when all events of the stream
  * have been processed. Completes the future with an error if the
  * stream has an error event, or if [action] throws.
  */
 Future forEach(void action(T element)) {
   _FutureImpl future = new _FutureImpl();
   StreamSubscription subscription;
   subscription = this.listen(
       // TODO(ahe): Restore type when feature is implemented in dart2js
       // checked mode. http://dartbug.com/7733
       (/*T*/ element) {
         _runUserCode(
           () => action(element),
           (_) {},
           _cancelAndError(subscription, future)
         );
       },
       onError: future._setError,
       onDone: () {
         future._setValue(null);
       },
       cancelOnError: true);
   return future;
 }

 /**
  * Checks whether [test] accepts all elements provided by this stream.
  *
  * Completes the [Future] when the answer is known.
  * If this stream reports an error, the [Future] will report that error.
  */
 Future<bool> every(bool test(T element)) {
   _FutureImpl<bool> future = new _FutureImpl<bool>();
   StreamSubscription subscription;
   subscription = this.listen(
       // TODO(ahe): Restore type when feature is implemented in dart2js
       // checked mode. http://dartbug.com/7733
       (/*T*/ element) {
         _runUserCode(
           () => test(element),
           (bool isMatch) {
             if (!isMatch) {
               subscription.cancel();
               future._setValue(false);
             }
           },
           _cancelAndError(subscription, future)
         );
       },
       onError: future._setError,
       onDone: () {
         future._setValue(true);
       },
       cancelOnError: true);
   return future;
 }

 /**
  * Checks whether [test] accepts any element provided by this stream.
  *
  * Completes the [Future] when the answer is known.
  * If this stream reports an error, the [Future] will report that error.
  */
 Future<bool> any(bool test(T element)) {
   _FutureImpl<bool> future = new _FutureImpl<bool>();
   StreamSubscription subscription;
   subscription = this.listen(
       // TODO(ahe): Restore type when feature is implemented in dart2js
       // checked mode. http://dartbug.com/7733
       (/*T*/ element) {
         _runUserCode(
           () => test(element),
           (bool isMatch) {
             if (isMatch) {
               subscription.cancel();
               future._setValue(true);
             }
           },
           _cancelAndError(subscription, future)
         );
       },
       onError: future._setError,
       onDone: () {
         future._setValue(false);
       },
       cancelOnError: true);
   return future;
 }


 /** Counts the elements in the stream. */
 Future<int> get length {
   _FutureImpl<int> future = new _FutureImpl<int>();
   int count = 0;
   this.listen(
     (_) { count++; },
     onError: future._setError,
     onDone: () {
       future._setValue(count);
     },
     cancelOnError: true);
   return future;
 }

 /** Reports whether this stream contains any elements. */
 Future<bool> get isEmpty {
   _FutureImpl<bool> future = new _FutureImpl<bool>();
   StreamSubscription subscription;
   subscription = this.listen(
     (_) {
       subscription.cancel();
       future._setValue(false);
     },
     onError: future._setError,
     onDone: () {
       future._setValue(true);
     },
     cancelOnError: true);
   return future;
 }

 /** Collects the data of this stream in a [List]. */
 Future<List<T>> toList() {
   List<T> result = <T>[];
   _FutureImpl<List<T>> future = new _FutureImpl<List<T>>();
   this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ data) {
       result.add(data);
     },
     onError: future._setError,
     onDone: () {
       future._setValue(result);
     },
     cancelOnError: true);
   return future;
 }

 /** Collects the data of this stream in a [Set]. */
 Future<Set<T>> toSet() {
   Set<T> result = new Set<T>();
   _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>();
   this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ data) {
       result.add(data);
     },
     onError: future._setError,
     onDone: () {
       future._setValue(result);
     },
     cancelOnError: true);
   return future;
 }

 /**
  * Provides at most the first [n] values of this stream.
  *
  * Forwards the first [n] data events of this stream, and all error
  * events, to the returned stream, and ends with a done event.
  *
  * If this stream produces fewer than [count] values before it's done,
  * so will the returned stream.
  */
 Stream<T> take(int count) {
   return new _TakeStream(this, count);
 }

 /**
  * Forwards data events while [test] is successful.
  *
  * The returned stream provides the same events as this stream as long
  * as [test] returns [:true:] for the event data. The stream is done
  * when either this stream is done, or when this stream first provides
  * a value that [test] doesn't accept.
  */
 Stream<T> takeWhile(bool test(T value)) {
   return new _TakeWhileStream(this, test);
 }

 /**
  * Skips the first [count] data events from this stream.
  */
 Stream<T> skip(int count) {
   return new _SkipStream(this, count);
 }

 /**
  * Skip data events from this stream while they are matched by [test].
  *
  * Error and done events are provided by the returned stream unmodified.
  *
  * Starting with the first data event where [test] returns true for the
  * event data, the returned stream will have the same events as this stream.
  */
 Stream<T> skipWhile(bool test(T value)) {
   return new _SkipWhileStream(this, test);
 }

 /**
  * Skips data events if they are equal to the previous data event.
  *
  * The returned stream provides the same events as this stream, except
  * that it never provides two consequtive data events that are equal.
  *
  * Equality is determined by the provided [equals] method. If that is
  * omitted, the '==' operator on the last provided data element is used.
  */
 Stream<T> distinct([bool equals(T previous, T next)]) {
   return new _DistinctStream(this, equals);
 }

 /**
  * Returns the first element.
  *
  * If [this] is empty throws a [StateError]. Otherwise this method is
  * equivalent to [:this.elementAt(0):]
  */
 Future<T> get first {
   _FutureImpl<T> future = new _FutureImpl<T>();
   StreamSubscription subscription;
   subscription = this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ value) {
       subscription.cancel();
       future._setValue(value);
       return;
     },
     onError: future._setError,
     onDone: () {
       future._setError(new StateError("No elements"));
     },
     cancelOnError: true);
   return future;
 }

 /**
  * Returns the last element.
  *
  * If [this] is empty throws a [StateError].
  */
 Future<T> get last {
   _FutureImpl<T> future = new _FutureImpl<T>();
   T result = null;
   bool foundResult = false;
   StreamSubscription subscription;
   subscription = this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ value) {
       foundResult = true;
       result = value;
     },
     onError: future._setError,
     onDone: () {
       if (foundResult) {
         future._setValue(result);
         return;
       }
       future._setError(new StateError("No elements"));
     },
     cancelOnError: true);
   return future;
 }

 /**
  * Returns the single element.
  *
  * If [this] is empty or has more than one element throws a [StateError].
  */
 Future<T> get single {
   _FutureImpl<T> future = new _FutureImpl<T>();
   T result = null;
   bool foundResult = false;
   StreamSubscription subscription;
   subscription = this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ value) {
       if (foundResult) {
         subscription.cancel();
         // This is the second element we get.
         Error error = new StateError("More than one element");
         future._setError(error);
         return;
       }
       foundResult = true;
       result = value;
     },
     onError: future._setError,
     onDone: () {
       if (foundResult) {
         future._setValue(result);
         return;
       }
       future._setError(new StateError("No elements"));
     },
     cancelOnError: true);
   return future;
 }

 /**
  * Finds the first element of this stream matching [test].
  *
  * Returns a future that is filled with the first element of this stream
  * that [test] returns true for.
  *
  * If no such element is found before this stream is done, and a
  * [defaultValue] function is provided, the result of calling [defaultValue]
  * becomes the value of the future.
  *
  * If an error occurs, or if this stream ends without finding a match and
  * with no [defaultValue] function provided, the future will receive an
  * error.
  */
 Future<T> firstWhere(bool test(T value), {T defaultValue()}) {
   _FutureImpl<T> future = new _FutureImpl<T>();
   StreamSubscription subscription;
   subscription = this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ value) {
       _runUserCode(
         () => test(value),
         (bool isMatch) {
           if (isMatch) {
             subscription.cancel();
             future._setValue(value);
           }
         },
         _cancelAndError(subscription, future)
       );
     },
     onError: future._setError,
     onDone: () {
       if (defaultValue != null) {
         _runUserCode(defaultValue, future._setValue, future._setError);
         return;
       }
       future._setError(new StateError("firstMatch ended without match"));
     },
     cancelOnError: true);
   return future;
 }

 /**
  * Finds the last element in this stream matching [test].
  *
  * As [firstWhere], except that the last matching element is found.
  * That means that the result cannot be provided before this stream
  * is done.
  */
 Future<T> lastWhere(bool test(T value), {T defaultValue()}) {
   _FutureImpl<T> future = new _FutureImpl<T>();
   T result = null;
   bool foundResult = false;
   StreamSubscription subscription;
   subscription = this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ value) {
       _runUserCode(
         () => true == test(value),
         (bool isMatch) {
           if (isMatch) {
             foundResult = true;
             result = value;
           }
         },
         _cancelAndError(subscription, future)
       );
     },
     onError: future._setError,
     onDone: () {
       if (foundResult) {
         future._setValue(result);
         return;
       }
       if (defaultValue != null) {
         _runUserCode(defaultValue, future._setValue, future._setError);
         return;
       }
       future._setError(new StateError("lastMatch ended without match"));
     },
     cancelOnError: true);
   return future;
 }

 /**
  * Finds the single element in this stream matching [test].
  *
  * Like [lastMatch], except that it is an error if more than one
  * matching element occurs in the stream.
  */
 Future<T> singleWhere(bool test(T value)) {
   _FutureImpl<T> future = new _FutureImpl<T>();
   T result = null;
   bool foundResult = false;
   StreamSubscription subscription;
   subscription = this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ value) {
       _runUserCode(
         () => true == test(value),
         (bool isMatch) {
           if (isMatch) {
             if (foundResult) {
               subscription.cancel();
               future._setError(
                   new StateError('Multiple matches for "single"'));
               return;
             }
             foundResult = true;
             result = value;
           }
         },
         _cancelAndError(subscription, future)
       );
     },
     onError: future._setError,
     onDone: () {
       if (foundResult) {
         future._setValue(result);
         return;
       }
       future._setError(new StateError("single ended without match"));
     },
     cancelOnError: true);
   return future;
 }

 /**
  * Returns the value of the [index]th data event of this stream.
  *
  * If an error event occurs, the future will end with this error.
  *
  * If this stream provides fewer than [index] elements before closing,
  * an error is reported.
  */
 Future<T> elementAt(int index) {
   if (index is! int || index < 0) throw new ArgumentError(index);
   _FutureImpl<T> future = new _FutureImpl<T>();
   StreamSubscription subscription;
   subscription = this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ value) {
       if (index == 0) {
         subscription.cancel();
         future._setValue(value);
         return;
       }
       index -= 1;
     },
     onError: future._setError,
     onDone: () {
       future._setError(new StateError("Not enough elements for elementAt"));
     },
     cancelOnError: true);
   return future;
 }
}

Subclasses

EventTransformStream<S, T>, HttpClientResponse, HttpMultipartFormData, HttpRequest, HttpServer, IsolateStream, MimeMultipart, RawSecureServerSocket, RawServerSocket, RawSocket, SecureServerSocket, ServerSocket, Socket, StreamView<T>, WebSocket

Constructors

new Stream() #

Stream();

factory Stream.fromFuture(Future<T> future) #

Creates a new single-subscription stream from the future.

When the future completes, the stream will fire one event, either data or error, and then close with a done-event.

factory Stream.fromFuture(Future<T> future) {
 _StreamImpl<T> stream = new _SingleStreamImpl<T>();
 future.then((value) {
     stream._add(value);
     stream._close();
   },
   onError: (error) {
     stream._addError(error);
     stream._close();
   });
 return stream;
}

factory Stream.fromIterable(Iterable<T> data) #

Creates a single-subscription stream that gets its data from data.

factory Stream.fromIterable(Iterable<T> data) {
 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data);
 return new _GeneratedSingleStreamImpl<T>(iterableEvents);
}

factory Stream.periodic(Duration period, [T computation(int computationCount)]) #

Creates a stream that repeatedly emits events at period intervals.

The event values are computed by invoking computation. The argument to this callback is an integer that starts with 0 and is incremented for every event.

If computation is omitted the event values will all be null.

factory Stream.periodic(Duration period,
                       [T computation(int computationCount)]) {
 if (computation == null) computation = ((i) => null);

 Timer timer;
 int computationCount = 0;
 StreamController<T> controller;
 // Counts the time that the Stream was running (and not paused).
 Stopwatch watch = new Stopwatch();

 void sendEvent() {
   watch.reset();
   T data = computation(computationCount++);
   controller.add(data);
 }

 void startPeriodicTimer() {
   assert(timer == null);
   timer = new Timer.periodic(period, (Timer timer) {
     sendEvent();
   });
 }

 controller = new StreamController<T>(
     onListen: () {
       watch.start();
       startPeriodicTimer();
     },
     onPause: () {
       timer.cancel();
       timer = null;
       watch.stop();
     },
     onResume: () {
       assert(timer == null);
       Duration elapsed = watch.elapsed;
       watch.start();
       timer = new Timer(period - elapsed, () {
         timer = null;
         startPeriodicTimer();
         sendEvent();
       });
     },
     onCancel: () {
       if (timer != null) timer.cancel();
       timer = null;
     });
 return controller.stream;
}

Properties

final Future<T> first #

Returns the first element.

If this is empty throws a StateError. Otherwise this method is equivalent to this.elementAt(0)

Future<T> get first {
 _FutureImpl<T> future = new _FutureImpl<T>();
 StreamSubscription subscription;
 subscription = this.listen(
   // TODO(ahe): Restore type when feature is implemented in dart2js
   // checked mode. http://dartbug.com/7733
   (/*T*/ value) {
     subscription.cancel();
     future._setValue(value);
     return;
   },
   onError: future._setError,
   onDone: () {
     future._setError(new StateError("No elements"));
   },
   cancelOnError: true);
 return future;
}

final bool isBroadcast #

Reports whether this stream is a broadcast stream.

bool get isBroadcast => false;

final Future<bool> isEmpty #

Reports whether this stream contains any elements.

Future<bool> get isEmpty {
 _FutureImpl<bool> future = new _FutureImpl<bool>();
 StreamSubscription subscription;
 subscription = this.listen(
   (_) {
     subscription.cancel();
     future._setValue(false);
   },
   onError: future._setError,
   onDone: () {
     future._setValue(true);
   },
   cancelOnError: true);
 return future;
}

final Future<T> last #

Returns the last element.

If this is empty throws a StateError.

Future<T> get last {
 _FutureImpl<T> future = new _FutureImpl<T>();
 T result = null;
 bool foundResult = false;
 StreamSubscription subscription;
 subscription = this.listen(
   // TODO(ahe): Restore type when feature is implemented in dart2js
   // checked mode. http://dartbug.com/7733
   (/*T*/ value) {
     foundResult = true;
     result = value;
   },
   onError: future._setError,
   onDone: () {
     if (foundResult) {
       future._setValue(result);
       return;
     }
     future._setError(new StateError("No elements"));
   },
   cancelOnError: true);
 return future;
}

final Future<int> length #

Counts the elements in the stream.

Future<int> get length {
 _FutureImpl<int> future = new _FutureImpl<int>();
 int count = 0;
 this.listen(
   (_) { count++; },
   onError: future._setError,
   onDone: () {
     future._setValue(count);
   },
   cancelOnError: true);
 return future;
}

final Future<T> single #

Returns the single element.

If this is empty or has more than one element throws a StateError.

Future<T> get single {
 _FutureImpl<T> future = new _FutureImpl<T>();
 T result = null;
 bool foundResult = false;
 StreamSubscription subscription;
 subscription = this.listen(
   // TODO(ahe): Restore type when feature is implemented in dart2js
   // checked mode. http://dartbug.com/7733
   (/*T*/ value) {
     if (foundResult) {
       subscription.cancel();
       // This is the second element we get.
       Error error = new StateError("More than one element");
       future._setError(error);
       return;
     }
     foundResult = true;
     result = value;
   },
   onError: future._setError,
   onDone: () {
     if (foundResult) {
       future._setValue(result);
       return;
     }
     future._setError(new StateError("No elements"));
   },
   cancelOnError: true);
 return future;
}

Methods

Future<bool> any(bool test(T element)) #

Checks whether test accepts any element provided by this stream.

Completes the Future when the answer is known. If this stream reports an error, the Future will report that error.

Future<bool> any(bool test(T element)) {
 _FutureImpl<bool> future = new _FutureImpl<bool>();
 StreamSubscription subscription;
 subscription = this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ element) {
       _runUserCode(
         () => test(element),
         (bool isMatch) {
           if (isMatch) {
             subscription.cancel();
             future._setValue(true);
           }
         },
         _cancelAndError(subscription, future)
       );
     },
     onError: future._setError,
     onDone: () {
       future._setValue(false);
     },
     cancelOnError: true);
 return future;
}

Stream<T> asBroadcastStream() #

Returns a multi-subscription stream that produces the same events as this.

If this stream is single-subscription, return a new stream that allows multiple subscribers. It will subscribe to this stream when its first subscriber is added, and unsubscribe again when the last subscription is cancelled.

If this stream is already a broadcast stream, it is returned unmodified.

Stream<T> asBroadcastStream() {
 if (isBroadcast) return this;
 return new _SingleStreamMultiplexer<T>(this);
}

Future<bool> contains(T match) #

Checks whether match occurs in the elements provided by this stream.

Completes the Future when the answer is known. If this stream reports an error, the Future will report that error.

Future<bool> contains(T match) {
 _FutureImpl<bool> future = new _FutureImpl<bool>();
 StreamSubscription subscription;
 subscription = this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ element) {
       _runUserCode(
         () => (element == match),
         (bool isMatch) {
           if (isMatch) {
             subscription.cancel();
             future._setValue(true);
           }
         },
         _cancelAndError(subscription, future)
       );
     },
     onError: future._setError,
     onDone: () {
       future._setValue(false);
     },
     cancelOnError: true);
 return future;
}

Stream<T> distinct([bool equals(T previous, T next)]) #

Skips data events if they are equal to the previous data event.

The returned stream provides the same events as this stream, except that it never provides two consequtive data events that are equal.

Equality is determined by the provided equals method. If that is omitted, the '==' operator on the last provided data element is used.

Stream<T> distinct([bool equals(T previous, T next)]) {
 return new _DistinctStream(this, equals);
}

Future<T> elementAt(int index) #

Returns the value of the indexth data event of this stream.

If an error event occurs, the future will end with this error.

If this stream provides fewer than index elements before closing, an error is reported.

Future<T> elementAt(int index) {
 if (index is! int || index < 0) throw new ArgumentError(index);
 _FutureImpl<T> future = new _FutureImpl<T>();
 StreamSubscription subscription;
 subscription = this.listen(
   // TODO(ahe): Restore type when feature is implemented in dart2js
   // checked mode. http://dartbug.com/7733
   (/*T*/ value) {
     if (index == 0) {
       subscription.cancel();
       future._setValue(value);
       return;
     }
     index -= 1;
   },
   onError: future._setError,
   onDone: () {
     future._setError(new StateError("Not enough elements for elementAt"));
   },
   cancelOnError: true);
 return future;
}

Future<bool> every(bool test(T element)) #

Checks whether test accepts all elements provided by this stream.

Completes the Future when the answer is known. If this stream reports an error, the Future will report that error.

Future<bool> every(bool test(T element)) {
 _FutureImpl<bool> future = new _FutureImpl<bool>();
 StreamSubscription subscription;
 subscription = this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ element) {
       _runUserCode(
         () => test(element),
         (bool isMatch) {
           if (!isMatch) {
             subscription.cancel();
             future._setValue(false);
           }
         },
         _cancelAndError(subscription, future)
       );
     },
     onError: future._setError,
     onDone: () {
       future._setValue(true);
     },
     cancelOnError: true);
 return future;
}

Stream expand(Iterable convert(T value)) #

Creates a new stream from this stream that converts each element into zero or more events.

Each incoming event is converted to an Iterable of new events, and each of these new events are then sent by the returned stream in order.

Stream expand(Iterable convert(T value)) {
 return new _ExpandStream<T, dynamic>(this, convert);
}

Future<T> firstWhere(bool test(T value), {T defaultValue()}) #

Finds the first element of this stream matching test.

Returns a future that is filled with the first element of this stream that test returns true for.

If no such element is found before this stream is done, and a defaultValue function is provided, the result of calling defaultValue becomes the value of the future.

If an error occurs, or if this stream ends without finding a match and with no defaultValue function provided, the future will receive an error.

Future<T> firstWhere(bool test(T value), {T defaultValue()}) {
 _FutureImpl<T> future = new _FutureImpl<T>();
 StreamSubscription subscription;
 subscription = this.listen(
   // TODO(ahe): Restore type when feature is implemented in dart2js
   // checked mode. http://dartbug.com/7733
   (/*T*/ value) {
     _runUserCode(
       () => test(value),
       (bool isMatch) {
         if (isMatch) {
           subscription.cancel();
           future._setValue(value);
         }
       },
       _cancelAndError(subscription, future)
     );
   },
   onError: future._setError,
   onDone: () {
     if (defaultValue != null) {
       _runUserCode(defaultValue, future._setValue, future._setError);
       return;
     }
     future._setError(new StateError("firstMatch ended without match"));
   },
   cancelOnError: true);
 return future;
}

Future fold(initialValue, combine(previous, T element)) #

Reduces a sequence of values by repeatedly applying combine.

Future fold(var initialValue, combine(var previous, T element)) {
 _FutureImpl result = new _FutureImpl();
 var value = initialValue;
 StreamSubscription subscription;
 subscription = this.listen(
   // TODO(ahe): Restore type when feature is implemented in dart2js
   // checked mode. http://dartbug.com/7733
   (/*T*/ element) {
     _runUserCode(
       () => combine(value, element),
       (newValue) { value = newValue; },
       _cancelAndError(subscription, result)
     );
   },
   onError: (e) {
     result._setError(e);
   },
   onDone: () {
     result._setValue(value);
   },
   cancelOnError: true);
 return result;
}

Future forEach(void action(T element)) #

Executes action on each data event of the stream.

Completes the returned Future when all events of the stream have been processed. Completes the future with an error if the stream has an error event, or if action throws.

Future forEach(void action(T element)) {
 _FutureImpl future = new _FutureImpl();
 StreamSubscription subscription;
 subscription = this.listen(
     // TODO(ahe): Restore type when feature is implemented in dart2js
     // checked mode. http://dartbug.com/7733
     (/*T*/ element) {
       _runUserCode(
         () => action(element),
         (_) {},
         _cancelAndError(subscription, future)
       );
     },
     onError: future._setError,
     onDone: () {
       future._setValue(null);
     },
     cancelOnError: true);
 return future;
}

Stream<T> handleError(void handle(error), {bool test(error)}) #

Creates a wrapper Stream that intercepts some errors from this stream.

If this stream sends an error that matches test, then it is intercepted by the handle function.

An [AsyncError] [:e:] is matched by a test function if test(e) returns true. If test is omitted, every error is considered matching.

If the error is intercepted, the handle function can decide what to do with it. It can throw if it wants to raise a new (or the same) error, or simply return to make the stream forget the error.

If you need to transform an error into a data event, use the more generic Stream.transformEvent to handle the event by writing a data event to the output sink

Stream<T> handleError(void handle( error), { bool test(error) }) {
 return new _HandleErrorStream<T>(this, handle, test);
}

Future<T> lastWhere(bool test(T value), {T defaultValue()}) #

Finds the last element in this stream matching test.

As firstWhere, except that the last matching element is found. That means that the result cannot be provided before this stream is done.

Future<T> lastWhere(bool test(T value), {T defaultValue()}) {
 _FutureImpl<T> future = new _FutureImpl<T>();
 T result = null;
 bool foundResult = false;
 StreamSubscription subscription;
 subscription = this.listen(
   // TODO(ahe): Restore type when feature is implemented in dart2js
   // checked mode. http://dartbug.com/7733
   (/*T*/ value) {
     _runUserCode(
       () => true == test(value),
       (bool isMatch) {
         if (isMatch) {
           foundResult = true;
           result = value;
         }
       },
       _cancelAndError(subscription, future)
     );
   },
   onError: future._setError,
   onDone: () {
     if (foundResult) {
       future._setValue(result);
       return;
     }
     if (defaultValue != null) {
       _runUserCode(defaultValue, future._setValue, future._setError);
       return;
     }
     future._setError(new StateError("lastMatch ended without match"));
   },
   cancelOnError: true);
 return future;
}

abstract StreamSubscription<T> listen(void onData(T event), {void onError(error), void onDone(), bool cancelOnError}) #

Adds a subscription to this stream.

On each data event from this stream, the subscriber's onData handler is called. If onData is null, nothing happens.

On errors from this stream, the onError handler is given a object describing the error.

If this stream closes, the onDone handler is called.

If cancelOnError is true, the subscription is ended when the first error is reported. The default is false.

Stream map(convert(T event)) #

Creates a new stream that converts each element of this stream to a new value using the convert function.

Stream map(convert(T event)) {
 return new _MapStream<T, dynamic>(this, convert);
}

Future pipe(StreamConsumer<T> streamConsumer) #

Binds this stream as the input of the provided StreamConsumer.

Future pipe(StreamConsumer<T> streamConsumer) {
 return streamConsumer.addStream(this).then((_) => streamConsumer.close());
}

Future<T> reduce(T combine(T previous, T element)) #

Reduces a sequence of values by repeatedly applying combine.

Future<T> reduce(T combine(T previous, T element)) {
 _FutureImpl<T> result = new _FutureImpl<T>();
 bool seenFirst = false;
 T value;
 StreamSubscription subscription;
 subscription = this.listen(
   // TODO(ahe): Restore type when feature is implemented in dart2js
   // checked mode. http://dartbug.com/7733
   (/* T */ element) {
     if (seenFirst) {
       _runUserCode(() => combine(value, element),
                    (T newValue) { value = newValue; },
                    _cancelAndError(subscription, result));
     } else {
       value = element;
       seenFirst = true;
     }
   },
   onError: result._setError,
   onDone: () {
     if (!seenFirst) {
       result._setError(new StateError("No elements"));
     } else {
       result._setValue(value);
     }
   },
   cancelOnError: true
 );
 return result;
}

Future<T> singleWhere(bool test(T value)) #

Finds the single element in this stream matching test.

Like lastMatch, except that it is an error if more than one matching element occurs in the stream.

Future<T> singleWhere(bool test(T value)) {
 _FutureImpl<T> future = new _FutureImpl<T>();
 T result = null;
 bool foundResult = false;
 StreamSubscription subscription;
 subscription = this.listen(
   // TODO(ahe): Restore type when feature is implemented in dart2js
   // checked mode. http://dartbug.com/7733
   (/*T*/ value) {
     _runUserCode(
       () => true == test(value),
       (bool isMatch) {
         if (isMatch) {
           if (foundResult) {
             subscription.cancel();
             future._setError(
                 new StateError('Multiple matches for "single"'));
             return;
           }
           foundResult = true;
           result = value;
         }
       },
       _cancelAndError(subscription, future)
     );
   },
   onError: future._setError,
   onDone: () {
     if (foundResult) {
       future._setValue(result);
       return;
     }
     future._setError(new StateError("single ended without match"));
   },
   cancelOnError: true);
 return future;
}

Stream<T> skip(int count) #

Skips the first count data events from this stream.

Stream<T> skip(int count) {
 return new _SkipStream(this, count);
}

Stream<T> skipWhile(bool test(T value)) #

Skip data events from this stream while they are matched by test.

Error and done events are provided by the returned stream unmodified.

Starting with the first data event where test returns true for the event data, the returned stream will have the same events as this stream.

Stream<T> skipWhile(bool test(T value)) {
 return new _SkipWhileStream(this, test);
}

Stream<T> take(int count) #

Provides at most the first n values of this stream.

Forwards the first n data events of this stream, and all error events, to the returned stream, and ends with a done event.

If this stream produces fewer than count values before it's done, so will the returned stream.

Stream<T> take(int count) {
 return new _TakeStream(this, count);
}

Stream<T> takeWhile(bool test(T value)) #

Forwards data events while test is successful.

The returned stream provides the same events as this stream as long as test returns true for the event data. The stream is done when either this stream is done, or when this stream first provides a value that test doesn't accept.

Stream<T> takeWhile(bool test(T value)) {
 return new _TakeWhileStream(this, test);
}

Future<List<T>> toList() #

Collects the data of this stream in a List.

Future<List<T>> toList() {
 List<T> result = <T>[];
 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>();
 this.listen(
   // TODO(ahe): Restore type when feature is implemented in dart2js
   // checked mode. http://dartbug.com/7733
   (/*T*/ data) {
     result.add(data);
   },
   onError: future._setError,
   onDone: () {
     future._setValue(result);
   },
   cancelOnError: true);
 return future;
}

Future<Set<T>> toSet() #

Collects the data of this stream in a Set.

Future<Set<T>> toSet() {
 Set<T> result = new Set<T>();
 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>();
 this.listen(
   // TODO(ahe): Restore type when feature is implemented in dart2js
   // checked mode. http://dartbug.com/7733
   (/*T*/ data) {
     result.add(data);
   },
   onError: future._setError,
   onDone: () {
     future._setValue(result);
   },
   cancelOnError: true);
 return future;
}

Stream transform(StreamTransformer<T, dynamic> streamTransformer) #

Chains this stream as the input of the provided StreamTransformer.

Returns the result of streamTransformer.bind itself.

Stream transform(StreamTransformer<T, dynamic> streamTransformer) {
 return streamTransformer.bind(this);
}

Stream<T> where(bool test(T event)) #

Creates a new stream from this stream that discards some data events.

The new stream sends the same error and done events as this stream, but it only sends the data events that satisfy the test.

Stream<T> where(bool test(T event)) {
 return new _WhereStream<T>(this, test);
}