작성
·
58
답변 2
1
안녕하세요. 판다스 스튜디오입니다.
두 가지 시나리오 모두 LangGraph에서 구현 가능합니다.
첫 번째 시나리오는 순차 처리 후 집계하는 방식입니다.
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
import operator
# 상태 정의
class State(TypedDict):
query: str
result_1: str
result_2: str
final_result: str
# 노드 정의
def process_1(state: State):
# 쿼리를 기반으로 첫 번째 작업 수행
result_1 = f"작업 1의 결과: {state['query']} 처리됨"
return {"result_1": result_1}
def process_2(state: State):
# 쿼리를 기반으로 두 번째 작업 수행
result_2 = f"작업 2의 결과: {state['query']} 처리됨"
return {"result_2": result_2}
def aggregate(state: State):
# 작업 1과 2의 결과를 집계
combined = f"작업 1 결과: {state['result_1']}, 작업 2 결과: {state['result_2']}"
return {"final_result": combined}
# 그래프 구성
workflow = StateGraph(State)
workflow.add_node("process_1", process_1)
workflow.add_node("process_2", process_2)
workflow.add_node("aggregate", aggregate)
# 엣지 정의 - 순차적 실행
workflow.add_edge(START, "process_1")
workflow.add_edge("process_1", "process_2")
workflow.add_edge("process_2", "aggregate")
workflow.add_edge("aggregate", END)
# 그래프 컴파일
graph = workflow.compile()
# 실행
result = graph.invoke({"query": "테스트 쿼리"})
print(result["final_result"])
두 번째 시나리오는 LangGraph의 Send
API를 사용하여 구현할 수 있습니다.
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, START, END
from langgraph.constants import Send
import operator
# 상태 정의
class State(TypedDict):
query: str
results: Annotated[List[str], operator.add] # 결과를 리스트에 추가하는 reducer
final_result: str
# 노드 정의
def process_1(state: State):
result = f"작업 1의 결과: {state['query']} 처리됨"
return {"results": [result]}
def process_2(state: State):
result = f"작업 2의 결과: {state['query']} 처리됨"
return {"results": [result]}
def process_3(state: State):
result = f"작업 3의 결과: {state['query']} 처리됨"
return {"results": [result]}
def aggregate(state: State):
# 모든 결과 취합
combined = "취합된 결과: " + ", ".join(state["results"])
return {"final_result": combined}
# 그래프 구성
workflow = StateGraph(State)
workflow.add_node("process_1", process_1)
workflow.add_node("process_2", process_2)
workflow.add_node("process_3", process_3)
workflow.add_node("aggregate", aggregate)
# 병렬 처리를 위한 조건부 엣지 함수
def branch_parallel(state: State):
# 병렬로 실행할 노드들 정의
return [
Send("process_1", state),
Send("process_2", state),
Send("process_3", state),
]
# 엣지 정의 - 병렬 실행
workflow.add_edge(START, branch_parallel)
workflow.add_edge("process_1", "aggregate")
workflow.add_edge("process_2", "aggregate")
workflow.add_edge("process_3", "aggregate")
workflow.add_edge("aggregate", END)
# 그래프 컴파일
graph = workflow.compile()
# 실행
result = graph.invoke({"query": "테스트 쿼리", "results": []})
print(result["final_result"])
0
안녕하세요, 인프런 AI 인턴입니다. 유영재님.
에이전틱 루프에서의 병렬 노드 구현에 대해 문의주셨군요. 이를 구현할 수 있는 방법을 설명드리겠습니다.
순차적 진행 및 Aggregate 전달 방식:
이러한 방식은 주로 각 작업의 결과가 다음 작업의 입력으로 사용될 때 유용합니다. 보통은 다음과 같은 방식으로 구현할 수 있습니다:
병렬 작업 수행 후 한 번에 취합 방식:
이 방식은 독립적인 작업들을 동시에 수행하여 성능을 향상시킬 때 활용됩니다.
LangGraph와 같은 프레임워크에서, 이러한 병렬 처리를 구현하기 위해서는 추가적인 비동기 프로그래밍 컨셉을 고려하여 코드를 작성해야 할 수 있습니다. 비동기 작업의 결과를 모아 올바르게 집계하고, 각 작업의 독립성을 고려하여 에러 발생 시 적절한 대처가 필요합니다.
비슷한 질문에 대한 답변이나 예제를 원하시면, 다음 링크를 참조해주세요:
- tool_search_graph 서브그래프 구현 관련 질문: 조건부 분기 처리 및 supervisor 연결 방법
저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏 추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.
branch_parallel 노드가 정의가 안되어있어서 정의하고 그래프 그리니 저렇게 나왔어요!