asyncExpand<E> method

Stream<E> asyncExpand <E>(Stream<E> convert(T event))

Transforms each element into a sequence of asynchronous events.

Returns a new stream and for each event of this stream, do the following:

  • If the event is an error event or a done event, it is emitted directly by the returned stream.
  • Otherwise it is an element. Then the convert function is called with the element as argument to produce a convert-stream for the element.
  • If that call throws, the error is emitted on the returned stream.
  • If the call returns null, no further action is taken for the elements.
  • Otherwise, this stream is paused and convert-stream is listened to. Every data and error event of the convert-stream is emitted on the returned stream in the order it is produced. When the convert-stream ends, this stream is resumed.

The returned stream is a broadcast stream if this stream is.

Implementation

Stream<E> asyncExpand<E>(Stream<E> convert(T event)) {
  StreamController<E> controller;
  StreamSubscription<T> subscription;
  void onListen() {
    assert(controller is _StreamController ||
        controller is _BroadcastStreamController);
    final _EventSink<E> eventSink = controller as Object;
    subscription = this.listen((T event) {
      Stream<E> newStream;
      try {
        newStream = convert(event);
      } catch (e, s) {
        controller.addError(e, s);
        return;
      }
      if (newStream != null) {
        subscription.pause();
        controller.addStream(newStream).whenComplete(subscription.resume);
      }
    },
        onError: eventSink._addError, // Avoid Zone error replacement.
        onDone: controller.close);
  }

  if (this.isBroadcast) {
    controller = new StreamController<E>.broadcast(
        onListen: onListen,
        onCancel: () {
          subscription.cancel();
        },
        sync: true);
  } else {
    controller = new StreamController<E>(
        onListen: onListen,
        onPause: () {
          subscription.pause();
        },
        onResume: () {
          subscription.resume();
        },
        onCancel: () => subscription.cancel(),
        sync: true);
  }
  return controller.stream;
}