{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> **前言:** 学习了 Sutton 的《强化学习(第二版)》第6章时序差分学习的控制部分,将笔记提炼如下。\n",
"\n",
"笔者阅读的是中文书籍,所提到的公式,笔者将给出其在英文书籍上的页码。英文书籍见 Sutton 个人主页:\n",
"[http://incompleteideas.net/book/the-book.html](http://incompleteideas.net/book/the-book.html)\n",
"\n",
"本次笔记内容:\n",
"\n",
"- 6.4 Sarsa:同轨策略下的时序差分控制\n",
"- 6.5 Q 学习:离轨策略下的时序差分控制\n",
"- 6.6 期望 Sarsa\n",
"- 6.7 最大化偏差与双学习\n",
"- 6.8 游戏、后位状态和其他特殊例子\n",
"- 6.9 本章小结\n",
"\n",
"在上一次笔记中,我们讨论了 [动态规划( Dynamic Programming, DP )、蒙特卡洛方法( Monte Carlo Method, MC )与时序差分学习( Temporal Difference Learning, TD )的异同](https://blog.csdn.net/weixin_42815609/article/details/104034967),以及时序差分学习中的预测算法。本次笔记中我们讨论其控制部分算法,其概述如下。\n",
"\n",
"- Sarsa 是同轨策略下的时序差分控制,;\n",
"- Q-learning 是离轨策略下的时序差分控制;\n",
"- 期望 Sarsa 的表现比上述二者表现都更好( van Hasselt, 2011),并且被称为“广义 Q 学习”;\n",
"- 然而,单纯的最大化操作带来了“最大化偏差”,因此我们提出“双学习”来消除“最大化偏差”;\n",
"- 此外,我们还引出了如“后位状态”的概念,没有具体讨论。\n",
"\n",
"书中展示了4段实例, Zhang 都有相应代码进行实现,分别介绍如下知识点:\n",
"- 有风的网格世界(Example 6.5: Windy Gridworld)介绍 Sarsa 的性能;\n",
"- 在悬崖边行走(Example 6.6: Cliff Walking)对比了基于$\\epsilon$-贪心方法的 Sarsa 与 Q-learning 的控制效果;\n",
"- 接着,在介绍 期望 Sarsa 时也使用了 Cliff Walking 实例对其效果进行展示;\n",
"- 最大化偏差实例(Example 6.7: Maximization Bias Example)用于表达:双 Q 学习优于 Q 学习。\n",
"\n",
"我对其代码进行了标注,请见[https://github.com/PiperLiu/Reinforcement-Learning-practice-zh/blob/master/practice/05-02-Temporal-Difference-Control.ipynb](https://github.com/PiperLiu/Reinforcement-Learning-practice-zh/blob/master/practice/05-02-Temporal-Difference-Control.ipynb)。**并且,我还由代码及实验结果,复述了我对于书上提出的算法对比特性的理解。**\n",
"\n",
"### Sarsa\n",
"\n",
"基于同轨策略,其更新公式为:\n",
"\n",
"$$Q(S_t,A_t) \\leftarrow Q(S_t,A_t) + \\alpha [ R_{t+1} + \\gamma Q( S_{t+1}, A_{t+1} ) - Q_(S_t , A_t ) ]$$\n",
"\n",
"可以看出与之前“时序差分预测”中的价值预测公式很像。\n",
"\n",
"如果 $S_{t+1}$ 是终止状态,那么$Q( S_{t+1}, A_{t+1}$则定义为0。这个公式用到了元组$(S_t,A_t,R_{t+1},S_{t+1},A_{t+1})$,因此该算法命名为 Sarsa 。\n",
"\n",
"Sarsa 想要以1的概率收敛到最优的策略和动作价值函数,需要满足2个条件:\n",
"1. 所有的“状态-动作”二元组都被无限多次访问到;\n",
"2. 贪心策略在极限情况下能够收敛(收敛过程可以通过令 $\\epsilon = 1/t$ 来实现)。\n",
"\n",
"算法框架中,每幕中的每步都要更新 Q ,不具体展示框架了,可见书第6章。\n",
"\n",
"### Q-learning\n",
"\n",
"更新公式为:\n",
"\n",
"$$Q(S_t,A_t) \\leftarrow Q(S_t, A_t) + \\alpha [R_{t+1} + \\gamma \\max_a Q(S_{t+1}, a) - Q(S_t, A_t)]$$\n",
"\n",
"只是变了个更新公式而已,连算法框图都没变,**为什么说 Q-learning 是离轨策略呢?**\n",
"- 书上的解释:In this case, the learned action-value function, Q, directly approximates q*, the optimal action-value function, independent of the policy being followed.\n",
"- 我的理解:在公式中用于更新的动作为 $\\argmax_a Q(S' , a)$ ,而下一步却未必是 $\\argmax_a Q(S' , a)$ ,因此为离轨策略。\n",
"\n",
"**我的理解方式没有错,并且,这个理解会辅助对于“最大化偏差”部分的学习。**\n",
"\n",
"### 期望 Sarsa\n",
"\n",
"$$\\begin{aligned}Q(S_t, A_t) & \\leftarrow Q(S_t, A_t) + \\alpha [R_{t+1} + \\gamma \\mathbb{E}[ Q(S_{t+1}, A_{t+1}) | S_{t+1}] - Q(S_t, A_t)]\\\\\n",
"& \\leftarrow Q(S_t, A_t) + \\alpha [R_{t+1} + \\gamma \\sum_a \\pi(a | S_{t+1}) Q(S_{t+1},a) - Q(S_t, A_t)]\\\\\\end{aligned}$$\n",
"\n",
"虽然计算上更为复杂,但它消除了 Sarsa 中因为随机选择 $A_{t+1}$ 而带来的方差。并且,对于 cliff walking 中的情况,期望 Sarsa 将保持 Sarsa 相对于 Q-learning 的“能学到迂回策略”的优势。\n",
"\n",
"### 最大化偏差与双学习\n",
"\n",
"#### 最大化偏差\n",
"\n",
"上述算法中,通常是基于 $\\epsilon-$贪心 来产生策略的,这其中都用到了“最大化操作”。\n",
"\n",
"**但是,如果在估计值的基础上进行最大化操作,就是隐式地对最大值进行估计,而这就会产生一个显著的正偏差。例子如下。**\n",
"\n",
"\n",
"\n",
"如图的MDP,A为起点,做动作 left ,则0收益;做动作 right ,之后获得的收益服从 正态分布 N(-0.1, 1)。\n",
"\n",
"我们知道最优策略应该是 100% 做动作 left 。\n",
"\n",
"但是,如果使用了最大化操作,动作 right 的估计值是不确定的,有些可能大于0,则估计值的最大值就产生了正数,就产生了正偏差。**就是最大化偏差。**\n",
"\n",
"#### 双学习\n",
"\n",
"双学习可以消除最大化偏差。双学习使用了2倍的内存,但计算量无需双倍。\n",
"\n",
"**以双Q学习为例:**\n",
"\n",
"使用 $Q1$ 来估计 $A^* = \\argmax_a Q_1(a)$ ,而 $Q_2$ 负责估计 $Q_2(A^*) = Q_2(\\argmax_a Q_1 (a))$ ,由于 $\\mathbb{E} [Q_2 (A^*)] = q(A^*)$ ,因此这个估计是无偏的。\n",
"\n",
"即更新公式换为:\n",
"\n",
"$$\\begin{aligned}\n",
"& With \\; 0.5 \\; probabilility: \\\\\n",
"& \\quad Q_1(S_t,A_t) \\leftarrow Q_1(S_t, A_t) + \\alpha [R_{t+1} + \\gamma Q_2(S_{t+1}, \\argmax_a Q_1(S_{t+1}, a)) - Q_1(S_t, A_t)] \\\\\n",
"& else: \\\\\n",
"& \\quad Q_2(S_t,A_t) \\leftarrow Q_2(S_t, A_t) + \\alpha [R_{t+1} + \\gamma Q_1(S_{t+1}, \\argmax_a Q_2(S_{t+1}, a)) - Q_2(S_t, A_t)] \\\\\n",
"\\end{aligned}$$\n",
"\n",
"### 后位状态\n",
"\n",
"后位状态我读了两遍,差不多明白了其意思:类似下棋的游戏中,可以由不同的状态,经过不同的动作,达到同一状态(棋盘摆放位置同),我们叫这个为后位状态。在这种情况中,后位状态显然更为重要。这很有趣,应该找些实例继续了解。\n",
"\n",
"Van Roy, Bertsekas, Lee, Tsitsiklis, 1997; Powell, 2011 对其进行了研究。\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example 6.5: Windy Gridworld\n",
"\n",
"\n",
"\n",
"Shown inset below is a standard gridworld, with start and goal states, but with one di↵erence: there is a crosswind running upward through the middle of the grid. The actions are the standard four—up, down, right, and left—but in the middle region the resultant next states are shifted upward by a “wind,” the strength of which varies from column to column. The strength of the wind is given below each column, in number of cells shifted upward. For example, if you are one cell to the right of the goal, then the action left takes you to the cell just above the goal. This is an undiscounted episodic task, with constant rewards of −1 until the goal state is reached."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"image/svg+xml": [
"\r\n",
"\r\n",
"\r\n",
"\r\n"
],
"text/plain": [
""
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Optimal policy is:\n",
"['R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R']\n",
"['R', 'R', 'R', 'U', 'R', 'R', 'R', 'R', 'R', 'D']\n",
"['U', 'R', 'R', 'R', 'R', 'R', 'R', 'L', 'L', 'D']\n",
"['R', 'U', 'R', 'R', 'R', 'R', 'R', 'G', 'R', 'D']\n",
"['R', 'R', 'R', 'R', 'R', 'R', 'U', 'D', 'L', 'L']\n",
"['R', 'R', 'R', 'R', 'R', 'U', 'U', 'R', 'R', 'D']\n",
"['R', 'R', 'R', 'R', 'U', 'U', 'U', 'U', 'U', 'U']\n",
"Wind strength for each column:\n",
"['0', '0', '0', '1', '1', '1', '2', '2', '1', '0']\n"
]
}
],
"source": [
"#######################################################################\n",
"# Copyright (C) #\n",
"# 2016-2018 Shangtong Zhang(zhangshangtong.cpp@gmail.com) #\n",
"# 2016 Kenta Shimada(hyperkentakun@gmail.com) #\n",
"# Permission given to modify the code as long as you keep this #\n",
"# declaration at the top #\n",
"#######################################################################\n",
"\n",
"import numpy as np\n",
"import matplotlib\n",
"%matplotlib inline\n",
"import matplotlib.pyplot as plt\n",
"\n",
"# world height\n",
"WORLD_HEIGHT = 7\n",
"\n",
"# world width\n",
"WORLD_WIDTH = 10\n",
"\n",
"# wind strength for each column\n",
"WIND = [0, 0, 0, 1, 1, 1, 2, 2, 1, 0]\n",
"\n",
"# possible actions\n",
"ACTION_UP = 0\n",
"ACTION_DOWN = 1\n",
"ACTION_LEFT = 2\n",
"ACTION_RIGHT = 3\n",
"\n",
"# probability for exploration\n",
"EPSILON = 0.1\n",
"\n",
"# Sarsa step size\n",
"ALPHA = 0.5\n",
"\n",
"# reward for each step\n",
"REWARD = -1.0\n",
"\n",
"START = [3, 0]\n",
"GOAL = [3, 7]\n",
"ACTIONS = [ACTION_UP, ACTION_DOWN, ACTION_LEFT, ACTION_RIGHT]\n",
"\n",
"def step(state, action):\n",
" i, j = state\n",
" if action == ACTION_UP:\n",
" return [max(i - 1 - WIND[j], 0), j]\n",
" elif action == ACTION_DOWN:\n",
" return [max(min(i + 1 - WIND[j], WORLD_HEIGHT - 1), 0), j]\n",
" elif action == ACTION_LEFT:\n",
" return [max(i - WIND[j], 0), max(j - 1, 0)]\n",
" elif action == ACTION_RIGHT:\n",
" return [max(i - WIND[j], 0), min(j + 1, WORLD_WIDTH - 1)]\n",
" else:\n",
" assert False\n",
"\n",
"# play for an episode\n",
"def episode(q_value):\n",
" # track the total time steps in this episode\n",
" time = 0\n",
"\n",
" # initialize state\n",
" state = START\n",
"\n",
" # choose an action based on epsilon-greedy algorithm\n",
" '''\n",
" 使用 np.random.binomial(1, EPSILON) 二项分布\n",
" 来生产随机数据,表示随机性,我认为要比 random < 值 要好\n",
" '''\n",
" if np.random.binomial(1, EPSILON) == 1:\n",
" action = np.random.choice(ACTIONS)\n",
" else:\n",
" values_ = q_value[state[0], state[1], :]\n",
" '''\n",
" choice() 里 [] 的生成,使用了迭代器以及 if\n",
" '''\n",
" action = np.random.choice([action_ for action_, value_ in enumerate(values_) if value_ == np.max(values_)])\n",
"\n",
" # keep going until get to the goal state\n",
" while state != GOAL:\n",
" next_state = step(state, action)\n",
" if np.random.binomial(1, EPSILON) == 1:\n",
" next_action = np.random.choice(ACTIONS)\n",
" else:\n",
" values_ = q_value[next_state[0], next_state[1], :]\n",
" next_action = np.random.choice([action_ for action_, value_ in enumerate(values_) if value_ == np.max(values_)])\n",
"\n",
" # Sarsa update\n",
" q_value[state[0], state[1], action] += \\\n",
" ALPHA * (REWARD + q_value[next_state[0], next_state[1], next_action] -\n",
" q_value[state[0], state[1], action])\n",
" state = next_state\n",
" action = next_action\n",
" time += 1\n",
" return time\n",
"\n",
"def figure_6_3():\n",
" q_value = np.zeros((WORLD_HEIGHT, WORLD_WIDTH, 4))\n",
" episode_limit = 500\n",
"\n",
" steps = []\n",
" ep = 0\n",
" while ep < episode_limit:\n",
" steps.append(episode(q_value))\n",
" # time = episode(q_value)\n",
" # episodes.extend([ep] * time)\n",
" ep += 1\n",
"\n",
" steps = np.add.accumulate(steps)\n",
"\n",
" plt.plot(steps, np.arange(1, len(steps) + 1))\n",
" plt.xlabel('Time steps')\n",
" plt.ylabel('Episodes')\n",
"\n",
" plt.show()\n",
"\n",
" # display the optimal policy\n",
" optimal_policy = []\n",
" for i in range(0, WORLD_HEIGHT):\n",
" optimal_policy.append([])\n",
" for j in range(0, WORLD_WIDTH):\n",
" if [i, j] == GOAL:\n",
" optimal_policy[-1].append('G')\n",
" continue\n",
" bestAction = np.argmax(q_value[i, j, :])\n",
" if bestAction == ACTION_UP:\n",
" optimal_policy[-1].append('U')\n",
" elif bestAction == ACTION_DOWN:\n",
" optimal_policy[-1].append('D')\n",
" elif bestAction == ACTION_LEFT:\n",
" optimal_policy[-1].append('L')\n",
" elif bestAction == ACTION_RIGHT:\n",
" optimal_policy[-1].append('R')\n",
" print('Optimal policy is:')\n",
" for row in optimal_policy:\n",
" print(row)\n",
" print('Wind strength for each column:\\n{}'.format([str(w) for w in WIND]))\n",
"\n",
"\n",
"figure_6_3()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example 6.6: Cliff Walking\n",
"\n",
"This gridworld example compares Sarsa and Q-learning, highlighting the difference between on-policy (Sarsa) and o↵-policy (Q-learning) methods.\n",
"\n",
"Consider the gridworld shown to the right. This is a standard undiscounted, episodic task, with start and goal states, and the usual actions causing movement up, down, right, and left. Reward is −1 on all\n",
"transitions except those into the region marked “The Cliff.” Stepping\n",
"into this region incurs a reward of −100 and sends the agent instantly\n",
"back to the start.\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"#######################################################################\n",
"# Copyright (C) #\n",
"# 2016-2018 Shangtong Zhang(zhangshangtong.cpp@gmail.com) #\n",
"# 2016 Kenta Shimada(hyperkentakun@gmail.com) #\n",
"# Permission given to modify the code as long as you keep this #\n",
"# declaration at the top #\n",
"#######################################################################\n",
"\n",
"import numpy as np\n",
"import matplotlib\n",
"%matplotlib inline\n",
"import matplotlib.pyplot as plt\n",
"from tqdm import tqdm\n",
"\n",
"# world height\n",
"WORLD_HEIGHT = 4\n",
"\n",
"# world width\n",
"WORLD_WIDTH = 12\n",
"\n",
"# probability for exploration\n",
"EPSILON = 0.1\n",
"\n",
"# step size\n",
"ALPHA = 0.5\n",
"\n",
"# gamma for Q-Learning and Expected Sarsa\n",
"GAMMA = 1\n",
"\n",
"# all possible actions\n",
"ACTION_UP = 0\n",
"ACTION_DOWN = 1\n",
"ACTION_LEFT = 2\n",
"ACTION_RIGHT = 3\n",
"ACTIONS = [ACTION_UP, ACTION_DOWN, ACTION_LEFT, ACTION_RIGHT]\n",
"\n",
"# initial state action pair values\n",
"START = [3, 0]\n",
"GOAL = [3, 11]\n",
"\n",
"def step(state, action):\n",
" i, j = state\n",
" if action == ACTION_UP:\n",
" next_state = [max(i - 1, 0), j]\n",
" elif action == ACTION_LEFT:\n",
" next_state = [i, max(j - 1, 0)]\n",
" elif action == ACTION_RIGHT:\n",
" next_state = [i, min(j + 1, WORLD_WIDTH - 1)]\n",
" elif action == ACTION_DOWN:\n",
" next_state = [min(i + 1, WORLD_HEIGHT - 1), j]\n",
" else:\n",
" assert False\n",
"\n",
" reward = -1\n",
" if (action == ACTION_DOWN and i == 2 and 1 <= j <= 10) or (\n",
" action == ACTION_RIGHT and state == START):\n",
" reward = -100\n",
" next_state = START\n",
"\n",
" return next_state, reward\n",
"\n",
"# reward for each action in each state\n",
"# actionRewards = np.zeros((WORLD_HEIGHT, WORLD_WIDTH, 4))\n",
"# actionRewards[:, :, :] = -1.0\n",
"# actionRewards[2, 1:11, ACTION_DOWN] = -100.0\n",
"# actionRewards[3, 0, ACTION_RIGHT] = -100.0\n",
"\n",
"# set up destinations for each action in each state\n",
"# actionDestination = []\n",
"# for i in range(0, WORLD_HEIGHT):\n",
"# actionDestination.append([])\n",
"# for j in range(0, WORLD_WIDTH):\n",
"# destinaion = dict()\n",
"# destinaion[ACTION_UP] = [max(i - 1, 0), j]\n",
"# destinaion[ACTION_LEFT] = [i, max(j - 1, 0)]\n",
"# destinaion[ACTION_RIGHT] = [i, min(j + 1, WORLD_WIDTH - 1)]\n",
"# if i == 2 and 1 <= j <= 10:\n",
"# destinaion[ACTION_DOWN] = START\n",
"# else:\n",
"# destinaion[ACTION_DOWN] = [min(i + 1, WORLD_HEIGHT - 1), j]\n",
"# actionDestination[-1].append(destinaion)\n",
"# actionDestination[3][0][ACTION_RIGHT] = START\n",
"\n",
"# choose an action based on epsilon greedy algorithm\n",
"def choose_action(state, q_value):\n",
" if np.random.binomial(1, EPSILON) == 1:\n",
" return np.random.choice(ACTIONS)\n",
" else:\n",
" values_ = q_value[state[0], state[1], :]\n",
" return np.random.choice([action_ for action_, value_ in enumerate(values_) if value_ == np.max(values_)])\n",
"\n",
"# an episode with Sarsa\n",
"# @q_value: values for state action pair, will be updated\n",
"# @expected: if True, will use expected Sarsa algorithm\n",
"# @step_size: step size for updating\n",
"# @return: total rewards within this episode\n",
"def sarsa(q_value, expected=False, step_size=ALPHA):\n",
" state = START\n",
" action = choose_action(state, q_value)\n",
" rewards = 0.0\n",
" while state != GOAL:\n",
" next_state, reward = step(state, action)\n",
" next_action = choose_action(next_state, q_value)\n",
" rewards += reward\n",
" if not expected:\n",
" target = q_value[next_state[0], next_state[1], next_action]\n",
" else:\n",
" # calculate the expected value of new state\n",
" target = 0.0\n",
" q_next = q_value[next_state[0], next_state[1], :]\n",
" best_actions = np.argwhere(q_next == np.max(q_next))\n",
" for action_ in ACTIONS:\n",
" if action_ in best_actions:\n",
" target += ((1.0 - EPSILON) / len(best_actions) + EPSILON / len(ACTIONS)) * q_value[next_state[0], next_state[1], action_]\n",
" else:\n",
" target += EPSILON / len(ACTIONS) * q_value[next_state[0], next_state[1], action_]\n",
" target *= GAMMA\n",
" q_value[state[0], state[1], action] += step_size * (\n",
" reward + target - q_value[state[0], state[1], action])\n",
" state = next_state\n",
" action = next_action\n",
" return rewards\n",
"\n",
"# an episode with Q-Learning\n",
"# @q_value: values for state action pair, will be updated\n",
"# @step_size: step size for updating\n",
"# @return: total rewards within this episode\n",
"def q_learning(q_value, step_size=ALPHA):\n",
" state = START\n",
" rewards = 0.0\n",
" while state != GOAL:\n",
" action = choose_action(state, q_value)\n",
" next_state, reward = step(state, action)\n",
" rewards += reward\n",
" # Q-Learning update\n",
" q_value[state[0], state[1], action] += step_size * (\n",
" reward + GAMMA * np.max(q_value[next_state[0], next_state[1], :]) -\n",
" q_value[state[0], state[1], action])\n",
" state = next_state\n",
" return rewards\n",
"\n",
"# print optimal policy\n",
"def print_optimal_policy(q_value):\n",
" optimal_policy = []\n",
" for i in range(0, WORLD_HEIGHT):\n",
" optimal_policy.append([])\n",
" for j in range(0, WORLD_WIDTH):\n",
" if [i, j] == GOAL:\n",
" optimal_policy[-1].append('G')\n",
" continue\n",
" bestAction = np.argmax(q_value[i, j, :])\n",
" if bestAction == ACTION_UP:\n",
" optimal_policy[-1].append('U')\n",
" elif bestAction == ACTION_DOWN:\n",
" optimal_policy[-1].append('D')\n",
" elif bestAction == ACTION_LEFT:\n",
" optimal_policy[-1].append('L')\n",
" elif bestAction == ACTION_RIGHT:\n",
" optimal_policy[-1].append('R')\n",
" for row in optimal_policy:\n",
" print(row)\n",
"\n",
"# Use multiple runs instead of a single run and a sliding window\n",
"# With a single run I failed to present a smooth curve\n",
"# However the optimal policy converges well with a single run\n",
"# Sarsa converges to the safe path, while Q-Learning converges to the optimal path\n",
"def figure_6_4():\n",
" # episodes of each run\n",
" episodes = 500\n",
"\n",
" # perform 40 independent runs\n",
" runs = 50\n",
" '''\n",
" runs 求平均之用\n",
" 防止一次计算带来的误差\n",
" 多次计算\n",
" '''\n",
"\n",
" rewards_sarsa = np.zeros(episodes)\n",
" rewards_q_learning = np.zeros(episodes)\n",
" for r in tqdm(range(runs)):\n",
" q_sarsa = np.zeros((WORLD_HEIGHT, WORLD_WIDTH, 4))\n",
" q_q_learning = np.copy(q_sarsa)\n",
" for i in range(0, episodes):\n",
" # cut off the value by -100 to draw the figure more elegantly\n",
" # rewards_sarsa[i] += max(sarsa(q_sarsa), -100)\n",
" # rewards_q_learning[i] += max(q_learning(q_q_learning), -100)\n",
" rewards_sarsa[i] += sarsa(q_sarsa)\n",
" rewards_q_learning[i] += q_learning(q_q_learning)\n",
"\n",
" # averaging over independt runs\n",
" rewards_sarsa /= runs\n",
" rewards_q_learning /= runs\n",
"\n",
" # draw reward curves\n",
" plt.plot(rewards_sarsa, label='Sarsa')\n",
" plt.plot(rewards_q_learning, label='Q-Learning')\n",
" plt.xlabel('Episodes')\n",
" plt.ylabel('Sum of rewards during episode')\n",
" plt.ylim([-100, 0])\n",
" plt.legend()\n",
"\n",
" plt.show()\n",
"\n",
" # display optimal policy\n",
" print('Sarsa Optimal Policy:')\n",
" print_optimal_policy(q_sarsa)\n",
" print('Q-Learning Optimal Policy:')\n",
" print_optimal_policy(q_q_learning)\n",
"\n",
"# Due to limited capacity of calculation of my machine, I can't complete this experiment\n",
"# with 100,000 episodes and 50,000 runs to get the fully averaged performance\n",
"# However even I only play for 1,000 episodes and 10 runs, the curves looks still good.\n",
"def figure_6_6():\n",
" step_sizes = np.arange(0.1, 1.1, 0.1)\n",
" episodes = 1000\n",
" runs = 10\n",
"\n",
" ASY_SARSA = 0\n",
" ASY_EXPECTED_SARSA = 1\n",
" ASY_QLEARNING = 2\n",
" INT_SARSA = 3\n",
" INT_EXPECTED_SARSA = 4\n",
" INT_QLEARNING = 5\n",
" methods = range(0, 6)\n",
"\n",
" performace = np.zeros((6, len(step_sizes)))\n",
" for run in range(runs):\n",
" for ind, step_size in tqdm(list(zip(range(0, len(step_sizes)), step_sizes))):\n",
" q_sarsa = np.zeros((WORLD_HEIGHT, WORLD_WIDTH, 4))\n",
" q_expected_sarsa = np.copy(q_sarsa)\n",
" q_q_learning = np.copy(q_sarsa)\n",
" '''\n",
" 注意作者的命名规范,所有的期望都是\n",
" q_ + 方法名,即便出现 q_q_learning\n",
" '''\n",
" for ep in range(episodes):\n",
" sarsa_reward = sarsa(q_sarsa, expected=False, step_size=step_size)\n",
" expected_sarsa_reward = sarsa(q_expected_sarsa, expected=True, step_size=step_size)\n",
" q_learning_reward = q_learning(q_q_learning, step_size=step_size)\n",
" performace[ASY_SARSA, ind] += sarsa_reward\n",
" performace[ASY_EXPECTED_SARSA, ind] += expected_sarsa_reward\n",
" performace[ASY_QLEARNING, ind] += q_learning_reward\n",
" '''\n",
" 所谓 “长期性能” 与 “短期性能” 的区别就是\n",
" 经历的幕数的多少,短期性能的幕数限制在了 100 以内\n",
" '''\n",
"\n",
" if ep < 100:\n",
" performace[INT_SARSA, ind] += sarsa_reward\n",
" performace[INT_EXPECTED_SARSA, ind] += expected_sarsa_reward\n",
" performace[INT_QLEARNING, ind] += q_learning_reward\n",
"\n",
" performace[:3, :] /= episodes * runs\n",
" performace[3:, :] /= 100 * runs\n",
" labels = ['Asymptotic Sarsa', 'Asymptotic Expected Sarsa', 'Asymptotic Q-Learning',\n",
" 'Interim Sarsa', 'Interim Expected Sarsa', 'Interim Q-Learning']\n",
"\n",
" for method, label in zip(methods, labels):\n",
" plt.plot(step_sizes, performace[method, :], label=label)\n",
" plt.xlabel('alpha')\n",
" plt.ylabel('reward per episode')\n",
" plt.legend()\n",
"\n",
" plt.show()\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"100%|██████████| 50/50 [00:30<00:00, 1.64it/s]\n"
]
},
{
"data": {
"image/png": "\n",
"image/svg+xml": [
"\r\n",
"\r\n",
"\r\n",
"\r\n"
],
"text/plain": [
""
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Sarsa Optimal Policy:\n",
"['D', 'L', 'R', 'R', 'R', 'R', 'R', 'D', 'R', 'R', 'R', 'D']\n",
"['R', 'R', 'U', 'U', 'U', 'R', 'R', 'R', 'U', 'U', 'R', 'D']\n",
"['U', 'U', 'U', 'U', 'U', 'L', 'U', 'U', 'U', 'U', 'R', 'D']\n",
"['U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'G']\n",
"Q-Learning Optimal Policy:\n",
"['L', 'R', 'R', 'R', 'D', 'D', 'R', 'R', 'R', 'R', 'D', 'D']\n",
"['D', 'D', 'D', 'D', 'R', 'R', 'R', 'R', 'D', 'D', 'R', 'D']\n",
"['R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'R', 'D']\n",
"['U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'U', 'G']\n"
]
}
],
"source": [
"figure_6_4()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"由上图可以看出, Q-learning 学习了最优策略,但是智能体偶尔会调入悬崖(因为是贪心探索策略)。\n",
"\n",
"虽然 Q-learning 学到了最优策略的价值,但是在线性性能却比学到迂回策略的 Sarsa 差。但如果 $\\epsilon$逐渐减小,那么两种方法都会渐进地收敛到最优策略。"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"100%|██████████| 10/10 [00:30<00:00, 3.28s/it]\n",
"100%|██████████| 10/10 [00:25<00:00, 2.80s/it]\n",
"100%|██████████| 10/10 [00:25<00:00, 2.86s/it]\n",
"100%|██████████| 10/10 [00:24<00:00, 2.73s/it]\n",
"100%|██████████| 10/10 [00:25<00:00, 2.93s/it]\n",
"100%|██████████| 10/10 [00:24<00:00, 2.71s/it]\n",
"100%|██████████| 10/10 [00:25<00:00, 2.80s/it]\n",
"100%|██████████| 10/10 [00:26<00:00, 3.05s/it]\n",
"100%|██████████| 10/10 [00:24<00:00, 2.72s/it]\n",
"100%|██████████| 10/10 [00:25<00:00, 2.90s/it]\n"
]
},
{
"data": {
"image/png": "\n",
"image/svg+xml": [
"\r\n",
"\r\n",
"\r\n",
"\r\n"
],
"text/plain": [
""
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"figure_6_6()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example 6.7: Maximization Bias Example\n",
"\n",
"The small MDP shown inset in Figure provides a simple example of how maximization bias can harm the performance of TD control algorithms. The MDP has two non-terminal states A and B. Episodes always start in A with a choice between two actions, left and right. The right action ransitions immediately to the terminal state with a reward and return of zero. The left action transitions to B, also with a reward of zero, from which here are many possible actions all of which cause immediate termination with a reward drawn from a normal distribution with mean −0.1 and variance 1.0.\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"#######################################################################\n",
"# Copyright (C) #\n",
"# 2016-2018 Shangtong Zhang(zhangshangtong.cpp@gmail.com) #\n",
"# 2016 Kenta Shimada(hyperkentakun@gmail.com) #\n",
"# Permission given to modify the code as long as you keep this #\n",
"# declaration at the top #\n",
"#######################################################################\n",
"\n",
"import numpy as np\n",
"import matplotlib\n",
"%matplotlib inline\n",
"import matplotlib.pyplot as plt\n",
"from tqdm import tqdm\n",
"import copy\n",
"\n",
"# state A\n",
"STATE_A = 0\n",
"\n",
"# state B\n",
"STATE_B = 1\n",
"\n",
"# use one terminal state\n",
"STATE_TERMINAL = 2\n",
"\n",
"# starts from state A\n",
"STATE_START = STATE_A\n",
"\n",
"# possible actions in A\n",
"ACTION_A_RIGHT = 0\n",
"ACTION_A_LEFT = 1\n",
"\n",
"# probability for exploration\n",
"EPSILON = 0.1\n",
"\n",
"# step size\n",
"ALPHA = 0.1\n",
"\n",
"# discount for max value\n",
"GAMMA = 1.0\n",
"\n",
"# possible actions in B, maybe 10 actions\n",
"ACTIONS_B = range(0, 10)\n",
"\n",
"# all possible actions\n",
"STATE_ACTIONS = [[ACTION_A_RIGHT, ACTION_A_LEFT], ACTIONS_B]\n",
"\n",
"# state action pair values, if a state is a terminal state, then the value is always 0\n",
"INITIAL_Q = [np.zeros(2), np.zeros(len(ACTIONS_B)), np.zeros(1)]\n",
"\n",
"# set up destination for each state and each action\n",
"TRANSITION = [[STATE_TERMINAL, STATE_B], [STATE_TERMINAL] * len(ACTIONS_B)]\n",
"\n",
"# choose an action based on epsilon greedy algorithm\n",
"def choose_action(state, q_value):\n",
" if np.random.binomial(1, EPSILON) == 1:\n",
" return np.random.choice(STATE_ACTIONS[state])\n",
" else:\n",
" values_ = q_value[state]\n",
" return np.random.choice([action_ for action_, value_ in enumerate(values_) if value_ == np.max(values_)])\n",
"\n",
"# take @action in @state, return the reward\n",
"def take_action(state, action):\n",
" if state == STATE_A:\n",
" return 0\n",
" return np.random.normal(-0.1, 1)\n",
"\n",
"# if there are two state action pair value array, use double Q-Learning\n",
"# otherwise use normal Q-Learning\n",
"def q_learning(q1, q2=None):\n",
" state = STATE_START\n",
" # track the # of action left in state A\n",
" left_count = 0\n",
" while state != STATE_TERMINAL:\n",
" if q2 is None:\n",
" action = choose_action(state, q1)\n",
" else:\n",
" # derive a action form Q1 and Q2\n",
" '''\n",
" zip() 还可用在 [ for ] 中,直接对元素运算 来生成 list()\n",
" '''\n",
" action = choose_action(state, [item1 + item2 for item1, item2 in zip(q1, q2)])\n",
" if state == STATE_A and action == ACTION_A_LEFT:\n",
" left_count += 1\n",
" reward = take_action(state, action)\n",
" next_state = TRANSITION[state][action]\n",
" if q2 is None:\n",
" active_q = q1\n",
" target = np.max(active_q[next_state])\n",
" else:\n",
" if np.random.binomial(1, 0.5) == 1:\n",
" active_q = q1\n",
" target_q = q2\n",
" else:\n",
" active_q = q2\n",
" target_q = q1\n",
" best_action = np.random.choice([action_ for action_, value_ in enumerate(active_q[next_state]) if value_ == np.max(active_q[next_state])])\n",
" target = target_q[next_state][best_action]\n",
"\n",
" # Q-Learning update\n",
" active_q[state][action] += ALPHA * (\n",
" reward + GAMMA * target - active_q[state][action])\n",
" state = next_state\n",
" return left_count\n",
"\n",
"# Figure 6.7, 1,000 runs may be enough, # of actions in state B will also affect the curves\n",
"def figure_6_7():\n",
" # each independent run has 300 episodes\n",
" episodes = 300\n",
" runs = 1000\n",
" left_counts_q = np.zeros((runs, episodes))\n",
" left_counts_double_q = np.zeros((runs, episodes))\n",
" for run in tqdm(range(runs)):\n",
" q = copy.deepcopy(INITIAL_Q)\n",
" q1 = copy.deepcopy(INITIAL_Q)\n",
" q2 = copy.deepcopy(INITIAL_Q)\n",
" for ep in range(0, episodes):\n",
" left_counts_q[run, ep] = q_learning(q)\n",
" left_counts_double_q[run, ep] = q_learning(q1, q2)\n",
" left_counts_q = left_counts_q.mean(axis=0)\n",
" left_counts_double_q = left_counts_double_q.mean(axis=0)\n",
"\n",
" plt.plot(left_counts_q, label='Q-Learning')\n",
" plt.plot(left_counts_double_q, label='Double Q-Learning')\n",
" plt.plot(np.ones(episodes) * 0.05, label='Optimal')\n",
" plt.xlabel('episodes')\n",
" plt.ylabel('% left actions from A')\n",
" plt.legend()\n",
"\n",
" plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"100%|██████████| 1000/1000 [00:24<00:00, 40.57it/s]\n"
]
},
{
"data": {
"image/png": "\n",
"image/svg+xml": [
"\r\n",
"\r\n",
"\r\n",
"\r\n"
],
"text/plain": [
""
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"figure_6_7()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"实验结果证明,相比Q学习,双Q学习是有效的。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"file_extension": ".py",
"kernelspec": {
"display_name": "Python 3.7.0 64-bit ('base': conda)",
"language": "python",
"name": "python37064bitbasecondaf1f4ce8bd9ee468caf98567667ef0765"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.0"
},
"mimetype": "text/x-python",
"name": "python",
"npconvert_exporter": "python",
"pygments_lexer": "ipython3",
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": false,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
},
"varInspector": {
"cols": {
"lenName": 16,
"lenType": 16,
"lenVar": 40
},
"kernels_config": {
"python": {
"delete_cmd_postfix": "",
"delete_cmd_prefix": "del ",
"library": "var_list.py",
"varRefreshCmd": "print(var_dic_list())"
},
"r": {
"delete_cmd_postfix": ") ",
"delete_cmd_prefix": "rm(",
"library": "var_list.r",
"varRefreshCmd": "cat(var_dic_list()) "
}
},
"types_to_exclude": [
"module",
"function",
"builtin_function_or_method",
"instance",
"_Feature"
],
"window_display": false
},
"version": 3
},
"nbformat": 4,
"nbformat_minor": 2
}