Reinforcement Learning on OpenAI Gym

Reinforcement Learning: OpenAI Gym

強化學習(Reinforcement Learning,以下簡稱 RL)有別與一般監督式學習(Supervised Learning)與要 end-to-end 的資料訓練。

Reinforcement Learning 介紹

Agent 在某個 state 做了一個 action,而移到下一個 state,環境此時給了 agent 一個 reward,而 agent 則根據這個 reward 調整他的腳步。

聽起來有點抽象?比喻來說:

你是 agent,而環境是家(包括:爸爸、媽媽等),action 是你的行為舉止,state 是你所在的地點,reward 是媽媽給你的獎勵(可能是負的,即:負回饋)。

  • 若你(agent)在客廳(state),打破了一個杯子(action),而媽媽斥責你(reward),你就會透過這次的不小心,去避免下次在客廳時不要再打破杯子。

  • 相反地,若你(agent)在廚房(state),洗了碗筷(action),而媽媽說你很棒(reward),你就會知道,洗碗可以幫忙分擔家事並得到正回饋,所以以後一樣到了廚房看到有碗筷時,就會增加想洗碗的機率。


由上可知,RL 是 ML 家族中的一員,而我個人覺得他更有 AI 的感覺,他是一種目標導向(goal-oriented)的學習方法,透過 agent 與環境間的互動獲得更種獎勵或懲罰,學會如何做「最好的」決策(policy)。

整個決策決定過程可由以下 5 個要素組成:

  1. Agent:與 environment 互動(action)。
  2. Action:agent 藉由自己的 policy 所做的動作。
  3. Environment:agent 的行動範圍,根據 agent 的 action 給予不同的 reward。
  4. State:agent 在特定時間處的狀態。
  5. Reward:environment 根據 agent 的 action 給的獎勵或懲罰。

Algorithm Implementation

我們使用 OpenAI Gym 當中的 CartPole-v0 來實作 RL 演算法,OpenAL Gym 提供了各種不同的 environment 來做 RL 的訓練。

為了避免跨平台上安裝的問題,我使用 Google Colab 的雲端平台來 demo

CartPole source code

Random Action

先從最簡單的例子來了解相關變數,無論 environment 如何,隨機挑選一個可行的 action,即:隨機決定左移或右移。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
"""
Random Action
"""

env = gym.make('CartPole-v0')
env.reset()

# try 30 episodes
for i in range(30):
obs = env.reset()
rewards = 0

# try 50 actions for each episode
for t in range(50):
action = env.action_space.sample() # randomly choose 0 (left) or 1 (right)
obs, reward, done, info = env.step(action)
rewards += reward

if done:
print('episode {}: Episode finished after {} timesteps, total rewards {}'.format(i, t + 1, rewards))
break

ipythondisplay.clear_output(wait=True)
env.close()

因為 agent 在這個 random choosing action 時,並沒有任何學習,所以 reward 普遍不高。

Hand-Made Policy

再來我們引進一個簡單的 policy:

  • 如果柱子向左傾(角度 $< 0$),則小車左移以維持平衡。
  • 如果柱子向左傾(角度 $\ge 0$),則小車右移以維持平衡。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
"""
Hand-Made Policy
"""

# Define policy
def chooseAction(obs):
pos, v, ang, rot = obs
return 0 if ang < 0 else 1 # 0: left, 1: right

env = gym.make('CartPole-v0')
env.reset()

for i in range(50):
obs = env.reset()
rewards = 0

for t in range(250):
action = chooseAction(obs)
obs, reward, done, info = env.step(action)
rewards += reward

if done:
print('episode {}: Episode finished after {} timesteps, total rewards {}'.format(i, t + 1, rewards))
break

env.close()

Q-Learning with Q Table

$$Q ^ { * } ( s , a ) = \sum _ { s ^ { \prime } } T \left( s , a , s ^ { \prime } \right) \left( R \left( s , a , s ^ { \prime } \right) + \gamma \max _ { a ^ { \prime } } Q ^ { * } \left( s ^ { \prime } , a ^ { \prime } \right) \right)$$

其中:

  • $T$:transition function,$0 \le T \le 1$ 表發生機率
  • $R$:reward function
  • $\gamma$:discount factor,通常會是一個 $< 1$ 的值,可能是 $0.9$、$0.8$ 之類,代表的是對未來 reward 的重視程度。

所以我們的目的是:

$$\arg\max_a Q^*(s, a).$$

agent 透過一次次跟 environment 互動(a)獲得的 reward 來學習 Q function:

$$
Q(s_t, a_t) = (1 - \alpha) \cdot Q(s_{t - 1}, a_{t - 1}) + \alpha \cdot (r_t + \gamma \max Q(s_{t + 1}, a))
$$

其中,

  • $t$:不同的時間點
  • $\alpha$:learning rate

pseudo code 如下:

1
2
3
4
5
6
7
8
9
10
Initialize Q(s, a) randomly
for each episode
Initialize s

for each step of episode
Choose a from s using policy derived from Q (e.g., ε-greedy)
Take action a, observe r, s'
Q(s, a) ← Q(s, a) + α[r + γ max_a' Q(s', a') - Q(s, a)]
s ← s'
until s is terminal

$\epsilon$-greedy 是一種在 exploration 和 exploitation 間取得平衡的方法。

  • exploration 嘗試不同 action
  • exploitation 沿用現有 policy

方法很簡單:

  • $\epsilon$ 時間,agent 嘗試新 action
  • $(1 - \epsilon)$ 時間,agent 沿用現有 policy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""
Q-Learning
"""

import math

def chooseAction(state, QTable, action_space, epsilon):
if np.random.random_sample() < epsilon: # P(randomly choose action) = ε
return action_space.sample()
return np.argmax(QTable[state]) # choose the action that maximize QTable[state]

def getState(obs, n_buckets, bds):
state = [0] * len(obs) # state = [0, 0, 0, 0]

for i, s in enumerate(obs): # each feature has different distribution
l, u = bds[i][0], bds[i][1] # each feature's lowerbound & upperbound
if s <= bds[i][0]: # lower than lowerbound
state[i] = 0
elif s >= bds[i][1]: # higher than upperbound
state[i] = n_buckets[i] - 1
else:
state[i] = int(((s - l) / (u - l)) * n_buckets[i])

return tuple(state)

env = gym.make('CartPole-v0')

# Prepare Q table
# Each feature's n_bucket
# '1' represents that any value belongs to the same state, i.e., the feature is not important
n_buckets = (1, 1, 6, 3)
n_actions = env.action_space.n

# bounds of state
bds = list(zip(env.observation_space.low, env.observation_space.high))
bds[1] = [-0.5, 0.5]
bds[3] = [-math.radians(50), math.radians(50)]

# Q Table, each (s, a) pair stores one value
QTable = np.zeros(n_buckets + (n_actions,))

epsilons = lambda i: max(0.01, min(1, 1.0 - math.log10((i + 1) / 25))) # epsilon-greedy, decrease by time
lrs = lambda i: max(0.01, min(0.5, 1.0 - math.log10((i + 1) / 25))) # learning rate, decrease by time
gamma = 0.99 # reward discount factor

# Q-Learning
for i in range(200):
obs = env.reset()
rewards = 0
state = getState(obs, n_buckets, bds) # convert continuous value -> discrete value

epsilon = epsilons(i)
lr = lrs(i)

for t in range(250):
action = chooseAction(state, QTable, env.action_space, epsilon)
obs, reward, done, info = env.step(action)
rewards += reward

next_state = getState(obs, n_buckets, bds)

# update Q Table
q_next_max = np.amax(QTable[next_state])
QTable[state + (action,)] += lr * (reward + gamma * q_next_max - QTable[state + (action,)]) # Formula
print(QTable)

# step to next state
state = next_state

if done:
print('Episode finished after {} timesteps, total rewards {}'.format(t+1, rewards))
break

env.close()

Deep Q-Learning

在 CartPole 的 task 中,state 只有 4 個 features,action 只有 0 和 1 兩個值。

但若 state 今天是來自整個環境(例如:整個螢幕、Alpha Go 圍棋棋盤),因為 states 過多,這時用 table 來表示就不太妥當。

Deep Q-Leanring(以下簡稱 DQN):我們可以用 deep neural network 幫我們提取 features 並逼近 Q function。

NN 藉由大量的 input-output end-to-end 訓練,找出:

$$f(input) = output.$$

把它轉成 policy:

$$\pi(state) = action.$$

Deep Q-Learning Implementation

Human-level control through deep reinforcement learning 中,提供了三種訓練 DQN 的 tips:

  1. Use experience replay:把 experience 存在 memory,訓練時隨機抽樣。優點是可以打亂不同 experience 之間不存在的時間關係。
  2. Freeze target Q-network:即建立兩種 Q-network,

    • 實際進行訓練的 evaluation network
    • 訓練目標 target network

      若只訓練一個 nn,每更新一次時,不但正在訓練的 $Q(s, a)$ 在變,目標 $Q(s’, a’)$ 也在變,這樣是無法收斂的!

  3. Clip rewards:限縮 reward 的值,以利 backpropagation 中能穩定地計算 gradient。

Step 1: 建立 Network

先建立一層 hidden layer,把 state 傳入後,得出每個 action 的分數,分數越高的 action 越容易被挑選。

目標:對未來越有利的 action 分數越高。

1
2
3
4
5
6
7
8
9
10
11
class Net(nn.Module):
def __init__(self, n_states, n_actions, n_hidden):
super(Net, self).__init__()

self.fc1 = nn.Linear(n_states, n_hidden)
self.out = nn.Linear(n_hidden, n_actions)

def forward(self, x):
x = self.fc1(x)
x = F.relu(x)
return self.out(x)

Step 2: 建立 Deep Q-Network

Tips 中提到,總共需要兩個 network:

  • evaluation network(eval_net
  • target network(targ_net

  • memory 儲存 experience
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Deep Q-Network, composed of one eval network, one target network
class DQN(object):
def __init__(self, n_states, n_actions, n_hidden, batch_size, lr, epsilon, gamma, target_replace_iter, memory_capacity):
self.eval_net, self.target_net = Net(n_states, n_actions, n_hidden), Net(n_states, n_actions, n_hidden)

self.memory = np.zeros((memory_capacity, n_states * 2 + 2)) # initialize memory, each memory slot is of size (state + next state + reward + action)
self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=lr)
self.loss_func = nn.MSELoss()
self.memory_counter = 0
self.learn_step_counter = 0 # for target network update

self.n_states = n_states
self.n_actions = n_actions
self.n_hidden = n_hidden
self.batch_size = batch_size
self.lr = lr
self.epsilon = epsilon
self.gamma = gamma
self.target_replace_iter = target_replace_iter
self.memory_capacity = memory_capacity

def choose_action(self, state):
x = torch.unsqueeze(torch.FloatTensor(state), 0)

# epsilon-greedy
if np.random.uniform() < self.epsilon: # random
action = np.random.randint(0, self.n_actions)
else: # greedy
actions_value = self.eval_net(x) # feed into eval net, get scores for each action
action = torch.max(actions_value, 1)[1].data.numpy()[0] # choose the one with the largest score

return action

def store_transition(self, state, action, reward, next_state):
# Pack the experience
transition = np.hstack((state, [action, reward], next_state))

# Replace the old memory with new memory
index = self.memory_counter % self.memory_capacity
self.memory[index, :] = transition
self.memory_counter += 1

def learn(self):
# Randomly select a batch of memory to learn from
sample_index = np.random.choice(self.memory_capacity, self.batch_size)
b_memory = self.memory[sample_index, :]
b_state = torch.FloatTensor(b_memory[:, :self.n_states])
b_action = torch.LongTensor(b_memory[:, self.n_states:self.n_states+1].astype(int))
b_reward = torch.FloatTensor(b_memory[:, self.n_states+1:self.n_states+2])
b_next_state = torch.FloatTensor(b_memory[:, -self.n_states:])

# Compute loss between Q values of eval net & target net
q_eval = self.eval_net(b_state).gather(1, b_action) # evaluate the Q values of the experiences, given the states & actions taken at that time
q_next = self.target_net(b_next_state).detach() # detach from graph, don't backpropagate
q_target = b_reward + self.gamma * q_next.max(1)[0].view(self.batch_size, 1) # compute the target Q values
loss = self.loss_func(q_eval, q_target)

# Backpropagation
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()

# Update target network every few iterations (target_replace_iter), i.e. replace target net with eval net
self.learn_step_counter += 1
if self.learn_step_counter % self.target_replace_iter == 0:
self.target_net.load_state_dict(self.eval_net.state_dict())

Step 3: 訓練

步驟如下:

  1. 選擇 action
  2. 儲存 experience
  3. train
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
if __name__ == '__main__':
env = gym.make('CartPole-v0')
env = env.unwrapped # For cheating mode to access values hidden in the environment

# Environment parameters
n_actions = env.action_space.n
n_states = env.observation_space.shape[0]

# Hyper parameters
n_hidden = 50
batch_size = 32
lr = 0.01 # learning rate
epsilon = 0.1 # epsilon-greedy, factor to explore randomly
gamma = 0.9 # reward discount factor
target_replace_iter = 100 # target network update frequency
memory_capacity = 2000
n_episodes = 400 if CHEAT else 4000

# Create DQN
dqn = DQN(n_states, n_actions, n_hidden, batch_size, lr, epsilon, gamma, target_replace_iter, memory_capacity)

# Collect experience
for i_episode in range(n_episodes):
t = 0 # timestep
rewards = 0 # accumulate rewards for each episode
state = env.reset() # reset environment to initial state for each episode
while True:

# Agent takes action
action = dqn.choose_action(state) # choose an action based on DQN
next_state, reward, done, info = env.step(action) # do the action, get the reward

# Cheating part: modify the reward to speed up training process
if CHEAT:
x, v, theta, omega = next_state
r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8 # reward 1: the closer the cart is to the center, the better
r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5 # reward 2: the closer the pole is to the center, the better
reward = r1 + r2

# Keep the experience in memory
dqn.store_transition(state, action, reward, next_state)

# Accumulate reward
rewards += reward

# If enough memory stored, agent learns from them via Q-learning
if dqn.memory_counter > memory_capacity:
dqn.learn()

# Transition to next state
state = next_state

if done:
print('Episode finished after {} timesteps, total rewards {}'.format(t+1, rewards))
break

t += 1

env.close()