Why do we need hot/Connectable observable in reactive paradigm (RxJava)?

Bikash MainaliBikash Mainali
2 min read

Imagine a scenario where you're implementing a stock price ticker. You want to stream real-time stock prices to multiple users. Each user should see the same prices in real-time from the moment they start observing, not a separate sequence.

public class StockPriceTicker {
    public static void main(String[] args) throws InterruptedException {
        ConnectableObservable<Long> stockPriceObservable = Observable.interval(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .take(5)
                .publish(); // Convert to ConnectableObservable (hot observable)

        // First subscriber
        stockPriceObservable.subscribe(price ->
                System.out.println("Subscriber 1: " + price));

        // Sleep for some time To observe the emission between subscriber-1 and subscriber-2   
        Thread.sleep(2000);

        // Second subscriber
        stockPriceObservable.subscribe(price ->
                System.out.println("Subscriber 2: " + price));

        // Connect to start emitting
        stockPriceObservable.connect();

        // Sleep for some time to observe emissions
        Thread.sleep(50000);
    }
}

Output:
Subscriber 1: 0
Subscriber 2: 0

Subscriber 1: 1
Subscriber 2: 1

Subscriber 1: 2
Subscriber 2: 2

Subscriber 1: 3
Subscriber 2: 3

Subscriber 1: 4
Subscriber 2: 4
  • Observable Creation: Create an Observable that emits a stock price every second using Observable.interval.

  • Conversion to ConnectableObservable: Use publish() to convert the Observable to a ConnectableObservable. This conversion allows us to control when the emissions start.

  • Subscribers: Add two subscribers to the ConnectableObservable.

  • Starting Emissions: Call connect() to start the emissions. Both subscribers will now receive the same sequence of stock prices from the moment connect() is called.

What if we use cold observable in lieu of Connectable observable ?

By default, observables in RxJava are cold. This means each subscription starts a new sequence of emissions. If subscribers subscribe very close to each other (without Thread in between two subscribers as shown below in the code) in time, it may appear that they are receiving the same sequence of emissions, but each subscriber actually has its own independent sequence.

public class StockPriceTicker {
    public static void main(String[] args) throws InterruptedException {
        Observable<Long> stockPriceObservable = Observable.interval(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .take(5);

        // First subscriber
        stockPriceObservable.subscribe(price ->
                System.out.println("Subscriber 1: " + price));

        // Sleep for some time To observe the emission between subscriber-1 and subscriber-2
        Thread.sleep(2000);

        // Second subscriber
        stockPriceObservable.subscribe(price ->
                System.out.println("Subscriber 2: " + price));

        // Sleep for some time to observe emissions
        Thread.sleep(50000);
    }
}

Output:
Subscriber 1: 0
Subscriber 1: 1
Subscriber 2: 0
Subscriber 1: 2
Subscriber 2: 1
Subscriber 1: 3
Subscriber 2: 2
Subscriber 1: 4
Subscriber 2: 3
Subscriber 2: 4

Key Differences between Cold and ConnectableObservable

  • Cold Observable (Default): Each subscriber gets its own independent sequence.

  • Hot Observable (Using ConnectableObservable): All subscribers share the same sequence from the moment connect() is called.

0
Subscribe to my newsletter

Read articles from Bikash Mainali directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Bikash Mainali
Bikash Mainali

Experienced Java/JavaScript full-stack developer over 6 years of extensive expertise serving key role on elite technical teams developing enterprise software for healthcare, apple ad-platform, banking, and e-commerce. Adaptable problem-solver with high levels of skill in Groovy, Java, Spring, Spring Boot, Hibernate, JavaScript, TypeScript, Angular, Node, Express, React, MongoDB, IBM DB2, Oracle, PL/SQL, Docker, Kubernetes, CI/CD pipelines, AWS, Micro-service and Agile/Scrum. Strong technical skills paired with business-savvy UI design expertise. Personable team player with experience collaborating with diverse cross-functional teams.