How to Wait for First Element of Stream in Flutter

Abu HurayrahAbu Hurayrah
7 min read

When building Flutter applications with reactive state management, you'll often encounter scenarios where you need to initialize your app state from a stream-based data source. This is particularly common when using reactive databases like ObjectBox, Supabase, etc. that provide stream-based APIs for real-time data updates.

Bonus Productivity Tip for Developers

Just a quick heads-up, if you're a developer looking to work faster and to boost your productivity, check out our tool VoiceHype—a powerful SaaS product built specifically for developers who want to speak instead of type. With VoiceHype, you can not only generate accurate transcriptions by voice, but also optimize with advanced LLMs like Claude. It supports multiple smart modes tailored to different tasks and use-cases. Alhamdulillah, it's available as a VS Code extension—just search for "VoiceHype" on the marketplace and give it a try. It's made with developers in mind, and we hope you'll find it truly useful, InshaAllah.

Checkout https://voicehype.ai.

The Problem: Stream Initialization vs. Continuous Updates

Consider this common scenario: You have a user profile stored in a local database, and you want to:

  1. Initialize your app by waiting for the user profile to load

  2. Reactively update the UI whenever the profile changes

The challenge arises because streams are designed to emit multiple values over time, but during initialization, you specifically need to wait for just the first emission before proceeding.

The Solution: Single Stream with Completer Pattern

Here's an efficient solution that uses a single stream with a Completer to handle both initialization and ongoing updates:

import 'dart:async';
import 'package:escape/models/user_profile_model.dart' as user_profile;
import 'package:riverpod_annotation/riverpod_annotation.dart';
import '../repositories/user_profile_repository.dart';

part 'user_profile_provider.g.dart';

/// A provider that handles user profile operations
/// This provider has keepAlive: true since there's only one user profile
@Riverpod(keepAlive: true)
class UserProfile extends _$UserProfile {
  @override
  Future<user_profile.UserProfile?> build() async {
    final repository = ref.read(userProfileRepositoryProvider.notifier);

    // Create stream only once
    final stream = repository.watchUserProfile();

    // Use a Completer to handle the first value
    final completer = Completer<user_profile.UserProfile?>();
    bool isFirst = true;

    StreamSubscription subscription = stream.listen(
      (profile) {
        if (isFirst) {
          // Complete initialization with first value
          completer.complete(profile);
          isFirst = false;
        } else {
          // Handle subsequent updates
          state = AsyncValue.data(profile);
        }
      },
      onError: (error, stackTrace) {
        if (isFirst) {
          completer.completeError(error, stackTrace);
        } else {
          state = AsyncValue.error(error, stackTrace);
        }
      },
    );

    ref.onDispose(() {
      subscription.cancel();
    });

    // Wait for first emission
    return await completer.future;
  }

  /// Save the user profile
  Future<int> saveProfile(user_profile.UserProfile profile) async {
    return await ref
        .read(userProfileRepositoryProvider.notifier)
        .saveUserProfile(profile);
  }

  /// Delete the user profile
  Future<bool> deleteProfile() async {
    return await ref
        .read(userProfileRepositoryProvider.notifier)
        .deleteUserProfile();
  }
}

Why This Pattern Works

1. Single Stream Creation

Unlike the previous approach that created the stream twice, this pattern creates the stream only once, avoiding duplicate database queries and improving efficiency.

2. Completer for Initialization

The Completer<UserProfile?> allows us to:

  • Wait for the first stream emission during initialization

  • Complete the provider's build() method only after receiving initial data

  • Handle both success and error cases during initialization

3. State Management for Updates

After initialization, subsequent stream emissions directly update the provider's state using AsyncValue.data(), maintaining reactivity throughout the app's lifecycle.

4. Proper Error Handling

The pattern handles errors differently based on timing:

  • During initialization: Errors complete the completer with an error

  • After initialization: Errors update the provider state with AsyncValue.error()

Repository Implementation

Here's how the repository layer supports this pattern:

@Riverpod(keepAlive: true)
class UserProfileRepository extends _$UserProfileRepository {
  late Box<UserProfile> _userProfileBox;

  @override
  FutureOr<void> build() async {
    _userProfileBox = ref
        .read(objectboxProvider)
        .requireValue
        .store
        .box<UserProfile>();
  }

  // Watch for changes to the user profile
  Stream<UserProfile?> watchUserProfile() {
    return _userProfileBox
        .query(UserProfile_.id.equals(1))
        .watch(triggerImmediately: true)
        .asyncMap((query) async {
          final result = await query.findFirstAsync();
          return result;
        });
  }

  // Save user profile
  Future<int> saveUserProfile(UserProfile profile) async {
    final profileWithId = profile.copyWith(id: 1);
    final id = _userProfileBox.put(profileWithId);
    return id;
  }

  // Delete user profile
  Future<bool> deleteUserProfile() async {
    try {
      final query = _userProfileBox.query(UserProfile_.id.equals(1));
      final count = query.remove();
      return count > 0;
    } catch (e) {
      return false;
    }
  }
}

App Startup Integration

This pattern works seamlessly in app startup scenarios:

@Riverpod(keepAlive: true)
class AppStartup extends _$AppStartup {
  @override
  Future<bool> build() async {
    return await _initializeApp();
  }

  Future<bool> _initializeApp() async {
    try {
      // Initialize ObjectBox
      await ref.read(objectboxProvider.future);

      // Initialize user profile (waits for first stream element)
      await ref.read(userProfileProvider.future);

      // Initialize other providers...
      await ref.read(themeModeNotifierProvider.future);

      return true;
    } catch (e, st) {
      debugPrint('App startup error: $e');
      debugPrintStack(stackTrace: st);
      rethrow;
    }
  }
}

Key Benefits

1. Efficiency

  • Single stream creation: No duplicate database queries

  • Minimal resource usage: One subscription handles both initialization and updates

  • Clean state management: Direct state updates after initialization

2. Deterministic Initialization

Your app won't proceed until the essential data is loaded, preventing race conditions and null state issues.

3. Automatic Reactivity

Once initialized, the UI automatically updates when the underlying data changes, without any additional code.

4. Robust Error Handling

Both initialization and runtime errors are handled appropriately, with different strategies for each phase.

5. Clean Resource Management

The ref.onDispose() callback ensures that stream subscriptions are properly cleaned up when the provider is disposed.

Best Practices

  1. Always Clean Up: Use ref.onDispose() to cancel stream subscriptions

  2. Handle Errors Properly: Separate error handling for initialization vs. runtime updates

  3. Keep Alive: Use keepAlive: true for providers that should persist throughout the app lifecycle

  4. Use Completer Pattern: For scenarios where you need to wait for the first stream emission

  5. Single Stream Creation: Avoid creating multiple streams for the same data source

Alternative Approaches

While the Completer pattern shown above is efficient, here are other valid approaches:

StreamProvider Approach

@Riverpod(keepAlive: true)
Stream<UserProfile?> userProfileStream(UserProfileStreamRef ref) {
  final repository = ref.read(userProfileRepositoryProvider.notifier);
  return repository.watchUserProfile();
}

@Riverpod(keepAlive: true)
Future<UserProfile?> userProfile(UserProfileRef ref) async {
  final stream = ref.watch(userProfileStreamProvider);
  return await stream.first;
}

This approach separates concerns clearly but may be overkill for simple use cases.

UPDATE: The Stream Pattern Mistake: Learning from Inefficient Code

The Inefficient Approach: Creating Multiple Streams

When I first tackled the problem of waiting for stream initialization while maintaining reactive updates, I attempted a "First + Listen" approach that seemed logical but had critical flaws:

// ❌ INEFFICIENT - Don't do this!
@Riverpod(keepAlive: true)
class UserProfile extends _$UserProfile {
  @override
  Future<UserProfile?> build() async {
    final repository = ref.read(userProfileRepositoryProvider.notifier);

    // Step 1: Get the stream
    Stream<UserProfile?> stream = repository.watchUserProfile();

    // Step 2: Wait for the first element (initialization)
    UserProfile? profile = await stream.first;

    // Step 3: Create a SECOND stream for ongoing updates
    stream = repository.watchUserProfile(); // ❌ This creates another query!

    // Step 4: Listen to future changes
    StreamSubscription listener = stream.listen((profile) {
      state = AsyncValue.data(profile);
    });

    // Step 5: Clean up on dispose
    ref.onDispose(() {
      listener.cancel();
    });

    return profile;
  }
}

Why This Approach is Problematic

1. Double Stream Creation

The most critical issue is creating the stream twice:

  • First call: repository.watchUserProfile() for initialization

  • Second call: repository.watchUserProfile() for ongoing updates

Each call typically results in a new database query or subscription, doubling the resource usage.

2. Resource Waste

  • Two separate database subscriptions to the same data

  • Unnecessary memory allocation for duplicate streams

  • Potential network requests if dealing with remote data sources

The Better Solution

Instead of creating two streams, the efficient approach uses a single stream with a Completer:

// ✅ EFFICIENT - Single stream approach
@override
Future<UserProfile?> build() async {
  final repository = ref.read(userProfileRepositoryProvider.notifier);

  // Create stream only once
  final stream = repository.watchUserProfile();

  // Use a Completer to handle the first value
  final completer = Completer<UserProfile?>();
  bool isFirst = true;

  StreamSubscription subscription = stream.listen(
    (profile) {
      if (isFirst) {
        completer.complete(profile);
        isFirst = false;
      } else {
        state = AsyncValue.data(profile);
      }
    },
    onError: (error, stackTrace) {
      if (isFirst) {
        completer.completeError(error, stackTrace);
      } else {
        state = AsyncValue.error(error, stackTrace);
      }
    },
  );

  ref.onDispose(() => subscription.cancel());
  return await completer.future;
}

This approach eliminates the problems of the inefficient version while maintaining the same functionality.

Conclusion

The single stream with Completer pattern provides an efficient solution for scenarios where you need both initialization guarantees and reactive updates from stream-based data sources. It eliminates the inefficiency of creating multiple streams while maintaining all the benefits of reactive programming.

This pattern is particularly powerful in Flutter applications using reactive databases and state management solutions like Riverpod, ensuring your app initializes predictably while maintaining the real-time updates that make modern Flutter applications feel responsive and alive.

Whether you're working with ObjectBox, Supabase, or any other stream-based data source, this approach will help you build more robust, efficient, and maintainable Flutter applications.

0
Subscribe to my newsletter

Read articles from Abu Hurayrah directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Abu Hurayrah
Abu Hurayrah