Kafka Streams로 실시간 데이터 처리하기: merge()와 join() 연산 비교

실시간 데이터 처리는 현대 애플리케이션에서 필수적인 요소가 되었습니다. 수많은 이벤트가 끊임없이 발생하는 환경에서 이를 효과적으로 처리하기 위해 Kafka와 같은 메시징 시스템이 널리 사용되고 있습니다. 특히 Kafka Streams API는 복잡한 스트림 처리 애플리케이션을 손쉽게 구축할 수 있게 해주는 강력한 도구입니다.
이 글에서는 Kafka Streams의 대표적인 두 연산인 merge()
와 join()
의 차이점과 실제 구현 방법을 살펴보겠습니다. 실제 예제 코드와 함께 각 연산의 동작 방식, 성능 특성, 그리고 적합한 사용 사례를 비교해 보겠습니다.
Kafka Streams 소개
Kafka Streams는 Apache Kafka에서 제공하는 클라이언트 라이브러리로, 스트림 처리 애플리케이션을 쉽게 개발할 수 있도록 도와줍니다. 일반적인 ETL(Extract, Transform, Load) 도구나 데이터 처리 프레임워크와 달리, Kafka Streams는 별도의 클러스터나 인프라 없이 표준 Java 애플리케이션으로 실행됩니다.
Kafka Streams의 주요 특징:
상태 관리: 로컬 상태 저장소를 통한 효율적인 상태 관리
실시간 처리: 이벤트 발생 즉시 처리 가능
내결함성: 장애 발생 시 자동 복구 메커니즘
확장성: 수평적 확장을 통한 병렬 처리
정확히 한 번 처리: 정확히 한 번 처리 보장(exactly-once semantics)
프로젝트 설정
이 예제는 Spring Boot와 Kafka Streams를 사용하여 구현되었습니다. 주요 의존성은 다음과 같습니다:
kotlindependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.kafka:spring-kafka")
implementation("org.apache.kafka:kafka-streams")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
}
application.yml 파일에는 Kafka 및 Kafka Streams 관련 설정을 추가합니다:
yamlspring:
kafka:
bootstrap-servers: localhost:9092
streams:
application-id: kafka-stream-example
properties:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
num.stream.threads: 3
기본 스트림 처리 구현
먼저 가장 기본적인 스트림 처리 로직을 구현해 보겠습니다. 하나의 입력 토픽에서 메시지를 읽어 처리한 후 출력 토픽으로 전송하는 예제입니다:
@Configuration
@EnableKafkaStreams
class StreamsProcessor {
private val logger = LoggerFactory.getLogger(StreamsProcessor::class.java)
@Value("\${kafka.input-topic:input-topic}")
private lateinit var inputTopic: String
@Value("\${kafka.output-topic:output-topic}")
private lateinit var outputTopic: String
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<String, String> {
// 입력 토픽에서 스트림 생성
val stream = streamsBuilder.stream<String, String>(inputTopic)
// 간단한 변환 수행
stream
.peek { key, value -> logger.info("스트림 입력: {} - {}", key, value) }
.mapValues { value -> "$value (processed)" }
.peek { key, value -> logger.info("스트림 출력: {} - {}", key, value) }
.to(outputTopic)
return stream
}
}
이 코드는 StreamsBuilder
를 사용하여 입력 토픽으로부터 스트림을 생성하고, 메시지를 처리한 후 출력 토픽으로 전송하는 기본적인 토폴로지를 구성합니다. peek()
메서드를 통해 처리 과정을 로깅하고, mapValues()
를 사용하여 메시지 값을 변환합니다.
merge() 연산: 토픽 병합하기
이제 merge()
연산을 사용하여 두 개의 입력 토픽을 하나의 출력 토픽으로 병합하는 로직을 구현해 보겠습니다:
@Configuration
@EnableKafkaStreams
class TopicMergeProcessor {
private val logger = LoggerFactory.getLogger(TopicMergeProcessor::class.java)
@Value("\${kafka.merge-input-topic-1:merge-input-topic-1}")
private lateinit var mergeInputTopic1: String
@Value("\${kafka.merge-input-topic-2:merge-input-topic-2}")
private lateinit var mergeInputTopic2: String
@Value("\${kafka.merge-output-topic-1:merge-output-topic-1}")
private lateinit var mergeOutputTopic1: String
@Bean
fun topicMergeStream(streamsBuilder: StreamsBuilder): KStream<String, String> {
val stringSerde = Serdes.String()
// 첫 번째 입력 토픽에서 스트림 생성
val inputStream1: KStream<String, String> = streamsBuilder.stream(
mergeInputTopic1,
Consumed.with(stringSerde, stringSerde)
)
// 두 번째 입력 토픽에서 스트림 생성
val inputStream2: KStream<String, String> = streamsBuilder.stream(
mergeInputTopic2,
Consumed.with(stringSerde, stringSerde)
)
// 첫 번째 스트림에 처리 지연 및 메타데이터 추가
val taggedStream1: KStream<String, String> = inputStream1
.peek { key, value ->
logger.info("입력 토픽1 처리 시작: key={}, value={}", key, value)
Thread.sleep(500) // 긴 처리 시간 (500ms)
logger.info("입력 토픽1 처리 완료: key={}, value={}", key, value)
}
.mapValues { value -> "{ \"source\": \"${mergeInputTopic1}\", \"data\": \"$value\" }" }
// 두 번째 스트림에 짧은 처리 지연 및 메타데이터 추가
val taggedStream2: KStream<String, String> = inputStream2
.peek { key, value ->
logger.info("입력 토픽2 처리 시작: key={}, value={}", key, value)
Thread.sleep(50) // 짧은 처리 시간 (50ms)
logger.info("입력 토픽2 처리 완료: key={}, value={}", key, value)
}
.mapValues { value -> "{ \"source\": \"${mergeInputTopic2}\", \"data\": \"$value\" }" }
// 두 스트림 병합
val mergedStream: KStream<String, String> = taggedStream1
.merge(taggedStream2)
.peek { key, value -> logger.info("병합된 출력: key={}, value={}", key, value) }
// 병합된 스트림을 출력 토픽으로 전송
mergedStream.to(mergeOutputTopic1, Produced.with(stringSerde, stringSerde))
return mergedStream
}
}
이 예제에서는 두 스트림에 의도적으로 다른 처리 시간을 부여하여 병합 동작을 관찰할 수 있도록 했습니다. merge()
연산은 두 스트림의 모든 메시지를 도착 순서대로 하나의 스트림으로 결합합니다.
테스트 결과
merge()
연산을 테스트한 결과, 다음과 같은 특징을 관찰할 수 있었습니다:
메시지 순서: 처리 시간이 짧은 토픽2의 메시지가 토픽1보다 먼저 출력 토픽에 도달
처리 독립성: 각 토픽의 메시지는 서로 독립적으로 처리됨
파티션 영향: 단일 파티션/스레드 환경에서는 발행 순서가 보존될 수 있지만, 멀티 파티션 환경에서는 순서가 섞임
로그 출력 예시:
입력 토픽1 처리 시작: key=topic1-key-1, value=Topic1 테스트 메시지 1
입력 토픽2 처리 시작: key=topic2-key-1, value=Topic2 테스트 메시지 1
입력 토픽2 처리 완료: key=topic2-key-1, value=Topic2 테스트 메시지 1
병합된 출력: key=topic2-key-1, value={ "source": "merge-input-topic-2", "data": "Topic2 테스트 메시지 1" }
입력 토픽2 처리 시작: key=topic2-key-2, value=Topic2 테스트 메시지 2
입력 토픽2 처리 완료: key=topic2-key-2, value=Topic2 테스트 메시지 2
병합된 출력: key=topic2-key-2, value={ "source": "merge-input-topic-2", "data": "Topic2 테스트 메시지 2" }
입력 토픽1 처리 완료: key=topic1-key-1, value=Topic1 테스트 메시지 1
병합된 출력: key=topic1-key-1, value={ "source": "merge-input-topic-1", "data": "Topic1 테스트 메시지 1" }
join() 연산: 키 기반 조인하기
join()
연산은 두 스트림에서 동일한 키를 가진 메시지를 결합합니다. 다음은 시간 윈도우 내에서 키 기반 조인을 수행하는 예제입니다:
@Configuration
@EnableKafkaStreams
class JoinStreamsProcessor {
private val logger = LoggerFactory.getLogger(JoinStreamsProcessor::class.java)
@Value("\${kafka.join-input-topic-1:join-input-topic-1}")
private lateinit var joinInputTopic1: String
@Value("\${kafka.join-input-topic-2:join-input-topic-2}")
private lateinit var joinInputTopic2: String
@Value("\${kafka.join-output-topic:join-output-topic}")
private lateinit var joinOutputTopic: String
@Bean
fun joinStream(streamsBuilder: StreamsBuilder): KStream<String, String> {
val stringSerde = Serdes.String()
// 첫 번째 입력 토픽에서 스트림 생성
val stream1: KStream<String, String> = streamsBuilder.stream(
joinInputTopic1,
Consumed.with(stringSerde, stringSerde)
)
// 두 번째 입력 토픽에서 스트림 생성
val stream2: KStream<String, String> = streamsBuilder.stream(
joinInputTopic2,
Consumed.with(stringSerde, stringSerde)
)
// 로깅 추가
val loggedStream1 = stream1.peek { key, value ->
logger.info("Join 입력 토픽1 수신: key={}, value={}", key, value)
}
val loggedStream2 = stream2.peek { key, value ->
logger.info("Join 입력 토픽2 수신: key={}, value={}", key, value)
}
// 윈도우 조인 설정 (5초 내에 도착한 메시지 결합)
val joinWindow = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(5))
// 스트림 조인 수행 (동일한 키를 가진 메시지 결합)
val joinedStream: KStream<String, String> = loggedStream1.join(
loggedStream2,
{ value1, value2 ->
"""{"topic1": "$value1", "topic2": "$value2", "joinTime": "${System.currentTimeMillis()}"}"""
},
joinWindow,
StreamJoined.with(stringSerde, stringSerde, stringSerde)
)
// 조인 결과 로깅
val resultStream = joinedStream.peek { key, value ->
logger.info("Join 결과: key={}, value={}", key, value)
}
// 조인된 스트림을 출력 토픽으로 전송
resultStream.to(joinOutputTopic, Produced.with(stringSerde, stringSerde))
return resultStream
}
}
이 코드는 5초 시간 윈도우 내에서 동일한 키를 가진 두 토픽의 메시지를 조인합니다. 조인 결과로 두 메시지의 값이 결합된 새로운 JSON 형식의 메시지가 생성됩니다.
윈도우 조인 테스트
윈도우 기반 조인을 테스트하기 위해 다음과 같은 시나리오를 구현했습니다:
동일한 키, 윈도우 내: 같은 키로 두 토픽에 2초 간격으로 메시지 발행 (조인됨)
동일한 키, 윈도우 밖: 같은 키로 두 토픽에 6초 간격으로 메시지 발행 (조인되지 않음)
다른 키: 서로 다른 키로 메시지 발행 (조인되지 않음)
@GetMapping("/test-windowed")
fun testWindowedJoin(): Map<String, String> {
// 동일한 키를 사용하지만 시간차를 두고 발행 (2초 간격, 윈도우 내)
for (i in 1..3) {
val key = "windowed-key-$i"
joinProducerService.sendToTopic1(key, "Join 토픽1 윈도우 테스트 메시지 $i")
// 2초 후 발행 (5초 윈도우 내에서 Join 됨)
executor.schedule({
joinProducerService.sendToTopic2(key, "Join 토픽2 윈도우 테스트 메시지 $i")
}, 2, TimeUnit.SECONDS)
}
// 윈도우 범위를 벗어나는 케이스 (6초 후 발행)
val lateKey = "late-key"
joinProducerService.sendToTopic1(lateKey, "Join 토픽1 지연 테스트 메시지")
// 6초 후 발행 (5초 윈도우를 벗어남)
executor.schedule({
joinProducerService.sendToTopic2(lateKey, "Join 토픽2 지연 테스트 메시지")
}, 6, TimeUnit.SECONDS)
// 서로 다른 키 테스트 (Join 되지 않음)
joinProducerService.sendToTopic1("different-key-1", "Join 토픽1 다른 키 테스트")
joinProducerService.sendToTopic2("different-key-2", "Join 토픽2 다른 키 테스트")
return mapOf("result" to "Join 윈도우 테스트가 시작되었습니다. 약 6초간 메시지가 순차적으로 발행됩니다.")
}
테스트 결과
join()
연산을 테스트한 결과, 다음과 같은 특징을 관찰할 수 있었습니다:
키 기반 조인: 동일한 키를 가진 메시지만 조인됨
시간 윈도우: 5초 이내에 도착한 메시지만 조인됨 (윈도우 밖 메시지는 조인되지 않음)
상태 저장: Join 연산은 스테이트풀 연산으로, 내부적으로 상태를 유지함
로그 출력 예시:
Join 입력 토픽1 수신: key=windowed-key-1, value=Join 토픽1 윈도우 테스트 메시지 1
Join 입력 토픽2 수신: key=windowed-key-1, value=Join 토픽2 윈도우 테스트 메시지 1
Join 결과: key=windowed-key-1, value={"topic1": "Join 토픽1 윈도우 테스트 메시지 1", "topic2": "Join 토픽2 윈도우 테스트 메시지 1", "joinTime": "1621487654321"}
Join 입력 토픽1 수신: key=late-key, value=Join 토픽1 지연 테스트 메시지
Join 입력 토픽2 수신: key=late-key, value=Join 토픽2 지연 테스트 메시지
// 주의: late-key는 6초 간격으로 발행되어 조인 결과가 없음
Join 입력 토픽1 수신: key=different-key-1, value=Join 토픽1 다른 키 테스트
Join 입력 토픽2 수신: key=different-key-2, value=Join 토픽2 다른 키 테스트
// 주의: 다른 키로 발행되어 조인 결과가 없음
파티션과 병렬 처리
Kafka는 토픽을 파티션으로 분할하여 병렬 처리를 지원합니다. 파티션 수와 스레드 수를 조정하여 처리 성능을 최적화할 수 있습니다.
파티션 설정
@Bean
fun joinInputTopic1(): NewTopic {
return TopicBuilder.name(joinInputTopic1)
.partitions(3) // 파티션 수를 3으로 설정
.replicas(1)
.build()
}
스레드 설정
spring:
kafka:
streams:
properties:
num.stream.threads: 3 # Kafka Streams 처리 스레드 수
컨슈머 동시성 설정
@KafkaListener(
topics = ["\${kafka.join-output-topic:join-output-topic}"],
groupId = "join-consumer-group",
concurrency = "3" // 각 리스너마다 3개의 스레드 사용
)
테스트 결과, 파티션 수와 컨슈머 동시성을 함께 증가시켰을 때 처리량이 크게 향상되는 것을 확인할 수 있었습니다. 특히 merge()
연산의 경우 병렬 처리 효과가 더 두드러졌습니다.
시간 윈도우 기반 처리
Kafka Streams는 시간 윈도우 기반 처리를 지원합니다. 이는 특히 join()
연산에서 유용하게 사용됩니다.
// 5초 시간 윈도우 내에서 조인
val joinWindow = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(5))
시간 윈도우 설정을 통해 다양한 시나리오를 구현할 수 있습니다:
텀블링 윈도우(Tumbling Window): 고정 크기, 겹치지 않는 윈도우
호핑 윈도우(Hopping Window): 고정 크기, 겹치는 윈도우
슬라이딩 윈도우(Sliding Window): 동적 크기, 연속적으로 이동하는 윈도우
세션 윈도우(Session Window): 비활동 간격으로 구분되는 윈도우
시간 윈도우 기반 처리는 시간적 연관성이 중요한 이벤트를 처리할 때 유용합니다. 예를 들어, 사용자 행동 분석, 이상 탐지, 시계열 집계 등에 활용될 수 있습니다.
성능 테스트 및 결과 분석
merge()
와 join()
연산의 성능을 비교하기 위해 다양한 조건에서 테스트를 수행했습니다.
테스트 환경
메시지 수: 각 토픽당 10,000개
파티션 수: 1, 3, 6개
스레드 수: 1, 3, 6개
키 분포: 균등 분포 vs 치우친 분포
주요 결과
처리량(Throughput)
merge()
: 평균 15,000 msgs/secjoin()
: 평균 7,500 msgs/sec (동일한 키 분포 가정)
지연 시간(Latency)
merge()
: 평균 10msjoin()
: 평균 25ms
메모리 사용량
merge()
: 낮음 (스테이트리스)join()
: 높음 (스테이트풀)
파티션 확장성
merge()
: 파티션 수에 거의 선형적으로 성능 증가join()
: 파티션 증가에 따른 성능 향상이 제한적
분석
성능 테스트 결과, merge()
연산은 단순히 두 스트림을 결합하는 스테이트리스 연산이기 때문에 더 높은 처리량과 낮은 지연 시간을 보였습니다. 반면 join()
연산은 상태를 유지하고 윈도우 내에서 메시지를 조인해야 하기 때문에 더 많은 리소스를 소비하고 처리 속도가 느렸습니다.
특히 키 분포가 치우친 경우(hot key), join()
연산의 성능이 크게 저하되는 것을 관찰할 수 있었습니다. 이는 특정 파티션에 부하가 집중되기 때문입니다.
결론
Kafka Streams의 merge()
와 join()
연산은 각각 다른 특성과 사용 사례를 가지고 있습니다.
merge() 연산 요약
특징: 키나 값에 관계없이 모든 메시지를 하나의 스트림으로 병합
성능: 높은 처리량, 낮은 지연 시간, 낮은 리소스 사용량
사용 사례: 단순 로깅, 모니터링, 데이터 수집
join() 연산 요약
특징: 동일한 키를 가진 메시지만 조인, 시간 윈도우 적용 가능
성능: 중간 처리량, 중간 지연 시간, 높은 리소스 사용량
사용 사례: 트랜잭션 처리, 이벤트 상관관계 분석, 데이터 보강
선택 가이드
단순히 여러 소스의 데이터를 결합하고 싶다면 →
merge()
관련 이벤트를 키 기반으로 결합하고 싶다면 →
join()
처리량이 중요하다면 →
merge()
데이터 일관성과 관계가 중요하다면 →
join()
이번 블로그에서는 Kafka Streams API의 merge()
와 join()
연산의 동작 방식과 성능 특성을 살펴보았습니다. 각 연산의 장단점과 적합한 사용 사례를 이해함으로써, 실시간 데이터 처리 애플리케이션을 더 효과적으로 설계할 수 있기를 바랍니다.
전체 코드와 더 자세한 내용은 GitHub 저장소에서 확인할 수 있습니다.
Subscribe to my newsletter
Read articles from 조현준 directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
