Implementing Pub/Sub with Flutter and Apache Pulsar Using EventBus
When developing applications, real-time communication between components is critical. A common pattern for this is Publish/Subscribe (Pub/Sub). This article demonstrates how to implement a Pub/Sub system in Flutter using Apache Pulsar and EventBus for seamless communication between widgets and a backend WebSocket server.
We’ll build a simple app with:
A publisher widget to send messages to Pulsar.
A subscriber widget to listen for messages from Pulsar.
A
PulsarSidecar
to handle WebSocket communication and routing.
1. Project Setup
Our project has the following structure:
pub_widget.dart
: Handles message publishing.sub_widget.dart
: Displays received messages.pulsar_sidecar.dart
: Manages the WebSocket connection with Pulsar.message_event.dart
: Defines events for communication between components.main.dart
: Initializes the app and sets up communication.
2. Publishing Messages
The PubWidget
allows users to send messages. Here's how it works:
class PubWidget extends StatelessWidget {
final TextEditingController _controller = TextEditingController();
final String topic = 'my-topic';
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text("Publish Widget")),
body: Column(
children: [
TextField(
controller: _controller,
decoration: InputDecoration(hintText: 'Enter message'),
),
ElevatedButton(
onPressed: () {
final message = _controller.text;
final timestamp = DateTime.now().toIso8601String();
// Create a formatted message
final messageData = {
'asset': 'spine/topicname/$topic',
'action': 'publish',
'data': message,
'metadata': {'timestamp': timestamp},
};
print('PubWidget: Sending message: $messageData');
eventBus.fire(MessageEvent.fromJson(messageData));
_controller.clear();
},
child: Text('Send Message'),
),
],
),
);
}
}
Key Points:
The user inputs a message.
The message is wrapped in a structured JSON format containing:
Asset: Defines the topic.
Action: Indicates whether it's a "publish" or "subscribe" action.
Data: Contains the message.
Metadata: Includes additional info like a timestamp.
The
eventBus.fire
method sends the message to thePulsarSidecar
.
3. Subscribing to Messages
The SubWidget
listens for messages from a specific topic and displays them in a list.
class SubWidget extends StatefulWidget {
final String topic;
SubWidget({required this.topic});
@override
_SubWidgetState createState() => _SubWidgetState();
}
class _SubWidgetState extends State<SubWidget> {
List<String> messages = [];
@override
void initState() {
super.initState();
print('SubWidget: Sending subscribe request to sidecar for topic: ${widget.topic}');
eventBus.fire(SubscribeEvent(widget.topic));
eventBus.on<MessageEvent>().listen((event) {
if (event.topic == widget.topic) {
setState(() {
messages.add(event.message);
});
print('SubWidget: Received message: ${event.message}');
}
});
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text("Subscribe Widget")),
body: ListView.builder(
itemCount: messages.length,
itemBuilder: (context, index) => ListTile(title: Text(messages[index])),
),
);
}
}
Key Points:
The widget sends a
SubscribeEvent
to thePulsarSidecar
to subscribe to a topic.When a
MessageEvent
is received, the widget updates its state to display the message.
4. WebSocket Integration with Pulsar
The PulsarSidecar
establishes a WebSocket connection with Pulsar and routes messages between widgets and the server.
For pulsarUrl, you can replace <PULSAR_BROKER_URL> with either localhost
or 127.0.0.1
class PulsarSidecar {
final String pulsarUrl = 'ws://<PULSAR_BROKER_URL>:8080/ws/v2/producer/persistent/public/default/my-topic';
late WebSocketChannel _channel;
Map<String, List<Function>> topicSubscriptions = {}; // Topic -> Subscribers
void startPulsarConnection() {
print('PulsarSidecar: Connecting to Pulsar at $pulsarUrl');
_channel = WebSocketChannel.connect(Uri.parse(pulsarUrl));
_channel.stream.listen((message) {
print('PulsarSidecar: Received message from Pulsar: $message');
_handleMessage(message);
});
}
void _handleMessage(String message) {
final decodedMessage = jsonDecode(message);
final topic = decodedMessage['topic'];
final payload = decodedMessage['payload'];
if (topicSubscriptions.containsKey(topic)) {
for (var callback in topicSubscriptions[topic]!) {
print('PulsarSidecar: Sending message to subscribers of topic: $topic');
callback(MessageEvent(topic, payload));
}
}
}
void handleSubscribe(SubscribeEvent event) {
if (!topicSubscriptions.containsKey(event.topic)) {
topicSubscriptions[event.topic] = [];
}
topicSubscriptions[event.topic]!.add((message) {
eventBus.fire(message);
});
print('PulsarSidecar: Subscribed to topic: ${event.topic}');
}
void sendMessage(String topic, String message) {
final encodedMessage = jsonEncode({
'asset': 'spine/topicname/$topic',
'action': 'publish',
'data': message,
'metadata': {'timestamp': DateTime.now().toIso8601String()},
});
_channel.sink.add(encodedMessage);
print('PulsarSidecar: Sent message to Pulsar: $encodedMessage');
}
}
Key Points:
Establishes a WebSocket connection with the Pulsar server.
Routes messages to the appropriate subscribers.
Handles subscriptions and message publishing.
5. Main App
The main.dart
file initializes the app and integrates the widgets.
void main() {
runApp(MyApp());
}
class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
appBar: AppBar(title: Text("Flutter Pub/Sub with Pulsar")),
body: Column(
children: [
Expanded(child: PubWidget()),
Expanded(child: SubWidget(topic: 'my-topic')),
PulsarSidecarWidget(),
],
),
),
);
}
}
6. Events
MessageEvent
and SubscribeEvent
class MessageEvent {
final String topic;
final String message;
MessageEvent(this.topic, this.message);
factory MessageEvent.fromJson(Map<String, dynamic> json) {
return MessageEvent(
json['asset'].split('/').last,
json['data'],
);
}
}
class SubscribeEvent {
final String topic;
SubscribeEvent(this.topic);
}
Debugging Tips
PubWidget: Logs messages being sent.
SubWidget: Logs messages received.
PulsarSidecar: Logs WebSocket messages and subscription activity.
Conclusion
This app demonstrates how to implement a robust Pub/Sub architecture in Flutter using Apache Pulsar and EventBus. The modular design ensures scalability, and the debugging logs provide insights into the message flow, making it easier to troubleshoot. Replace <PULSAR_BROKER_URL>
with your actual Pulsar broker URL and explore the full power of real-time messaging!
Subscribe to my newsletter
Read articles from Shreyas N bhat directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by