How to Wait for First Element of Stream in Flutter


When building Flutter applications with reactive state management, you'll often encounter scenarios where you need to initialize your app state from a stream-based data source. This is particularly common when using reactive databases like ObjectBox, Supabase, etc. that provide stream-based APIs for real-time data updates.
Bonus Productivity Tip for Developers
Just a quick heads-up, if you're a developer looking to work faster and to boost your productivity, check out our tool VoiceHype—a powerful SaaS product built specifically for developers who want to speak instead of type. With VoiceHype, you can not only generate accurate transcriptions by voice, but also optimize with advanced LLMs like Claude. It supports multiple smart modes tailored to different tasks and use-cases. Alhamdulillah, it's available as a VS Code extension—just search for "VoiceHype" on the marketplace and give it a try. It's made with developers in mind, and we hope you'll find it truly useful, InshaAllah.
Checkout https://voicehype.ai.
The Problem: Stream Initialization vs. Continuous Updates
Consider this common scenario: You have a user profile stored in a local database, and you want to:
Initialize your app by waiting for the user profile to load
Reactively update the UI whenever the profile changes
The challenge arises because streams are designed to emit multiple values over time, but during initialization, you specifically need to wait for just the first emission before proceeding.
The Solution: Single Stream with Completer Pattern
Here's an efficient solution that uses a single stream with a Completer
to handle both initialization and ongoing updates:
import 'dart:async';
import 'package:escape/models/user_profile_model.dart' as user_profile;
import 'package:riverpod_annotation/riverpod_annotation.dart';
import '../repositories/user_profile_repository.dart';
part 'user_profile_provider.g.dart';
/// A provider that handles user profile operations
/// This provider has keepAlive: true since there's only one user profile
@Riverpod(keepAlive: true)
class UserProfile extends _$UserProfile {
@override
Future<user_profile.UserProfile?> build() async {
final repository = ref.read(userProfileRepositoryProvider.notifier);
// Create stream only once
final stream = repository.watchUserProfile();
// Use a Completer to handle the first value
final completer = Completer<user_profile.UserProfile?>();
bool isFirst = true;
StreamSubscription subscription = stream.listen(
(profile) {
if (isFirst) {
// Complete initialization with first value
completer.complete(profile);
isFirst = false;
} else {
// Handle subsequent updates
state = AsyncValue.data(profile);
}
},
onError: (error, stackTrace) {
if (isFirst) {
completer.completeError(error, stackTrace);
} else {
state = AsyncValue.error(error, stackTrace);
}
},
);
ref.onDispose(() {
subscription.cancel();
});
// Wait for first emission
return await completer.future;
}
/// Save the user profile
Future<int> saveProfile(user_profile.UserProfile profile) async {
return await ref
.read(userProfileRepositoryProvider.notifier)
.saveUserProfile(profile);
}
/// Delete the user profile
Future<bool> deleteProfile() async {
return await ref
.read(userProfileRepositoryProvider.notifier)
.deleteUserProfile();
}
}
Why This Pattern Works
1. Single Stream Creation
Unlike the previous approach that created the stream twice, this pattern creates the stream only once, avoiding duplicate database queries and improving efficiency.
2. Completer for Initialization
The Completer<UserProfile?>
allows us to:
Wait for the first stream emission during initialization
Complete the provider's
build()
method only after receiving initial dataHandle both success and error cases during initialization
3. State Management for Updates
After initialization, subsequent stream emissions directly update the provider's state using AsyncValue.data()
, maintaining reactivity throughout the app's lifecycle.
4. Proper Error Handling
The pattern handles errors differently based on timing:
During initialization: Errors complete the completer with an error
After initialization: Errors update the provider state with
AsyncValue.error()
Repository Implementation
Here's how the repository layer supports this pattern:
@Riverpod(keepAlive: true)
class UserProfileRepository extends _$UserProfileRepository {
late Box<UserProfile> _userProfileBox;
@override
FutureOr<void> build() async {
_userProfileBox = ref
.read(objectboxProvider)
.requireValue
.store
.box<UserProfile>();
}
// Watch for changes to the user profile
Stream<UserProfile?> watchUserProfile() {
return _userProfileBox
.query(UserProfile_.id.equals(1))
.watch(triggerImmediately: true)
.asyncMap((query) async {
final result = await query.findFirstAsync();
return result;
});
}
// Save user profile
Future<int> saveUserProfile(UserProfile profile) async {
final profileWithId = profile.copyWith(id: 1);
final id = _userProfileBox.put(profileWithId);
return id;
}
// Delete user profile
Future<bool> deleteUserProfile() async {
try {
final query = _userProfileBox.query(UserProfile_.id.equals(1));
final count = query.remove();
return count > 0;
} catch (e) {
return false;
}
}
}
App Startup Integration
This pattern works seamlessly in app startup scenarios:
@Riverpod(keepAlive: true)
class AppStartup extends _$AppStartup {
@override
Future<bool> build() async {
return await _initializeApp();
}
Future<bool> _initializeApp() async {
try {
// Initialize ObjectBox
await ref.read(objectboxProvider.future);
// Initialize user profile (waits for first stream element)
await ref.read(userProfileProvider.future);
// Initialize other providers...
await ref.read(themeModeNotifierProvider.future);
return true;
} catch (e, st) {
debugPrint('App startup error: $e');
debugPrintStack(stackTrace: st);
rethrow;
}
}
}
Key Benefits
1. Efficiency
Single stream creation: No duplicate database queries
Minimal resource usage: One subscription handles both initialization and updates
Clean state management: Direct state updates after initialization
2. Deterministic Initialization
Your app won't proceed until the essential data is loaded, preventing race conditions and null state issues.
3. Automatic Reactivity
Once initialized, the UI automatically updates when the underlying data changes, without any additional code.
4. Robust Error Handling
Both initialization and runtime errors are handled appropriately, with different strategies for each phase.
5. Clean Resource Management
The ref.onDispose()
callback ensures that stream subscriptions are properly cleaned up when the provider is disposed.
Best Practices
Always Clean Up: Use
ref.onDispose()
to cancel stream subscriptionsHandle Errors Properly: Separate error handling for initialization vs. runtime updates
Keep Alive: Use
keepAlive: true
for providers that should persist throughout the app lifecycleUse Completer Pattern: For scenarios where you need to wait for the first stream emission
Single Stream Creation: Avoid creating multiple streams for the same data source
Alternative Approaches
While the Completer pattern shown above is efficient, here are other valid approaches:
StreamProvider Approach
@Riverpod(keepAlive: true)
Stream<UserProfile?> userProfileStream(UserProfileStreamRef ref) {
final repository = ref.read(userProfileRepositoryProvider.notifier);
return repository.watchUserProfile();
}
@Riverpod(keepAlive: true)
Future<UserProfile?> userProfile(UserProfileRef ref) async {
final stream = ref.watch(userProfileStreamProvider);
return await stream.first;
}
This approach separates concerns clearly but may be overkill for simple use cases.
UPDATE: The Stream Pattern Mistake: Learning from Inefficient Code
The Inefficient Approach: Creating Multiple Streams
When I first tackled the problem of waiting for stream initialization while maintaining reactive updates, I attempted a "First + Listen" approach that seemed logical but had critical flaws:
// ❌ INEFFICIENT - Don't do this!
@Riverpod(keepAlive: true)
class UserProfile extends _$UserProfile {
@override
Future<UserProfile?> build() async {
final repository = ref.read(userProfileRepositoryProvider.notifier);
// Step 1: Get the stream
Stream<UserProfile?> stream = repository.watchUserProfile();
// Step 2: Wait for the first element (initialization)
UserProfile? profile = await stream.first;
// Step 3: Create a SECOND stream for ongoing updates
stream = repository.watchUserProfile(); // ❌ This creates another query!
// Step 4: Listen to future changes
StreamSubscription listener = stream.listen((profile) {
state = AsyncValue.data(profile);
});
// Step 5: Clean up on dispose
ref.onDispose(() {
listener.cancel();
});
return profile;
}
}
Why This Approach is Problematic
1. Double Stream Creation
The most critical issue is creating the stream twice:
First call:
repository.watchUserProfile()
for initializationSecond call:
repository.watchUserProfile()
for ongoing updates
Each call typically results in a new database query or subscription, doubling the resource usage.
2. Resource Waste
Two separate database subscriptions to the same data
Unnecessary memory allocation for duplicate streams
Potential network requests if dealing with remote data sources
The Better Solution
Instead of creating two streams, the efficient approach uses a single stream with a Completer
:
// ✅ EFFICIENT - Single stream approach
@override
Future<UserProfile?> build() async {
final repository = ref.read(userProfileRepositoryProvider.notifier);
// Create stream only once
final stream = repository.watchUserProfile();
// Use a Completer to handle the first value
final completer = Completer<UserProfile?>();
bool isFirst = true;
StreamSubscription subscription = stream.listen(
(profile) {
if (isFirst) {
completer.complete(profile);
isFirst = false;
} else {
state = AsyncValue.data(profile);
}
},
onError: (error, stackTrace) {
if (isFirst) {
completer.completeError(error, stackTrace);
} else {
state = AsyncValue.error(error, stackTrace);
}
},
);
ref.onDispose(() => subscription.cancel());
return await completer.future;
}
This approach eliminates the problems of the inefficient version while maintaining the same functionality.
Conclusion
The single stream with Completer pattern provides an efficient solution for scenarios where you need both initialization guarantees and reactive updates from stream-based data sources. It eliminates the inefficiency of creating multiple streams while maintaining all the benefits of reactive programming.
This pattern is particularly powerful in Flutter applications using reactive databases and state management solutions like Riverpod, ensuring your app initializes predictably while maintaining the real-time updates that make modern Flutter applications feel responsive and alive.
Whether you're working with ObjectBox, Supabase, or any other stream-based data source, this approach will help you build more robust, efficient, and maintainable Flutter applications.
Subscribe to my newsletter
Read articles from Abu Hurayrah directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
