sse(Server-Sent-Event)란?

seoyoon jungseoyoon jung
3 min read

팀을 옮기고 나서 처음 맡게된 프로젝트에 sse를 사용하게 되어서 공부하게 되었다.
요즘 웹소켓보다 sse가 구현하기에 더 간단해서 많이 쓰는 추세라고 하더라!
재미있는 개발 경험 쌓는중 ㅎㅎ

Server-Sent Events(SSE) 개요

Server-Sent Events(SSE)는 서버→클라이언트 단방향 푸시 스트림을 제공하는 HTML5 표준.
클라이언트는 EventSource로 연결을 열고, 서버는 text/event-stream 형식으로 데이터를 흘려보낸다.

장점

  • WebSocket보다 구현이 단순

  • HTTP/1.1 기반 → 방화벽·프록시 통과가 쉽고, SSL도 그대로 사용

  • 자동 재연결·Last-Event-ID 관리 내장

단점

  • 브라우저→서버 전송은 불가 (POST 등 별도 구현 필요)

  • 구형 브라우저에는 EventSource(SSE 전용 JS API)가 기본 내장되어 있지 않기 때문에, 별도의 스크립트(polyfill) 를 사용 필요

웹 소켓과 SSE 차이점

구분SSEWebSocket
통신 방향서버 → 클라이언트 단방향서버 ↔ 클라이언트 양방향
프로토콜/연결HTTP (text/event-stream), 자동 재연결·Last-Event-ID 내장ws:///wss:// 핸드셰이크 후 소켓 유지, 재연결 로직 직접 구현
메시지 형식텍스트 전용텍스트, 바이너리 모두 지원
구현 난이도간단 (SseEmitter + EventSource)복잡 (핸드셰이크, 에러 처리 등)
주요 용도실시간 알림, 로그, 시세, 상태 모니터링채팅, 실시간 게임, 화상회의, 양방향 상호작용

동작 확인하기

클라이언트 핵심 구현 일부(GPT 체고~)

import requests, json, time, threading
import sseclient

class SSEEventClient:
    def __init__(self, url, storeId=None, pcId=None, headers=None, heartbeat_url=None):
        self.url = url
        self.params = {"storeId": storeId, "pcId": pcId}
        self.headers = headers or {"Accept": "text/event-stream"}
        self.heartbeat_url = heartbeat_url
        self.running = False
        self.event_handlers = {}

    def register_handler(self, event_type, handler):
        self.event_handlers[event_type] = handler

    def _heartbeat_worker(self):
        while self.running:
            if self.heartbeat_url:
                requests.get(self.heartbeat_url, headers=self.headers, timeout=10)
            time.sleep(240)

    def _process_event(self, event):
        handler = self.event_handlers.get(event.event or "message")
        if handler:
            try:
                data = json.loads(event.data)
            except json.JSONDecodeError:
                data = event.data
            handler(data)

    def start(self):
        self.running = True
        threading.Thread(target=self._heartbeat_worker, daemon=True).start()
        while self.running:
            with requests.get(self.url, headers=self.headers, params=self.params, stream=True) as response:
                client = sseclient.SSEClient(response)
                for event in client.events():
                    if not self.running:
                        break
                    self._process_event(event)

    def stop(self):
        self.running = False

클라이언트 사용 예시

client = SSEEventClient(
    url="http://localhost:8082/v1/stores/event-sources",
    storeId="store123",
    pcId="pc1",
    headers={"Authorization": "Bearer token"},
    heartbeat_url="http://localhost:8082/v1/health"
)

client.register_handler("sse", lambda data: print("SSE:", data))
client.register_handler("heartbeat", lambda data: print("HB:", data))
client.start()

Controller – 클라이언트가 연결하는 SSE 엔드포인트

클라이언트가 /event-sources로 GET 요청을 보내면 storeId, pcId, lastEventId 등의 파라미터를 받아 SseEmitter를 생성하고 반환 (응답 스트림 시작)

@GetMapping("event-sources", produces = ["text/event-stream"])
fun createEventSource(
    @RequestParam("storeId") storeId: String,
    @RequestParam("pcId") pcId: String,
    @RequestParam("lastEventId", required = false) lastEventId: String?,
    response: HttpServletResponse
): SseEmitter {
    response.addHeader("X-Accel-Buffering", "no")
    return storeNotificationService.createEventSource("$storeId#$pcId", lastEventId)!!
}

Service: SseEmitter 생성

fun createEventSource(username: String, lastEventId: String?): SseEmitter {
    val emitter = SseEmitter(299_000)
    val eventSourceId = "$username_${Instant.now()}"

    // 종료 이벤트 등록
    emitter.onCompletion { cleanup(eventSourceId) }

    // 초기 연결 이벤트 전송
    //클라이언트 연결 시 최초 이벤트("sse")를 전송 → 연결 확인용
    emitter.send(SseEmitter.event().id("conn").name("sse").data("CONNECTED"))

    // lastEventId가 있다면 재전송 처리
    if (!lastEventId.isNullOrBlank()) {
        hazelcastInstance.getMap<String, Event>("map-$username")
            .filterKeys { it > lastEventId }
            .forEach { (id, event) ->
                emitter.send(SseEmitter.event().id(id).name("sse").data(objectMapper.writeValueAsString(event)))
            }
    }

    return emitter
}

서버는 코틀린,클라이언트는 파이썬으로 구현되어있고 서로 연결이 맺어지면 아래 사진과 같이 이벤트 수신 성공된 것을 확인해볼 수 있다.

연결이 되었으니.. 이제 서버에서 클라이언트쪽으로 전송했을 때 sendEvent()메소드가 실행이 되도록 해놨다.

   fun sendEvent(storeId: String, pcId: String, objectId: String, event: StoreEventType) {

        val tsid = TSID.fast().toString()

        notificationService.publishEventAsync(
            notificationEventDTO(
                eventType = StoreNotificationEventType.COMMAND,
                username = "$storeId#$pcId",
                referenceType = notificationEventType.ON,
                referenceId = tsid,
                data = StoreEventData.toJson(StoreEventData(storeId = storeId, objectId = objectId, event = event)),
                isBroadcast = false
            )
        )
    }

해당 메소드를 실행하고 클라이언트 화면을 확인해보면 다음과 같은 로그가 찍히는 것을 확인해볼 수 있다.
클라이언트의 요청이 없어도 서버에서 게속 데이터를 보낼수있는 상태!!

0
Subscribe to my newsletter

Read articles from seoyoon jung directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

seoyoon jung
seoyoon jung