인프런 영문 브랜드 로고
인프런 영문 브랜드 로고

인프런 커뮤니티 질문&답변

leeceo97님의 프로필 이미지
leeceo97

작성한 질문수

실리콘밸리 엔지니어에게 배우는 파이썬 아파치 스파크

cdc 기반의 스트리밍 데이터 처리

해결된 질문

작성

·

39

1

안녕하세요. 강의 잘듣고 있는 도중에 질문이 있어 글을 작성하게 되었습니다.

 

현재 cdc 기반으로 iceberg 테이블에 operation(insert, update, delete) 정보들을 적용해서 데이터를 적재하고 있습니다. 이부분까지는 DMS -> kinesis -> lambda-> firehose -> iceberg 순서로 문제없이 적재를 진행하고 있지만 이런형식으로 운영중인 iceberg 테이블 10개를 join 시켜 datamart에 현재는 1주일에 한번씩 전체를 삭제 후 insert하는 비효율 적인 방식으로 마트를 운영중에 있습니다.

 

이것을 개선시키기 위해 적재중인 iceberg 테이블들에 변경된 데이터들을 5분 10분 단위로 종합해 datamart 테이블에 변경 분에 한해서만 insert, update, delete를 반영해주고자 하는데 강의를 듣다보니 spark streaming을 활용해서 뭔가 해결을 할 수 있을것 같은데 혹시 이부분에 대한 아이디어에 대해서 조언을 구할 수 있을까요?

 

10개의 테이블에 대해서 10분간 발생한 변경 점들을 종합해 datamart에 반영을 해주려고 하는데 아직 제대로 감이 잡히지 않아서 질문드렸습니다.

답변 2

1

미쿡엔지니어님의 프로필 이미지
미쿡엔지니어
지식공유자

안녕하세요 leeceo97님

일단 Spark Streaming은 지속적으로 데이터를 처리하고 변경 사항을 효율적으로 캡처하는 데 적합합니다. 이를 활용하여 말씀하신, 10개의 Iceberg 테이블 변경 사항을 종합해 Datamart에 반영할 수 있습니다.

구체적으로는 Iceberg의 스냅샷(snapshots) 기능을 활용하여 최근 스냅샷과 이전 스냅샷의 차이를 추출합니다. 그리고 Spark Streaming으로 변경 데이터 처리 Spark Streaming을 사용하여 변경 데이터를 실시간으로 읽고 변환합니다. 마지막으로 Iceberg의 MERGE INTO를 활용하여 Datamart에 반영하면 될 듯 합니다(https://iceberg.apache.org/docs/1.7.1/spark-writes/#merge-into)

요약하면 다음과 같습니다.

  • 변경 데이터 수집:

    • Iceberg의 Snapshots 또는 CDC 데이터 스트림.

  • 데이터 통합 및 조인:

    • Spark Structured Streaming을 통해 10개 테이블의 변경 데이터 집계 및 조인.

  • Datamart 반영:

    • Spark SQL + Iceberg의 MERGE INTO로 실시간 또는 배치 업데이트.

도움이 되었을까요?

leeceo97님의 프로필 이미지
leeceo97
질문자

아하 네넵 도움이 됐습니다.

감사합니다 🙂

0

안녕하세요, 인프런 AI 인턴입니다. leeceo97님.

CDC 기반으로 Iceberg 테이블의 데이터 변경점을 실시간으로 처리하고자 하는 문제에 대해 고민하고 계시군요. Spark Streaming을 활용하여 해결책을 찾을 수 있습니다. Spark Streaming은 실시간으로 들어오는 데이터를 처리하는 데 매우 효과적이며, 여기서 CDC(Change Data Capture) 이벤트를 효율적으로 다룰 수 있습니다.

Spark Streaming을 이용한 처리 방법

  1. 실시간 데이터 수집:

    • 이미 Kinesis와 같은 스트리밍 플랫폼을 사용하고 계시기에, Spark Streaming으로 Kinesis에서 직접 데이터를 수신할 수 있습니다.
    • Spark Structured Streaming을 활용해 Kinesis에서 스트림을 읽어오는 구조를 만드세요.
  2. 변경 데이터 처리:

    • 각 CDC 이벤트(insert, update, delete)를 적절히 분류해야 합니다. 이때 Kafka 등의 메시지 브로커를 통해 해당 이벤트를 트리거할 수도 있습니다.
    • Spark의 DataFrame API 및 SQL을 이용해 각각의 이벤트를 처리할 수 있습니다.
  3. 집계 및 반영:

    • 집계된 변경 데이터들을 주기적으로 DataFrame 또는 테이블 형식으로 변환합니다.
    • Iceberg를 사용하여 해당 변화를 DataFrame에 실시간으로 반영할 수 있습니다.
  4. 데이터 마트에 반영:

    • 적재된 데이터를 마트 테이블에 삽입하거나 업데이트할 때, insert와 update를 위한 트랜잭션을 설정하여 필요한 경우에만 반영되도록 합니다.

아래는 관련된 질문의 예시들이며, 참고하시기를 추천드립니다:

이 답변이 도움이 되었으면 좋겠습니다. 추가적으로 궁금한 점이 있으시면 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.

저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다. 현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏

leeceo97님의 프로필 이미지
leeceo97
질문자

  1. 데이터 마트에 반영:

    • 적재된 데이터를 마트 테이블에 삽입하거나 업데이트할 때, insert와 update를 위한 트랜잭션을 설정하여 필요한 경우에만 반영되도록 합니다.

이부분에 대한게 사실 잘 해결안되고 있습니다.

이런것들을 datamart를 생성하는 쿼리를 통해 어느정도 구조화 되어 관리할 수 있는 방식은 현재로써는 어려운 부분일까요? 스트리밍 데이터에 대한 이해가 좀 부족하여 이부분 질문드렸습니다.

leeceo97님의 프로필 이미지
leeceo97

작성한 질문수

질문하기