Mastering RxSwift: A Comprehensive Manual

varun pullurvarun pullur
11 min read
  • Reactive Programming is programming with asynchronous data streams.

  • It enables building apps in a declarative way.

  • Everything in RxSwift is an observable sequence or something that subscribes to the events emitted by those observable sequences.

Observables

  • Observables are sequence of events that can be subscribed and get notified when the value of the observable changes.

  • Observables can have 0 or more elements.

  • We use disposables to make sure that observables and subscriptions are released from memory so that we don’t get any retain cycles.

  • At the end of the subscription we must add the disposables to disposeBag.

Use Observables for data streams where the sequence is fixed, and you don't need to manage or store the state.

  • An Observable is a sequence of events that can emit:

    • Next: A new data item.

    • Error: An error that terminates the sequence

    • Completed: A signal that the sequence has completed and no further data will be emitted.

  • Observers work in the following way

    • Create: Observables are created or initialised.

    • Subscribe: Observers subscribe to these Observables to receive and react to the emitted items.

    • Emit: Observables emits which are received by the observers.

    • Dispose: When the observer no longer needs to receive items or the observable completes, the subscription is disposed of.

    // creating an Observable

    let number = Observable<Int>

    // subscribing to an Observable

    let subscription = number.subscribe() // ypu can do a lot of stuff with .subscribe method

    // disposing an observable

    *Method-1*
    subsciption.dispose()

    *Method-2*
    let disposeBag = DisposeBag()

    let subscription = number.subscribe().disposed(by: disposeBag)

    // Full Code

    import RxSwift

    let numbersSequence = Observable.just([1, 2, 3, 4, 5])

    let disposeBag = DisposeBag()

    numbersSequence.subscribe(onNext: { values in

      for value in values {

        print(value*5)

      }

    }).disposed(by: disposeBag)

    // output

    5
    10
    15
    20
    25

Subjects

  • Subjects acts as both observable and observer. They can both emit and subscribe to events.

Use Subjects when you need to maintain or inject state, broadcast events, or create a bridge between different parts of your codebase.

  • Act as bridges or proxies that allow events to be injected into observable sequences.
  1. PublishedSubject:

    • starts empty and only emits new elements to subscribers.

    • these are used when you only want subscribers to receive events emitted after they have subscribed.

        import RxSwift
      
        let publishSubject = PublishSubject<String>()
      
        let disposeBag = DisposeBag()
      
        public enum SomeError: Error {
           case genericError(String)
        }
      
        publishSubject.onNext("A")
      
        let subscriptionOne = publishSubject.subscribe { event in
      
          print("Subscriber 1:", event.element ?? event)
      
        }
      
        publishSubject.onNext("B")
      
        let subscriptionTwo = publishSubject.subscribe { event in
      
          print("Subscriber 2:", event.element ?? event)
      
        }
      
        publishSubject.onNext("C")
        publishSubject.onNext("D")
      
        let subscriptionThree = publishSubject.subscribe { event in
      
          print("Subscriber 3:", event.element ?? event)
      
        }
      
        publishSubject.onError(SomeError.genericError("An error occurred!"))
      
        subscriptionOne.disposed(by: disposeBag)
        subscriptionTwo.disposed(by: disposeBag)
        subscriptionThree.disposed(by: disposeBag)
      
        // output
      
        Subscriber 1: B
        Subscriber 1: C
        Subscriber 2: C
        Subscriber 1: D
        Subscriber 2: D
        Subscriber 1: error(genericError("An error occurred!"))
        Subscriber 2: error(genericError("An error occurred!"))
        Subscriber 3: error(genericError("An error occurred!"))
      
  2. BehaviourSubject:

    • Starts with an initial value and replays it or the latest element to new subscribers.

    • Used when you want subscribers to receive the most recent value upon subscription.

    • Takes the most recent value or anything that comes after the subscription.

        import RxSwift
      
        let behaviorSubject = BehaviorSubject<String>(value: "A")
      
        let disposeBag = DisposeBag()
      
        public enum SomeError: Error {
                case genericError(String)
        }
      
        let subscriptionOne = behaviorSubject.subscribe { event in
      
          print("Subscriber 1:", event.element ?? event)
      
        }
      
        behaviorSubject.onNext("B")
      
        let subscriptionTwo = behaviorSubject.subscribe { event in
      
          print("Subscriber 2:", event.element ?? event)
      
        }
      
        behaviorSubject.onNext("C")
        behaviorSubject.onNext("D")
        behaviorSubject.onError(SomeError.genericError("An error occurred!"))
      
        let subscriptionThree = behaviorSubject.subscribe { event in
      
          print("Subscriber 3:", event.element ?? event)
      
        }
      
        behaviorSubject.on(.next("E")) // never executes because we have .onError
      
        subscriptionOne.disposed(by: disposeBag)
        subscriptionTwo.disposed(by: disposeBag)
        subscriptionThree.disposed(by: disposeBag)
      
        // output
      
        Subscriber 1: A
        Subscriber 1: B
        Subscriber 2: B
        Subscriber 1: C
        Subscriber 2: C
        Subscriber 1: D
        Subscriber 2: D
        Subscriber 1: error(genericError("An error occurred!"))
        Subscriber 2: error(genericError("An error occurred!"))
        Subscriber 3: error(genericError("An error occurred!"))
      
  3. ReplaySubject:

    • These are same as the BehaviourSubject but with a buffer size.

    • buffer size indicates the number of latest emitted values which needs to be replayed to the future subscribers.

    • the first subscriber will receive all the emitted values, and the next all subscribers will get the latest emitted values according the buffer size [ ex: if buffer size is 3 the second subscriber will receive the 3 recent values only. ]

    • Used when you want the new subscribers to receive a specified number of most recent values.

        import RxSwift
      
        var replaySubject = ReplaySubject<Int>.create(bufferSize: 3)
      
        let disposeBag = DisposeBag()
      
        replaySubject.onNext(1)
      
        let subscriptionOne = replaySubject.subscribe { owner in
      
          print("Subscriber 1: ", owner.element ?? owner )
      
        }
      
        replaySubject.onNext(2)
        replaySubject.onNext(3)
        replaySubject.onNext(4)
        replaySubject.onNext(5)
        replaySubject.onNext(6)
        replaySubject.onNext(7)
      
        let subscriptionTwo = replaySubject.subscribe { owner in
      
          print("Subscriber 2: ", owner.element ?? owner )
      
        }
      
        subscriptionOne.disposed(by: disposeBag)
        subscriptionTwo.disposed(by: disposeBag)
      
        // output
      
        Subscriber 1:  1
        Subscriber 1:  2
        Subscriber 1:  3
        Subscriber 1:  4
        Subscriber 1:  5
        Subscriber 1:  6
        Subscriber 1:  7
        Subscriber 2:  5
        Subscriber 2:  6
        Subscriber 2:  7
      
  4. AsyncSubject:

    • it observes only the last value emitted by the observable after the source is completed

    • Use when you only care about the final value of an observable once it completes.

        import RxSwift
      
        var asyncSubject = AsyncSubject<Int>()
      
        let disposeBag = DisposeBag()
      
        asyncSubject.onNext(1)
        asyncSubject.onNext(2)
      
        let subscriptionOne = asyncSubject.subscribe { owner in
      
          print("Subscription 1: ", owner.element ?? owner)
      
        }
      
        asyncSubject.onNext(3)
        asyncSubject.onNext(4)
        asyncSubject.onCompleted()
      
        subscriptionOne.disposed(by: disposeBag)
      
        // output
      
        Subscription 1:  4
        Subscription 1:  completed
      

Summary of the Use Cases

  • PublishedSubject: Use when you don't need to store previous events and only want to emit new events to current subscribers. [ ex: form submission ]

  • BehaviourSubject: Use when you need to store and emit the latest value to new subscribers. [ ex: form inputs ]

  • ReplaySubject: Use when you need to store and emit a buffer of the most recent values to new subscribers. [ ex: recent notifications ]

  • AsyncSubject: Use when you are only interested in the final value of a sequence once it completes. [ ex: file download ]

Transformations

  • Transformations refer to operators that modify the data emitted by the observables.

  • These operators help to reshape, change, or convert the data stream for your needs.

  1. .map:

    • transforms each item emitted by an observable into another item using provided closure

    • Use when you need to apply a transformation or computation to each item in a sequence

        import RxSwift
        import Foundation
      
        let disposeBag = DisposeBag()
      
        let formatter = NumberFormatter()
        formatter.numberStyle = .spellOut
      
        Observable<NSNumber>.of(100, 1000, 1, 21, 25)
          .map {
      
            formatter.string(from: $0) ?? ""
      
          }
          .subscribe(onNext: {
      
            print($0)
      
          })
          .disposed(by: disposeBag)
      
          // output
      
        one hundred
        one thousand
        one
        twenty-one
        twenty-five
      
  2. .flatMap:

    • transforms stream of observables into a single observable.

    • Use when you have an observable and each item in it produces another observable, .flatmap helps you to combine into a single observable.

        import Foundation
        import RxRelay
        import RxSwift
      
        let disposeBag = DisposeBag()
      
        struct Game {
      
          var price: BehaviorRelay<Double>
      
        }
      
        let watchDogs2 = Game(price: BehaviorRelay(value: 299.50))
        let watchDogsLegion = Game(price: BehaviorRelay(value: 299.99))
      
        let gameSubject = PublishSubject<Game>()
      
        gameSubject.flatMap {
      
          $0.price.asObservable()
      
        }.subscribe(onNext: {
      
          print($0)
      
        }).disposed(by: disposeBag)
      
        gameSubject.onNext(watchDogs2)
        gameSubject.onNext(watchDogsLegion)
      
        // output
      
        299.5
        299.99
      
  3. .filter:

    • Allows only those items to pass through which satisfy the condition.

    • Use when you need to exclude items that do not meet certain criteria.

        import Foundation
        import RxSwift
      
        let disposeBag = DisposeBag()
      
        let names = Observable.of("Elon", "Steve", "Mark", "Jeff", "Ambani", "Jhon")
      
        names.filter {
      
          $0.contains("e")
      
        }.subscribe {
      
          print($0)
      
        }.disposed(by: disposeBag)
      
        // output
      
        Jeff
        Jhon
      
  4. .zip:

    • combines the emissions from 2 observables in a pairwise manner.

    • Use when you need to combine corresponding items from multiple observables into a tuple.

        import Foundation
        import RxSwift
      
        let disposeBag = DisposeBag()
      
        let carMakes = Observable.of("BMW", "Porsche", "Land Rover")
        let carModels = Observable.of("X7", "911 GT3", "Defender")
      
        Observable.zip(carMakes, carModels) {
      
          $0 + " " + $1
      
        }.subscribe {
      
          print($0)
      
        }.disposed(by: disposeBag)
      
        // output
      
        next(BMW X7)
        next(Porsche 911 GT3)
        next(Land Rover Defender)
        completed
      
  5. .merge:

    • Combines multiple observable sequence into single observable sequence.

    • Use merge when you want to combine emissions from multiple observables into a single observable sequence, such as aggregating events from different sources.

        import Foundation
        import RxSwift
      
        let disposeBag = DisposeBag()
      
        let observable1 = Observable.of(1, 2, 3)
        let observable2 = Observable.of(4, 5, 6)
      
        Observable.merge(observable1, observable2).subscribe(onNext: { value in
      
          print(value)
      
        }).disposed(by:disposeBag)
      
        // output
      
        1
        4
        2
        5
        3
        6
      
  6. .combineLatest:

    • combines latest elements from observables and emits a new value when any of the observables emits a value

    • Use combineLates when you need to create a combined output based on the latest values from multiple observables, such as updating a UI element with the latest data from different sources.

        import Foundation
        import RxSwift
      
        let disposeBag = DisposeBag()
      
        let numbers = Observable.of(1,2,3,4)
      
        let letters = Observable.of("A", "B", "C", "D")
      
        let names = Observable.of("cat", "dog", "cow", "fox")
      
        let combined = Observable.combineLatest(numbers, letters, names) { (number, letter, name) in
      
          return "\\(number)-\\(letter)-\\(name)"
      
        }
      
        combined.subscribe { value in
      
          print(value)
      
        }.disposed(by:disposeBag)
      
        // output
      
        next(1-A-cat)
        next(2-A-cat)
        next(2-B-cat)
        next(2-B-dog)
        next(3-B-dog)
        next(3-C-dog)
        next(3-C-cow)
        next(4-C-cow)
        next(4-D-cow)
        next(4-D-fox)
        completed
      
  7. .distinctUntilChanged:

    • only emits items that are different from previously emitted item, avoids consecutive duplicates.

    • Use distinctUntilChanged when you need to prevent duplicate consecutive emissions, such as avoiding repeated updates or notifications.

        import Foundation
        import RxSwift
      
        let disposeBag = DisposeBag()
      
        let numbers = Observable.of(1,1,2,2,2,3,4,5,5,5,6)
      
        let distinct = numbers.distinctUntilChanged()
      
        distinct.subscribe { number in
      
          print(number)
      
        }.disposed(by: disposeBag)
      
        // output
      
        next(1)
        next(2)
        next(3)
        next(4)
        next(5)
        next(6)
        completed
      
  8. .debounce:

    • emits items only after a specified time period has passed without emitting a new item.

    • Use debounce to manage rapid successive emissions, such as handling user input in search bars to reduce the number of requests or updates.

        import Foundation
        import RxSwift
      
        let disposeBag = DisposeBag()
      
        let searchBar = Observable.of("a", "ab", "abc", "abcd", "abcde", "abcdef")
        let debounced = searchBar.debounce(.milliseconds(300), scheduler: MainScheduler.instance)
      
        debounced.subscribe { value in
      
          print(value)
      
        }.disposed(by: disposeBag)
      
        // output
      
        next(abcdef)
        completed
      

      Summary

      • map(_:): Transforms each item.

      • flatMap(_:): Projects items into observables and flattens them.

      • filter(_:): Allows items that meet a condition.

      • zip(_:_): Combines items pairwise.

      • merge(_:): Merges emissions from multiple observables.

      • combineLatest(_:_): Combines the latest values from observables.

      • distinctUntilChanged(): Prevents consecutive duplicates.

      • debounce(_:): Delays emissions until a period of inactivity.

Additional Types

  1. Single:

    • it is an observable that either emits a single value or an error.

    • it can never emit multiple values or complete without emitting value.

    • Use Single when you expect a single result from an operation, such as network requests or database queries.

        import Foundation
        import RxSwift
      
        let disposeBag = DisposeBag()
      
        let single: Single<String> = Single.create { single in
            let success = false // Or some condition
      
            if success {
                single(.success("Single value"))
            } else {
                single(.failure(NSError(domain: "", code: -1, userInfo: nil)))
            }
            return Disposables.create()
        }
      
        single.subscribe(
            onSuccess: { value in print("Received value: \\(value)") },
            onFailure: { error in print("Error: \\(error)") }
        ).disposed(by: DisposeBag())
      
        // output
      
        Error: Error Domain= Code=-1 "(null)"
      
  2. Completable:

    • it represents an observable sequence that either completes successfully or with an error, but never emits any items.

    • Use Completable for operations that perform a task (e.g., save data, make network requests) but do not produce a result that you need to handle.

        import Foundation
        import RxSwift
      
        let disposeBag = DisposeBag()
      
        let completable: Completable = Completable.create { completable in
            let success = true // Or some condition
      
            if success {
                completable(.completed)
            } else {
                completable(.error(NSError(domain: "", code: -1, userInfo: nil)))
            }
            return Disposables.create()
        }
      
        completable.subscribe(
            onCompleted: { print("Completed") },
            onError: { error in print("Error: \\(error)") }
        ).disposed(by: disposeBag)
      
        // output
      
        Completed
      
  3. Maybe:

    • it represents an observable sequence that can either emit a single value, complete without emitting a value, or emit an error.

    • Use Maybe for scenarios where a result may or may not be available and handling both the presence or absence of a value is required.

        import Foundation
        import RxSwift
      
        let disposeBag = DisposeBag()
      
        let maybe: Maybe<String> = Maybe.create { maybe in
            let success = true // Or some condition
            let value: String? = nil
      
            if success {
                if let value = value {
                    maybe(.success(value))
                } else {
                    maybe(.completed)
                }
            } else {
                maybe(.error(NSError(domain: "", code: -1, userInfo: nil)))
            }
            return Disposables.create()
        }
      
        maybe.subscribe(
            onSuccess: { value in print("Received value: \\(value)") },
            onError: { error in print("Error: \\(error)") },
            onCompleted: { print("Completed without value") }
        ).disposed(by: disposeBag)
      
        // output
      
        Completed without value
      
  4. Driver:

    • It is a special type of observable that ensures the observable sequence is always on the main thread and that it does not emit errors

    • Use Driver for data that needs to be observed and bound to UI elements, ensuring that updates occur on the main thread and handle errors gracefully.

        import Foundation
        import RxCocoa
        import RxSwift
      
        let disposeBag = DisposeBag()
      
        Driver.of("Hello", "World").drive(
          onNext: { value in
            print("Driver value: \\(value)")
        }).disposed(by: disposeBag)
      
        //output
      
        Driver value: Hello
        Driver value: World
      
  5. Signal:

    • it is similar to Driver, but is used for event-based sequences that do not emit errors and are always on the main thread

    • Use Signal for event-based sequences where you want to ensure main thread execution and error handling is not needed.

        import Foundation
        import RxCocoa
        import RxSwift
      
        let disposeBag = DisposeBag()
      
        let signal = Signal.of("Event 1", "Event 2")
      
        signal.emit(onNext: { event in
            print("Signal event: \\(event)")
        }).disposed(by: disposeBag)
      
        // output
      
        Signal event: Event 1
        Signal event: Event 2
      
  6. ControlProperty:

    • it is a type of observable specifically designed for binding UI control properties in RxCocoa.

    • It provides a way to bind and observe UI control properties.

        import RxSwift
        import RxCocoa
      
        let textField = UITextField()
        let disposeBag = DisposeBag()
      
        let textObservable: ControlProperty<String?> = textField.rx.text
      
        textObservable
          .subscribe(onNext: { text in
            print("Text changed: \\(text ?? "")")
          })
          .disposed(by: disposeBag)
      
  7. ControlEvent:

    • it represents events (e.g., button taps, switch toggles) from UI controls in RxCocoa.

    • Use ControlEvent to handle user interactions and events from UI controls.

        import RxSwift
        import RxCocoa
      
        let button = UIButton()
        let disposeBag = DisposeBag()
      
        button.rx.tap
          .subscribe(onNext: {
            print("Button tapped")
          })
          .disposed(by: disposeBag)
      

Summary

  • Single: Use for operations that return a single result or error.

  • Completable: Use for operations that complete with success or error but do not return a result.

  • Maybe: Use for operations where the result is optional.

  • Driver: Use for UI-related sequences that must operate on the main thread and handle errors gracefully.

  • Signal: Use for event-based sequences on the main thread without error handling.

  • ControlProperty: Use for binding and observing UI control properties.

  • ControlEvent: Use for handling UI control events.

4
Subscribe to my newsletter

Read articles from varun pullur directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

varun pullur
varun pullur