Building a Generic Data Integration Pipeline in Scala: The Data Plumber - Part 1

YadukrishnanYadukrishnan
6 min read

Introduction

This blog is a part of the Data Plumber Series.

In today's enterprise applications, data integration is not just a feature—it's a fundamental requirement. Applications need to communicate with various systems using different protocols and formats:

  • File-based integrations (CSV, JSON, XML)

  • Database systems (PostgreSQL, MongoDB, Cassandra)

  • Message queues (Kafka, RabbitMQ)

  • REST APIs and WebSockets

  • IoT protocols (MQTT)

  • Hardware devices with proprietary protocols

Each integration point brings its own complexities, patterns, and challenges. When your application needs to handle multiple such integrations, having a standardized approach becomes crucial. Instead of creating custom implementations for each integration, a generic framework can provide:

  • Consistent patterns across different integrations

  • Reusable components

  • Type-safe data handling

  • Standardized error handling

  • Common monitoring and logging interfaces

Disclaimer

This article presents a proof-of-concept implementation designed to showcase key concepts. In real-world production environments, additional factors—such as proper rollback handling, comprehensive error recovery, distributed transactions, performance tuning, and monitoring—should be carefully considered. Moreover, for single-source/sink integrations, a specialized solution might be more appropriate than a generic framework.

The Data Plumber Framework

Data Plumber is a type-safe data pipeline framework in Scala that demonstrates how to build a generic integration framework. Here's its high-level architecture:

Core Components

  • Source: Reads data from the origin system, handles data extraction and manages source-specific configurations

  • Transformer: Converts data between source and sink formats

  • Sink: Writes data to the destination system, handles destination-specific configurations

  • DataPlumber: Combines Source, Transformer, and Sink into a complete pipeline

Framework Variants

The Data Plumber framework comes in three variants, each serving different use cases:

  1. Simple Implementation

    • Basic source-to-sink pipeline

    • Synchronous processing

    • Ideal for quick integrations and prototypes

  1. Offsetable Implementation

    • Support for batch processing

    • Progress tracking with Redis

    • Resumable operations

    • Will be covered in Part 2

  1. Streaming Implementation

    • Built with fs2 streams

    • Real-time data processing

    • Back-pressure handling

    • Will be covered in Part 3

Simple DataPlumber

Let’s look at the simple data plumber implementation. The core part consists of the following three traits:

trait DataPlumber[S, D] {
    def source: DataSource[S]
    def sink: DataSink[D]
    final def run = sink.write(transform(source.read))
    def transform(rows: List[S]): List[D]
}
trait DataSource[S] {
    def read: List[S]
}
trait DataSink[D] {
    def write(rows: List[D]): Unit
}

These basic traits can be implemented for various data sources and sinks like CSV files, databases (MongoDB, PostgreSQL), REST APIs, message queues, or any custom protocol. Each implementation only needs to handle its specific read or write logic while following the simple interface defined by DataSource[S] or DataSink[D]. This approach makes the framework easily extensible while maintaining a consistent pattern for building data pipelines.

Sample Implementations

Now, let's look at a sample concrete implementation with different sources and sinks. We'll build a data pipeline that reads Star Trek logs from a CSV file and writes them to MongoDB, demonstrating how to implement each component of our framework.

Domain Models

enum LogType {
    case CaptainsLog, FirstOfficerLog, ChiefMedicalLog, ChiefEngineerLog, PersonalLog
}
// Source model (from CSV)
case class StarLogEntry(
    starDate: Double,
    logType: LogType,
    crewId: Int,
    entry: String,
    planetaryDate: LocalDate,
    starfleetTime: LocalDateTime
)
// Destination model (for MongoDB)
case class MongoStarLogEntry(
    starDate: Double,
    logType: String,  // Note: Enum converted to String for MongoDB
    crewId: Int,
    entry: String,
    planetaryDate: LocalDate,
    starfleetTime: LocalDateTime
)

CSV Source Implementation

Now, let’s add an implementation for reading from the csv file. Note that, this is a very crude implementation that doesn’t handle all csv variations, but that is not needed for the sample implementation:

trait CsvSource[T] extends DataSource[T] {
    def location: String
    def read: List[T] = {
        println(s"reading from csv file: $location")
        val source = Source.fromFile(location)
        try {
            val lines = source.getLines().toList
            // Skip header line
            val dataRows = lines.tail
            dataRows.map(fromCSVRow)
        } finally {
            source.close()
        }
    }
    protected def fromCSVRow(row: String): T
}

We implement this trait for our Star Trek logs:

class StarLogSource extends CsvSource[StarLogEntry] {
    override def location: String = "/path/to/starlog.csv"

    override protected def fromCSVRow(row: String): StarLogEntry = {
        row.split(",").map(_.trim) match {
            case Array(starDate, logType, crewId, entry, planetaryDate, starfleetTime) =>
                StarLogEntry(
                    starDate.toDouble,
                    LogType.valueOf(logType),
                    crewId.toInt,
                    entry,
                    LocalDate.parse(planetaryDate),
                    LocalDateTime.parse(starfleetTime)
                )
            case _ => 
                throw new IllegalArgumentException(s"Invalid CSV row format: $row")
        }
    }
}

Sink Implementation

Similarly, the we can first implement MongoSink trait for MongoDB operations:

trait MongoSink[T] extends DataSink[T] {
    def collectionName: String
    def mongoUri: String

    private val connectionString = new ConnectionString(mongoUri)
    private val settings = MongoClientSettings
        .builder()
        .applyConnectionString(connectionString)
        .applyToSocketSettings(builder =>
            builder.connectTimeout(5.seconds.toMillis.toInt, TimeUnit.MILLISECONDS)
        )
        .build()

    private val mongoClient = MongoClients.create(settings)
    private val database = mongoClient.getDatabase(connectionString.getDatabase)
    private val collection = database.getCollection(collectionName)

    def write(rows: List[T]): Unit = {
        val documents = rows.map(toDocument)
        collection.insertMany(documents.asJava)
    }
    protected def toDocument(value: T): Document
}

Now, we can use this trait to implement the concrete sink for Mongo:

class StarLogSink extends MongoSink[MongoStarLogEntry] {
    override def collectionName: String = "starlog"
    override def mongoUri: String = "mongodb://localhost:27017/starlog"

    override def toDocument(value: MongoStarLogEntry): Document = {
        val doc = new Document()
        doc.put("starDate", value.starDate)
        doc.put("logType", value.logType)
        doc.put("crewId", value.crewId)
        doc.put("entry", value.entry)
        doc.put("planetaryDate", value.planetaryDate)
        doc.put("starfleetTime", value.starfleetTime)
        doc
    }
}

The Complete Pipeline

Finally, we implement the DataPlumber trait to tie everything together:

class StarLogIntegrator extends DataPlumber[StarLogEntry, MongoStarLogEntry] {
    override def source: CsvSource[StarLogEntry] = new StarLogSource()
    override def sink: MongoSink[MongoStarLogEntry] = new StarLogSink()

    override def transform(list: List[StarLogEntry]): List[MongoStarLogEntry] = {
        list.map(value => MongoStarLogEntry(
            value.starDate,
            value.logType.toString,
            value.crewId,
            value.entry,
            value.planetaryDate,
            value.starfleetTime
        ))
    }
}

Running the Pipeline

To execute our pipeline:

@main
def start = {
    val integrator = new StarLogIntegrator()
    integrator.run
}

This example demonstrates how the framework's traits provide the basic infrastructure while allowing us to focus on the specific implementation details for our use case. The framework handles the common patterns (file reading, MongoDB connection management, etc.) while we implement the data-specific logic (CSV parsing, document mapping, etc.).

The databases required for this application is available as docker-compose file. Make sure to run it before starting the application.

Conclusion

In this first part, we explored the basic implementation of the Data Plumber framework, demonstrating how to build type-safe data pipelines in Scala. The simple implementation showcases the core principles of the framework: separation of concerns through distinct Source and Sink components, type safety through generics, and easy extensibility through traits. While this implementation is straightforward, using minimal dependencies and synchronous processing, it serves as a solid foundation for understanding data pipeline patterns.

The entire DataPlumber implementation is available on GitHub.

However, this simple implementation has its limitations - it loads all data into memory, lacks error recovery mechanisms, and doesn't support progress tracking. In the next parts of this series, we'll address these limitations by:

  • Part 2: Adding offset-based processing and batch handling

  • Part 3: Implementing streaming capabilities using fs2 for real-time data processing

These enhancements will make the framework more suitable for production use cases while maintaining the same clean and type-safe design principles we've established here.

0
Subscribe to my newsletter

Read articles from Yadukrishnan directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Yadukrishnan
Yadukrishnan

Travel | Movies | History | Nature I am a software developer. Started my career as a Java developer, but later switched to Scala. I wish to write Scala articles which are easier for newbies to follow. I love to travel, prefer to be alone or in very small group. Love to watch movies/series.