Stream<GroupedEvents<K, T>> groupBy<K>(K key(T event))

Groups events by a computed key.

A key is extracted from incoming events. The first time a key is seen, a stream is created for it, and emitted on the returned stream, along with the key, as a GroupedEvents object. Then the event is emitted on the stream (GroupedEvents.values) corresponding to the key.

An error on the source stream, or when calling the key functions, will emit the error on the returned stream.

Canceling the subscription on the returned stream will stop processing and close the streams for all groups.

Pausing the subscription on the returned stream will pause processing and no further events are added to streams for the individual groups.

Pausing or canceling an individual group stream has no effect other than on that stream. Events will be queued while the group stream is paused and until it is first listened to. If the GroupedEvents.values stream is never listened to, it will enqueue all the events unnecessarily.

Source

Stream<GroupedEvents<K, T>> groupBy<K>(K key(T event)) {
  var controller;
  controller = new StreamController<GroupedEvents<K, T>>(
      sync: true,
      onListen: () {
        var groupControllers = new HashMap<K, StreamController<T>>();

        void closeAll() {
          for (var groupController in groupControllers.values) {
            groupController.close();
          }
        }

        var subscription = this.listen(
            (data) {
              K theKey;
              try {
                theKey = key(data);
              } catch (error, stackTrace) {
                controller.addError(error, stackTrace);
                return;
              }
              var groupController = groupControllers[theKey];
              if (groupController == null) {
                groupController =
                    new StreamController<T>.broadcast(sync: true);
                groupControllers[theKey] = groupController;
                controller.add(
                    new GroupedEvents<K, T>(theKey, groupController.stream));
              }
              groupController.add(data);
            },
            onError: controller.addError,
            onDone: () {
              controller.close();
              closeAll();
            });
        controller.onPause = subscription.pause;
        controller.onResume = subscription.resume;
        controller.onCancel = () {
          subscription.cancel();
          // Don't fire sync events in response to a callback.
          scheduleMicrotask(closeAll);
        };
      });
  return controller.stream;
}