Flow Cancellation
In this series of articles , we are discussing about Kotlin flows and in this article we will discuss about Flow Cancellation.
There are 2 ways to cancel the flow
1.Cancel the coroutine job
// cancel the job
val job = launch {
sampleFlow()
.onCompletion { cause ->
if (cause is CancellationException) {
println("Flow cancelled Manually by job ")
}
}
.collect {
println("Type 1 Received $it")
}
}
delay(550)
job.cancel()
/**
* Output :
* Type 1 Received 1
* Type 1 Received 3
* Flow cancelled Manually by job
*/
2.Cancel in scope
// cancel the code in scope
launch {
sampleFlow()
.onCompletion { cause ->
if (cause is CancellationException) {
println("Flow cancelled Manually in scope")
}
}.collect {
println("Type 2 Received $it")
if (it == 3) {
cancel()
}
}
}.join()
/**
* Output :
* Type 2 Received 1
* Type 2 Received 3
* Flow cancelled Manually in scope
*/
Code gist for 2 types of cancelling can be found in https://gist.github.com/vprabhu/ebce4cf922f50325f3f82b400681c0ac
Cooperative Cancellation
All suspend function in coroutines are
cancellable
They check for coroutine cancellation internally and throw
CancellationException
when cancelledBut if a coroutine is not checking for cancellation when working on its job , then it is non-cancellable .
Behaviour of flowOf() when cancelling
flowOf()
doesn't check internally if the coroutine is active
launch { flowOf(1,3,5) .onCompletion { cause -> if (cause is CancellationException) { println("Flow cancelled $cause") } } .collect { println("Received $it") if (it == 3) { cancel() } } }.join() /** * output : * Received 1 * Received 3 * Received 5 */
In the above code , even when you are cancelling the flow if the value is 3 inside the
collect{}
block , its not stopping the flow.There are 2 ways to check that
1.ensureActive()
in onEach()
block
launch {
flowOf(1,3,5)
.onCompletion { cause ->
if (cause is CancellationException) {
println("Flow cancelled $cause")
}
}.onEach {
ensureActive()
}
.collect {
println("Received $it")
if (it == 3) {
cancel()
}
}
}.join()
/**
* output :
* Received 1
* Received 3
* Flow cancelled kotlinx.coroutines.JobCancellationException:
* StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@635a732e
*/
2.cancellable()
launch {
flowOf(1,3,5)
.onCompletion { cause ->
if (cause is CancellationException) {
println("Flow cancelled $cause")
}
}
.cancellable()
.collect {
println("Received $it")
if (it == 3) {
cancel()
}
}
}.join()
/**
* output :
* Received 1
* Received 3
* Flow cancelled kotlinx.coroutines.JobCancellationException:
* StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@635a732e
*/
Code gist for 2 types of cancelling in the flowOf()
can be found in https://gist.github.com/vprabhu/1a9defc5e8decffc619b5ed746b43d17
When the flow is working on something intensive , we can use
ensureActive()
inside the function to cancel it. Let me know if you need a sample code .
In the next article , we will discuss about Cold Flow and Hot Flow.
Please leave your comments to improve and discuss more
Happy and Enjoy coding
Subscribe to my newsletter
Read articles from Vignesh Prabhu directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Vignesh Prabhu
Vignesh Prabhu
I am an Android application developer who is looking for new challenges to solve , love to learn and implement new things in coding