인프런 영문 브랜드 로고
인프런 영문 브랜드 로고

인프런 커뮤니티 질문&답변

유영재님의 프로필 이미지

작성한 질문수

AI 에이전트로 구현하는 RAG 시스템(w. LangGraph)

Human-in-the-Loop (HITL) 과정 구현하기

에이전틱 루프에서의 병렬 노드 구현 문의

작성

·

58

0

안녕하세요, 강사님

워크플로우가 아닌 에이전틱 루프에서 병렬 노드 구현이 가능한지 문의드립니다. 구체적으로 다음과 같은 두 가지 시나리오에 대한 가능 여부가 궁금합니다

  1. 쿼리에 따라 1, 2번 작업을 순차적으로 진행한 후, 그 결과를 aggregate하여 3번으로 전달하는 방식

  2. 1, 2, 3번 작업을 동시에 병렬로 수행한 후 한 번에 취합하는 방식

어떤 방식이 가능한지 조언 부탁드립니다. 감사합니다

답변 2

1

판다스 스튜디오님의 프로필 이미지
판다스 스튜디오
지식공유자

안녕하세요. 판다스 스튜디오입니다.
두 가지 시나리오 모두 LangGraph에서 구현 가능합니다.

1. 순차적 처리 후 결과 집계 방식

첫 번째 시나리오는 순차 처리 후 집계하는 방식입니다.

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
import operator

# 상태 정의
class State(TypedDict):
    query: str
    result_1: str
    result_2: str
    final_result: str

# 노드 정의
def process_1(state: State):
    # 쿼리를 기반으로 첫 번째 작업 수행
    result_1 = f"작업 1의 결과: {state['query']} 처리됨"
    return {"result_1": result_1}

def process_2(state: State):
    # 쿼리를 기반으로 두 번째 작업 수행
    result_2 = f"작업 2의 결과: {state['query']} 처리됨"
    return {"result_2": result_2}

def aggregate(state: State):
    # 작업 1과 2의 결과를 집계
    combined = f"작업 1 결과: {state['result_1']}, 작업 2 결과: {state['result_2']}"
    return {"final_result": combined}

# 그래프 구성
workflow = StateGraph(State)
workflow.add_node("process_1", process_1)
workflow.add_node("process_2", process_2)
workflow.add_node("aggregate", aggregate)

# 엣지 정의 - 순차적 실행
workflow.add_edge(START, "process_1")
workflow.add_edge("process_1", "process_2")
workflow.add_edge("process_2", "aggregate")
workflow.add_edge("aggregate", END)

# 그래프 컴파일
graph = workflow.compile()

# 실행
result = graph.invoke({"query": "테스트 쿼리"})
print(result["final_result"])

2. 병렬 실행 후 결과 취합 방식

두 번째 시나리오는 LangGraph의 Send API를 사용하여 구현할 수 있습니다.

from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, START, END
from langgraph.constants import Send
import operator

# 상태 정의
class State(TypedDict):
    query: str
    results: Annotated[List[str], operator.add]  # 결과를 리스트에 추가하는 reducer
    final_result: str

# 노드 정의
def process_1(state: State):
    result = f"작업 1의 결과: {state['query']} 처리됨"
    return {"results": [result]}

def process_2(state: State):
    result = f"작업 2의 결과: {state['query']} 처리됨"
    return {"results": [result]}

def process_3(state: State):
    result = f"작업 3의 결과: {state['query']} 처리됨"
    return {"results": [result]}

def aggregate(state: State):
    # 모든 결과 취합
    combined = "취합된 결과: " + ", ".join(state["results"])
    return {"final_result": combined}

# 그래프 구성
workflow = StateGraph(State)
workflow.add_node("process_1", process_1)
workflow.add_node("process_2", process_2)
workflow.add_node("process_3", process_3)
workflow.add_node("aggregate", aggregate)

# 병렬 처리를 위한 조건부 엣지 함수
def branch_parallel(state: State):
    # 병렬로 실행할 노드들 정의
    return [
        Send("process_1", state),
        Send("process_2", state),
        Send("process_3", state),
    ]

# 엣지 정의 - 병렬 실행
workflow.add_edge(START, branch_parallel)
workflow.add_edge("process_1", "aggregate")
workflow.add_edge("process_2", "aggregate")
workflow.add_edge("process_3", "aggregate")
workflow.add_edge("aggregate", END)

# 그래프 컴파일
graph = workflow.compile()

# 실행
result = graph.invoke({"query": "테스트 쿼리", "results": []})
print(result["final_result"])

유영재님의 프로필 이미지
유영재
질문자

image.png

branch_parallel 노드가 정의가 안되어있어서 정의하고 그래프 그리니 저렇게 나왔어요!

유영재님의 프로필 이미지
유영재
질문자

에러가 뜨는데, 동작 결과랑 langgraph 버전이 문제라면 알려주시면 감사하겠습니다!

0

인프런 AI 인턴님의 프로필 이미지

안녕하세요, 인프런 AI 인턴입니다. 유영재님.

에이전틱 루프에서의 병렬 노드 구현에 대해 문의주셨군요. 이를 구현할 수 있는 방법을 설명드리겠습니다.

  1. 순차적 진행 및 Aggregate 전달 방식:
    이러한 방식은 주로 각 작업의 결과가 다음 작업의 입력으로 사용될 때 유용합니다. 보통은 다음과 같은 방식으로 구현할 수 있습니다:

    • 각 작업(1번, 2번)을 순차적으로 실행합니다.
    • 각 작업의 결과를 적절히 결합하여 다음 노드(3번)로 전달합니다.
      이 방식은 상태 관리가 중요하며, 결과를 집계하고 조건부 분기를 설정하는 데 유용합니다.
  2. 병렬 작업 수행 후 한 번에 취합 방식:
    이 방식은 독립적인 작업들을 동시에 수행하여 성능을 향상시킬 때 활용됩니다.

    • 각각의 작업(1번, 2번, 3번)을 동시에 실행합니다.
    • 모든 작업이 완료되면, 결과를 통합하여 다음 처리 과정을 진행합니다.
      이 경우에는 각 작업이 독립적으로 처리되므로, 비동기 처리를 지원하는 시스템을 사용하는 것이 좋습니다.

LangGraph와 같은 프레임워크에서, 이러한 병렬 처리를 구현하기 위해서는 추가적인 비동기 프로그래밍 컨셉을 고려하여 코드를 작성해야 할 수 있습니다. 비동기 작업의 결과를 모아 올바르게 집계하고, 각 작업의 독립성을 고려하여 에러 발생 시 적절한 대처가 필요합니다.

비슷한 질문에 대한 답변이나 예제를 원하시면, 다음 링크를 참조해주세요:
- tool_search_graph 서브그래프 구현 관련 질문: 조건부 분기 처리 및 supervisor 연결 방법

저는 질문자님의 이해를 돕고자 지식공유자님의 기존 답변을 참고하여 유사한 답변 링크를 추천해드리고 있습니다.
현재 베타 기능으로 답변이 만족스럽지 않을 수 있는 점 양해 부탁드립니다. 🙏 추가적으로 궁금한 점이 있으시면, 이어서 질문해 주세요. 곧 지식공유자께서 답변해 주실 것입니다.