Parallel processing with Worker Pools in Kotlin

Thomas SchühlyThomas Schühly
2 min read

Worker pools

Recently while doing a project with Go I came across Worker Pools on GoByExample to do parallel processing. I didn't find many resources for implementing Worker Pools in Kotlin, but it seemed a good idea for my current Spring Boot + Kotlin application.

Kotlin

Kotlin uses coroutines for concurrency which are fairly similar to goroutines.

Coroutines use structured concurrency to delimit the lifetime of each coroutine to a certain scope.

To be able to create a worker group we need to create a coroutine scope that is persistent over the lifetime of our application. We achieve this behavior with the SupervisorJob() context.

private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)

We then create a buffered channel as a queue for our image data and the URL where we want to upload it.

val channel = Channel<Pair<String, ByteArray>>(10000)

I'm using the Spring @PostConstruct annotation to create the worker group and listen to the channel for new data. Each time an item is in the queue we launch the upload function, if no item is in the queue the function is suspended.

@PostConstruct
    fun createWorkerGroup() {
        coroutineScope.launch {
            for (x in 1..5) {
                launch {
                    println("Create Worker $x")
                    while (true) {
                        uploadImage(channel.receive())
                    }
                }
            }
        }
    }

Finally, we can send our data to our channel inside a runBlocking coroutine scope:

runBlocking {
  uploadService.channel.send(Pair(url, image.bytes))
}

WebDav

In my web application users upload images from their mobile phone to my webserver, afterwards I want to upload these pictures to a Hetzner Storage Box over webdav as a cheap alternative to an S3 object storage.

I use the sardine java webdav client library for its simplicity.

The usage is very straightforward, you configure the client with:

val sardine = SardineFactory.begin("webDavUsername", "webDavPassword")

The uploadImage Function is called every time a new image is sent over the channel we created earlier. In this function, we call sarding.put() to save the image file.

sardine.put("https://username.your-storagebox.de/foldername/imagename.jpg", ImageByteArray)

That is all we need to have a highly parallel File upload.

If you want to learn more about HTMX + Spring Boot check out my series Web development without the JavaScript headache with Spring + HTMX.

My side business PhotoQuest is also built with HTMX + JTE

0
Subscribe to my newsletter

Read articles from Thomas Schühly directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Thomas Schühly
Thomas Schühly

Thomas Schühly’s server-side rendering journey started as a developer trying to make life easier while developing his first bootstrapped product in his free time. Creating Spring ViewComponent enabled him to be the youngest Speaker at the largest European Spring conference and build awesome software full-time with his open-source library at alanda.io. He regularly talks at Java User Groups about htmx and server-side rendering with Spring while contributing to the open-source community. PhotoQuest