Implementing Pub/Sub with Flutter and Apache Pulsar Using EventBus

Shreyas N bhatShreyas N bhat
4 min read

When developing applications, real-time communication between components is critical. A common pattern for this is Publish/Subscribe (Pub/Sub). This article demonstrates how to implement a Pub/Sub system in Flutter using Apache Pulsar and EventBus for seamless communication between widgets and a backend WebSocket server.

We’ll build a simple app with:

  1. A publisher widget to send messages to Pulsar.

  2. A subscriber widget to listen for messages from Pulsar.

  3. A PulsarSidecar to handle WebSocket communication and routing.


1. Project Setup

Our project has the following structure:

  • pub_widget.dart: Handles message publishing.

  • sub_widget.dart: Displays received messages.

  • pulsar_sidecar.dart: Manages the WebSocket connection with Pulsar.

  • message_event.dart: Defines events for communication between components.

  • main.dart: Initializes the app and sets up communication.


2. Publishing Messages

The PubWidget allows users to send messages. Here's how it works:

class PubWidget extends StatelessWidget {
  final TextEditingController _controller = TextEditingController();
  final String topic = 'my-topic';

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text("Publish Widget")),
      body: Column(
        children: [
          TextField(
            controller: _controller,
            decoration: InputDecoration(hintText: 'Enter message'),
          ),
          ElevatedButton(
            onPressed: () {
              final message = _controller.text;
              final timestamp = DateTime.now().toIso8601String();

              // Create a formatted message
              final messageData = {
                'asset': 'spine/topicname/$topic',
                'action': 'publish',
                'data': message,
                'metadata': {'timestamp': timestamp},
              };

              print('PubWidget: Sending message: $messageData');
              eventBus.fire(MessageEvent.fromJson(messageData));
              _controller.clear();
            },
            child: Text('Send Message'),
          ),
        ],
      ),
    );
  }
}

Key Points:

  • The user inputs a message.

  • The message is wrapped in a structured JSON format containing:

    • Asset: Defines the topic.

    • Action: Indicates whether it's a "publish" or "subscribe" action.

    • Data: Contains the message.

    • Metadata: Includes additional info like a timestamp.

  • The eventBus.fire method sends the message to the PulsarSidecar.


3. Subscribing to Messages

The SubWidget listens for messages from a specific topic and displays them in a list.

class SubWidget extends StatefulWidget {
  final String topic;
  SubWidget({required this.topic});

  @override
  _SubWidgetState createState() => _SubWidgetState();
}

class _SubWidgetState extends State<SubWidget> {
  List<String> messages = [];

  @override
  void initState() {
    super.initState();
    print('SubWidget: Sending subscribe request to sidecar for topic: ${widget.topic}');
    eventBus.fire(SubscribeEvent(widget.topic));

    eventBus.on<MessageEvent>().listen((event) {
      if (event.topic == widget.topic) {
        setState(() {
          messages.add(event.message);
        });
        print('SubWidget: Received message: ${event.message}');
      }
    });
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text("Subscribe Widget")),
      body: ListView.builder(
        itemCount: messages.length,
        itemBuilder: (context, index) => ListTile(title: Text(messages[index])),
      ),
    );
  }
}

Key Points:

  • The widget sends a SubscribeEvent to the PulsarSidecar to subscribe to a topic.

  • When a MessageEvent is received, the widget updates its state to display the message.


4. WebSocket Integration with Pulsar

The PulsarSidecar establishes a WebSocket connection with Pulsar and routes messages between widgets and the server.

For pulsarUrl, you can replace <PULSAR_BROKER_URL> with either localhost or 127.0.0.1

class PulsarSidecar {
  final String pulsarUrl = 'ws://<PULSAR_BROKER_URL>:8080/ws/v2/producer/persistent/public/default/my-topic';
  late WebSocketChannel _channel;
  Map<String, List<Function>> topicSubscriptions = {};  // Topic -> Subscribers

  void startPulsarConnection() {
    print('PulsarSidecar: Connecting to Pulsar at $pulsarUrl');
    _channel = WebSocketChannel.connect(Uri.parse(pulsarUrl));

    _channel.stream.listen((message) {
      print('PulsarSidecar: Received message from Pulsar: $message');
      _handleMessage(message);
    });
  }

  void _handleMessage(String message) {
    final decodedMessage = jsonDecode(message);
    final topic = decodedMessage['topic'];
    final payload = decodedMessage['payload'];

    if (topicSubscriptions.containsKey(topic)) {
      for (var callback in topicSubscriptions[topic]!) {
        print('PulsarSidecar: Sending message to subscribers of topic: $topic');
        callback(MessageEvent(topic, payload));
      }
    }
  }

  void handleSubscribe(SubscribeEvent event) {
    if (!topicSubscriptions.containsKey(event.topic)) {
      topicSubscriptions[event.topic] = [];
    }

    topicSubscriptions[event.topic]!.add((message) {
      eventBus.fire(message);
    });
    print('PulsarSidecar: Subscribed to topic: ${event.topic}');
  }

  void sendMessage(String topic, String message) {
    final encodedMessage = jsonEncode({
      'asset': 'spine/topicname/$topic',
      'action': 'publish',
      'data': message,
      'metadata': {'timestamp': DateTime.now().toIso8601String()},
    });

    _channel.sink.add(encodedMessage);
    print('PulsarSidecar: Sent message to Pulsar: $encodedMessage');
  }
}

Key Points:

  • Establishes a WebSocket connection with the Pulsar server.

  • Routes messages to the appropriate subscribers.

  • Handles subscriptions and message publishing.


5. Main App

The main.dart file initializes the app and integrates the widgets.

void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        appBar: AppBar(title: Text("Flutter Pub/Sub with Pulsar")),
        body: Column(
          children: [
            Expanded(child: PubWidget()),
            Expanded(child: SubWidget(topic: 'my-topic')),
            PulsarSidecarWidget(),
          ],
        ),
      ),
    );
  }
}

6. Events

MessageEvent and SubscribeEvent

class MessageEvent {
  final String topic;
  final String message;

  MessageEvent(this.topic, this.message);

  factory MessageEvent.fromJson(Map<String, dynamic> json) {
    return MessageEvent(
      json['asset'].split('/').last,
      json['data'],
    );
  }
}

class SubscribeEvent {
  final String topic;
  SubscribeEvent(this.topic);
}

Debugging Tips

  • PubWidget: Logs messages being sent.

  • SubWidget: Logs messages received.

  • PulsarSidecar: Logs WebSocket messages and subscription activity.


Conclusion

This app demonstrates how to implement a robust Pub/Sub architecture in Flutter using Apache Pulsar and EventBus. The modular design ensures scalability, and the debugging logs provide insights into the message flow, making it easier to troubleshoot. Replace <PULSAR_BROKER_URL> with your actual Pulsar broker URL and explore the full power of real-time messaging!

1
Subscribe to my newsletter

Read articles from Shreyas N bhat directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Shreyas N bhat
Shreyas N bhat