인프런 커뮤니티 질문&답변

JP님의 프로필 이미지
JP

작성한 질문수

실리콘밸리 엔지니어와 함께하는 Apache Airflow

PostgresOperator로 대량의 데이터 업로드 방법 질문드립니다.

해결된 질문

작성

·

223

1

안녕하세요 선생님 🙂

PostgresOperator 질문 드립니다.

DB table에 데이터를 갱신하는 task를 혼자 만들어보고 있는데요.

PostgresOperator는 executemany와 같은 기능을 지원하지 않는 것으로 확인했습니다.

airflow에서 대량의 데이터를 insert / update 하는 방법이 있을까요..?

답변 1

1

미쿡엔지니어님의 프로필 이미지
미쿡엔지니어
지식공유자

안녕하세요 JP님,

일단 executemany같은 경우 PythonOperator를 사용하시고 psycopg2를 이용하시면 될 듯 힙니다. 대충 코드를 만들어 보자면 ...

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import psycopg2

def batch_insert_to_postgres():
    conn = psycopg2.connect("dbname=your_db user=your_user password=your_password")
    cur = conn.cursor()

    query = "INSERT INTO your_table (column1, column2) VALUES (%s, %s)"
    data = [(1, 'abc'), (2, 'def'), (3, 'ghi')]

    cur.executemany(query, data)

    conn.commit()
    cur.close()
    conn.close()

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 12, 12),
    # other default args
}

dag = DAG('postgres_batch_insert', default_args=default_args, schedule_interval='@daily')

insert_task = PythonOperator(
    task_id='insert_to_postgres',
    python_callable=batch_insert_to_postgres,
    dag=dag,
)

insert_task

하지만 데이터가 엄청나게 많은 데이터 같은 경우(Billion level), 저 같은 경우는 대부분 Spark를 사용해서 로딩을 합니다. 개인용 프로젝트로는 위의 코드처럼 하시면 충분 하실 듯 합니다.

JP님의 프로필 이미지
JP
질문자

답변감사합니다!! ㅠㅠ

추후 큰 데이터로 spark 로딩도 해봐야겠네요!!

빠른 답변 감사합니다!!

JP님의 프로필 이미지
JP
질문자

조금 우여곡절이 있었지만 조언해주신 덕분에 DAG 구동 성공했습니다!
감사합니다 ㅠㅠ

image

JP님의 프로필 이미지
JP

작성한 질문수

질문하기