인프런 커뮤니티 질문&답변

윤원준님의 프로필 이미지
윤원준

작성한 질문수

유니티 머신러닝 에이전트 완전정복 (기초편)

A2C 알고리즘을 드론 환경에 적용하려면

해결된 질문

작성

·

714

1

안녕하세요, 드론 환경에서 A2C 알고리즘을 적용해서 실험해보려고 합니다.

환경에 맞게 적용을 하기 위해 state_size, action_size를 각각 9와 3으로 설정하고, 이 외에도 여러 부분을 수정하여 돌려 보려고 했는데 잘 안 되네요.

get_action() 함수를 손보거나, preprocess를 하는 부분을 수정해주어야 할 것 같은데, 어떻게 해결해야 할지 잘 모르겠습니다.

현재 상태에서는 다음과 같은 오류가 발생하고 있습니다.Capture.jpgA2C.py의 코드는 아래와 같습니다.

import numpy as np
import datetime
import platform
import torch
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from collections import deque
from mlagents_envs.environment import UnityEnvironment, ActionTuple
from mlagents_envs.side_channel.engine_configuration_channel\
                             import EngineConfigurationChannel
# A2C를 위한 파라미터 값 세팅
state_size = 9
action_size = 3

load_model = False
train_mode = True

discount_factor = 0.9
learning_rate = 0.00025

run_step = 100000 if train_mode else 0
test_step = 10000

print_interval = 10
save_interval = 100

# VISUAL_OBS = 0
# GOAL_OBS = 1
# VECTOR_OBS = 2
# OBS = VECTOR_OBS


# 유니티 환경 경로
game = "Drone"
os_name = platform.system()
if os_name == 'Windows':
    env_name = f"C:/Users/user/anaconda3/envs/mlagents/Drone_0427/Drone_1002_3.exe"
elif os_name == 'Darwin':
    env_name = f"../envs/{game}_{os_name}"

# 모델 저장 및 불러오기 경로
date_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
save_path = f"./saved_models/{game}/A2C/{date_time}"
load_path = f"./saved_models/{game}/A2C/20210709235643"

# 연산 장치
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("CUDA is available" if torch.cuda.is_available() else "CPU is available")
# A2C 클래스 -> Actor Network / Critic Network 정의
class A2C(torch.nn.Module):
    def __init__(self, **kwargs):
        super(A2C, self).__init__(**kwargs)
        self.d1 = torch.nn.Linear(state_size, 128)
        self.d2 = torch.nn.Linear(128, 128)
        self.pi = torch.nn.Linear(128, action_size)
        self.v = torch.nn.Linear(128, 1)
        
    def forward(self, x):
        x = F.relu(self.d1(x))
        x = F.relu(self.d2(x))
        return F.softmax(self.pi(x), dim=1), self.v(x)

# A2CAgent 클래스 -> A2C 알고리즘을 위한 다양한 함수 정의
class A2CAgent():
    def __init__(self):
        self.a2c = A2C().to(device)
        self.optimizer = torch.optim.Adam(self.a2c.parameters(), lr=learning_rate)
        self.writer = SummaryWriter(save_path)

        if load_model == True:
            print(f"... Load Model from {load_path}/ckpt ...")
            checkpoint = torch.load(load_path+'/ckpt', map_location=device)
            self.a2c.load_state_dict(checkpoint["network"])
            self.optimizer.load_state_dict(checkpoint["optimizer"])

    # 정책을 통해 행동 결정
    def get_action(self, state, training=True):
        #  네트워크 모드 설정
        self.a2c.train(training)
        
        #네트워크 연산에 따라 행동 결정
        pi, _  = self.a2c(torch.FloatTensor(state).to(device))
        action = torch.multinomial(pi, num_samples=1).cpu().numpy() 
        return action

    def train_model(self, state, action, reward, next_state, done):
        state, action, reward, next_state, done = map(lambda x: torch.FloatTensor(x).to(device),
                                                        [state, action, reward, next_state, done])
        pi, value = self.a2c(state)

        # Value network
        with torch.no_grad():
            _, next_value = self.a2c(next_state)
            target_value = reward + (1-done) * discount_factor * next_value
        critic_loss = F.mse_loss(target_value, value)
        
        #Policy network 
        eye = torch.eye(action_size).to(device)
        one_hot_action = eye[action.view(-1).long()]
        advantage = (target_value - value).detach()
        actor_loss = -(torch.log((one_hot_action * pi).sum(1))*advantage).mean()
        total_loss = critic_loss + actor_loss
    
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()
        
        return actor_loss.item(), critic_loss.item()

    # 네트워크 모델 저장
    def save_model(self):
        print(f"... Save Model to {save_path}/ckpt ...")
        torch.save({
            "network" : self.a2c.state_dict(),
            "optimizer" : self.optimizer.state_dict(),
        }, save_path+'/ckpt')

    # 학습 기록
    def write_summray(self, score, actor_loss, critic_loss, step):
        self.writer.add_scalar("run/score", score, step)
        self.writer.add_scalar("model/actor_loss", actor_loss, step)
        self.writer.add_scalar("model/critic_loss", critic_loss, step)


# Main 함수 -> 전체적으로 A2C 알고리즘을 진행
if __name__ == '__main__':
    # 유니티 환경 경로 설정 (file_name)
    engine_configuration_channel = EngineConfigurationChannel()
    env = UnityEnvironment(file_name=env_name,
                           side_channels=[engine_configuration_channel])
    env.reset()

    # 유니티 브레인 설정
    behavior_name = list(env.behavior_specs.keys())[0]
    spec = env.behavior_specs[behavior_name]
    engine_configuration_channel.set_configuration_parameters(time_scale=12.0)
    dec, term = env.get_steps(behavior_name)

    # A2C 클래스를 agent로 정의
    agent = A2CAgent()

    actor_losses, critic_losses, scores, episode, score = [], [], [], 0, 0
    for step in range(run_step + test_step):
        if step == run_step:
            if train_mode:
                agent.save_model()
            print("TEST START")
            train_mode = False
            engine_configuration_channel.set_configuration_parameters(time_scale=1.0)
            
        #preprocess = lambda obs, goal: np.concatenate((obs*goal[0][0], obs*goal[0][1]), axis=-1)  
        #state = preprocess(dec.obs[OBS])
        #state = preprocess(dec.obs[OBS],dec.obs[GOAL_OBS])
        state = dec.obs[0]
        action = agent.get_action(state, train_mode)
        action_tuple = ActionTuple()
        action_tuple.add_continuous(action)
        env.set_actions(behavior_name, action_tuple)
        env.step()

        dec, term = env.get_steps(behavior_name)
        done = len(term.agent_id) > 0
        reward = term.reward if done else dec.reward
        next_state = term.obs[0] if done else dec.obs[0]
        score += reward[0]

        if train_mode:
            agent.append_sample(state[0], action[0], reward, next_state[0], [done])
            # 학습 수행
            actor_loss, critic_loss = agent.train_model()
            actor_losses.append(actor_loss)
            critic_losses.append(critic_loss)

        if done:
            episode += 1
            scores.append(score)
            score = 0

            # 게임 진행 상황 출력 및 텐서 보드에 보상과 손실함수 값 기록
            if episode % print_interval == 0:
                mean_score = np.mean(scores)
                mean_actor_loss = np.mean(actor_losses)
                mean_critic_loss = np.mean(critic_losses)
                agent.write_summray(mean_score, mean_actor_loss, mean_critic_loss, step)
                actor_losses, critic_losses, scores = [], [], []

                print(f"{episode} Episode / Step: {step} / Score: {mean_score:.2f} / " +\
                      f"Actor loss: {mean_actor_loss:.2f} / Critic loss: {mean_critic_loss:.4f}")

            # 네트워크 모델 저장
            if train_mode and episode % save_interval == 0:
                agent.save_model()

    env.close()

답변 주시면 감사하겠습니다!

답변 1

0

이현호님의 프로필 이미지
이현호
지식공유자

원준님 안녕하세요!

 

그리드 월드에서 사용한 A2C를 드론 환경에 적용해보시려고 하는 것 같은데요.

현재 그리드 월드는 discrete action 환경이고 드론 환경은 continuous action 환경입니다.

따라서 그리드 월드 환경에서 사용한 A2C는 discrete action 기반으로 구현돼있어, 바로 continuous action 환경인 드론에 바로 적용하기는 어려운 상황인데요.

 

드론에 A2C 알고리즘을 적용하기 위해서 두가지 방법이 있을 것 같습니다.

첫번째 방법은 정석대로 기존에 discrete action 기반의 A2C를 continuous action 기반으로 수정하는 것이 있습니다. 이 방법을 적용하기 위해서는 Actor 네트워크에서 확률을 뽑는 것이 아니라, 평균과 표준편차를 구해 가우시안 샘플링을 해야 합니다. 이와 관련해서 학습 코드도 수정해야 하구요. (참고)

두번째 방법은 코드 수정을 최소화하기 위해 드론 환경을 (-1, 0, 0), (0, -1, 0), (0, 0, -1), (1, 0, 0), (0, 1, 0), (0, 0, 1) 이렇게 6개의 discrete action 으로 구성돼있다고 간주하는 것입니다. 이 방법을 적용하기 위해서 action space를 6으로 정의하고 action 을 후처리하여 환경에 넣어주는 코드만 작성하면 될 것 같습니다.

두번째 방법을 적용하기 위해 올려주신 코드에서 아래와 같이 수정해주면 될 것 같습니다. (수정이 필요한 부분만 작성하였습니다.)

...
...
# 파라미터 값 세팅 부분
action_size = 6
...
...
# Main 함수 부분 
        state = dec.obs[0]
        action = agent.get_action(state, train_mode)
        action_support = np.concatenate([-np.eye(action_size//2) , np.eye(action_size//2)], axis=0)
        real_action = action_support[action[0]]
        action_tuple = ActionTuple()
        action_tuple.add_continuous(real_action)
        env.set_actions(behavior_name, action_tuple)
        env.step()

        dec, term = env.get_steps(behavior_name)
        done = len(term.agent_id) > 0
        reward = term.reward if done else dec.reward
        next_state = term.obs[0] if done else dec.obs[0]
        score += reward[0]

        if train_mode:
            # 학습 수행
            actor_loss, critic_loss = agent.train_model(state, action[0], [reward], next_state, [done])
            actor_losses.append(actor_loss)
            critic_losses.append(critic_loss)
...
...

 

추가로 궁금하신게 있다면 편하게 물어봐주세요.

감사합니다 :)

윤원준님의 프로필 이미지
윤원준
질문자

답변 감사드립니다! action을 discrete 6개인 것으로 간주하려면 유니티 환경 내부의 Behaviour Parameters도 수정을 함께 해야 할 것 같은데요, Discrete Branches를 3로, Branch Size는 각각 3으로 설정하면 맞을까요?

이현호님의 프로필 이미지
이현호
지식공유자

환경은 그대로 이용하셔도 됩니다!

메인함수 코드에서 후처리해서 continuous action 형식에 맞춰주기 때문에 환경은 수정하지 않으셔도 됩니다 :)

윤원준님의 프로필 이미지
윤원준
질문자

네 학습 잘 됩니다!! 답변 감사합니다!!

윤원준님의 프로필 이미지
윤원준

작성한 질문수

질문하기