해결된 질문
작성
·
223
답변 1
1
안녕하세요 JP님,
일단 executemany
같은 경우 PythonOperator를 사용하시고 psycopg2
를 이용하시면 될 듯 힙니다. 대충 코드를 만들어 보자면 ...
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import psycopg2
def batch_insert_to_postgres():
conn = psycopg2.connect("dbname=your_db user=your_user password=your_password")
cur = conn.cursor()
query = "INSERT INTO your_table (column1, column2) VALUES (%s, %s)"
data = [(1, 'abc'), (2, 'def'), (3, 'ghi')]
cur.executemany(query, data)
conn.commit()
cur.close()
conn.close()
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 12, 12),
# other default args
}
dag = DAG('postgres_batch_insert', default_args=default_args, schedule_interval='@daily')
insert_task = PythonOperator(
task_id='insert_to_postgres',
python_callable=batch_insert_to_postgres,
dag=dag,
)
insert_task
하지만 데이터가 엄청나게 많은 데이터 같은 경우(Billion level), 저 같은 경우는 대부분 Spark를 사용해서 로딩을 합니다. 개인용 프로젝트로는 위의 코드처럼 하시면 충분 하실 듯 합니다.
답변감사합니다!! ㅠㅠ
추후 큰 데이터로 spark 로딩도 해봐야겠네요!!
빠른 답변 감사합니다!!