How to implement the queueing mechanism in Dart using the Completer pattern

Oftentimes, we need to implement a queueing mechanism in our application, including but not limited to analytics, image processing, payment processing, streaming, etc. Dart provides an efficient API called Completer which can be used to implement a queueing mechanism in a clean and efficient way.
Consider this real-life scenario — we have a shop and customers are waiting outside the shop and rushing to go inside. A guard is there to prevent them at the gate. The guard only allows customers to go inside once the owner allows him. This is a perfect example of queueing in real life.

Now let’s come to the real-world engineering problem statement that I recently got a chance to work on — We have an analytics plugin that logs events to the server from the mobile app. We want to log events only after the session has started. The session start event is a special event that needs to be logged before any other event. We want to queue all the events that are logged before the session start event and then log them once the session start event is logged.
The typical way to solve this problem is to have a boolean flag that indicates whether the session has started or not. If the session has started, log the event, otherwise, add the event to a queue. Once the session starts, log all the queued events, empty the queue, and mark the boolean flag as true. Refer to the below diagram to understand this.

But with the completer pattern, we can achieve this in a much cleaner way.

import 'dart:async';
import 'dart:math';

void main() async {
  final analyticsPlugin = AnalyticsPlugin(
    onEvent: (name, parameters) async {
      await fireAnalyticsApi({'event': name, 'parameters': parameters});
    },
  );

  analyticsPlugin.logEvent('pageView', {'pageTitle': 'login'});
  await Future.delayed(Duration(seconds: 1));

  analyticsPlugin.logEvent('view', {'widgetName': 'click here to login'});
  await Future.delayed(Duration(seconds: 1));

  analyticsPlugin.logEvent('tap', {'widgetName': 'click here to login'});
  await Future.delayed(Duration(seconds: 1));
}

Future<void> fireAnalyticsApi(Map<String, dynamic> payload) async {
  print('debug: firing api request for $payload');
  await Future.delayed(Duration(seconds: Random().nextInt(3) + 1));
  print('api request completed: $payload');
}

class AnalyticsPlugin {
  final Future<void> Function(String name, Map<String, dynamic>) _onEvent;

  AnalyticsPlugin({required onEvent}) : _onEvent = onEvent {
    _waitForStart();
  }

  void logEvent(String name, Map<String, dynamic> parameters) async {
    print('debug: got event: $name, $parameters');
    await _waitForStart();
    await _onEvent(name, parameters);
  }

  Completer<bool>? _startCompleter;

  Future<void> _waitForStart() async {
    try {
      if (_startCompleter != null && await _startCompleter!.future) {
        return;
      }
      _startCompleter = Completer();
      await _fireSessionStart();
      _startCompleter!.complete(true);
    } catch (_) {
      _startCompleter!.complete(false);
    }
  }

  Future<void> _fireSessionStart() async {
    await fireAnalyticsApi({'event': 'sessionStart'});
  }
}

In this code, we have an AnalyticsPlugin class that logs events to the server. The constructor of the class takes a callback onEvent that is called whenever an event is logged. The onEvent callback is an async function in which we log the event to the server by calling the API. The logEvent method logs the event to the server by calling the onEvent callback. But before logging the event, it waits for the session to start by calling the _waitForStart method.
The _waitForStart method checks if the session has started or not. If the session has started, it returns immediately. Otherwise, it waits for the session to start by calling the _fireSessionStart method. The _fireSessionStart method logs the session start event to the server by calling the API. Once the session start event is logged, it completes the _startCompleter and sets it to true. Once the completer is completed, the _waitForStart method returns immediately and all the events that were waiting for the session start to complete(or were queued) are logged(or allowed to go through).
You might be asking what happens if the session start event fails to log. In this case, the start event will be set to false, and the _waitForStart method will return immediately without waiting for the session to start. In this case, a few events will be logged without the session start event. This is a tradeoff we have to make to reduce the overhead that will be created otherwise and is generally accepted as far as the reporting is concerned. In the critical use cases, we might want to keep retrying the event until it is logged successfully. We can easily do it by putting our function in the catch block so that it will keep retrying until it’s successful.

debug: firing api request for {event: sessionStart}
debug: got event: pageView, {pageTitle: login}
debug: got event: view, {widgetName: click here to login}
api request completed: {event: sessionStart}
debug: firing api request for {event: pageView, parameters: {pageTitle: login}}
debug: firing api request for {event: view, parameters: {widgetName: click here to login}}
debug: got event: tap, {widgetName: click here to login}
debug: firing api request for {event: tap, parameters: {widgetName: click here to login}}
api request completed: {event: tap, parameters: {widgetName: click here to login}}
api request completed: {event: pageView, parameters: {pageTitle: login}}
api request completed: {event: view, parameters: {widgetName: click here to login}}

In the above output, as you can see, although the pageView and view events were logged, they had not been fired before the sessionStart event was fired. This is because the _waitForStart method waited for the sessionStart event to be fired before allowing the pageView and view events to be fired.

So if we filter out the request completed logs, we can see that all the events have been logged only after the session start event has been logged.

api request completed: {event: sessionStart}
api request completed: {event: tap, parameters: {widgetName: click here to login}}
api request completed: {event: pageView, parameters: {pageTitle: login}}
api request completed: {event: view, parameters: {widgetName: click here to login}}
0
Subscribe to my newsletter

Read articles from NonStop io Technologies directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

NonStop io Technologies
NonStop io Technologies

Product Development as an Expertise Since 2015 Founded in August 2015, we are a USA-based Bespoke Engineering Studio providing Product Development as an Expertise. With 80+ satisfied clients worldwide, we serve startups and enterprises across San Francisco, Seattle, New York, London, Pune, Bangalore, Tokyo and other prominent technology hubs.