How to Combine Streams in Flutter with Maximum Control using Pure, Vanilla Dart

Abu HurayrahAbu Hurayrah
6 min read

When building reactive Flutter applications, you often need to combine multiple streams to create a unified data flow. While packages like RxDart make this easier with combineLatest, sometimes you need maximum control or prefer to stick with vanilla Dart. I mean, why download another package for something so simple, right? This article shows you how to combine streams using pure Dart with proper resource management.

Bonus Productivity Tip for Developers

Just a quick heads-up, if you're a developer looking to work faster and to boost your productivity, check out our tool VoiceHype—a powerful SaaS product built specifically for developers who want to speak instead of type. With VoiceHype, you can not only generate accurate transcriptions by voice, but also optimize with advanced LLMs like Claude. It supports multiple smart modes tailored to different tasks and use-cases. Alhamdulillah, it's available as a VS Code extension—just search for "VoiceHype" on the marketplace and give it a try. It's made with developers in mind, and we hope you'll find it truly useful, InshaAllah.

Checkout https://voicehype.ai.

Understanding the Problem

Imagine you're building a prayer tracking app and need to combine three different streams:

  • Prayer count stream (emits integers)

  • Best streak stream (emits Streak objects)

  • Current streak stream (emits Streak objects)

You want to create a single stream that emits QuickStats objects whenever any of these source streams emit new values.

The Step-by-Step Approach

Step 1: Set Up Your Data Structure

First, define what you'll be emitting:

class QuickStats {
  final int totalPrayers;
  final int bestStreak;
  final int currentMood;
  final int progressToGoal;

  QuickStats({
    required this.totalPrayers,
    required this.bestStreak,
    required this.currentMood,
    required this.progressToGoal,
  });
}

Step 2: Create State Variables

You need to track the latest value from each source stream:

int? latestPrayerCount;
Streak? latestBestStreak;
Streak? latestCurrentStreak;

Step 3: Create a StreamController

This will manage your combined output stream:

final controller = StreamController<QuickStats>();

Step 4: Create the Emission Logic

Create a function that builds and emits your combined data:

void emitStats() {
  if (latestPrayerCount != null) {
    controller.add(QuickStats(
      totalPrayers: latestPrayerCount!,
      bestStreak: latestBestStreak?.count ?? 0,
      currentMood: latestCurrentStreak?.moodIntensity ?? 5,
      progressToGoal: latestCurrentStreak != null
          ? (latestCurrentStreak!.goal > 0
              ? (latestCurrentStreak!.count / latestCurrentStreak!.goal * 100).round()
              : 0)
          : 0,
    ));
  }
}

Step 5: Subscribe to Source Streams

Listen to each source stream and update your state:

final subscriptions = <StreamSubscription>[];

subscriptions.add(prayerCountStream.listen((count) {
  latestPrayerCount = count;
  emitStats();
}));

subscriptions.add(bestStreakStream.listen((streak) {
  latestBestStreak = streak;
  emitStats();
}));

subscriptions.add(currentStreakStream.listen((streak) {
  latestCurrentStreak = streak;
  emitStats();
}));

Step 6: Handle Cleanup

Always clean up your resources:

// Later, when you're done:
controller.close();
for (final subscription in subscriptions) {
  subscription.cancel();
}

Complete Examples

Example 1: Regular Function

import 'dart:async';

Stream<QuickStats> combineStreams(
  Stream<int> prayerCountStream,
  Stream<Streak?> bestStreakStream,
  Stream<Streak?> currentStreakStream,
) async* {
  // State variables to track latest values
  int? latestPrayerCount;
  Streak? latestBestStreak;
  Streak? latestCurrentStreak;

  // Controller for the combined output
  final controller = StreamController<QuickStats>();

  // Function to emit combined stats
  void emitStats() {
    if (latestPrayerCount != null) {
      controller.add(QuickStats(
        totalPrayers: latestPrayerCount!,
        bestStreak: latestBestStreak?.count ?? 0,
        currentMood: latestCurrentStreak?.moodIntensity ?? 5,
        progressToGoal: latestCurrentStreak != null
            ? (latestCurrentStreak!.goal > 0
                ? (latestCurrentStreak!.count / latestCurrentStreak!.goal * 100).round()
                : 0)
            : 0,
      ));
    }
  }

  // Subscribe to all source streams
  final subscriptions = <StreamSubscription>[];

  subscriptions.add(prayerCountStream.listen((count) {
    latestPrayerCount = count;
    emitStats();
  }));

  subscriptions.add(bestStreakStream.listen((streak) {
    latestBestStreak = streak;
    emitStats();
  }));

  subscriptions.add(currentStreakStream.listen((streak) {
    latestCurrentStreak = streak;
    emitStats();
  }));

  // Yield from the combined stream
  yield* controller.stream;

  // Note: In a real app, you'd need proper cleanup mechanism
  // This is where the Riverpod example below shines
}
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'dart:async';

// Your repository providers
final prayerRepositoryProvider = Provider<PrayerRepository>((ref) {
  return PrayerRepository();
});

final streakRepositoryProvider = Provider<StreakRepository>((ref) {
  return StreakRepository();
});

// The combined stream provider
final quickStatsProvider = StreamProvider<QuickStats>((ref) {
  final prayerRepository = ref.watch(prayerRepositoryProvider);
  final streakRepository = ref.watch(streakRepositoryProvider);

  return _watchQuickStats(ref, prayerRepository, streakRepository);
});

// The implementation function
Stream<QuickStats> _watchQuickStats(
  StreamProviderRef<QuickStats> ref,
  PrayerRepository prayerRepository,
  StreakRepository streakRepository,
) async* {
  final prayerCountStream = prayerRepository.watchCompletedPrayerCount();
  final bestStreakStream = streakRepository.watchBestStreak();
  final currentStreakStream = streakRepository.watchCurrentStreak();

  // State variables to track latest values
  int? latestPrayerCount;
  Streak? latestBestStreak;
  Streak? latestCurrentStreak;

  // Controller for the combined output
  final controller = StreamController<QuickStats>();

  // Function to emit combined stats
  void emitStats() {
    if (latestPrayerCount != null) {
      controller.add(QuickStats(
        totalPrayers: latestPrayerCount!,
        bestStreak: latestBestStreak?.count ?? 0,
        currentMood: latestCurrentStreak?.moodIntensity ?? 5,
        progressToGoal: latestCurrentStreak != null
            ? (latestCurrentStreak!.goal > 0
                ? (latestCurrentStreak!.count / latestCurrentStreak!.goal * 100).round()
                : 0)
            : 0,
      ));
    }
  }

  // Subscribe to all source streams
  final subscriptions = <StreamSubscription>[];

  subscriptions.add(prayerCountStream.listen((count) {
    latestPrayerCount = count;
    emitStats();
  }));

  subscriptions.add(bestStreakStream.listen((streak) {
    latestBestStreak = streak;
    emitStats();
  }));

  subscriptions.add(currentStreakStream.listen((streak) {
    latestCurrentStreak = streak;
    emitStats();
  }));

  // Yield from the combined stream
  yield* controller.stream;

  // Cleanup when the provider is disposed
  ref.onDispose(() {
    controller.close();
    for (final subscription in subscriptions) {
      subscription.cancel();
    }
  });
}

Using the Riverpod Provider in Your Widget

class QuickStatsWidget extends ConsumerWidget {
  @override
  Widget build(BuildContext context, WidgetRef ref) {
    final quickStatsAsync = ref.watch(quickStatsProvider);

    return quickStatsAsync.when(
      data: (stats) => Column(
        children: [
          Text('Total Prayers: ${stats.totalPrayers}'),
          Text('Best Streak: ${stats.bestStreak}'),
          Text('Current Mood: ${stats.currentMood}'),
          Text('Progress: ${stats.progressToGoal}%'),
        ],
      ),
      loading: () => CircularProgressIndicator(),
      error: (error, stack) => Text('Error: $error'),
    );
  }
}

Key Concepts to Remember

Streams vs StreamControllers vs StreamSubscriptions

  • Streams: Data sources that can be listened to multiple times. Don't need to be closed.

  • StreamControllers: Manage streams and their resources. Must be closed to prevent memory leaks.

  • StreamSubscriptions: Active listeners to streams. Must be cancelled to stop listening and free resources.

Think of it like a TV analogy:

  • Stream = TV channel (always broadcasting)

  • StreamController = TV station (needs proper shutdown)

  • StreamSubscription = Your cable subscription (needs to be cancelled)

Why This Approach Works

  1. Maximum Control: You decide exactly when and how to emit values

  2. Resource Management: Proper cleanup prevents memory leaks

  3. No Dependencies: Uses only built-in Dart/Flutter APIs

  4. Flexible Logic: Easy to add conditions, transformations, or caching

  5. Debuggable: Clear flow makes it easy to add logging or debugging

When to Use This vs RxDart

Use this vanilla approach when:

  • You want to minimize dependencies

  • You need complex emission logic

  • You want maximum control over resource management

  • You're already using Riverpod for state management

Use RxDart when:

  • You need simple stream combination

  • You're doing lots of stream operations

  • You want more functional reactive programming features

Conclusion

Combining streams with vanilla Dart gives you complete control over your reactive data flow. While it requires more boilerplate than RxDart, it's incredibly powerful and helps you understand exactly what's happening under the hood. The Riverpod integration makes resource management automatic and clean.

This pattern works great for complex applications where you need precise control over when and how your combined data is emitted. MashaAllah, you now have the tools to build robust reactive Flutter applications!

0
Subscribe to my newsletter

Read articles from Abu Hurayrah directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Abu Hurayrah
Abu Hurayrah