{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "FYAZ2nct-C2l" }, "source": [ "# 머신 러닝 교과서 3판" ] }, { "cell_type": "markdown", "metadata": { "id": "g-4KQdk2-C2n" }, "source": [ "# 18장 - 강화 학습으로 복잡한 환경에서 의사 결정하기" ] }, { "cell_type": "markdown", "metadata": { "id": "iKN7i0HT-C2n" }, "source": [ "**아래 링크를 통해 이 노트북을 주피터 노트북 뷰어(nbviewer.jupyter.org)로 보거나 구글 코랩(colab.research.google.com)에서 실행할 수 있습니다.**\n", "\n", "\n", " \n", " \n", "
\n", " 주피터 노트북 뷰어로 보기\n", " \n", " 구글 코랩(Colab)에서 실행하기\n", "
" ] }, { "cell_type": "markdown", "metadata": { "id": "EOCPtjVK-C2n" }, "source": [ "### 목차" ] }, { "cell_type": "markdown", "metadata": { "id": "oVuwjmWN-C2o" }, "source": [ "- 경험에서 배웁니다\n", " - 강화 학습 이해하기\n", " - 강화 학습 시스템의 에이전트-환경 인터페이스 정의하기\n", "- 강화 학습의 기초 이론\n", " - 마르코프 결정 과정\n", " - 마르코프 결정 과정의 수학 공식\n", " - 마르코프 과정 시각화\n", " - 에피소드 작업 대 연속적인 작업\n", " - 강화 학습 용어: 대가, 정책, 가치 함수\n", " - 대가\n", " - 정책\n", " - 가치 함수\n", " - 벨먼 방정식을 사용한 동적 계획법\n", "- 강화 학습 알고리즘\n", " - 동적 계획법\n", " - 정책 평가 - 동적 계획법으로 가치 함수 예측하기\n", " - 추정된 가치 함수로 정책 향상시키기\n", " - 정책 반복\n", " - 가치 반복\n", " - 몬테 카를로를 사용한 강화 학습\n", " - MC를 사용한 상태-가치 함수 추정\n", " - MC를 사용한 행동-가치 함수 추정\n", " - MC 제어를 사용해 최적의 정책 찾기\n", " - 정책 향상 - 행동-가치 함수로부터 그리디 정책 계산하기\n", " - 시간차 학습\n", " - TD 예측\n", " - 온-폴리시 TD 제어 (SARSA)\n", " - 오프-폴리시 TD 제어 (Q-러닝)\n", "- 첫 번째 강화 학습 알고리즘 구현하기\n", " - OpenAI 짐 툴킷 소개\n", " - OpenAI 짐에 포함된 환경 사용하기\n", " - 그리드 월드\n", " - OpenAI 짐에서 그리드 월드 환경 구현하기\n", " - Q-러닝으로 그리드 월드 문제 풀기\n", " - Q-러닝 알고리즘 구현하기\n", " - 심층 Q-러닝\n", " - Q-러닝 알고리즘에 따라 DQN 모델 훈련하기\n", " - 재생 메모리\n", " - 손실 계산을 위해 타깃 가치 결정하기\n", " - 심층 Q-러닝 알고리즘 구현\n", "- 전체 요약" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "id": "aW0h86w9-C2o" }, "outputs": [], "source": [ "from IPython.display import Image" ] }, { "cell_type": "markdown", "metadata": { "id": "BjkzSp0e-C2o" }, "source": [ "# 경험에서 배웁니다\n", "\n", "## 강화 학습 이해하기\n", "\n", "## 강화 학습 시스템의 에이전트-환경 인터페이스 정의하기" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 447 }, "id": "he9yqXQY-C2p", "outputId": "68ae6b1f-5bbd-4ace-f5f4-f4f7708d22b2" }, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "execution_count": 2 } ], "source": [ "Image(url='https://git.io/JtTQo', width=700)" ] }, { "cell_type": "markdown", "metadata": { "id": "ULH0eSrQ-C2p" }, "source": [ "# 강화 학습의 기초 이론\n", "\n", "## 마르코프 결정 과정\n", "\n", "## 마르코프 결정 과정의 수학 공식" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 473 }, "id": "QXU39HEE-C2q", "outputId": "8f5431bd-ccbc-4943-83b0-da50367e168a" }, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "execution_count": 3 } ], "source": [ "Image(url='https://git.io/JtTQi', width=700)" ] }, { "cell_type": "markdown", "metadata": { "id": "wVy6MQKE-C2q" }, "source": [ "### 마르코프 과정 시각화" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 276 }, "id": "R4u_4d6w-C2q", "outputId": "ebf678eb-d464-426b-e3e6-e5467b4c6359" }, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "execution_count": 4 } ], "source": [ "Image(url='https://git.io/JtTQP', width=700)" ] }, { "cell_type": "markdown", "metadata": { "id": "xN6MVEs3-C2q" }, "source": [ "### 에피소드 작업 대 연속적인 작업" ] }, { "cell_type": "markdown", "metadata": { "id": "FdueKfC5-C2r" }, "source": [ "## 강화 학습 용어: 대가, 정책, 가치 함수\n", "\n", "### 대가" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 422 }, "id": "k0i6lJUZ-C2r", "outputId": "2c454b5b-b5e3-4ab8-acdb-3ddbd30c84d7" }, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "execution_count": 5 } ], "source": [ "# 할인 계수에 대하여\n", "\n", "Image(url='https://git.io/Jtkcl', width=700)" ] }, { "cell_type": "markdown", "metadata": { "id": "BS8vmyz7-C2r" }, "source": [ "### 정책\n", "\n", "### 가치 함수" ] }, { "cell_type": "markdown", "metadata": { "id": "jzQGkggx-C2r" }, "source": [ "## 벨먼 방정식을 사용한 동적 계획법" ] }, { "cell_type": "markdown", "metadata": { "id": "mOwyDK-f-C2r" }, "source": [ "# 강화 학습 알고리즘" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 237 }, "id": "fXKrwBqo-C2r", "outputId": "39f89120-8b17-47ef-bf2e-1d4a43a2703b" }, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "execution_count": 6 } ], "source": [ "Image(url='https://git.io/Jtkc4', width=700)" ] }, { "cell_type": "markdown", "metadata": { "id": "diXYfr-1-C2s" }, "source": [ "## 동적 계획법\n", "\n", "### 정책 평가 - 동적 계획법으로 가치 함수 예측하기\n", "\n", "### 추정된 가치 함수로 정책 향상시키기\n", "\n", "### 정책 반복\n", "\n", "### 가치 반복" ] }, { "cell_type": "markdown", "metadata": { "id": "f8zZTfDr-C2s" }, "source": [ "## 몬테 카를로를 사용한 강화 학습\n", "\n", "### MC를 사용한 상태-가치 함수 추정\n", "\n", "### MC를 사용한 행동-가치 함수 추정\n", "\n", "### MC 제어를 사용해 최적의 정책 찾기\n", "\n", "### 정책 향상 - 행동-가치 함수로부터 그리디 정책 계산하기" ] }, { "cell_type": "markdown", "metadata": { "id": "MYHcYa2n-C2s" }, "source": [ "## 시간차 학습\n", "\n", "### TD 예측\n", "\n", "### 온-폴리시 TD 제어 (SARSA)\n", "\n", "### 오프-폴리시 TD 제어 (Q-러닝)" ] }, { "cell_type": "markdown", "metadata": { "id": "6r2LUtd1-C2s" }, "source": [ "# 첫 번째 강화 학습 알고리즘 구현하기\n", "\n", "## OpenAI 짐 툴킷 소개\n", "\n", "### OpenAI 짐에 포함된 환경 사용하기" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 203 }, "id": "_hD-JWTu-C2s", "outputId": "df237f28-e3b7-476b-e9e7-098e39ccf788" }, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "execution_count": 7 } ], "source": [ "Image(url='https://git.io/JtkcB', width=800)" ] }, { "cell_type": "markdown", "metadata": { "id": "LeF3J9WX-C2s" }, "source": [ "### 그리드 월드" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 303 }, "id": "YxGS_WO4-C2s", "outputId": "e30ded01-61b6-4672-def2-b6a347189db4" }, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "execution_count": 8 } ], "source": [ "Image(url='https://git.io/Jtkc0', width=800)" ] }, { "cell_type": "markdown", "metadata": { "id": "SIehs9EQ-C2t" }, "source": [ "### OpenAI 짐에서 그리드 월드 환경 구현하기" ] }, { "cell_type": "markdown", "metadata": { "id": "0YCt4M_g-C2t" }, "source": [ "```python\n", "# 스크립트: gridworld_env.py\n", "\n", "import numpy as np\n", "from gym.envs.toy_text import discrete\n", "from collections import defaultdict\n", "import time\n", "import pickle\n", "import os\n", "\n", "from gym.envs.classic_control import rendering\n", "\n", "CELL_SIZE = 100\n", "MARGIN = 10\n", "\n", "\n", "def get_coords(row, col, loc='center'):\n", " xc = (col + 1.5) * CELL_SIZE\n", " yc = (row + 1.5) * CELL_SIZE\n", " if loc == 'center':\n", " return xc, yc\n", " elif loc == 'interior_corners':\n", " half_size = CELL_SIZE//2 - MARGIN\n", " xl, xr = xc - half_size, xc + half_size\n", " yt, yb = xc - half_size, xc + half_size\n", " return [(xl, yt), (xr, yt), (xr, yb), (xl, yb)]\n", " elif loc == 'interior_triangle':\n", " x1, y1 = xc, yc + CELL_SIZE//3\n", " x2, y2 = xc + CELL_SIZE//3, yc - CELL_SIZE//3\n", " x3, y3 = xc - CELL_SIZE//3, yc - CELL_SIZE//3\n", " return [(x1, y1), (x2, y2), (x3, y3)]\n", "\n", "\n", "def draw_object(coords_list):\n", " if len(coords_list) == 1: # -> 원\n", " obj = rendering.make_circle(int(0.45*CELL_SIZE))\n", " obj_transform = rendering.Transform()\n", " obj.add_attr(obj_transform)\n", " obj_transform.set_translation(*coords_list[0])\n", " obj.set_color(0.2, 0.2, 0.2) # -> 검정\n", " elif len(coords_list) == 3: # -> 삼각형\n", " obj = rendering.FilledPolygon(coords_list)\n", " obj.set_color(0.9, 0.6, 0.2) # -> 노랑\n", " elif len(coords_list) > 3: # -> 다각형\n", " obj = rendering.FilledPolygon(coords_list)\n", " obj.set_color(0.4, 0.4, 0.8) # -> 파랑\n", " return obj\n", "\n", "\n", "class GridWorldEnv(discrete.DiscreteEnv):\n", " def __init__(self, num_rows=4, num_cols=6, delay=0.05):\n", " self.num_rows = num_rows\n", " self.num_cols = num_cols\n", "\n", " self.delay = delay\n", "\n", " move_up = lambda row, col: (max(row - 1, 0), col)\n", " move_down = lambda row, col: (min(row + 1, num_rows - 1), col)\n", " move_left = lambda row, col: (row, max(col - 1, 0))\n", " move_right = lambda row, col: (row, min(col + 1, num_cols - 1))\n", "\n", " self.action_defs = {0: move_up, 1: move_right,\n", " 2: move_down, 3: move_left}\n", "\n", " # 상태와 행동 개수\n", " nS = num_cols * num_rows\n", " nA = len(self.action_defs)\n", " self.grid2state_dict = {(s // num_cols, s % num_cols): s\n", " for s in range(nS)}\n", " self.state2grid_dict = {s: (s // num_cols, s % num_cols)\n", " for s in range(nS)}\n", "\n", " # 골드 상태\n", " gold_cell = (num_rows // 2, num_cols - 2)\n", "\n", " # 함정 상태\n", " trap_cells = [((gold_cell[0] + 1), gold_cell[1]),\n", " (gold_cell[0], gold_cell[1] - 1),\n", " ((gold_cell[0] - 1), gold_cell[1])]\n", "\n", " gold_state = self.grid2state_dict[gold_cell]\n", " trap_states = [self.grid2state_dict[(r, c)]\n", " for (r, c) in trap_cells]\n", " self.terminal_states = [gold_state] + trap_states\n", " print(self.terminal_states)\n", "\n", " # 전이 확률 만들기\n", " P = defaultdict(dict)\n", " for s in range(nS):\n", " row, col = self.state2grid_dict[s]\n", " P[s] = defaultdict(list)\n", " for a in range(nA):\n", " action = self.action_defs[a]\n", " next_s = self.grid2state_dict[action(row, col)]\n", "\n", " # 종료 상태\n", " if self.is_terminal(next_s):\n", " r = (1.0 if next_s == self.terminal_states[0]\n", " else -1.0)\n", " else:\n", " r = 0.0\n", " if self.is_terminal(s):\n", " done = True\n", " next_s = s\n", " else:\n", " done = False\n", " P[s][a] = [(1.0, next_s, r, done)]\n", "\n", " # 초기 상태 배치\n", " isd = np.zeros(nS)\n", " isd[0] = 1.0\n", "\n", " super(GridWorldEnv, self).__init__(nS, nA, P, isd)\n", "\n", " self.viewer = None\n", " self._build_display(gold_cell, trap_cells)\n", "\n", " def is_terminal(self, state):\n", " return state in self.terminal_states\n", "\n", " def _build_display(self, gold_cell, trap_cells):\n", "\n", " screen_width = (self.num_cols + 2) * CELL_SIZE\n", " screen_height = (self.num_rows + 2) * CELL_SIZE\n", " self.viewer = rendering.Viewer(screen_width,\n", " screen_height)\n", "\n", " all_objects = []\n", "\n", " # 경계 위치 좌표\n", " bp_list = [\n", " (CELL_SIZE - MARGIN, CELL_SIZE - MARGIN),\n", " (screen_width - CELL_SIZE + MARGIN, CELL_SIZE - MARGIN),\n", " (screen_width - CELL_SIZE + MARGIN,\n", " screen_height - CELL_SIZE + MARGIN),\n", " (CELL_SIZE - MARGIN, screen_height - CELL_SIZE + MARGIN)\n", " ]\n", " border = rendering.PolyLine(bp_list, True)\n", " border.set_linewidth(5)\n", " all_objects.append(border)\n", "\n", " # 수직선\n", " for col in range(self.num_cols + 1):\n", " x1, y1 = (col + 1) * CELL_SIZE, CELL_SIZE\n", " x2, y2 = (col + 1) * CELL_SIZE, \\\n", " (self.num_rows + 1) * CELL_SIZE\n", " line = rendering.PolyLine([(x1, y1), (x2, y2)], False)\n", " all_objects.append(line)\n", "\n", " # 수평선\n", " for row in range(self.num_rows + 1):\n", " x1, y1 = CELL_SIZE, (row + 1) * CELL_SIZE\n", " x2, y2 = (self.num_cols + 1) * CELL_SIZE, \\\n", " (row + 1) * CELL_SIZE\n", " line = rendering.PolyLine([(x1, y1), (x2, y2)], False)\n", " all_objects.append(line)\n", "\n", " # 함정: --> 원\n", " for cell in trap_cells:\n", " trap_coords = get_coords(*cell, loc='center')\n", " all_objects.append(draw_object([trap_coords]))\n", "\n", " # 골드: --> 삼각형\n", " gold_coords = get_coords(*gold_cell,\n", " loc='interior_triangle')\n", " all_objects.append(draw_object(gold_coords))\n", "\n", " # 에이전트 --> 사각형 또는 로봇\n", " if (os.path.exists('robot-coordinates.pkl') and CELL_SIZE == 100):\n", " agent_coords = pickle.load(\n", " open('robot-coordinates.pkl', 'rb'))\n", " starting_coords = get_coords(0, 0, loc='center')\n", " agent_coords += np.array(starting_coords)\n", " else:\n", " agent_coords = get_coords(0, 0, loc='interior_corners')\n", " agent = draw_object(agent_coords)\n", " self.agent_trans = rendering.Transform()\n", " agent.add_attr(self.agent_trans)\n", " all_objects.append(agent)\n", "\n", " for obj in all_objects:\n", " self.viewer.add_geom(obj)\n", "\n", " def render(self, mode='human', done=False):\n", " if done:\n", " sleep_time = 1\n", " else:\n", " sleep_time = self.delay\n", " x_coord = self.s % self.num_cols\n", " y_coord = self.s // self.num_cols\n", " x_coord = (x_coord + 0) * CELL_SIZE\n", " y_coord = (y_coord + 0) * CELL_SIZE\n", " self.agent_trans.set_translation(x_coord, y_coord)\n", " rend = self.viewer.render(\n", " return_rgb_array=(mode == 'rgb_array'))\n", " time.sleep(sleep_time)\n", " return rend\n", "\n", " def close(self):\n", " if self.viewer:\n", " self.viewer.close()\n", " self.viewer = None\n", "\n", "\n", "if __name__ == '__main__':\n", " env = GridWorldEnv(5, 6)\n", " for i in range(1):\n", " s = env.reset()\n", " env.render(mode='human', done=False)\n", "\n", " while True:\n", " action = np.random.choice(env.nA)\n", " res = env.step(action)\n", " print('Action ', env.s, action, ' -> ', res)\n", " env.render(mode='human', done=res[2])\n", " if res[2]:\n", " break\n", "\n", " env.close()\n", "```" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 563 }, "id": "A7V3AiWN-C2u", "outputId": "429a48e1-f3a5-4599-f93c-b48054a6f5ec" }, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "execution_count": 9 } ], "source": [ "Image(url='https://bit.ly/34MpM2p', width=600)" ] }, { "cell_type": "markdown", "metadata": { "id": "q8uvcoXy-C2v" }, "source": [ "## Q-러닝으로 그리드 월드 문제 풀기\n", "\n", "### Q-러닝 알고리즘 구현하기" ] }, { "cell_type": "markdown", "metadata": { "id": "qGrhQ644-C2v" }, "source": [ "```python\n", "# 스크립트: agent.py\n", "\n", "from collections import defaultdict\n", "import numpy as np\n", "\n", "\n", "class Agent(object):\n", " def __init__(\n", " self, env,\n", " learning_rate=0.01,\n", " discount_factor=0.9,\n", " epsilon_greedy=0.9,\n", " epsilon_min=0.1,\n", " epsilon_decay=0.95):\n", " self.env = env\n", " self.lr = learning_rate\n", " self.gamma = discount_factor\n", " self.epsilon = epsilon_greedy\n", " self.epsilon_min = epsilon_min\n", " self.epsilon_decay = epsilon_decay\n", "\n", " # q_table 정의\n", " self.q_table = defaultdict(lambda: np.zeros(self.env.nA))\n", "\n", " def choose_action(self, state):\n", " if np.random.uniform() < self.epsilon:\n", " action = np.random.choice(self.env.nA)\n", " else:\n", " q_vals = self.q_table[state]\n", " perm_actions = np.random.permutation(self.env.nA)\n", " q_vals = [q_vals[a] for a in perm_actions]\n", " perm_q_argmax = np.argmax(q_vals)\n", " action = perm_actions[perm_q_argmax]\n", " return action\n", "\n", " def _learn(self, transition):\n", " s, a, r, next_s, done = transition\n", " q_val = self.q_table[s][a]\n", " if done:\n", " q_target = r\n", " else:\n", " q_target = r + self.gamma*np.max(self.q_table[next_s])\n", "\n", " # q_table 업데이트\n", " self.q_table[s][a] += self.lr * (q_target - q_val)\n", "\n", " # epislon 조정\n", " self._adjust_epsilon()\n", "\n", " def _adjust_epsilon(self):\n", " if self.epsilon > self.epsilon_min:\n", " self.epsilon *= self.epsilon_decay\n", "\n", "```" ] }, { "cell_type": "markdown", "metadata": { "id": "jQkxBw-Q-C2v" }, "source": [ "```python\n", "# 스크립트: qlearning.py\n", "\n", "from gridworld_env import GridWorldEnv\n", "from agent import Agent\n", "from collections import namedtuple\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "\n", "np.random.seed(1)\n", "\n", "Transition = namedtuple(\n", " 'Transition', ('state', 'action', 'reward', 'next_state', 'done'))\n", "\n", "\n", "def run_qlearning(agent, env, num_episodes=50):\n", " history = []\n", " for episode in range(num_episodes):\n", " state = env.reset()\n", " env.render(mode='human')\n", " final_reward, n_moves = 0.0, 0\n", " while True:\n", " action = agent.choose_action(state)\n", " next_s, reward, done, _ = env.step(action)\n", " agent._learn(Transition(state, action, reward,\n", " next_s, done))\n", " env.render(mode='human', done=done)\n", " state = next_s\n", " n_moves += 1\n", " if done:\n", " break\n", " final_reward = reward\n", " history.append((n_moves, final_reward))\n", " print('에피소드 %d: 보상 %.1f #이동 %d'\n", " % (episode, final_reward, n_moves))\n", "\n", " return history\n", "\n", "\n", "def plot_learning_history(history):\n", " fig = plt.figure(1, figsize=(14, 10))\n", " ax = fig.add_subplot(2, 1, 1)\n", " episodes = np.arange(len(history))\n", " moves = np.array([h[0] for h in history])\n", " plt.plot(episodes, moves, lw=4,\n", " marker=\"o\", markersize=10)\n", " ax.tick_params(axis='both', which='major', labelsize=15)\n", " plt.xlabel('Episodes', size=20)\n", " plt.ylabel('# moves', size=20)\n", "\n", " ax = fig.add_subplot(2, 1, 2)\n", " rewards = np.array([h[1] for h in history])\n", " plt.step(episodes, rewards, lw=4)\n", " ax.tick_params(axis='both', which='major', labelsize=15)\n", " plt.xlabel('Episodes', size=20)\n", " plt.ylabel('Final rewards', size=20)\n", " plt.savefig('q-learning-history.png', dpi=300)\n", " plt.show()\n", "\n", "\n", "if __name__ == '__main__':\n", " env = GridWorldEnv(num_rows=5, num_cols=6)\n", " agent = Agent(env)\n", " history = run_qlearning(agent, env)\n", " env.close()\n", "\n", " plot_learning_history(history)\n", "```" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 593 }, "id": "n9Xtr4Kj-C2v", "outputId": "925c6f47-b3b2-4af5-c430-30008237bc56" }, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "execution_count": 10 } ], "source": [ "Image(url='https://bit.ly/2TBkxR3', width=800)" ] }, { "cell_type": "markdown", "metadata": { "id": "f9v2Yd6B-C2w" }, "source": [ "## 심층 Q-러닝" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 340 }, "id": "kpGLb-g3-C2w", "outputId": "80c0f418-0164-49f9-af65-7eac59e60d8e" }, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "execution_count": 11 } ], "source": [ "Image(url='https://bit.ly/3yUaFlv', width=800)" ] }, { "cell_type": "markdown", "metadata": { "id": "YnUKSG3z-C2w" }, "source": [ "### Q-러닝 알고리즘에 따라 DQN 모델 훈련하기\n", "\n", "#### 재생 메모리" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 289 }, "id": "FLp8-kCD-C2w", "outputId": "d06e246f-da4b-4835-e3e6-d2f66dbfba11" }, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "execution_count": 12 } ], "source": [ "Image(url='https://bit.ly/34CELfz', width=800)" ] }, { "cell_type": "markdown", "metadata": { "id": "_Z8jBIMt-C2w" }, "source": [ "#### 손실 계산을 위해 타깃 가치 결정하기" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 538 }, "id": "YsFDayA6-C2w", "outputId": "46afacfd-f9bb-457e-cb31-835b554098e3" }, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "execution_count": 13 } ], "source": [ "Image(url='https://bit.ly/34Fkwhb', width=800)" ] }, { "cell_type": "markdown", "metadata": { "id": "KUTwBa5Q-C2x" }, "source": [ "## 심층 Q-러닝 알고리즘 구현" ] }, { "cell_type": "markdown", "metadata": { "id": "quG_mPlr-C2x" }, "source": [ "```python\n", "# 스크립트: carpole/main.py\n", "\n", "import gym\n", "import numpy as np\n", "import tensorflow as tf\n", "import random\n", "import matplotlib.pyplot as plt\n", "from collections import namedtuple\n", "from collections import deque\n", "\n", "np.random.seed(1)\n", "tf.random.set_seed(1)\n", "\n", "Transition = namedtuple(\n", " 'Transition', ('state', 'action', 'reward',\n", " 'next_state', 'done'))\n", "\n", "\n", "class DQNAgent:\n", " def __init__(\n", " self, env, discount_factor=0.95,\n", " epsilon_greedy=1.0, epsilon_min=0.01,\n", " epsilon_decay=0.995, learning_rate=1e-3,\n", " max_memory_size=2000):\n", " self.enf = env\n", " self.state_size = env.observation_space.shape[0]\n", " self.action_size = env.action_space.n\n", "\n", " self.memory = deque(maxlen=max_memory_size)\n", "\n", " self.gamma = discount_factor\n", " self.epsilon = epsilon_greedy\n", " self.epsilon_min = epsilon_min\n", " self.epsilon_decay = epsilon_decay\n", " self.lr = learning_rate\n", " self._build_nn_model()\n", "\n", " def _build_nn_model(self, n_layers=3):\n", " self.model = tf.keras.Sequential()\n", "\n", " # 은닉층\n", " for n in range(n_layers - 1):\n", " self.model.add(tf.keras.layers.Dense(\n", " units=32, activation='relu'))\n", " self.model.add(tf.keras.layers.Dense(\n", " units=32, activation='relu'))\n", "\n", " # 마지막 층\n", " self.model.add(tf.keras.layers.Dense(\n", " units=self.action_size))\n", "\n", " # 모델 빌드 & 컴파일\n", " self.model.build(input_shape=(None, self.state_size))\n", " self.model.compile(\n", " loss='mse',\n", " optimizer=tf.keras.optimizers.Adam(lr=self.lr))\n", "\n", " def remember(self, transition):\n", " self.memory.append(transition)\n", "\n", " def choose_action(self, state):\n", " if np.random.rand() <= self.epsilon:\n", " return random.randrange(self.action_size)\n", " q_values = self.model.predict(state)[0]\n", " return np.argmax(q_values) # 행동 반환\n", "\n", " def _learn(self, batch_samples):\n", " batch_states, batch_targets = [], []\n", " for transition in batch_samples:\n", " s, a, r, next_s, done = transition\n", " if done:\n", " target = r\n", " else:\n", " target = (r +\n", " self.gamma * np.amax(\n", " self.model.predict(next_s)[0]\n", " )\n", " )\n", " target_all = self.model.predict(s)[0]\n", " target_all[a] = target\n", " batch_states.append(s.flatten())\n", " batch_targets.append(target_all)\n", " self._adjust_epsilon()\n", " return self.model.fit(x=np.array(batch_states),\n", " y=np.array(batch_targets),\n", " epochs=1,\n", " verbose=0)\n", "\n", " def _adjust_epsilon(self):\n", " if self.epsilon > self.epsilon_min:\n", " self.epsilon *= self.epsilon_decay\n", "\n", " def replay(self, batch_size):\n", " samples = random.sample(self.memory, batch_size)\n", " history = self._learn(samples)\n", " return history.history['loss'][0]\n", "\n", "\n", "def plot_learning_history(history):\n", " fig = plt.figure(1, figsize=(14, 5))\n", " ax = fig.add_subplot(1, 1, 1)\n", " episodes = np.arange(len(history[0])) + 1\n", " plt.plot(episodes, history[0], lw=4,\n", " marker='o', markersize=10)\n", " ax.tick_params(axis='both', which='major', labelsize=15)\n", " plt.xlabel('Episodes', size=20)\n", " plt.ylabel('# Total Rewards', size=20)\n", " plt.show()\n", "\n", "\n", "# 일반 설정\n", "EPISODES = 200\n", "batch_size = 32\n", "init_replay_memory_size = 500\n", "\n", "if __name__ == '__main__':\n", " env = gym.make('CartPole-v1')\n", " agent = DQNAgent(env)\n", " state = env.reset()\n", " state = np.reshape(state, [1, agent.state_size])\n", "\n", " # 재생 메모리 채우기\n", " for i in range(init_replay_memory_size):\n", " action = agent.choose_action(state)\n", " next_state, reward, done, _ = env.step(action)\n", " next_state = np.reshape(next_state, [1, agent.state_size])\n", " agent.remember(Transition(state, action, reward,\n", " next_state, done))\n", " if done:\n", " state = env.reset()\n", " state = np.reshape(state, [1, agent.state_size])\n", " else:\n", " state = next_state\n", "\n", " total_rewards, losses = [], []\n", " for e in range(EPISODES):\n", " state = env.reset()\n", " if e % 10 == 0:\n", " env.render()\n", " state = np.reshape(state, [1, agent.state_size])\n", " for i in range(500):\n", " action = agent.choose_action(state)\n", " next_state, reward, done, _ = env.step(action)\n", " next_state = np.reshape(next_state,\n", " [1, agent.state_size])\n", " agent.remember(Transition(state, action, reward,\n", " next_state, done))\n", " state = next_state\n", " if e % 10 == 0:\n", " env.render()\n", " if done:\n", " total_rewards.append(i)\n", " print('에피소드: %d/%d, 총 보상: %d'\n", " % (e, EPISODES, i))\n", " break\n", " loss = agent.replay(batch_size)\n", " losses.append(loss)\n", " plot_learning_history(total_rewards)\n", "```" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 436 }, "id": "-rTBIyHU-C2x", "outputId": "21d68d6d-5436-483b-d935-1c988db111c7" }, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "metadata": {}, "execution_count": 14 } ], "source": [ "Image(url='https://bit.ly/2TDralR', width=800)" ] } ], "metadata": { "accelerator": "GPU", "colab": { "name": "ch18.ipynb", "provenance": [] }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 0 }