Spark BufferHolder 메모리 한계 해결하기 - explode -> RDD Flatmap, 제너레이터

대용량 데이터 처리 과정에서 Spark BufferHolder 메모리 초과 이슈를 잡고 있었는데, 몇시간 전에 다행히 해결할 수 있었다. 
금번 포스팅에선 특정 Time Slot에서 반복적으로 발생했던 Spark BufferHolder 메모리 초과 문제 상황과 해결 과정을 공유하고자 한다. 

일반적인 Spark 튜닝으로는 해결할 수 없었고, udf & explode() 기반 처리를 RDD 스트리밍 처리 구조로 변경하여 문제를 해결할 수 있었다.

 

해당 변경을 통해 수백 ~ 수천만건 데이터도 처리할 수 있는 구조로 개선하여 Data Engineering의 성취를 느낄 수 있었다. 향후 전체 프로세스를 오픈하여 실제 검증을 진행할 예정이다.

 

환경

  • spark 3.4.1
  • pyspark
  • hive(json) -> iceberg(parquet)

문제 상황

특정 타임슬롯을 처리하는 Spark 배치 작업에서 지속적인 실패가 발생했다. saveAsTable 연산에서 약 30만개의 Task 중에 재시도 포함 3 ~ 6의 Task가 다음 에러가 발생했고, 전체 프로세스의 실패로 이어졌다.

Cause by: java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 50000(예시) because the size after growing exceeds size limitation 2147483632

 

문제 식별의 어려움

초기에는 stack trace 상 `df.count()` 연산에서 오류가 발생했으나, 해당 코드를 제거한 후에는 `saveAsTable` 실행 시점에서 동일한 에러가 발생했다. 따라서, 에러 로그의 stack trace만으론 명확한 문제 포인트를 잡기 어려웠다.

시니어 엔지니어 분의 도움을 받아, 해당 이슈는 Spark의 Lazy Evaluation 특성 때문으로, `saveAsTable` 자체가 문제가 아닌 것을 확인할 수 있었다.
Spark는 `count`, `saveAsTable`과 같은 액션이 실행될 때 모든 transformation이 함께 수행되기 때문에, 연산 중간에서 이슈가 발생하더라도 Stack Trace에는 액션이 실행된 위치에서 에러가 발생한 것처럼 표기된다.

즉, saveAsTable이 문제 시작점이긴 하지만, 앞단 프로세스들을 거치는 과정에서 BufferHolder 에러가 발생하는 상황이었다.

데이터의 특성

문제가 되는 Time Slot의 데이터를 살펴봤을 때, 일부 건들에서 다음의 특징이 존재했다.

  • Expansion : 1개의 입력 레코드가 프로세싱 후 수십만 개의 레코드로 확장된다.
  • Locality : 해당 케이스들이 특정 타임슬롯에 집중되어 있어 대부분의 처리에는 영향이 없음
  • 일시적으로 우회는 가능하지만, 근본적 해결이 필요한 상황

이를 통해 해당 데이터들이 메모리 아웃을 발생시킬 만한 문제점을 갖는다는 것을 확인할 수 있었다.

BufferHolder란?

BufferHolder는 UnsafeRow 등 내부 Spark SQL 자료구조에 데이터를 효율적으로 담기 위해 메모리 버퍼를 관리하는 객체이다.

주요 역할은 다음과 같다.

  • 직렬화 : UnsafeRow 등 내부 데이터 구조의 직렬화 과정에서 임시 데이터 저장
  • 데이터 변환 : Row 변환 작업 중 중간 결과 보관
  • 메모리 최적화 : 효율적인 메모리 사용을 위한 버퍼 관리

따라서, 다음과 같이 데이터가 임시로 저장되어야하는 케이스에서 BufferHolder가 사용될 수 있다.

  • Spark DataFrame에서 Parquet로의 저장 시 압축을 위한 직렬화 시점
  • 복잡한 Nested 데이터를 평면화(explode)할 때
  • 대량의 Row 또는 큰 사이즈의 컬럼을 메모리상에서 일시적으로 유지해야 할 때

BufferHolder 제약은 최대 2GB(2,147,483,632 bytes)까지만 할당 가능하다는 점이다. 
단일 버퍼가 이 크기를 초과하면 위와 같은 `Cannot grow BufferHolder` 에러가 발생한다.


해결 과정

처음에는 일반적인 Spark 메모리 이슈라 생각하여, 다음과 같은 기본적인 조치들을 진행했었다.

  • Executor 수 증가 & Driver & Executor 코어/메모리 증가
  • 파티션 수 늘리기
  • Time Slot 분단위로 쪼개기
  • Raw 데이터 Chunk 단위로 나누기
  • KryoSerializer 적용하기
  • 큰 데이터만 따로 빼서 처리하기 등등

여러 시행착오를 통해 현재의 문제는 이런 설정 조정으로는 해결할 수 없다고 판단하여, 로직에서 문제점을 찾기 시작했다.

해당 SparkJob을 다른 팀원께서 깔아주셨어서, 최대한 로직 수정없이 해결하려했던 것이 설정부터 잡은 이유 중 하나였다.

기존 로직 확인

데이터 처리 Layer를 확인했을 때, 대략적인 구조는 다음과 같았다.

raw_df = spark.sql(query)  # RAW 데이터 조회

udf_schema = ArrayType(MapType(StringType(), StringType()))  # Key-Value 리스트 구조
process_udf = udf(process_func, returnType=udf_schema)  # UDF 처리 함수

processed_df = raw_df.select(
    process_udf(struct(*raw_df.columns)).alias("processed_row")
)  # 전체 컬럼을 struct로 묶어서 UDF 처리

flatten_df = processed_df.select(
    explode(col("processed_row")).alias("row")
)  # explode로 배열을 행으로 전개

final_df = flatten_df.select(
    # 컬럼 & 타입 명시적 선택
    col("row.field1").cast(StringType()),
    # ... 기타 필드들
)

final_df.write \
    .format("iceberg") \
    .option("write.format.default", "parquet") \
    .option("write.parquet.compression-codec", "snappy") \
    .mode("append") \
    .saveAsTable(result_table)

 

즉, udf를 사용해서 데이터를 처리하고, 처리된 데이터들은 여러개가 담길 수 있으니 ArrayType에 저장된다.
이걸 묶어서 explode를 통해 리스트에 담긴 데이터들을 Row로 변환하게된다.
이후 saveAsTable을 통해 parquet 포맷으로 저장한다.

이 흐름을 봤을 때, 앞서 말한 BufferHolder가 사용되는 거의 모든 케이스를 만족한다.

 

기존 로직의 문제점은 다음과 같았다.

  • UDF 처리: 모든 입력을 한 번에 메모리에 적재하여 처리
  • explode() 연산: 수십만 건으로 확장되는 배열 데이터를 한 번에 메모리 전부 적재하여 처리
  • Parquet 압축: 최종 저장 시 전체 데이터를 BufferHolder에 로드하여 압축

특히 `explode()`를 사용하게 되면 `processed_df`의 모든 데이터를 한 번에 메모리에 올려서 처리할 수밖에 없기 때문에, 1:수십만의 일부 데이터가 들어올 때 필연적으로 2GB 제한을 초과하는 상황이 발생했다.

RDD 스트리밍 처리로의 전환 with 제너레이터

근본적인 문제를 해결하기 위해 한 번에 메모리에 적재하는 explode 방식을 제거하고, 단건 기반으로 필요할 때마다 처리하는 스트리밍 구조로 전환했다.

RDD.flatMap과 Python 제너레이터를 다음과 같이 사용해 스트리밍 처리를 구현할 수 있었다.

def process_with_streaming(row):
    """단건 처리 후 Generator로 결과 반환"""
    try:
        # 기존 process 로직 적용
        processed_rows = process_func(row)
        
        for processed_row in processed_rows:
            yield processed_row
    except Exception as e:
        print(f"Processing error: {e}")
        return iter([])

# RDD 기반 스트리밍 처리
raw_rdd = raw_df.rdd
processed_rdd = raw_rdd.flatMap(process_with_streaming)
final_rdd = processed_rdd.filter(lambda x: x is not None)

# 최종 DataFrame 변환
final_df = spark.createDataFrame(final_rdd, schema=target_schema)

# 저장 로직 동일

raw_df을 rdd로 변환한 이후 flatMap을 통해 row를 단건 단위로 udf 프로세싱을 수행한다.
이후 스키마를 잡아주고, 저장 전 DataFrame로 변환하는 과정을 거쳤다.

이를 통해 메모리 사용 패턴을 explode() 기반 전체 적재 -> 단건 처리 로 개선하여 메모리 사용량을 최소화하였다.

Python 제너레이터를 사용하면 yield 문을 통해 처리 결과를 즉시 반환하게 된다. 
해당 데이터는 참조값이 사라져 즉시 GC 대상이 되어 메모리에서 해제되게 된다. 
따라서 데이터 규모에 관계없이 레코드별로 일정한 메모리를 사용하는 구조로 개선되었다.


결과

UDF & explode 기반 처리 흐름을 RDD 스트리밍 처리로 전환하여 BufferHolder 메모리 초과 문제를 근본적으로 해결할 수 있었다.

BufferHolder의 2GB 제한에 대한 강한 의존성을 제거할 수 있었고, 1:수십만 확장 데이터도 안정적으로 처리 가능할 수 있는 구조로 개선되었다.
궁극적으로 메모리 사용량이 데이터 크기에 비례하지 않는 구조로 개선되어 메모리의 안정성을 확보할 수 있게 되었다.

처리 방식은 메모리 폭발이 일어날 수 있는 전체 데이터 일괄 메모리 적재에서 일정한 메모리를 사용할 수 있는 단건 스트리밍 처리로 개선되었다.

결과적으로 기존 수십만 건 기반의 처리에 한계가 있었지만, 수백만 & 수천만건 처리를 진행해도 불안해하지 않을 수 있게 되었다.


Reference

 

추가 업데이트

rdd.flatMap + 제너레이터로 접근한 방식이 메모리 사용량을 일정하게 사용하는 방식으로 explode() 사용 시 발생했던 BufferHolder 이슈를 해결했다 판단했지만, 사실, BufferHolder를 사용하지 않는 방식으로 전환하게 되어 동일한 이슈가 발생하지 않는 방향으로 개선되었다.

  • 구조적 API Dataframe과 함수들을 사용하면 Catalyst, Tungsten 엔진의 최적화를 받게된다.
  • 성능 개선을 위한 최적화의 주요한 부분 중 하나는 트랜스포메이션 과정에서 Python 또는 Java 객체를 사용하는 것이 아니라, UnsafeRow를 사용해 바이트 형태로 관리하게 된다.
  • BufferHolder의 역할 중 하나가 UnsafeRow를 저장하고 관리하는 것이다.
  • 따라서, Dataframe과 explode를 사용하는 기존 방식에서 폭발하는 데이터들을 UnsafeRow로 메모리에 올리게 되었고, 이들은 모두 BufferHolder의 버퍼의 2GB라는 제한된 인덱스를 사용해야했다.
  • 일부 엣지 데이터에서 이 인덱스를 벗어나는 상황이 발생했고, 이를 해결하기 위해 저수준 API인 RDD 연산으로 변경하여 UnsafeRow로의 최적화를 벗어났다.
  • 추가적으로 확인한 사항은, Tungsten 엔진의 최적화 과정에서 런타임에 Java 바이트코드를 동적 생성하는 과정이 있는데, 이 과정에서 UnsafeRow를 활용하게 된다.
  • 이 물리적 생성을 비활성화하는 옵션이 존재한다(spark.sql.codegen.wholeStage). 다만, 스파크 잡 전체의 최적화를 받지 않게되므로 문제가 발생하는 특정 포인트만 저수준 API로 변경하고, 전체적인 최적화는 유지할 수 있는 구조를 선택한 것이 결과적으로 좋은 선택이 되었던 것 같다.
  • 성능적인 측면이 가장 큰 Trade-Off인데, 실제 사용했을 때는 변경 전이나 크게 달라진 부분은 없었다. (나중에 추가로 확인해볼 예정이다.)
728x90
반응형