Kotlin : How to Flow?

Romman SabbirRomman Sabbir
5 min read

Kotlin Flow is a powerful and easy-to-use tool for handling asynchronous data streams in Kotlin, especially useful for modern Android development. At first glance, Flow might seem a bit complex, but don’t worry! In this article, we’ll break it down into its core parts with simple examples. By the end of this guide, we'll understand how Flow works under the hood and how to replicate its basic functionality step by step.

What We’ll Cover:

  1. Creating a Stream (Emission)

  2. Introducing Suspension (Pausing Between Emissions)

  3. Applying Operators (Transformation)

  4. Collecting Values (Handling the Stream)

  5. Handling Concurrency (Running Emission on a Different Thread)

Let’s get started!


1. Creating a Simple Flow-like Class (Emission)

In Kotlin Flow, values are emitted and processed over time. To replicate this behavior, we’ll create a simple MyFlow class that emits values when a consumer collects them.

Breaking Down the Basics: Creating Our Own Flow:

Let’s start by building a simple version of a flow using Kotlin. This will help us understand how Kotlin Flow works behind the scenes.

We’ll create two classes:

  1. MyFlow (to emit values) and

  2. MyFlowCollector (to handle the emitted values).

class MyFlow<T>(private val block: suspend MyFlowCollector<T>.() -> Unit) {

    suspend fun collect(collector: MyFlowCollector<T>) {
        collector.block() // Executes the block, emitting values
    }
}

class MyFlowCollector<T> {
    suspend fun emit(value: T) {
        println("Emitting value: $value") // Simulates emitting values
    }
}

What's Going On Here?

  • MyFlow: This class represents a stream of data, just like a real flow in Kotlin. It’s designed to emit values when someone collects them. Think of it as a machine that’s ready to send data but won’t start until you ask for it.

  • MyFlowCollector: This class is the one that "catches" or handles the values emitted by MyFlow. In this case, we simply print out each value that gets emitted.

  • emit(): This function is responsible for sending out values. Here, it prints each value to simulate data being "emitted" in a flow.

Putting It to Work

Now, we’ll use this class to actually emit some values, just like a flow in Kotlin would.

In short: MyFlow is the data stream, and MyFlowCollector is how we handle the data when it’s ready.

Ready to see it in action? Let’s go!

Example:

val myFlow = MyFlow<Int> {
    emit(1)
    emit(2)
    emit(3)
}

This creates a MyFlow that will emit numbers 1, 2, and 3. However, nothing happens until we collect these values.


2. Collecting the Flow

In Kotlin Flow, values are not emitted until they are collected. So, let’s implement a method to collect the values:

Example:

suspend fun main() {
    val myCollector = MyFlowCollector<Int>()

    myFlow.collect(myCollector) // Collect the flow
}

Output:

Emitting value: 1
Emitting value: 2
Emitting value: 3

Explanation:

  • We create an instance of MyFlowCollector and call collect() on the MyFlow to start emitting the values. Each value is printed as it's emitted.

3. Adding Suspension (Pausing Between Emissions)

Now, let’s make things a bit more realistic by adding suspension between emissions. This simulates how real-world asynchronous operations work, such as fetching data from a network.

Example with Delay:

val myFlowWithDelay = MyFlow<Int> {
    emit(1)
    delay(1000) // Simulate suspension between emissions
    emit(2)
    delay(1000)
    emit(3)
}

When we collect this flow, it will emit each value with a 1-second pause.

Example:

suspend fun main() {
    val myCollector = MyFlowCollector<Int>()

    myFlowWithDelay.collect(myCollector)
}

Output:

Emitting value: 1
(1-second delay)
Emitting value: 2
(1-second delay)
Emitting value: 3

Explanation:

  • We added delay(1000) to introduce a pause between each emission. This simulates real-world delays, such as waiting for data from a server, without blocking the main thread.

4. Applying Transformations (Operators)

One of the coolest features of Kotlin Flow is the ability to transform data using operators like map. Let’s replicate this functionality by creating a simple map operator that modifies each emitted value.

Example of a map Operator:

fun <T, R> MyFlow<T>.map(transform: suspend (T) -> R): MyFlow<R> {
    return MyFlow {
        collect(object : MyFlowCollector<T>() {
            override suspend fun emit(value: T) {
                val newValue = transform(value) // Apply the transformation
                this@MyFlow.emit(newValue as R)
            }
        })
    }
}

Explanation:

  • map: This function takes a transformation function that modifies each emitted value (just like Kotlin Flow’s map operator).

Let’s apply this map operator to our flow:

Example:

val transformedFlow = myFlowWithDelay.map { it * 2 }

suspend fun main() {
    val myCollector = MyFlowCollector<Int>()

    transformedFlow.collect(myCollector)
}

Output:

Emitting value: 2
(1-second delay)
Emitting value: 4
(1-second delay)
Emitting value: 6

Explanation:

  • The map operator transforms each emitted value by multiplying it by 2. As a result, the collected values are 2, 4, and 6.

5. Handling Concurrency with flowOn

In real-world scenarios, you often want to emit values on a background thread (e.g., doing network requests) and collect them on the main thread (e.g., updating the UI). Kotlin Flow allows you to do this using flowOn. Let’s replicate this functionality using coroutines.

Example with flowOn:

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext

fun <T> MyFlow<T>.flowOn(dispatcher: CoroutineDispatcher): MyFlow<T> {
    return MyFlow {
        withContext(dispatcher) {
            collect(this@MyFlow)
        }
    }
}

Explanation:

  • flowOn: This function changes the context of the emission to a specified dispatcher (e.g., Dispatchers.IO for background tasks).

Now, let’s use flowOn to run the emission on the IO dispatcher (a background thread):

Example:

val flowOnBackground = myFlowWithDelay.flowOn(Dispatchers.IO)

suspend fun main() {
    val myCollector = MyFlowCollector<Int>()

    flowOnBackground.collect(myCollector)
}

Explanation:

  • The emission now happens on the IO dispatcher, meaning that all emissions run in the background, while the collection can happen on the main thread (if needed).

Conclusion

By breaking down the core functionalities of Kotlin Flow, we've built a simple version that mimics how Flow works. We covered:

  1. Emission: Emitting values using a MyFlow class.

  2. Suspension: Adding delays between emissions to simulate real-world async tasks.

  3. Transformation: Using operators like map to modify emitted values.

  4. Concurrency: Controlling where the emission happens using flowOn for better thread management.

Kotlin Flow, while built on top of coroutines, offers powerful tools to handle asynchronous data streams. By understanding how it works internally, you’ll have a clearer idea of why it’s so useful and how to apply it effectively in your own projects.


That’s it for today. Happy Coding…

0
Subscribe to my newsletter

Read articles from Romman Sabbir directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Romman Sabbir
Romman Sabbir

Senior Android Engineer from Bangladesh. Love to contribute in Open-Source. Indie Music Producer.