Mastering RxDart: Advanced Techniques for Flutter Developers

Gideon SalamiGideon Salami
5 min read

Welcome, Flutter developers! If you've been using RxDart for a while and are ready to take your skills to the next level, you're in the right place. This intermediate guide will explore more advanced concepts and techniques in RxDart, helping you write cleaner, more efficient, and more powerful Flutter applications. We'll dive deep into complex stream operations, error handling, custom operators, and real-world patterns that will elevate your reactive programming skills.

  1. Advanced Observable Patterns

1.1 Combining Streams: Often, you'll need to work with multiple streams simultaneously. RxDart provides several powerful operators for combining streams.

a) Zip: The zip operator combines the emissions of multiple Observables using a specified function.

final Stream<int> stream1 = Stream.periodic(Duration(seconds: 1), (i) => i).take(3);
final Stream<String> stream2 = Stream.fromIterable(['A', 'B', 'C']);

Observable.zip2(stream1, stream2, (int a, String b) => '$a$b')
    .listen(print);

// Output: 0A, 1B, 2C

b) Merge: The merge operator combines multiple Observables into one Observable that emits all values from its sources as they arrive.

final stream1 = Stream.periodic(Duration(seconds: 1), (i) => 'Stream 1: $i').take(3);
final stream2 = Stream.periodic(Duration(milliseconds: 1500), (i) => 'Stream 2: $i').take(3);

Observable.merge([stream1, stream2])
    .listen(print);

// Output (approximate timing):
// Stream 1: 0
// Stream 2: 0
// Stream 1: 1
// Stream 2: 1
// Stream 1: 2
// Stream 2: 2

c) Combine Latest: This operator combines the latest values from multiple Observables whenever any of the Observables emit a value.

final subject1 = BehaviorSubject<int>.seeded(1);
final subject2 = BehaviorSubject<String>.seeded('A');

Observable.combineLatest2(
  subject1.stream, 
  subject2.stream, 
  (int a, String b) => '$a$b'
).listen(print);

subject1.add(2);
subject2.add('B');
subject1.add(3);

// Output: 1A, 2A, 2B, 3B

1.2 Transforming Streams: RxDart provides powerful operators for transforming stream data.

a) SwitchMap: SwitchMap is perfect for scenarios where you want to cancel previous operations when a new one starts, like in search functionality.

final searchTerms = PublishSubject<String>();

searchTerms
    .debounceTime(Duration(milliseconds: 300))
    .switchMap((query) => 
        Observable.fromFuture(fetchSearchResults(query)))
    .listen(print);

searchTerms.add('Flutter');
searchTerms.add('Dart');
searchTerms.add('RxDart');

// Only the results for 'RxDart' will be printed

b) FlatMap: FlatMap allows you to create a new Observable for each emitted item and then flattens the emissions from all of these Observables into a single Observable.

final usersSubject = PublishSubject<String>();

usersSubject
    .flatMap((user) => 
        Observable.fromFuture(fetchUserDetails(user)))
    .listen(print);

usersSubject.add('Alice');
usersSubject.add('Bob');

// This will print details for both Alice and Bob
  1. Error Handling and Recovery

Proper error handling is crucial in reactive programming. RxDart provides tools to gracefully handle and recover from errors.

2.1 Catch Errors: The catchError operator allows you to handle errors and optionally recover from them.

Observable.fromCallable(() => throw Exception('Oops!'))
    .catchError((error, stackTrace) {
      print('Caught error: $error');
      return 'Recovered value';
    })
    .listen(
      print,
      onError: (e) => print('This won\'t be called'),
      onDone: () => print('Done'),
    );

// Output:
// Caught error: Exception: Oops!
// Recovered value
// Done

2.2 Retry: The retry operator allows you to automatically retry a failed Observable a specified number of times.

int attemptCount = 0;

Observable.fromCallable(() {
  attemptCount++;
  if (attemptCount < 3) throw Exception('Not yet!');
  return 'Success!';
})
.retry(3)
.listen(
  print,
  onError: (e) => print('Failed after 3 attempts'),
);

// Output: Success!
  1. Custom Operators

Creating custom operators allows you to encapsulate complex stream transformations and reuse them across your application.

extension CustomOperators<T> on Stream<T> {
  Stream<R> mapIndexed<R>(R Function(T event, int index) convert) {
    return this.withIndex().map((indexed) => convert(indexed.value, indexed.index));
  }

  Stream<T> whereNotNull() => where((event) => event != null);
}

// Usage
final numbers = BehaviorSubject<int?>();
numbers.stream
    .whereNotNull()
    .mapIndexed((value, index) => 'Item $index: $value')
    .listen(print);

numbers.add(null);
numbers.add(1);
numbers.add(2);
numbers.add(null);
numbers.add(3);

// Output:
// Item 0: 1
// Item 1: 2
// Item 2: 3
  1. Advanced BLoC Pattern

Let's create a more complex BLoC that manages a paginated list with pull-to-refresh and infinite scrolling.

class PaginatedListBloc {
  final _page = BehaviorSubject<int>.seeded(1);
  final _items = BehaviorSubject<List<String>>.seeded([]);
  final _isLoading = BehaviorSubject<bool>.seeded(false);
  final _hasError = BehaviorSubject<bool>.seeded(false);

  Stream<List<String>> get items => _items.stream;
  Stream<bool> get isLoading => _isLoading.stream;
  Stream<bool> get hasError => _hasError.stream;

  PaginatedListBloc() {
    _page
        .distinct()
        .switchMap(_fetchItems)
        .listen(_updateItems, onError: _handleError);
  }

  void loadNextPage() => _page.add(_page.value + 1);
  void refresh() => _page.add(1);

  Stream<List<String>> _fetchItems(int page) async* {
    _isLoading.add(true);
    _hasError.add(false);
    try {
      await Future.delayed(Duration(seconds: 1)); // Simulate API call
      yield List.generate(20, (i) => 'Item ${(page - 1) * 20 + i + 1}');
    } catch (e) {
      _hasError.add(true);
      yield [];
    } finally {
      _isLoading.add(false);
    }
  }

  void _updateItems(List<String> newItems) {
    final currentItems = _items.value;
    _items.add(_page.value == 1 ? newItems : [...currentItems, ...newItems]);
  }

  void _handleError(error) {
    print('Error fetching items: $error');
    _hasError.add(true);
  }

  void dispose() {
    _page.close();
    _items.close();
    _isLoading.close();
    _hasError.close();
  }
}

Using this BLoC in a Flutter widget:

class PaginatedListWidget extends StatelessWidget {
  final bloc = PaginatedListBloc();

  @override
  Widget build(BuildContext context) {
    return StreamBuilder<List<String>>(
      stream: bloc.items,
      builder: (context, snapshot) {
        if (!snapshot.hasData) return CircularProgressIndicator();
        return RefreshIndicator(
          onRefresh: () async => bloc.refresh(),
          child: ListView.builder(
            itemCount: snapshot.data!.length + 1,
            itemBuilder: (context, index) {
              if (index == snapshot.data!.length) {
                bloc.loadNextPage();
                return StreamBuilder<bool>(
                  stream: bloc.isLoading,
                  builder: (context, loadingSnapshot) {
                    return loadingSnapshot.data == true
                        ? CircularProgressIndicator()
                        : SizedBox();
                  },
                );
              }
              return ListTile(title: Text(snapshot.data![index]));
            },
          ),
        );
      },
    );
  }
}
  1. Testing RxDart Code

Testing is crucial for maintaining reliable reactive code. Here's an example of how to test our PaginatedListBloc:

import 'package:test/test.dart';
import 'package:rxdart/rxdart.dart';

void main() {
  group('PaginatedListBloc', () {
    late PaginatedListBloc bloc;

    setUp(() {
      bloc = PaginatedListBloc();
    });

    tearDown(() {
      bloc.dispose();
    });

    test('initial state', () {
      expect(bloc.items, emits([]));
      expect(bloc.isLoading, emits(false));
      expect(bloc.hasError, emits(false));
    });

    test('load first page', () {
      bloc.refresh();
      expect(bloc.items, emitsThrough(hasLength(20)));
      expect(bloc.isLoading, emitsThrough(false));
    });

    test('load next page', () {
      bloc.refresh();
      bloc.loadNextPage();
      expect(bloc.items, emitsThrough(hasLength(40)));
    });

    // Add more tests for error scenarios, multiple page loads, etc.
  });
}

You've now explored some of the more advanced features of RxDart, including complex stream operations, error handling, custom operators, and a real-world example of managing paginated data. These techniques will allow you to handle more complex scenarios in your Flutter apps with ease and elegance.

Remember, the key to mastering RxDart is practice and experimentation. Don't be afraid to push the boundaries and create your own operators and patterns to suit your specific needs. As you continue to work with RxDart, you'll discover even more powerful ways to manage your app's data flow and create responsive, efficient Flutter applications.

In the next level, we'll explore even more advanced topics such as schedulers, hot and cold observables, and advanced testing techniques. Until then, keep coding reactively, and watch your Flutter apps become more responsive and maintainable than ever before!

@gideonsalamii - X (formerly twitter)

0
Subscribe to my newsletter

Read articles from Gideon Salami directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Gideon Salami
Gideon Salami

A skilled software engineer with expertise in Flutter and other frameworks, I have a passion for developing robust and efficient applications. With experience in both web and mobile app development, I am able to bring a unique perspective to each project.