Stream创建
转换现有的Stream
/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
// Stores any partial line from the previous chunk.
var partial = '';
// Wait until a new chunk is available, then process it.
await for (final chunk in source) {
var lines = chunk.split('\n');
lines[0] = partial + lines[0]; // Prepend partial line.
partial = lines.removeLast(); // Remove new partial line.
for (final line in lines) {
yield line; // Add lines to output stream.
}
}
// Add final partial line to output stream, if any.
if (partial.isNotEmpty) yield partial;
}
从零开始创建Stream
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
int i = 0;
while (true) {
await Future.delayed(interval);
yield i++;
if (i == maxCount) break;
}
}
// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounterController(Duration interval, [int? maxCount]) {
int counter = 0;
Timer? timer;
late StreamController<int> controller;
void tick(Timer timer) {
counter++;
controller.add(counter);
print('yield $counter'); // Ask stream to send counter values as event.
if (maxCount != null && counter >= maxCount) {
timer.cancel();
controller.close(); // Ask stream to shut down and tell listeners.
}
}
void startTimer() {
timer = Timer.periodic(
interval, tick); // BAD: Starts before it has subscribers.
}
void stopTimer() {
timer?.cancel();
timer = null;
}
controller = StreamController<int>(
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer);
return controller.stream;
}
Stream使用
void useTimedCounterController() async {
final st = timedCounterController(const Duration(seconds: 1), 100);
await Future.delayed(const Duration(seconds: 10));
print('start listen');
late StreamSubscription<int> subscription;
subscription = st.listen((event) {
print('listen $event');
if (event == 5) {
subscription.pause(Future.delayed(const Duration(seconds: 5)));
}
});
}
Future<int> useTimedCounterBasic() async {
final st = timedCounter(const Duration(seconds: 1), 10);
var sum = 0;
try {
await for (final value in st) {
sum += value;
}
} catch (e) {
print('catch $e');
} finally {
print('sum is $sum');
}
return sum;
}
浏览量: 469