728x90
지금까지의 데이터 파이프라인 경험은 배치(Batch) 기반의 ETL이었다. Airflow, Hive, Spark 등을 활용해 정해진 주기로 데이터를 처리하는 방식에 익숙했다.상대적으로 실시간 데이터 처리는 깊게 다뤄볼 기회가 없었다. 하지만 데이터 엔지니어의 역량은 배치 처리뿐 아니라 스트리밍 처리도 필수적이고, 요즘은 실시간 처리가 중요하다고 느껴서 관련 스택을 제대로 공부해야겠다는 생각이 들었다.Kafka, 스트리밍 엔진, 스토리지 및 OLAP 엔진 등에 대해서도 공부한 내용들도 포스팅해야하지만, 로컬에서 간단하게 올려봐야겠다 생각되어 Claude & Gemini CLI 두 시니어 엔지니어분들와 함께 간단한 유즈케이스로 로컬에서 (다행히) 돌아가는 스트리밍 파이프라인을 만들어 금번 포스팅에는 해당 내..
대용량 데이터 처리 과정에서 Spark BufferHolder 메모리 초과 이슈를 잡고 있었는데, 몇시간 전에 다행히 해결할 수 있었다. 금번 포스팅에선 특정 Time Slot에서 반복적으로 발생했던 Spark BufferHolder 메모리 초과 문제 상황과 해결 과정을 공유하고자 한다. 일반적인 Spark 튜닝으로는 해결할 수 없었고, udf & explode() 기반 처리를 RDD 스트리밍 처리 구조로 변경하여 문제를 해결할 수 있었다. 해당 변경을 통해 수백 ~ 수천만건 데이터도 처리할 수 있는 구조로 개선하여 Data Engineering의 성취를 느낄 수 있었다. 향후 전체 프로세스를 오픈하여 실제 검증을 진행할 예정이다. 환경spark 3.4.1pysparkhive(json) -> iceber..
최근 백만건 단위의 데이터를 처리하는 상황을 맞이하면서, 단순한 조건이 포함된 조회부터 성능이 안나오며, 분석 쿼리 조회 시에는 몇분 이상 기다려야하는 상황이 잦았다.단순히 데이터의 양이 많아져서 발생하는 문제일 수 있지만, 빅 데이터 처리 솔루션들이 천만건 데이터 수준에 지연이 발생하는 건 기술의 문제가 아니라 유즈 케이스가 잘못된 경우 것이라 생각된다. 해당 포스팅에선 기존 환경의 문제점을 확인하며 개선할 수 있는지에 대해 확인해보려 한다.기존 환경AWS EMR 환경에서 Hive Metastore를 중심으로 S3에 저장된 데이터를 Hive, Trino, Spark에서 조회/처리Raw Layer 데이터는 모두 `json.gz` 형식으로 저장되어 있으며, JSON Serde를 활용한 External Ta..
해당 포스팅에선, 데이터 수집 단일 프로세스의 한계점을 확인하고 큐와 상태를 도입해 병렬 처리 및 안정성을 확보하는 과정을 작성하고자 한다.단일 수집 프로세스 예시 및 한계데이터 수집 과정에서 여러 단계를 거쳐야 할 때, 각 단계에서 다음 단계 정보를 얻어가는 방식은 가장 직관적인 방법이다.예를 들어, 블로그의 모든 포스팅의 내용을 수집하는 상황이라면 수집 플로우를 다음 처럼 표현할 수 있다.메인 페이지목록 페이지포스팅이 과정을 단일 스크립트로 표현하게 된다면, 플로우의 Input은 블로그의 메인 페이지가 되고, Output은 모든 포스팅이 된다.가장 직관적이고 간단한 플로우이지만, 이를 정기적으로 운영하는 관점에선 크게 3가지의 문제점이 존재한다.1. 종료 시점을 예측할 수 없다. - 블로그마다 페..
최근 Python 연습할 겸 이것저것 고민하면서, 레포지토리 하나를 셋업했다.이번 연습에서는 Python 프로젝트 관리 도구들을 적용하는 것 또한 목표여서 하나씩 찾아보고 있었는데, 그 중 가상 환경(패키징) 관리 도구로써 Pixi를 접할 수 있었고, 사용 경험이 괜찮은 것 같아서, 이에 대한 소개와 사용하는 방법을 공유하고자 한다. 사용 환경은 다음과 같다.OS : Windows 11CPU : AMD 5600XShell : Bash (Git)추가적으로 Code Quality Watcher로 ruff를 셋업해두어서 pyproject.toml은 사전에 구성되어 있는 환경이다.(ruff는 공부 더 하고 포스팅하는걸로..)Pixi란? Pixi by prefix.devNonepixi.sh Pixi는 자신을 모..
원래 Macbook Air M1으로 공부하거나 간단한 작업들을 진행해왔는데, 램도 8GB고 디스크도 최소 사이즈여서 최근들어 cursor를 돌리거나 컨테이너를 올리면 힘들어 하는게 눈으로 보여서, HW 스펙이 조금 넉넉한 윈도우 환경에 익숙해지려고 노력하고 있다. CMD 명령어들을 따로 공부하거나 하는 정도는 아닌.. 과정상에 Python으로 간단하게 돌려보고 싶은게 있어서, 가상환경 세팅하는 중에 git bash에서는 리눅스 명령어(bash shell)가 먹지만, conda는 CMD나 전용 프롬프트로 진행하고 있어서, 나름대로 불편하다 생각했다. 사소한 부분이지만 원래 자주 사용하는 명령어들을 alias 걸어두고 사용하는 편인데, 윈도우에서는 .bat 파일 만들고 PATH 경로에도 추가하고 하는 부분이..
비즈니스 로직을 작성하면서 정상 로직을 생각하는 것도 중요하지만, 예외 상황과 엣지 케이스에 대한 방어로직을 작성하는 데 신경을 더 쓰는 것은 당연할 것이다.try-catch문으로 예외를 잡아서 처리하는 것 뿐만 아니라, 커넥션, 메모리 등의 자원을 반환하거나 초기화 등을 수행하기 위해 finally 블럭을 사용하게 된다.김영한님의 중급 자바 강의를 들으면서 추가적으로 Java7에 도입된 try-with-resources 기능이 있음을 알게되었고, 추후에 업무에 사용할 수 있도록 차이점과 장점을 기록하는 것이 이번 포스팅의 목적이다. # 예시 시나리오차이점과 장점을 알아보기 위해 간단한 시나리오를 작성한다. Reader- 파일을 쓰기 위한 기본 기능 제공하는 메소드- 파일 열기, 쓰기, 닫기 메소드 존재..
이번 포스팅에서는 MongoDB 다중 업데이트 로직을 구현하면서 마주한 성능 개선의 필요성과 이를 풀어나간 방법들을 공유하고자 한다. 사용 환경은 다음과 같다.Spring 2.3.2MongoDB 5.0JDK 1.8.0Kotlin# 최초 기능 개발 - saveAll()해당 업무에서는 상태를 기반으로 Entity를 관리하며 비즈니스 로직을 태우는 프로세스를 구축하는 것이 필요했다.신규로 생성된 데이터들을 PENDING으로, 처리를 진행할 때는 PROGRESS, 처리 다 되면 SUCCESS & FAILED 등으로 관리한다. 그중 처리를 진행할 때 대상들을 일괄적으로 PENDING -> PROGRESS로 변경하는 작업이 필요했는데, 해당 과정에서 여러개의 데이터에 대한 업데이트를 진행했다. 최초 로직은 가장 ..
# 서론최근 개발한 데이터 파이프라인을 운영 환경에 적용하기 전 요구사항을 반영하는 과정에서, Airflow 스케줄링을 걸 때 catchup 값을 잘못 설정해서, 과거부터 현재 시점 사이에서 실행되지 않았던 DAG가 모두 실행되는 상황이 발생했다.catchup 설정 자체는 알고 있었지만, 버전업이 되면서 바뀐 설정 방법을 사용하지 않아 DAG Backfill이 발생해 특정 파티션 데이터를 Hive 테이블에 중복으로 인입시켰다.해당 Hive 테이블을 복원하면서 사용한 방법과 Airflow의 Backfiil 발생 원인을 정리하여 앞으로 이런 일을 반복하지 않도록 반성의 시간을 갖고 혹시 비슷한 이슈를 겪으신 분들에게 도움이 되고자 이를 공유한다.# 상황최근 구축했던 Data Pipeline를 최종 운영하기 ..
# 목적Python을 사용한 업무를 진행할 때, 대부분의 메소드에 @staticmethod와 @classmethod를 사용하고 있는데, 두 어노테이션이 갖는 의미와 차이점, 사용 방법 등을 정확하게 정리하는 것이 이번 포스팅의 목적이다. 추가로, 업무하면서 두 어노테이션을 쓰다 인스턴스 변수와 클래스 변수를 생각 안하고 짯던 경험이 있어서, 이슈 상황과 차이점 또한 간단히 정리하고자 한다.@staticmethodhttps://docs.python.org/3/library/functions.html#staticmethod Built-in FunctionsThe Python interpreter has a number of functions and types built into it that are alwa..