Using Keras and Deep Q-Network to Play FlappyBird
—— github源碼
該項(xiàng)目通過卷積神經(jīng)網(wǎng)絡(luò)加Q-learning算法,利用Keras框架共210行代碼實(shí)現(xiàn)了讓程序自己學(xué)習(xí)如何玩耍FlappyBird。
這篇文章面向?qū)矸e神經(jīng)網(wǎng)絡(luò)、增強(qiáng)學(xué)習(xí)感興趣的新手。
文末有代碼的注釋。可以先大致瀏覽下代碼注釋再看文章。
pip install keraspip install pygamepip install scikit-imagepip install h5py
作者使用的是theano
訓(xùn)練的,訓(xùn)練好的模型文件要使用theano
作為Keras
的后端才能調(diào)用,在配置文件~/.keras/keras.json
中(沒有可創(chuàng)建)確認(rèn)/修改backend
為theano
(如果沒有安裝tensorflow
[Keras
的另一可選后端]好像就不用管了),配置文件樣式下文中卷積神經(jīng)網(wǎng)絡(luò)小節(jié)的補(bǔ)充里有。theano
,我們還需要安裝OpenBLAS。直接下載源碼并解壓,cd
進(jìn)目錄,然后sudo apt-get install gfortranmake FC=gfortran sudo make PREFIX=/usr/local install
git clone https://github.com/yanpanlau/Keras-FlappyBird.gitcd Keras-FlappyBirdpython qlearn.py -m "Run"
我下載時目錄中g(shù)ame/wrapped_flappy_bird.py文件的第144行,FPSCLOCK.tick(FPS)
語句處縮進(jìn)有點(diǎn)問題,刪去現(xiàn)有縮進(jìn),打8個空格就好了,沒問題就不用管了。model.h5
文件然后運(yùn)行命令qlearn.py -m "Train"
。游戲輸入及返回圖像
import wrapped_flappy_bird as gamex_t1_colored, r_t, terminal = game_state.frame_step(a_t)
直接使用flappybird python版本的接口。
輸入為a_t((1, 0)
代表不跳,(0,1)
代表跳)。
返回值為下一幀圖像x_t1_colored和獎勵reward(+0.1
表示存活,+1
表示通過管道,-1
表示死亡),獎勵被控制在[-1,+1]
來提高穩(wěn)定性。terminal 是一個布爾值表示游戲是否結(jié)束。
獎勵函數(shù)在game/wrapped_flappy_bird.py
中的def frame_step(self, input_actions)
方法中修改。
為什么直接將游戲圖像輸入處理呢?我一開始沒轉(zhuǎn)過彎,其實(shí)圖像中包含了全部的信息(聲音信息在多數(shù)游戲里只是輔助,不影響游戲),而人在玩游戲時也是接受輸入的圖像信息,然后決策輸出相應(yīng)的操作指令。這里其實(shí)就是在模擬人的反饋過程,將這一過程描述為一個非線性函數(shù),而該非線性函數(shù)我們將使用卷積神經(jīng)網(wǎng)絡(luò)來表達(dá),大體上卷積實(shí)現(xiàn)了對圖像特征的提取,神經(jīng)網(wǎng)絡(luò)實(shí)現(xiàn)了從特征到操作指令的轉(zhuǎn)換。
圖像預(yù)處理
要素:
x_t1 = skimage.color.rgb2gray(x_t1_colored)x_t1 = skimage.transform.resize(x_t1,(80,80))x_t1 = skimage.exposure.rescale_intensity(x_t1, out_range=(0, 255)) # 調(diào)整亮度#x_t1 = x_t1.reshape(1, 1, x_t1.shape[0], x_t1.shape[1])s_t1 = np.append(x_t1, s_t[:, :3, :, :], axis=1)# axis=1 意味著在第二維上添加
x_t1是一個(1x1x80x80) 的單幀,s_t1是4幀的疊加,形狀為(1x4x80x80)。輸入設(shè)計(jì)為 (1x4x80x80)而不是(4x80x80)是為了Keras考慮。補(bǔ)充
rescale_intensity
卷積神經(jīng)網(wǎng)絡(luò)
現(xiàn)在,將預(yù)處理后的圖像輸入神經(jīng)網(wǎng)絡(luò)。
def buildmodel(): print("開始建模") model = Sequential() model.add(Convolution2D(32, 8, 8, subsample=(4,4),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same', dim_ordering='th', input_shape=(img_channels,img_rows,img_cols))) model.add(Activation('relu')) model.add(Convolution2D(64, 4, 4, subsample=(2,2),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same', dim_ordering='th')) model.add(Activation('relu')) model.add(Convolution2D(64, 3, 3, subsample=(1,1),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same', dim_ordering='th')) model.add(Activation('relu')) model.add(Flatten()) model.add(Dense(512, init=lambda shape, name: normal(shape, scale=0.01, name=name))) model.add(Activation('relu')) model.add(Dense(2,init=lambda shape, name: normal(shape, scale=0.01, name=name))) adam = Adam(lr=1e-6) model.compile(loss='mse',optimizer=adam) print("建模完成") return model
Convolution2D( nb_filter, # 過濾器個數(shù) nb_row, # 過濾器的行數(shù) nb_col, # 過濾器的列數(shù) init='glorot_uniform', # 層權(quán)重weights的初始化函數(shù) activation='linear', # 默認(rèn)激活函數(shù)為線性,即a(x) = x weights=None, border_mode='valid', # 默認(rèn)'valid'(不補(bǔ)零,一般情況) # 或者'same'(自動補(bǔ)零,使得輸出尺寸在過濾窗口步幅為1的情況下與輸入尺寸相同, # 即輸出尺寸=輸入尺寸/步幅) subsample=(1, 1), # 代表向左和向下的過濾窗口移動步幅 dim_ordering='default', # 'default' 或'tf' 或'th' W_regularizer=None, b_regularizer=None, activity_regularizer=None, W_constraint=None, b_constraint=None, bias=True # 未注釋的一般用默認(rèn)值 )
該函數(shù)是二維輸入的濾波窗口的卷積函數(shù)。
當(dāng)使用它作為模型的第一層時,需要提供input_shape
關(guān)鍵字,如輸入為128x128 RGB 3通道圖像,則input_shape=(3, 128, 128)
。dim_ordering
的默認(rèn)值在~/.keras/keras.json
文件中,若沒有可以創(chuàng)建(一般運(yùn)行過一次keras就有),格式為{ "image_dim_ordering": "tf", "epsilon": 1e-07, "floatx": "float32", "backend": "tensorflow"}
根據(jù)
image_dim_ordering
和backend
選擇使用theano
即th
或者tensorflow
即tf
。
確切結(jié)構(gòu)如下:
輸入為4x80x80的圖像矩陣。
第一層卷積層,有32個卷積核(過濾器),每個卷積核的尺寸是8x8,x軸和y軸的步幅都是4,補(bǔ)零,并使用了一個ReLU激活函數(shù)。
第二層卷積層,有64個卷積核(過濾器),每個卷積核的尺寸是4x4,x軸和y軸的步幅都是2,補(bǔ)零,并使用了一個ReLU激活函數(shù)。
第三層卷積層,有64個卷積核(過濾器),每個卷積核的尺寸是3x3,x軸和y軸的步幅都是1,補(bǔ)零,并使用了一個ReLU激活函數(shù)。
然后將它們展平為一維輸入隱藏層。該隱藏層有512個神經(jīng)單元,全連接到第三層卷積層的輸出,并使用ReLU激活函數(shù)。
最后的輸出層是一個全連接線性層,輸出動作對應(yīng)的Q值列表。一般來說,索引0代表什么也不做;在這個游戲里索引1代表跳一下。比較兩者的Q值大小,選擇大的作為下一步操作。
相關(guān)知識點(diǎn)請看:
卷積神經(jīng)網(wǎng)絡(luò)CNN基本概念筆記
CS231n Convolutional Neural Networks
每層輸出計(jì)算公式:(W-F+2P)/S+1。
W:輸入尺寸大小; S:步幅; F:卷積核尺寸; P:補(bǔ)零數(shù)。
在這個應(yīng)用的設(shè)置中,輸出計(jì)算可以簡化為W/S。
補(bǔ)充: 參看畫出卷積神經(jīng)網(wǎng)絡(luò)結(jié)構(gòu)圖
小貼士
keras使得構(gòu)建卷積神經(jīng)網(wǎng)絡(luò)異常簡單。但是有一些需要注意的地方。
A. 選擇一個良好的初始化方法是重要的,這里選擇了σ=0.01的正態(tài)分布。(lambad x,y : f(x,y)
是一個匿名函數(shù))init=lambda shape, name: normal(shape, scale=0.01, name=name)
B. 維度的順序很重要。使用Theano 的話就是4x80x80,Tensorflow的話輸入就是80x80x4。通過Convolution2D函數(shù)的dim_ordering參數(shù)設(shè)置,這里使用的是theano。
C. 此次使用了一個叫亞當(dāng)?shù)淖赃m應(yīng)優(yōu)化算法 。學(xué)習(xí)速率為1-e6。
D. 關(guān)于梯度下降優(yōu)化算法An overview of gradient descent optimization algorithms。
E. Convolution2D
函數(shù)中的參數(shù)border_mode
的模式也需要注意,這里選擇了補(bǔ)零操作,使得圖像邊緣的像素點(diǎn)也受到過濾操作,轉(zhuǎn)化了所有的圖像信息。
DQN
下面就是運(yùn)用Q-learning算法來訓(xùn)練神經(jīng)網(wǎng)絡(luò)了。
在Q-learning中最重要的就是Q函數(shù)了: Q(s, a)代表了當(dāng)我們在狀態(tài)s執(zhí)行a動作的最大貼現(xiàn)獎勵。 Q(s, a)給你一個關(guān)于在s狀態(tài)選擇a動作是有多好的考量。
Q函數(shù)就像玩游戲的秘籍,在你需要決定在狀態(tài)s下該選擇動作a還是b時,只需要挑高Q值的動作就行了。
最大貼現(xiàn)獎勵既反映了在s狀態(tài)下做出動作a得到狀態(tài)s'的即時反饋獎勵(存活+0.1,通過管道+1),也反映了在狀態(tài)s'下繼續(xù)游戲可能得到的最佳獎勵(不論什么輸入)——其實(shí)就是s'下所有動作的最大貼現(xiàn)獎勵中的最大的一個(即max[ Q(s', a) | 所有可能的動作a]
),但第二個獎勵要乘以一個折扣系數(shù),因?yàn)樗俏磥淼莫剟?,要貼現(xiàn),得打點(diǎn)折扣。
實(shí)際上Q函數(shù)是一個理論假設(shè)存在的函數(shù),從上面的表述中我們可以看出Q函數(shù)可以表達(dá)為一種遞歸的形式,我們可以通過迭代來獲取它,這與神經(jīng)網(wǎng)絡(luò)的訓(xùn)練過程不謀而合。
而我們就是要用卷積神經(jīng)網(wǎng)絡(luò)來實(shí)現(xiàn)Q函數(shù),實(shí)際上是(Q,a) = f(s)函數(shù),由一個狀態(tài)返回在該狀態(tài)下的所有可能輸入與相應(yīng)Q值構(gòu)成的二值對列表,只是輸入動作以不同的索引表示。這樣,我們省去了對巨多的狀態(tài)s的分析判斷和復(fù)雜程序編寫,只要以某種方式初始化Q值表,然后根據(jù)Q值表訓(xùn)練調(diào)整神經(jīng)網(wǎng)絡(luò)的權(quán)重,再根據(jù)訓(xùn)練后的神經(jīng)網(wǎng)絡(luò)預(yù)測來更新Q值表,如此反復(fù)迭代來逼近Q函數(shù)。
# 抽取小批量樣本進(jìn)行訓(xùn)練 minibatch = random.sample(D, BATCH) # inputs和targets一起構(gòu)成了Q值表 inputs = np.zeros((BATCH, s_t.shape[1], s_t.shape[2], s_t.shape[3])) #32, 80, 80, 4 targets = np.zeros((inputs.shape[0], ACTIONS)) #32, 2 # 開始經(jīng)驗(yàn)回放 for i in range(0, len(minibatch)): # 以下序號對應(yīng)D的存儲順序?qū)⑿畔⑷咳〕觯? # D.append((s_t, action_index, r_t, s_t1, terminal)) state_t = minibatch[i][0] # 當(dāng)前狀態(tài) action_t = minibatch[i][1] # 輸入動作 reward_t = minibatch[i][2] # 返回獎勵 state_t1 = minibatch[i][3] # 返回的下一狀態(tài) terminal = minibatch[i][4] # 返回的是否終止的標(biāo)志 inputs[i:i + 1] = state_t # 保存當(dāng)前狀態(tài),即Q(s,a)中的s # 得到預(yù)測的以輸入動作x為索引的Q值列表 targets[i] = model.predict(state_t) # 得到下一狀態(tài)下預(yù)測的以輸入動作x為索引的Q值列表 Q_sa = model.predict(state_t1) if terminal: # 如果動作執(zhí)行后游戲終止了,該狀態(tài)下(s)該動作(a)的Q值就相當(dāng)于獎勵 targets[i, action_t] = reward_t else: # 否則,該狀態(tài)(s)下該動作(a)的Q值就相當(dāng)于動作執(zhí)行后的即時獎勵和下一狀態(tài)下的最佳預(yù)期獎勵乘以一個折扣率 targets[i, action_t] = reward_t + GAMMA * np.max(Q_sa) # 用生成的Q值表訓(xùn)練神經(jīng)網(wǎng)絡(luò),同時返回當(dāng)前的誤差 loss += model.train_on_batch(inputs, targets)
if random.random() <= epsilon: print("----------Random Action----------") action_index = random.randrange(ACTIONS) a_t[action_index] = 1 else: q = model.predict(s_t) #輸入四幅圖像的組合,預(yù)測結(jié)果 max_Q = np.argmax(q) action_index = max_Q a_t[max_Q] = 1
#!/usr/bin/env pythonfrom __future__ import print_functionimport argparseimport skimage as skimagefrom skimage import transform, color, exposurefrom skimage.transform import rotatefrom skimage.viewer import ImageViewerimport syssys.path.append("game/")import wrapped_flappy_bird as gameimport randomimport numpy as npfrom collections import dequeimport jsonfrom keras import initializationsfrom keras.initializations import normal, identityfrom keras.models import model_from_jsonfrom keras.models import Sequentialfrom keras.layers.core import Dense, Dropout, Activation, Flattenfrom keras.layers.convolutional import Convolution2D, MaxPooling2Dfrom keras.optimizers import SGD , AdamGAME = 'bird' # 游戲名CONFIG = 'nothreshold'ACTIONS = 2 # 有效動作數(shù):不動+跳=2個GAMMA = 0.99 # 折扣系數(shù),未來的獎勵轉(zhuǎn)化為現(xiàn)在的要乘的一個系數(shù)OBSERVATION = 3200. # 訓(xùn)練之前觀察多少步EXPLORE = 3000000. # epsilon衰減的總步數(shù)FINAL_EPSILON = 0.0001 # epsilon的最小值INITIAL_EPSILON = 0.1 # epsilon的初始值,epsilon逐漸減小REPLAY_MEMORY = 50000 # 記住的情景(狀態(tài)s到狀態(tài)s'的所有信息)數(shù)BATCH = 32 # 選取的小批量訓(xùn)練樣本數(shù)# 一幀一個輸入動作FRAME_PER_ACTION = 1# 預(yù)處理后的圖片尺寸img_rows , img_cols = 80, 80# 每次堆疊4幀灰階圖像,相當(dāng)于4通道img_channels = 4 # 構(gòu)建神經(jīng)網(wǎng)絡(luò)模型def buildmodel(): print("Now we build the model") # 以下注釋見文中 model = Sequential() model.add(Convolution2D(32, 8, 8, subsample=(4,4),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same',input_shape=(img_channels,img_rows,img_cols))) model.add(Activation('relu')) model.add(Convolution2D(64, 4, 4, subsample=(2,2),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same')) model.add(Activation('relu')) model.add(Convolution2D(64, 3, 3, subsample=(1,1),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same')) model.add(Activation('relu')) model.add(Flatten()) model.add(Dense(512, init=lambda shape, name: normal(shape, scale=0.01, name=name))) model.add(Activation('relu')) model.add(Dense(2,init=lambda shape, name: normal(shape, scale=0.01, name=name))) adam = Adam(lr=1e-6) model.compile(loss='mse',optimizer=adam) # 使用損失函數(shù)為均方誤差,優(yōu)化器為Adam。 print("We finish building the model") return modeldef trainNetwork(model,args): # 得到一個游戲模擬器 game_state = game.GameState() # 保存之前的觀察到回放存儲器D D = deque() # 什么也不做來得到第一個狀態(tài)然后預(yù)處理圖片為80x80x4格式 do_nothing = np.zeros(ACTIONS) do_nothing[0] = 1 # do_nothing 為 array([1,0]) x_t, r_0, terminal = game_state.frame_step(do_nothing) x_t = skimage.color.rgb2gray(x_t) x_t = skimage.transform.resize(x_t,(80,80)) x_t = skimage.exposure.rescale_intensity(x_t,out_range=(0,255)) # 初始化時,堆疊4張圖都為初始的同1張 s_t = np.stack((x_t, x_t, x_t, x_t), axis=0) # s_t為四張圖的堆疊 # 為了在Keras中使用,我們需要調(diào)整數(shù)組形狀,在頭部增加一個維度 s_t = s_t.reshape(1, s_t.shape[0], s_t.shape[1], s_t.shape[2]) if args['mode'] == 'Run': OBSERVE = 999999999 # 我們一直觀察,而不訓(xùn)練 epsilon = FINAL_EPSILON print ("Now we load weight") model.load_weights("model.h5") adam = Adam(lr=1e-6) model.compile(loss='mse',optimizer=adam) print ("Weight load successfully") else: # 否則我們在觀察一段時間之后開始訓(xùn)練 OBSERVE = OBSERVATION epsilon = INITIAL_EPSILON t = 0 # t為總幀數(shù) while (True): # 每次循環(huán)重新初始化的值 loss = 0 Q_sa = 0 action_index = 0 r_t = 0 a_t = np.zeros([ACTIONS]) # 通過epsilon貪心算法選擇行為 if t % FRAME_PER_ACTION == 0: if random.random() <= epsilon: print("----------Random Action----------") action_index = random.randrange(ACTIONS) # 隨機(jī)選取一個動作 a_t[action_index] = 1 # 生成相應(yīng)的規(guī)范化動作輸入?yún)?shù) else: q = model.predict(s_t) # 輸入當(dāng)前狀態(tài)得到預(yù)測的Q值 max_Q = np.argmax(q) # 返回?cái)?shù)組中最大值的索引 # numpy.argmax(a, axis=None, out=None) # Returns the indices of the maximum values along an axis. action_index = max_Q # 索引0代表啥也不做,索引1代表跳一下 a_t[max_Q] = 1 # 生成相應(yīng)的規(guī)范化動作輸入?yún)?shù) # 在開始訓(xùn)練之后并且epsilon小于一定值之前,我們逐步減小epsilon if epsilon > FINAL_EPSILON and t > OBSERVE: epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE # 執(zhí)行選定的動作,并觀察返回的下一狀態(tài)和獎勵 x_t1_colored, r_t, terminal = game_state.frame_step(a_t) # 將圖像處理為灰階,調(diào)整尺寸、亮度 x_t1 = skimage.color.rgb2gray(x_t1_colored) x_t1 = skimage.transform.resize(x_t1,(80,80)) x_t1 = skimage.exposure.rescale_intensity(x_t1, out_range=(0, 255)) # 調(diào)整圖像數(shù)組形狀,增加頭兩維到4維 x_t1 = x_t1.reshape(1, 1, x_t1.shape[0], x_t1.shape[1]) # 將s_t的前三幀添加在新幀的后面,新幀的索引為0,形成最后的4幀圖像 s_t1 = np.append(x_t1, s_t[:, :3, :, :], axis=1) # 存儲狀態(tài)轉(zhuǎn)移到回放存儲器 D.append((s_t, action_index, r_t, s_t1, terminal)) if len(D) > REPLAY_MEMORY: D.popleft() # 如果觀察完成,則 if t > OBSERVE: # 抽取小批量樣本進(jìn)行訓(xùn)練 minibatch = random.sample(D, BATCH) # inputs和targets一起構(gòu)成了Q值表 inputs = np.zeros((BATCH, s_t.shape[1], s_t.shape[2], s_t.shape[3])) #32, 80, 80, 4 targets = np.zeros((inputs.shape[0], ACTIONS)) #32, 2 # 開始經(jīng)驗(yàn)回放 for i in range(0, len(minibatch)): # 以下序號對應(yīng)D的存儲順序?qū)⑿畔⑷咳〕觯? # D.append((s_t, action_index, r_t, s_t1, terminal)) state_t = minibatch[i][0] # 當(dāng)前狀態(tài) action_t = minibatch[i][1] # 輸入動作 reward_t = minibatch[i][2] # 返回獎勵 state_t1 = minibatch[i][3] # 返回的下一狀態(tài) terminal = minibatch[i][4] # 返回的是否終止的標(biāo)志 inputs[i:i + 1] = state_t # 保存當(dāng)前狀態(tài),即Q(s,a)中的s # 得到預(yù)測的以輸入動作x為索引的Q值列表 targets[i] = model.predict(state_t) # 得到下一狀態(tài)下預(yù)測的以輸入動作x為索引的Q值列表 Q_sa = model.predict(state_t1) if terminal: # 如果動作執(zhí)行后游戲終止了,該狀態(tài)下(s)該動作(a)的Q值就相當(dāng)于獎勵 targets[i, action_t] = reward_t else: # 否則,該狀態(tài)(s)下該動作(a)的Q值就相當(dāng)于動作執(zhí)行后的即時獎勵和下一狀態(tài)下的最佳預(yù)期獎勵乘以一個折扣率 targets[i, action_t] = reward_t + GAMMA * np.max(Q_sa) # 用生成的Q值表訓(xùn)練神經(jīng)網(wǎng)絡(luò),同時返回當(dāng)前的誤差 loss += model.train_on_batch(inputs, targets) s_t = s_t1 # 下一狀態(tài)變?yōu)楫?dāng)前狀態(tài) t = t + 1 # 總幀數(shù)+1 # 每100次迭代存儲下當(dāng)前的訓(xùn)練模型 if t % 100 == 0: print("Now we save model") model.save_weights("model.h5", overwrite=True) with open("model.json", "w") as outfile: json.dump(model.to_json(), outfile) # 輸出信息 state = "" if t <= OBSERVE: state = "observe" elif t > OBSERVE and t <= OBSERVE + EXPLORE: state = "explore" else: state = "train" print("TIMESTEP", t, "/ STATE", state, "/ EPSILON", epsilon, "/ ACTION", action_index, "/ REWARD", r_t, "/ Q_MAX " , np.max(Q_sa), "/ Loss ", loss) print("Episode finished!") print("************************")def playGame(args): model = buildmodel() # 先建立模型 trainNetwork(model,args) # 開始訓(xùn)練def main(): parser = argparse.ArgumentParser(description='Description of your program') parser.add_argument('-m','--mode', help='Train / Run', required=True) #接受參數(shù) mode args = vars(parser.parse_args()) # args是字典,'mode'是鍵 playGame(args) # 開始游戲if __name__ == "__main__": main() #執(zhí)行本腳本時以main函數(shù)開始
聯(lián)系客服