FastAPI에서 OpenAI 응답에 SSE를 활용하는 방법

김한결김한결
2 min read

Overview

사용자가 작성한 메시지를 LLM 모델로 전송하는 API고, Content-Type: text/eventstream 을 사용한다. 이는 SSE(Server Sent Event)를 백엔드에서 사용하고 있다는 의미고, 서버에서 클라이언트로 실시간 업데이트를 전송할 때 사용하는데, 플레이그라운드 에서 이를 도입한 이유는 아래와 같을 것으로 예상된다.

  • LLM 모델이 전달하는 답변 내용이 스트림 형태로 페이지에 계속 업데이트 되야함

    • GPT, Claude 등은 응답을 토큰 단위로 잘라서 보내지기 때문
  • 따라서 HTTP 연결을 한 번 맺고 계속 유지하면서 데이터를 스트리밍할 필요가 있음

StreamingResponse

FastAPI에서는 StreamingResponse 객체로 SSE를 손쉽게 구현할 수 있다.

return StreamingResponse(
        stream_response(...),  # 제너레이터 함수
        media_type="text/event-stream"  # SSE 콘텐츠 타입 명시
    )

위와 같이 기본적으로 제너레이터 함수와 SSE임을 알리는 콘텐츠 타입을 명시할 필요가 있다.

위 코드에서 실제로 함수를 호출할 때 즉시 실행되는 것이 아니라 제너레이터 객체만 우선 반환한다. 그 객체는 StreamingResponse 에 전달된다.

제너레이터 함수

def stream_response(model, messages, temperature, db, suni_id, group_code, redis):
    # ... 코드 생략 ...
    for chunk in response:  # OpenAI API의 스트리밍 응답을 반복
        for ch in chunk.choices:
            if ch.delta is None or ch.delta.content is None:
                break
            full_response += ch.delta.content
            yield json.dumps({"t": "t", "d": ch.delta.content}) + "\\n"  # 여기서 실제 스트리밍 데이터 생성

제너레이터 함수는 위와 같은데, LLM 모델이 준 응답을 Chunk 단위로 잘라서 응답을 스트리밍한다. \\n 를 기준으로 이벤트를 구분하고 있다.

✅ 제너레이터 함수는 yield 를 사용하면 된다.

제너레이터 함수기 때문에 한 번에 모두 반환하는 것이 아니라 하나씩 필요할 때마다 생성해서 반환한다. 따라서 클라이언트는 응답이 모두 반환되기를 기다리지 않고, 청크 단위로 잘라진 응답을 반복해서 받을 수 있다. 이는 마치 사용자가 페이지에서 실제 LLM 모델과 대화하는 느낌을 주기 때문에 사용자 경험에 중요한 요소다.

로직은 첫번째 yield를 만날 때까지 코드를 실행하고, 만나면 값을 우선 반환하고 함수 실행 상태를 보존하고 일시 중지한다. 그리고 다음 값 요청이 있을 때 다시 실행하며 이 과정을 반복한다.

자세한 과정은 아래와 같다.

[OpenAI API에서 첫 토큰 수신]
↓
[JSON으로 포맷팅해서 yield]
↓
[함수 실행 일시 중지]
↓
[클라이언트가 토큰 받아 화면에 표시]
↓
[다음 요청 시 다음 토큰 처리]
↓
[이 과정을 모든 토큰이 처리될 때까지 반복]
0
Subscribe to my newsletter

Read articles from 김한결 directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

김한결
김한결