Why do we need hot/Connectable observable in reactive paradigm (RxJava)?
Imagine a scenario where you're implementing a stock price ticker. You want to stream real-time stock prices to multiple users. Each user should see the same prices in real-time from the moment they start observing, not a separate sequence.
public class StockPriceTicker {
public static void main(String[] args) throws InterruptedException {
ConnectableObservable<Long> stockPriceObservable = Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.take(5)
.publish(); // Convert to ConnectableObservable (hot observable)
// First subscriber
stockPriceObservable.subscribe(price ->
System.out.println("Subscriber 1: " + price));
// Sleep for some time To observe the emission between subscriber-1 and subscriber-2
Thread.sleep(2000);
// Second subscriber
stockPriceObservable.subscribe(price ->
System.out.println("Subscriber 2: " + price));
// Connect to start emitting
stockPriceObservable.connect();
// Sleep for some time to observe emissions
Thread.sleep(50000);
}
}
Output:
Subscriber 1: 0
Subscriber 2: 0
Subscriber 1: 1
Subscriber 2: 1
Subscriber 1: 2
Subscriber 2: 2
Subscriber 1: 3
Subscriber 2: 3
Subscriber 1: 4
Subscriber 2: 4
Observable Creation: Create an
Observable
that emits a stock price every second usingObservable.interval
.Conversion to ConnectableObservable: Use
publish()
to convert theObservable
to aConnectableObservable
. This conversion allows us to control when the emissions start.Subscribers: Add two subscribers to the
ConnectableObservable
.Starting Emissions: Call
connect()
to start the emissions. Both subscribers will now receive the same sequence of stock prices from the momentconnect()
is called.
What if we use cold observable in lieu of Connectable observable ?
By default, observables in RxJava are cold. This means each subscription starts a new sequence of emissions. If subscribers subscribe very close to each other (without Thread in between two subscribers as shown below in the code) in time, it may appear that they are receiving the same sequence of emissions, but each subscriber actually has its own independent sequence.
public class StockPriceTicker {
public static void main(String[] args) throws InterruptedException {
Observable<Long> stockPriceObservable = Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.take(5);
// First subscriber
stockPriceObservable.subscribe(price ->
System.out.println("Subscriber 1: " + price));
// Sleep for some time To observe the emission between subscriber-1 and subscriber-2
Thread.sleep(2000);
// Second subscriber
stockPriceObservable.subscribe(price ->
System.out.println("Subscriber 2: " + price));
// Sleep for some time to observe emissions
Thread.sleep(50000);
}
}
Output:
Subscriber 1: 0
Subscriber 1: 1
Subscriber 2: 0
Subscriber 1: 2
Subscriber 2: 1
Subscriber 1: 3
Subscriber 2: 2
Subscriber 1: 4
Subscriber 2: 3
Subscriber 2: 4
Key Differences between Cold and ConnectableObservable
Cold Observable (Default): Each subscriber gets its own independent sequence.
Hot Observable (Using ConnectableObservable): All subscribers share the same sequence from the moment
connect()
is called.
Subscribe to my newsletter
Read articles from Bikash Mainali directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Bikash Mainali
Bikash Mainali
Experienced Java/JavaScript full-stack developer over 6 years of extensive expertise serving key role on elite technical teams developing enterprise software for healthcare, apple ad-platform, banking, and e-commerce. Adaptable problem-solver with high levels of skill in Groovy, Java, Spring, Spring Boot, Hibernate, JavaScript, TypeScript, Angular, Node, Express, React, MongoDB, IBM DB2, Oracle, PL/SQL, Docker, Kubernetes, CI/CD pipelines, AWS, Micro-service and Agile/Scrum. Strong technical skills paired with business-savvy UI design expertise. Personable team player with experience collaborating with diverse cross-functional teams.