Mastering RxSwift: A Comprehensive Manual
Reactive Programming is programming with asynchronous data streams.
It enables building apps in a declarative way.
Everything in RxSwift is an observable sequence or something that subscribes to the events emitted by those observable sequences.
Observables
Observables are sequence of events that can be subscribed and get notified when the value of the observable changes.
Observables can have 0 or more elements.
We use disposables to make sure that observables and subscriptions are released from memory so that we don’t get any retain cycles.
At the end of the subscription we must add the disposables to disposeBag.
Use Observables for data streams where the sequence is fixed, and you don't need to manage or store the state.
An Observable is a sequence of events that can emit:
Next: A new data item.
Error: An error that terminates the sequence
Completed: A signal that the sequence has completed and no further data will be emitted.
Observers work in the following way
Create: Observables are created or initialised.
Subscribe: Observers subscribe to these Observables to receive and react to the emitted items.
Emit: Observables emits which are received by the observers.
Dispose: When the observer no longer needs to receive items or the observable completes, the subscription is disposed of.
// creating an Observable
let number = Observable<Int>
// subscribing to an Observable
let subscription = number.subscribe() // ypu can do a lot of stuff with .subscribe method
// disposing an observable
*Method-1*
subsciption.dispose()
*Method-2*
let disposeBag = DisposeBag()
let subscription = number.subscribe().disposed(by: disposeBag)
// Full Code
import RxSwift
let numbersSequence = Observable.just([1, 2, 3, 4, 5])
let disposeBag = DisposeBag()
numbersSequence.subscribe(onNext: { values in
for value in values {
print(value*5)
}
}).disposed(by: disposeBag)
// output
5
10
15
20
25
Subjects
- Subjects acts as both observable and observer. They can both emit and subscribe to events.
Use Subjects when you need to maintain or inject state, broadcast events, or create a bridge between different parts of your codebase.
- Act as bridges or proxies that allow events to be injected into observable sequences.
PublishedSubject:
starts empty and only emits new elements to subscribers.
these are used when you only want subscribers to receive events emitted after they have subscribed.
import RxSwift let publishSubject = PublishSubject<String>() let disposeBag = DisposeBag() public enum SomeError: Error { case genericError(String) } publishSubject.onNext("A") let subscriptionOne = publishSubject.subscribe { event in print("Subscriber 1:", event.element ?? event) } publishSubject.onNext("B") let subscriptionTwo = publishSubject.subscribe { event in print("Subscriber 2:", event.element ?? event) } publishSubject.onNext("C") publishSubject.onNext("D") let subscriptionThree = publishSubject.subscribe { event in print("Subscriber 3:", event.element ?? event) } publishSubject.onError(SomeError.genericError("An error occurred!")) subscriptionOne.disposed(by: disposeBag) subscriptionTwo.disposed(by: disposeBag) subscriptionThree.disposed(by: disposeBag) // output Subscriber 1: B Subscriber 1: C Subscriber 2: C Subscriber 1: D Subscriber 2: D Subscriber 1: error(genericError("An error occurred!")) Subscriber 2: error(genericError("An error occurred!")) Subscriber 3: error(genericError("An error occurred!"))
BehaviourSubject:
Starts with an initial value and replays it or the latest element to new subscribers.
Used when you want subscribers to receive the most recent value upon subscription.
Takes the most recent value or anything that comes after the subscription.
import RxSwift let behaviorSubject = BehaviorSubject<String>(value: "A") let disposeBag = DisposeBag() public enum SomeError: Error { case genericError(String) } let subscriptionOne = behaviorSubject.subscribe { event in print("Subscriber 1:", event.element ?? event) } behaviorSubject.onNext("B") let subscriptionTwo = behaviorSubject.subscribe { event in print("Subscriber 2:", event.element ?? event) } behaviorSubject.onNext("C") behaviorSubject.onNext("D") behaviorSubject.onError(SomeError.genericError("An error occurred!")) let subscriptionThree = behaviorSubject.subscribe { event in print("Subscriber 3:", event.element ?? event) } behaviorSubject.on(.next("E")) // never executes because we have .onError subscriptionOne.disposed(by: disposeBag) subscriptionTwo.disposed(by: disposeBag) subscriptionThree.disposed(by: disposeBag) // output Subscriber 1: A Subscriber 1: B Subscriber 2: B Subscriber 1: C Subscriber 2: C Subscriber 1: D Subscriber 2: D Subscriber 1: error(genericError("An error occurred!")) Subscriber 2: error(genericError("An error occurred!")) Subscriber 3: error(genericError("An error occurred!"))
ReplaySubject:
These are same as the BehaviourSubject but with a buffer size.
buffer size indicates the number of latest emitted values which needs to be replayed to the future subscribers.
the first subscriber will receive all the emitted values, and the next all subscribers will get the latest emitted values according the buffer size [ ex: if buffer size is 3 the second subscriber will receive the 3 recent values only. ]
Used when you want the new subscribers to receive a specified number of most recent values.
import RxSwift var replaySubject = ReplaySubject<Int>.create(bufferSize: 3) let disposeBag = DisposeBag() replaySubject.onNext(1) let subscriptionOne = replaySubject.subscribe { owner in print("Subscriber 1: ", owner.element ?? owner ) } replaySubject.onNext(2) replaySubject.onNext(3) replaySubject.onNext(4) replaySubject.onNext(5) replaySubject.onNext(6) replaySubject.onNext(7) let subscriptionTwo = replaySubject.subscribe { owner in print("Subscriber 2: ", owner.element ?? owner ) } subscriptionOne.disposed(by: disposeBag) subscriptionTwo.disposed(by: disposeBag) // output Subscriber 1: 1 Subscriber 1: 2 Subscriber 1: 3 Subscriber 1: 4 Subscriber 1: 5 Subscriber 1: 6 Subscriber 1: 7 Subscriber 2: 5 Subscriber 2: 6 Subscriber 2: 7
AsyncSubject:
it observes only the last value emitted by the observable after the source is completed
Use when you only care about the final value of an observable once it completes.
import RxSwift var asyncSubject = AsyncSubject<Int>() let disposeBag = DisposeBag() asyncSubject.onNext(1) asyncSubject.onNext(2) let subscriptionOne = asyncSubject.subscribe { owner in print("Subscription 1: ", owner.element ?? owner) } asyncSubject.onNext(3) asyncSubject.onNext(4) asyncSubject.onCompleted() subscriptionOne.disposed(by: disposeBag) // output Subscription 1: 4 Subscription 1: completed
Summary of the Use Cases
PublishedSubject: Use when you don't need to store previous events and only want to emit new events to current subscribers. [ ex: form submission ]
BehaviourSubject: Use when you need to store and emit the latest value to new subscribers. [ ex: form inputs ]
ReplaySubject: Use when you need to store and emit a buffer of the most recent values to new subscribers. [ ex: recent notifications ]
AsyncSubject: Use when you are only interested in the final value of a sequence once it completes. [ ex: file download ]
Transformations
Transformations refer to operators that modify the data emitted by the observables.
These operators help to reshape, change, or convert the data stream for your needs.
.map:
transforms each item emitted by an observable into another item using provided closure
Use when you need to apply a transformation or computation to each item in a sequence
import RxSwift import Foundation let disposeBag = DisposeBag() let formatter = NumberFormatter() formatter.numberStyle = .spellOut Observable<NSNumber>.of(100, 1000, 1, 21, 25) .map { formatter.string(from: $0) ?? "" } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) // output one hundred one thousand one twenty-one twenty-five
.flatMap:
transforms stream of observables into a single observable.
Use when you have an observable and each item in it produces another observable, .flatmap helps you to combine into a single observable.
import Foundation import RxRelay import RxSwift let disposeBag = DisposeBag() struct Game { var price: BehaviorRelay<Double> } let watchDogs2 = Game(price: BehaviorRelay(value: 299.50)) let watchDogsLegion = Game(price: BehaviorRelay(value: 299.99)) let gameSubject = PublishSubject<Game>() gameSubject.flatMap { $0.price.asObservable() }.subscribe(onNext: { print($0) }).disposed(by: disposeBag) gameSubject.onNext(watchDogs2) gameSubject.onNext(watchDogsLegion) // output 299.5 299.99
.filter:
Allows only those items to pass through which satisfy the condition.
Use when you need to exclude items that do not meet certain criteria.
import Foundation import RxSwift let disposeBag = DisposeBag() let names = Observable.of("Elon", "Steve", "Mark", "Jeff", "Ambani", "Jhon") names.filter { $0.contains("e") }.subscribe { print($0) }.disposed(by: disposeBag) // output Jeff Jhon
.zip:
combines the emissions from 2 observables in a pairwise manner.
Use when you need to combine corresponding items from multiple observables into a tuple.
import Foundation import RxSwift let disposeBag = DisposeBag() let carMakes = Observable.of("BMW", "Porsche", "Land Rover") let carModels = Observable.of("X7", "911 GT3", "Defender") Observable.zip(carMakes, carModels) { $0 + " " + $1 }.subscribe { print($0) }.disposed(by: disposeBag) // output next(BMW X7) next(Porsche 911 GT3) next(Land Rover Defender) completed
.merge:
Combines multiple observable sequence into single observable sequence.
Use merge when you want to combine emissions from multiple observables into a single observable sequence, such as aggregating events from different sources.
import Foundation import RxSwift let disposeBag = DisposeBag() let observable1 = Observable.of(1, 2, 3) let observable2 = Observable.of(4, 5, 6) Observable.merge(observable1, observable2).subscribe(onNext: { value in print(value) }).disposed(by:disposeBag) // output 1 4 2 5 3 6
.combineLatest:
combines latest elements from observables and emits a new value when any of the observables emits a value
Use combineLates when you need to create a combined output based on the latest values from multiple observables, such as updating a UI element with the latest data from different sources.
import Foundation import RxSwift let disposeBag = DisposeBag() let numbers = Observable.of(1,2,3,4) let letters = Observable.of("A", "B", "C", "D") let names = Observable.of("cat", "dog", "cow", "fox") let combined = Observable.combineLatest(numbers, letters, names) { (number, letter, name) in return "\\(number)-\\(letter)-\\(name)" } combined.subscribe { value in print(value) }.disposed(by:disposeBag) // output next(1-A-cat) next(2-A-cat) next(2-B-cat) next(2-B-dog) next(3-B-dog) next(3-C-dog) next(3-C-cow) next(4-C-cow) next(4-D-cow) next(4-D-fox) completed
.distinctUntilChanged:
only emits items that are different from previously emitted item, avoids consecutive duplicates.
Use distinctUntilChanged when you need to prevent duplicate consecutive emissions, such as avoiding repeated updates or notifications.
import Foundation import RxSwift let disposeBag = DisposeBag() let numbers = Observable.of(1,1,2,2,2,3,4,5,5,5,6) let distinct = numbers.distinctUntilChanged() distinct.subscribe { number in print(number) }.disposed(by: disposeBag) // output next(1) next(2) next(3) next(4) next(5) next(6) completed
.debounce:
emits items only after a specified time period has passed without emitting a new item.
Use debounce to manage rapid successive emissions, such as handling user input in search bars to reduce the number of requests or updates.
import Foundation import RxSwift let disposeBag = DisposeBag() let searchBar = Observable.of("a", "ab", "abc", "abcd", "abcde", "abcdef") let debounced = searchBar.debounce(.milliseconds(300), scheduler: MainScheduler.instance) debounced.subscribe { value in print(value) }.disposed(by: disposeBag) // output next(abcdef) completed
Summary
map(_:)
: Transforms each item.flatMap(_:)
: Projects items into observables and flattens them.filter(_:)
: Allows items that meet a condition.zip(_:_)
: Combines items pairwise.merge(_:)
: Merges emissions from multiple observables.combineLatest(_:_)
: Combines the latest values from observables.distinctUntilChanged()
: Prevents consecutive duplicates.debounce(_:)
: Delays emissions until a period of inactivity.
Additional Types
Single:
it is an observable that either emits a single value or an error.
it can never emit multiple values or complete without emitting value.
Use Single when you expect a single result from an operation, such as network requests or database queries.
import Foundation import RxSwift let disposeBag = DisposeBag() let single: Single<String> = Single.create { single in let success = false // Or some condition if success { single(.success("Single value")) } else { single(.failure(NSError(domain: "", code: -1, userInfo: nil))) } return Disposables.create() } single.subscribe( onSuccess: { value in print("Received value: \\(value)") }, onFailure: { error in print("Error: \\(error)") } ).disposed(by: DisposeBag()) // output Error: Error Domain= Code=-1 "(null)"
Completable:
it represents an observable sequence that either completes successfully or with an error, but never emits any items.
Use Completable for operations that perform a task (e.g., save data, make network requests) but do not produce a result that you need to handle.
import Foundation import RxSwift let disposeBag = DisposeBag() let completable: Completable = Completable.create { completable in let success = true // Or some condition if success { completable(.completed) } else { completable(.error(NSError(domain: "", code: -1, userInfo: nil))) } return Disposables.create() } completable.subscribe( onCompleted: { print("Completed") }, onError: { error in print("Error: \\(error)") } ).disposed(by: disposeBag) // output Completed
Maybe:
it represents an observable sequence that can either emit a single value, complete without emitting a value, or emit an error.
Use Maybe for scenarios where a result may or may not be available and handling both the presence or absence of a value is required.
import Foundation import RxSwift let disposeBag = DisposeBag() let maybe: Maybe<String> = Maybe.create { maybe in let success = true // Or some condition let value: String? = nil if success { if let value = value { maybe(.success(value)) } else { maybe(.completed) } } else { maybe(.error(NSError(domain: "", code: -1, userInfo: nil))) } return Disposables.create() } maybe.subscribe( onSuccess: { value in print("Received value: \\(value)") }, onError: { error in print("Error: \\(error)") }, onCompleted: { print("Completed without value") } ).disposed(by: disposeBag) // output Completed without value
Driver:
It is a special type of observable that ensures the observable sequence is always on the main thread and that it does not emit errors
Use Driver for data that needs to be observed and bound to UI elements, ensuring that updates occur on the main thread and handle errors gracefully.
import Foundation import RxCocoa import RxSwift let disposeBag = DisposeBag() Driver.of("Hello", "World").drive( onNext: { value in print("Driver value: \\(value)") }).disposed(by: disposeBag) //output Driver value: Hello Driver value: World
Signal:
it is similar to Driver, but is used for event-based sequences that do not emit errors and are always on the main thread
Use Signal for event-based sequences where you want to ensure main thread execution and error handling is not needed.
import Foundation import RxCocoa import RxSwift let disposeBag = DisposeBag() let signal = Signal.of("Event 1", "Event 2") signal.emit(onNext: { event in print("Signal event: \\(event)") }).disposed(by: disposeBag) // output Signal event: Event 1 Signal event: Event 2
ControlProperty:
it is a type of observable specifically designed for binding UI control properties in RxCocoa.
It provides a way to bind and observe UI control properties.
import RxSwift import RxCocoa let textField = UITextField() let disposeBag = DisposeBag() let textObservable: ControlProperty<String?> = textField.rx.text textObservable .subscribe(onNext: { text in print("Text changed: \\(text ?? "")") }) .disposed(by: disposeBag)
ControlEvent:
it represents events (e.g., button taps, switch toggles) from UI controls in RxCocoa.
Use ControlEvent to handle user interactions and events from UI controls.
import RxSwift import RxCocoa let button = UIButton() let disposeBag = DisposeBag() button.rx.tap .subscribe(onNext: { print("Button tapped") }) .disposed(by: disposeBag)
Summary
Single: Use for operations that return a single result or error.
Completable: Use for operations that complete with success or error but do not return a result.
Maybe: Use for operations where the result is optional.
Driver: Use for UI-related sequences that must operate on the main thread and handle errors gracefully.
Signal: Use for event-based sequences on the main thread without error handling.
ControlProperty: Use for binding and observing UI control properties.
ControlEvent: Use for handling UI control events.
Subscribe to my newsletter
Read articles from varun pullur directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by