How to Combine Streams in Flutter with Maximum Control using Pure, Vanilla Dart


When building reactive Flutter applications, you often need to combine multiple streams to create a unified data flow. While packages like RxDart make this easier with combineLatest
, sometimes you need maximum control or prefer to stick with vanilla Dart. I mean, why download another package for something so simple, right? This article shows you how to combine streams using pure Dart with proper resource management.
Bonus Productivity Tip for Developers
Just a quick heads-up, if you're a developer looking to work faster and to boost your productivity, check out our tool VoiceHype—a powerful SaaS product built specifically for developers who want to speak instead of type. With VoiceHype, you can not only generate accurate transcriptions by voice, but also optimize with advanced LLMs like Claude. It supports multiple smart modes tailored to different tasks and use-cases. Alhamdulillah, it's available as a VS Code extension—just search for "VoiceHype" on the marketplace and give it a try. It's made with developers in mind, and we hope you'll find it truly useful, InshaAllah.
Checkout https://voicehype.ai.
Understanding the Problem
Imagine you're building a prayer tracking app and need to combine three different streams:
Prayer count stream (emits integers)
Best streak stream (emits Streak objects)
Current streak stream (emits Streak objects)
You want to create a single stream that emits QuickStats
objects whenever any of these source streams emit new values.
The Step-by-Step Approach
Step 1: Set Up Your Data Structure
First, define what you'll be emitting:
class QuickStats {
final int totalPrayers;
final int bestStreak;
final int currentMood;
final int progressToGoal;
QuickStats({
required this.totalPrayers,
required this.bestStreak,
required this.currentMood,
required this.progressToGoal,
});
}
Step 2: Create State Variables
You need to track the latest value from each source stream:
int? latestPrayerCount;
Streak? latestBestStreak;
Streak? latestCurrentStreak;
Step 3: Create a StreamController
This will manage your combined output stream:
final controller = StreamController<QuickStats>();
Step 4: Create the Emission Logic
Create a function that builds and emits your combined data:
void emitStats() {
if (latestPrayerCount != null) {
controller.add(QuickStats(
totalPrayers: latestPrayerCount!,
bestStreak: latestBestStreak?.count ?? 0,
currentMood: latestCurrentStreak?.moodIntensity ?? 5,
progressToGoal: latestCurrentStreak != null
? (latestCurrentStreak!.goal > 0
? (latestCurrentStreak!.count / latestCurrentStreak!.goal * 100).round()
: 0)
: 0,
));
}
}
Step 5: Subscribe to Source Streams
Listen to each source stream and update your state:
final subscriptions = <StreamSubscription>[];
subscriptions.add(prayerCountStream.listen((count) {
latestPrayerCount = count;
emitStats();
}));
subscriptions.add(bestStreakStream.listen((streak) {
latestBestStreak = streak;
emitStats();
}));
subscriptions.add(currentStreakStream.listen((streak) {
latestCurrentStreak = streak;
emitStats();
}));
Step 6: Handle Cleanup
Always clean up your resources:
// Later, when you're done:
controller.close();
for (final subscription in subscriptions) {
subscription.cancel();
}
Complete Examples
Example 1: Regular Function
import 'dart:async';
Stream<QuickStats> combineStreams(
Stream<int> prayerCountStream,
Stream<Streak?> bestStreakStream,
Stream<Streak?> currentStreakStream,
) async* {
// State variables to track latest values
int? latestPrayerCount;
Streak? latestBestStreak;
Streak? latestCurrentStreak;
// Controller for the combined output
final controller = StreamController<QuickStats>();
// Function to emit combined stats
void emitStats() {
if (latestPrayerCount != null) {
controller.add(QuickStats(
totalPrayers: latestPrayerCount!,
bestStreak: latestBestStreak?.count ?? 0,
currentMood: latestCurrentStreak?.moodIntensity ?? 5,
progressToGoal: latestCurrentStreak != null
? (latestCurrentStreak!.goal > 0
? (latestCurrentStreak!.count / latestCurrentStreak!.goal * 100).round()
: 0)
: 0,
));
}
}
// Subscribe to all source streams
final subscriptions = <StreamSubscription>[];
subscriptions.add(prayerCountStream.listen((count) {
latestPrayerCount = count;
emitStats();
}));
subscriptions.add(bestStreakStream.listen((streak) {
latestBestStreak = streak;
emitStats();
}));
subscriptions.add(currentStreakStream.listen((streak) {
latestCurrentStreak = streak;
emitStats();
}));
// Yield from the combined stream
yield* controller.stream;
// Note: In a real app, you'd need proper cleanup mechanism
// This is where the Riverpod example below shines
}
Example 2: With Riverpod Provider (Recommended)
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'dart:async';
// Your repository providers
final prayerRepositoryProvider = Provider<PrayerRepository>((ref) {
return PrayerRepository();
});
final streakRepositoryProvider = Provider<StreakRepository>((ref) {
return StreakRepository();
});
// The combined stream provider
final quickStatsProvider = StreamProvider<QuickStats>((ref) {
final prayerRepository = ref.watch(prayerRepositoryProvider);
final streakRepository = ref.watch(streakRepositoryProvider);
return _watchQuickStats(ref, prayerRepository, streakRepository);
});
// The implementation function
Stream<QuickStats> _watchQuickStats(
StreamProviderRef<QuickStats> ref,
PrayerRepository prayerRepository,
StreakRepository streakRepository,
) async* {
final prayerCountStream = prayerRepository.watchCompletedPrayerCount();
final bestStreakStream = streakRepository.watchBestStreak();
final currentStreakStream = streakRepository.watchCurrentStreak();
// State variables to track latest values
int? latestPrayerCount;
Streak? latestBestStreak;
Streak? latestCurrentStreak;
// Controller for the combined output
final controller = StreamController<QuickStats>();
// Function to emit combined stats
void emitStats() {
if (latestPrayerCount != null) {
controller.add(QuickStats(
totalPrayers: latestPrayerCount!,
bestStreak: latestBestStreak?.count ?? 0,
currentMood: latestCurrentStreak?.moodIntensity ?? 5,
progressToGoal: latestCurrentStreak != null
? (latestCurrentStreak!.goal > 0
? (latestCurrentStreak!.count / latestCurrentStreak!.goal * 100).round()
: 0)
: 0,
));
}
}
// Subscribe to all source streams
final subscriptions = <StreamSubscription>[];
subscriptions.add(prayerCountStream.listen((count) {
latestPrayerCount = count;
emitStats();
}));
subscriptions.add(bestStreakStream.listen((streak) {
latestBestStreak = streak;
emitStats();
}));
subscriptions.add(currentStreakStream.listen((streak) {
latestCurrentStreak = streak;
emitStats();
}));
// Yield from the combined stream
yield* controller.stream;
// Cleanup when the provider is disposed
ref.onDispose(() {
controller.close();
for (final subscription in subscriptions) {
subscription.cancel();
}
});
}
Using the Riverpod Provider in Your Widget
class QuickStatsWidget extends ConsumerWidget {
@override
Widget build(BuildContext context, WidgetRef ref) {
final quickStatsAsync = ref.watch(quickStatsProvider);
return quickStatsAsync.when(
data: (stats) => Column(
children: [
Text('Total Prayers: ${stats.totalPrayers}'),
Text('Best Streak: ${stats.bestStreak}'),
Text('Current Mood: ${stats.currentMood}'),
Text('Progress: ${stats.progressToGoal}%'),
],
),
loading: () => CircularProgressIndicator(),
error: (error, stack) => Text('Error: $error'),
);
}
}
Key Concepts to Remember
Streams vs StreamControllers vs StreamSubscriptions
Streams: Data sources that can be listened to multiple times. Don't need to be closed.
StreamControllers: Manage streams and their resources. Must be closed to prevent memory leaks.
StreamSubscriptions: Active listeners to streams. Must be cancelled to stop listening and free resources.
Think of it like a TV analogy:
Stream = TV channel (always broadcasting)
StreamController = TV station (needs proper shutdown)
StreamSubscription = Your cable subscription (needs to be cancelled)
Why This Approach Works
Maximum Control: You decide exactly when and how to emit values
Resource Management: Proper cleanup prevents memory leaks
No Dependencies: Uses only built-in Dart/Flutter APIs
Flexible Logic: Easy to add conditions, transformations, or caching
Debuggable: Clear flow makes it easy to add logging or debugging
When to Use This vs RxDart
Use this vanilla approach when:
You want to minimize dependencies
You need complex emission logic
You want maximum control over resource management
You're already using Riverpod for state management
Use RxDart when:
You need simple stream combination
You're doing lots of stream operations
You want more functional reactive programming features
Conclusion
Combining streams with vanilla Dart gives you complete control over your reactive data flow. While it requires more boilerplate than RxDart, it's incredibly powerful and helps you understand exactly what's happening under the hood. The Riverpod integration makes resource management automatic and clean.
This pattern works great for complex applications where you need precise control over when and how your combined data is emitted. MashaAllah, you now have the tools to build robust reactive Flutter applications!
Subscribe to my newsletter
Read articles from Abu Hurayrah directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
