FastAPI 기반 Server Sent Event(SSE) 구현 과정 및 시행착오 - UTF-8 Decoding 및 ANSI Escape Sequence 처리

# 목적

프로젝트를 진행하면서 웹서버에서 실시간으로 특정 파드에서 발생하는 로그를 실시간으로 보여지는 기능을 구현해야했다.

목표로 한 기능은, kubectl logs 명령어를 통해서 파드의 로그를 확인하는 것 처럼, API 서버가 이를 수행하게 만드는 것이였다.

이를 구현하는 방법 중 하나로 SSE(Server Sent Event)를 채택하게 되었고, FastAPI를 사용하여 이를 구현한 방법을 공유하고자 한다.

FastAPI에서 쿠버네티스의 파드에 접근하는 방법에 대해서는 다른 글에서 설명하고, FastAPI 프레임워크를 통해 SSE 구현 방법을 중점으로 다룬다.


# Server Sent Event을 사용한 이유

우선 SSE 방식에 대해서 간단하게 짚어보자. 개념에 대해선 훨씬 잘 쓴 다른 블로거분들을 참고하자...

Server Sent Event(SSE)란, 서버에서 이벤트를 보낸다는 의미이다. 이는 클라이언트에게 API 서버가 실시간으로 데이터 혹은 이벤트를 보내는 방법 중 하나이다.

즉, 웹 브라우저에서 별도의 새로고침이나 조작을 하지 않아도 실시간으로 데이터가 추가된다던지, 알림창을 제공받는 기능을 구현할 때 사용되는 기술이다.

일반적으로, API 서버가 클라이언트에게 데이터를 실시간으로 전달할 수 있는 방법으론 WebSocket, SSE, Polling 등의 방식이 존재한다.

웹에서는 기본적으로 HTTP 프로토콜을 사용하여 통신을 수행하는데, HTTP 프로토콜의 특징은 연결을 유지하지 않는다는 점이다.

하나의 HTTP Request를 보낸다면, 그에 대응되는 하나의 HTTP Response를 받게되는 방식이다.

하지만, 이러한 실시간 통신의 궁극적인 목적은 요청이 없어더라도, 응답이 실시간으로 혹은 특정 이벤트에 따라서 바로바로 클라이언트가 확인할 수 있어야한다.

만약, 일정 시간마다 클라이언트가 데이터를 받아오기 위해 지속적으로 HTTP Request를 받는다면(이것이 Polling 방식), 간단한 서버에서는 문제가 없지만 많은 트래픽이 오고가는 환경에서는 서버에게 막대한 부하가 발생할 것이다. 

이러한 문제를 해결하기 위한 방법으로는 간단하게 WebSocket과 SSE 방식을 사용할 수 있다.

WebSocket은 이러한 HTTP을 사용하지 않고 양방향 통신을 위해 도입된 WS 프로토콜을 사용하여 실시간 통신을 구현한다.

즉, 클라이언트와 서버 사이에 별도의 TCP 전이중 통신 연결 세션을 구성하여 별도의 요청 없이 실시간으로 통신을 가능하게한다.

이에 반해, SSE는 기존의 HTTP 프로토콜을 사용하여, 스트림 방식으로  HTTP Response를 서버에서 클라이언트로 데이터를 비동기적으로 푸쉬할 수 있다.

앞서 프로젝트에서 구현해야할 기능을 참고하면, 클라이언트는 서버에서 전송하는 실시간 로그를 받아서 화면에 렌더링을 수행하면 충분했기 때문에, SSE 방식이 가장 적합한 방법이라고 판단하여 SSE를 통해 해당 기능을 구현했다.

 

SSE 예시


# Step 1. FastAPI 공식문서 확인

이에 앞서, 본인이 사용한 환경은 아래와 같다.

 

  • MacOS M1 
  • Python: 3.10.13
  • fastapi: 0.103.2

FastAPI 공식 문서의 Custom Response 항목에서 StreamingResponse를 보내는 예제가 존재한다.

이는 아래 사이트를 참고한다.

 

Custom Response - HTML, Stream, File, others - FastAPI

FastAPI framework, high performance, easy to learn, fast to code, ready for production

fastapi.tiangolo.com

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time

app = FastAPI()

async def fake_video_streamer():
    for i in range(10):
        yield b"some fake video bytes"
        time.slepp(1)
        

@app.get("/")
async def main():
    return StreamingResponse(fake_video_streamer())

이는 간단하게 "/" URL로 HTTP GET 요청을 보내면 1초동안 총 10번의 데이터가 반환되는 엔드포인트를 작성한 것이다.

 

사실 StreamingResponse를 구현한다고 쳐도, FastAPI에서 정상적으로 동작하는지 확인하기 위해선 실제로 이를 받아서 처리하는 클라이언트가 필요했다.

 

Swagger를 사용하거나 URL을 통해서 직접 접근했을 때, 데이터가 하나씩 보여지지 않고, 화면이 멈췄다가 데이터가 다 전송되면 그제서야 보였기 때문이다.

Swagger를 통해선 제대로 확인 불가능

본인은 API 뿐아니라, 웹서버까지 같이 개발해야하는 상황이였어서(;), 엔드포인트를 개발한 후 React를 통해서 Stream 데이터를 동적으로 처리했다.

 

React로 이런 스트림을 받아서 처리하는 내용까지 포함되면 너무 길어지기 때문에, 다음 기회에 작성하도록 하고, 해당 컴포넌트만 가져와서 사용할 예정이다.

그 전에, 지금 저 데이터로는 구현한 듯한 느낌이 안나니깐, 적어도 현재 시간을 계속 찍어주는 엔드포인트로 변경한 후 이를 웹 페이지에서 처리해본다.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import time

app = FastAPI()

app.add_middleware(
	CORSMiddleware,
	allow_origins=["*"],
	allow_credentials=True,
	allow_methods=["*"],
	allow_headers=["*"],
)

def current_datetime_streamer():
	while True:
		current_time = time.strftime("%Y-%m-%d %H:%M:%S").encode('utf-8')
		yield f"data: {current_time}\n\n"
		time.sleep(1)

@app.get("/")
async def main():
	return StreamingResponse(current_datetime_streamer(), media_type="text/event-stream")

일단, 공식 문서에서 제공하는 방법을 그대로 적용하면 정상적으로 렌더링이 되지 않았다.

이는 React에서 사용하여 클라이언트를 작성했기 때문일 수도 있지만, 본인은 이러한 방식을 사용했을 때, 정상적으로 SSE가 구현이 되었으므로, 이를 참고하면 좋을 것 같다.  

아무튼, 클라이언트에서는 스트림 데이터를 EventSource 훅을 통해서 처리하게 만들었는데, 리액트에서 제대로 처리할 수 있도록 스트림 데이터를 전송하기 위해선 다음과 같은 작업이 필요했다.

 

  1. "data:"를 붙여서 yield을 통해 스트림을 생성한다.
  2. 해당 데이터의 끝은 "\n\n"으로 지정한다.
  3. StreamingResponse의 미디어 타입을 text/event-stream으로 지정한다.

이를 지정하고 나서, 간단하게 만든 클라이언트를 확인하면 아래와 같다.

이런 방식으로 서버에서 클라이언트로 실시간 데이터를 전송할 수 있다.


이를 콘솔 출력을 통해서도 확인해보면, 아래와 같다.


# Step 2. 한걸음 더 - Decoding 추가

위의 전달받은 데이터는 일반적인 평문이 아닌 것을 확인할 수 있다.

 

기본적으로 SSE를 사용하면 데이터를 텍스트 형식으로 전달되게 되는데, 이를 문자열로 표현하기 위해서 별도의 과정을 거쳐야한다.

파이썬에서 문자열은 기본적으로 유니코드 문자열로 저장되며, UTF-8 같은 인코딩 형식을 사용하여 바이트로 변환되서 변수로 관리된다.

Mac에서는 기본적으로 UTF-8 인코딩 방식으로 바이트를 저장하므로, 이를 변수를 사용하여 그대로 클라이언트에게 전달하면, 이는 평문이 아닌 인코딩된 바이트 데이터 형식을 갖는다.

즉, 현재 시간을 변수로 담아서 스트림 데이터로 전달하기 때문에, 해당 변수 부분은 UTF-8로 인코딩된 데이터라는 의미이다. 변수를 따로 디코딩하지 않고 전달하면 바이트 형태로 전달된다.

이를 클라이언트가 받아서 디코딩 과정을 가져도 무방하지만, 애초에 보낼 때 디코딩을 거치는 것이 올바른 구조라고 생각했다.

이를 위해서 바이트 스트림으로 저장된 시간 문자열을 UTF-8 방식으로 디코딩하여 전달하게 된다면, 클라이언트에서는 바로 해당 유니코드 문자열을 렌더링할 수 있게 된다.

해당 디코딩을 적용하면 아래와 같다.

yield f"data: {current_time.decode('utf-8')}\n\n"

 

이를 적용하면 클라이언트에서는 아래와 같아진다.

이제는 완벽한 텍스트 데이터를 서버에서 전달하게 된다.

사실 단순하게 텍스트만 바로 전달하게 된다면, 별도의 디코딩 과정은 필요없지만, 애초에 "data: ~~~ \n\n" 을 통해서 스트림을 생성하므로, 해당 부분을 변수로 지정하게 될 수 밖에 없다.

따라서, 변수를 사용하게 되면, 해당 문자열이 바이트로 저장되서 사용되므로, 이를 디코딩하는 과정이 별도로 필요해진다.


참고 1. UTF-8 디코딩 및 ANSI Escape Sequence 처리

실제 프로젝트를 진행할 때도 위와 같이 "UTF-8" 디코딩을 수행했고, 이를 수행하고 나니 추가적으로 ANSI 이스케이프 처리를 수행했어야 했다.

위의 예시에는 해당 작업을 별도로 진행하지 않아도 되지만, 만약 SSE 로 제공하는 스트림이 아래와 같은 느낌을 가진다면 정규표현식을 통한 별도의 전처리 과정이 필요해진다.

 

이런 느낌으로 문자열이 감싸지는 상황이라고 보면 된다.

 

def remove_ansi_escape_sequences(input_string):
	ansi_escape = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
	output_string = ansi_escape.sub('', input_string)
	return output_string

이를 통해 ANSI 이스케이프 시퀸스를 제거할 수 있다.


참고 2. React 코드

아래는 이번 포스팅에서 사용한 컴포넌트이다.

function ServiceLog() {
	const [logs, setLogs] = useState([]);
	const eventSourceRef = useRef(null);

	useEffect(() => {
		eventSourceRef.current = new EventSource(`http://localhost:8000`);

  

		eventSourceRef.current.onopen = () => {};
		eventSourceRef.current.onmessage = (event) => {
		const newLog = event.data;
		console.log(newLog);
		setLogs((prevLogs) => [...prevLogs, newLog]);
		};
		return;
		}, []);

	return (
		<>
		<Container className="log-container" style={{ width: "50%" }}>
		<CodeBlock
			text={logs.join("\n")} language="shell" customStyle={{ textAlign: "left", backgroundColor: "#f4f4f4", }}
			codeContainerStyle={{
				color: "#2a2934",
			}}
		/>
		</Container>
		</>
	);
}
728x90
반응형