Dart Stream用法

Stream创建

转换现有的Stream

/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
  // Stores any partial line from the previous chunk.
  var partial = '';
  // Wait until a new chunk is available, then process it.
  await for (final chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0]; // Prepend partial line.
    partial = lines.removeLast(); // Remove new partial line.
    for (final line in lines) {
      yield line; // Add lines to output stream.
    }
  }
  // Add final partial line to output stream, if any.
  if (partial.isNotEmpty) yield partial;
}

从零开始创建Stream

Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}
// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounterController(Duration interval, [int? maxCount]) {
  int counter = 0;
  Timer? timer;
  late StreamController<int> controller;

  void tick(Timer timer) {
    counter++;
    controller.add(counter);
    print('yield $counter'); // Ask stream to send counter values as event.
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  void startTimer() {
    timer = Timer.periodic(
        interval, tick); // BAD: Starts before it has subscribers.
  }

  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  controller = StreamController<int>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);

  return controller.stream;
}

Stream使用

void useTimedCounterController() async {
  final st = timedCounterController(const Duration(seconds: 1), 100);
  await Future.delayed(const Duration(seconds: 10));
  print('start listen');
  late StreamSubscription<int> subscription;
  subscription = st.listen((event) {
    print('listen $event');
    if (event == 5) {
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}
Future<int> useTimedCounterBasic() async {
  final st = timedCounter(const Duration(seconds: 1), 10);
  var sum = 0;
  try {
    await for (final value in st) {
      sum += value;
    }
  } catch (e) {
    print('catch $e');
  } finally {
    print('sum is $sum');
  }
  return sum;
}

留下评论

您的电子邮箱地址不会被公开。 必填项已用 * 标注

Index