Scheduler in Combine: Managing Asynchronous Tasks Efficiently
What is a scheduler
According to the scheduler documentation, a scheduler is “a protocol that defines when and where to execute a closure.” You can use a scheduler to execute code as soon as possible, or after a future date
Combine does not work directly with threads. Instead, it allows Publishers to operate on specific Schedulers.
The where means current run loop, dispatch queue or operation queue.
The when means virtual time, according to the scheduler’s clock. The work performed by a scheduler will adhere to the scheduler’s clock only, which might not correspond to the real-time of the system.
Types of Schedulers in Combine
Combine provides several types of schedulers, all of which conform to the Scheduler
protocol:
DispatchQueue
a DispatchQueue
is a first-in-first-out queue that can accept tasks in the form of block objects and execute them serially or concurrently. You’ll commonly use serial or global queues for the background work, and the main queue for the UI-related work
The system manages work submitted to a DispatchQueue
on a pool of threads. The DispatchQueue
doesn’t guarantee which thread it will use for executing tasks unless the DispatchQueue
represents an app’s main thread
DispatchQueue
is one of the safest ways to schedule commands
OperationQueue
Performs the work on a specific operation queue. Similarly to the dispatch queues, use OperationQueue.main
for UI work, and other queues for the background work.
ImmediateScheduler
An IntermediateScheduler
is used to perform asynchronous operations immediately. This means that commands will be immediately executed on the application’s current threads
import Combine
import Foundation
let immediateScheduler = ImmediateScheduler.shared
let aNum = [1, 2, 3].publisher
.receive(on: immediateScheduler)
.sink(receiveValue: {
print("Received \($0) on thread \(Thread.current)")
}
)
The code snippet above is running on the main thread, so the result will be also received in the main thread
Received 1 on thread <_NSMainThread: 0x600001704040>{number = 1, name = main}
Received 2 on thread <_NSMainThread: 0x600001704040>{number = 1, name = main}
Received 3 on thread <_NSMainThread: 0x600001704040>{number = 1, name = main}
Default Scheduler
💡 Even if you don’t specify any scheduler, Combine provides you with the default one. The scheduler uses the same thread, where the task is performed. For example, if you perform a background task, Combine provides a scheduler that receives the task on the same background thread
let subject = PassthroughSubject<Int, Never>()
// 1
let token = subject.sink(receiveValue: { value in
print(Thread.isMainThread)
})
// 2
subject.send(1)
// 3
DispatchQueue.global().async {
subject.send(2)
}
Print true if the value is received on the main thread, and false otherwise.
Send
1
from the main thread.Send
2
from the background thread.
It will print:
true
false
As expected, the values are received on different threads.
Switching Schedulers
In order not to freeze or crash when interacting with the user interface, the resource-consuming tasks should be done in the background and then handle their result on the main thread. The Combine’s way of doing this is by switching schedulers with two methods: subscribe(on:)
and receive(on:)
receive(on:)
The receive(on:)
method changes a scheduler for all publisher that comes after it is declared
Just(1)
.map { _ in print(Thread.isMainThread) }
.receive(on: DispatchQueue.global())
.map { print(Thread.isMainThread) }
.sink { print(Thread.isMainThread) }
it will print:
true
false
false
The process is visualized as follows:
All operators to the right of receive(on:)
deliver elements on DispatchQueue.global()
scheduler.
subscribe(on:)
subscribe(on:)
sets where subscription work happens. It stays there unless receive(on:)
changes it. Think of it as assigning work locations.
Just(1)
.subscribe(on: DispatchQueue.global())
.map { _ in print(Thread.isMainThread) }
.sink { print(Thread.isMainThread) }
It will print:
false
false
Let’s visualize the process:
All the operations happen on the DispatchQueue.global()
scheduler
The position of subscribe(on:)
doesn’t matter, as it affects the time of subscription. This snippet is equivalent to the previous one:
Just(1)
.map { _ in print(Thread.isMainThread) }
.subscribe(on: DispatchQueue.global()) // Position of subscribe(on:) has changed
.sink { print(Thread.isMainThread) }
💡 You must notice that the definition of subscribe(on:)
says nothing about the scheduler on which we receive values. In case a publisher emits values on a different thread, it will be received on that thread. A typical example is a data task publisher:
URLSession.shared.dataTaskPublisher(for: URL(string: "<https://www.vadimbulavin.com>")!)
.subscribe(on: DispatchQueue.main) // Subscribe on the main thread
.sink(receiveCompletion: { _ in },
receiveValue: { _ in
print(Thread.isMainThread) // Are we on the main thread?
})
The code will print false because the publisher emits values on a background thread. In such cases, we must use receive(on:)
to specify a scheduler
Performing asynchronous tasks with schedulers
Let’s see how we can switch schedulers by combining subscribe(on:)
and receive(on:)
Assume that we have a publisher with a long-running task:
struct BusyPublisher: Publisher {
typealias Output = Int
typealias Failure = Never
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
sleep(5)
subscriber.receive(subscription: Subscriptions.empty)
_ = subscriber.receive(1)
subscriber.receive(completion: .finished)
}
}
When call from the UI thread, it freezes the app for 10 seconds. Remember, Combine defaults to the same scheduler from where the element is fired:
BusyPublisher()
.sink { _ in
print("Received value")
}
print("Hello")
As expected, Hello is printed after the value is received:
Received value
Hello
💡 So the solution for doing asynchronous work with Combine is subscribing on the background scheduler and then receiving the events on the UI Scheduler:
BusyPublisher()
.subscribe(on: DispatchQueue.global())
.receive(on: DispatchQueue.main)
.sink { _ in print("Received value") }
print("Hello")
As expected, Hello is printed before the value is received, this means that the application is not frozen by the publisher blocking the main thread.
Hello
Received value
Summary
Let’s recap the main takeaways.
subscribe(on:)
andreceive(on:)
are primary multithreading methods of the Combine Swift framework.The default scheduler uses the same thread from where the element was generated.
receive(on:)
sets a scheduler for all operators coming afterward.subscribe(on:)
sets a scheduler for the whole stream, starting at the time the Publisher is subscribed to. The stream stays on the same scheduler untilreceive(on:)
specifies another scheduler.The position of
subscribe(on:)
does not matter.Asynchronous work is typically performed by subscribing on the background scheduler and receiving values on the UI scheduler.
Thanks for Reading! ✌️
If you have any questions or corrections, please leave a comment below or contact me via my LinkedIn account Pham Trung Huy.
Happy coding 🍻
Subscribe to my newsletter
Read articles from Phạm Trung Huy directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Phạm Trung Huy
Phạm Trung Huy
👋 I am a Mobile Developer based in Vietnam. My passion lies in continuously pushing the boundaries of my skills in this dynamic field.