Pythonで書かれたAlphaZeroを動かして三目並べを学習させてみる

 

PythonとKerasで書かれたAlphaZeroのコードを見つけたので、それを使って三目並べを学習させてみました。さらに、以前tkinterで作った三目並べに学習させたAIを実装し、対戦してみます。

 

環境

関連リンク

How to build your own AlphaZero AI using Python and Keras

GitHub - AppliedDataSciencePartners/DeepReinforcementLearning: A replica of the AlphaZero methodology for deep reinforcement learning in Python

PythonとKerasを使ってAlphaZero AIを自作する | POSTD

三目並べを作る - どん底から這い上がるまでの記録

Graphviz - Graph Visualization Software

はじめに

pythonで書かれたAlphaZeroを動かして三目並べを学習させてみます。AlphaZeroと聞くと難しいことをしたり、専門的な知識が必要なのではと思うかもしれませんが、今回私が見つけたAlphaZeroのコードはpythonが少し理解できていれば簡単に動かせることができます。

動かす手順は以下のような感じです。

  1. ここのRepositoryをcloneもしくはDownloadする。
  2. cloneもしくはDownloadしたディレクトリにあるいくつかのプログラムを三目並べ用に変更する。
  3. cloneもしくはDownloadしたディレクトリにあるrun.ipynbというJupyter Notebookの最初の2つのセルを実行する。

動かす手順はざっとこんな感じです。私が実際にやったことをこれから書いていきます。

1 - RepositoryをDownloadする。

ここのRepositoryをDownloadし解凍します。解凍するとDeepReinforcementLearning-masterという名前になっているので、これをAlphaZeroという名前に変更しました。

コマンドプロンプトを開いて、AlphaZeroに行くと、ディレクトリの中身は以下のようになっています。この中にあるgame.pyのコードを三目並べ用に書き換えます。

LICENSE    agent.py   game.py        loggers.py  memory.py         run          utils.py
MCTS.py    config.py  games          loss.py     model.py          run.ipynb
README.md  funcs.py   initialise.py  main.py     requirements.txt  settings.py

2 - game.pyを書き換える。

game.pyにはゲームのルールや状態について書かれています。はじめにgame.pyを見てみると、コネクトフォー用のコードが書かれていることがわかります。このコードを書き換えることにより、他のゲームを学習できることができます。例えば、gamesディレクトリにはMetaSquares用のコードが置いてあります。

三目並べを学習するときもgame.pyのみを変更するだけです。

これから三目並べ用に変更した部分を書いていきます。ソースコード全体は下にあります。

最初にgame.pyのGame classの中身を見てみます。

class Game

コネクトフォーのコードを見てみるとGame classには__init__,reset,step,identitiesの4つのメソッドがあります。三目並べでもこれら4つのメソッドは必要になります。三目並べ用に書き換えると以下のようになります。

__init__

    def __init__(self):
        self.currentPlayer = 1
        self.gameState = GameState(np.array([0]*9, dtype=np.int), 1)
        self.actionSpace = np.array([0]*9, dtype=np.int)
        self.pieces = {'-1': 'X', '0': '-', '1': 'O'}
        self.grid_shape = (3, 3)
        self.input_shape = (2, 3, 3)
        self.name = "三目並べ"
        self.state_size = len(self.gameState.binary)
        self.action_size = len(self.actionSpace)

self.currentPlayerはターンのことで、1は先攻、-1は後攻を表します。

self.gameStateの一つ目の引数にはゲームの状態が入ります。三目並べは3x3の9マスなので配列のサイズは9です。0は空白、1は〇, -1は✖を表します。

Figure 1は配列のインデックスを表しています。例えば、ゲームの状態がFigure 2だったとすると、配列の中身は[-1, 0, 0, 0, 1, 0, 0, 0, 0]になります。

f:id:pytry3g:20180724190147p:plain

gird_shapeは盤面の形ー>3x3、input_shapeは(打てる手の数ー>〇か✖の2つ、盤面の形ー>3x3)

reset

    def reset(self):
        self.gameState = GameState(np.array([0]*9, dtype=np.int), 1)
        self.currentPlayer = 1
        return self.gameState

ゲームの状態をリセットするので盤面を空白に、プレイヤーのターンも1にしてゲーム開始時の状態に戻しています。

 

identities

    def identities(self, state, actionValues):
        identities = [(state, actionValues)]
        currentBoard = state.board
        currentAV = actionValues

        currentBoard = np.array([
            currentBoard[2], currentBoard[1], currentBoard[0],
            currentBoard[5], currentBoard[4], currentBoard[3],
            currentBoard[8], currentBoard[7], currentBoard[6]
        ])

        currentAV =  np.array([
            currentAV[2], currentAV[1], currentAV[0],
            currentAV[5], currentAV[4], currentAV[3],
            currentAV[8], currentAV[7], currentAV[6]
        ])

        identities.append((GameState(currentBoard, state.playerTurn), currentAV))
        return identities

このidentitiesについてですが、何のためにあるのかわかりませんでした。コネクトフォーやMetaSquaresのコードを参考にして書いてみました。なので、これが正しいのかが正直わかりません。

class GameState

GameStateクラスもGameクラスと同じく中身を書き換えていきます。ゲームが終了したか判定するときに、新しいメソッドを追加しています。

__init__

    def __init__(self, board, playerTurn):
        self.board = board
        self.pieces = {'-1': 'X', '0': '-', '1': 'O'}
        self.winners = [[0,1,2], [3,4,5], [6,7,8], [0,3,6], [1,4,7], [2,5,8], [0,4,8], [2,4,6]]
        self.playerTurn = playerTurn
        self.binary = self._binary()
        self.id = self._convertStateToId()
        self.allowedActions = self._allowedActions()
        self.isEndGame = self._checkForEndGame()
        self.value = self._getValue()
        self.score = self._getScore()

大きく変わったところは、self.winnersくらいです。これはゲームが終了したのかどうかを見るときに使います。

_allowedActions

    def _allowedActions(self):
        return np.where(self.board == 0)[0]

空白のところのインデックスを返します。

_checkForEndGame

    def _checkForEndGame(self):
        return self._check()

ゲームが終了したかを見る。self._check()は新しく追加したメソッド。

_check

    def _check(self):
        for i in range(len(self.winners)):
            if self.board[self.winners[i][0]]==self.board[self.winners[i][1]]==self.board[self.winners[i][2]]==1:
                return 1
            if self.board[self.winners[i][0]]==self.board[self.winners[i][1]]==self.board[self.winners[i][2]]==-1:
                return 2
        return 3 if np.count_nonzero(self.board) == 9 else 0

引き分けなら3、先攻が勝てば1、後攻が勝てば2、続行なら0を返す。

_getValue

    def _getValue(self):
        result = self._check()
        if result == 1:
            return (-1, -1, 1)
        elif result == 2:
            return (-1, 1, -1)
        elif result == 3:
            return (0, 1, 1)
        return (0, 0, 0)

render

    def render(self, logger):
        for r in range(3):
            logger.info([self.pieces[str(x)] for x in self.board[3*r : (3*r+3)]])
        logger.info('--------------')

学習する

学習する前に

学習が始まると探索や学習の過程がloggerによってファイルに記録されていきます。しかし、学習が進むにつれてファイルのサイズがかなり大きくなってしまうので気になる方はloggers.pyの内容を下のようにしておくといいです。

"""
LOGGER_DISABLED = {
'main':False
, 'memory':False
, 'tourney':False
, 'mcts':False
, 'model': False}
"""
LOGGER_DISABLED = {
'main':True
, 'memory':True
, 'tourney':True
, 'mcts':True
, 'model': True}

学習開始

game.pyを三目並べ用に書き換えたらrun.ipynbを開いて最初のセル2つを実行するだけです。

  1. First load the core libraries
  2. Now run this block to start the learning process

※注意※

学習時にgraphvizを使っているので、インストールしていない人はこちらからインストールしましょう。私はStable 2.38 Windows install packagesをダウンロードしました。

ダウンロードしたらパスを設定します。

C:\任意の場所\graphviz\bin

なので、例えば、C:\Program Files\graphviz\binでOKです。

パスを設定した後、コマンドプロンプトを開いて、

dot -Vgraphvizのバージョンが表示されるはずです。

加えてpydotをインストールしていない方はそれもインストールしましょう。

pip install pydot

モデル

学習が進んでいくと、学習モデルができます。(※学習モデルができるには数時間はかかります。)できたモデルはAlphaZero\run\modelsにあります。

@ ls run/models/
model.png       version0004.h5  version0008.h5  version0012.h5  version0016.h5  version0020.h5
version0001.h5  version0005.h5  version0009.h5  version0013.h5  version0017.h5  version0021.h5
version0002.h5  version0006.h5  version0010.h5  version0014.h5  version0018.h5  version0022.h5
version0003.h5  version0007.h5  version0011.h5  version0015.h5  version0019.h5

対戦する

学習したモデルを使ってAIと対戦してみます。

が、その前にagent.pymodel.pyを変更します。

agent.pyの末尾に以下のコードを追加します。

class AgentX(Agent):
	def __init__(self, name, state_size, action_size, mcts_simulations, cpuct, model, graph):
		self.name = name
		self.state_size = state_size
		self.action_size = action_size
		self.cpuct = cpuct
		self.MCTSsimulations = mcts_simulations
		self.model = model
		self.graph = graph

	def get_preds(self, state):
		#predict the leaf
		inputToModel = np.array([self.model.convertToModelInput(state)])

		preds = self.model._predict(inputToModel, self.graph)
		value_array = preds[0]
		logits_array = preds[1]
		value = value_array[0]

		logits = logits_array[0]

		allowedActions = state.allowedActions

		mask = np.ones(logits.shape,dtype=bool)
		mask[allowedActions] = False
		logits[mask] = -100

		#SOFTMAX
		odds = np.exp(logits)
		probs = odds / np.sum(odds) ###put this just before the for?

		return ((value, probs, allowedActions))

 

次にmodel.pyのGen_Modelクラスの中身を変更します。

1. _predict()を追加。

	def _predict(self, x, graph):
		with graph.as_default():
			return self.model.predict(x)

 

2. load()を追加。

学習済みのモデルを指定して読み込みます。モデル名は自身の環境に合わせて変更してください。

	def load(self):
		return load_model('run/models/version0022.h5',
			custom_objects={'softmax_cross_entropy_with_logits': softmax_cross_entropy_with_logits})

後は下のソースコードapp.pyを動かすだけです。置く場所はAlphaZero直下に置きます。

以前書いたコードを今回のために書き換えました。記事が長くなるので説明は割愛します。

ソースコード

app.py

import threading
import time
import random
import tkinter as tk
import config
import numpy as np
import tensorflow as tf
from agent import AgentX, User
from game import Game, GameState
from keras.models import load_model
from model import Residual_CNN


class Thread(threading.Thread):
    def __init__(self):
        super(Thread, self).__init__()
        self.is_running = True
        self.thlock = 1
        self.keylock = 0
        self.parent = None
        self.start()

    def __call__(self):
        self.keylock = 1
        self.unlock()

    def set_parent(self, parent):
        self.parent = parent

    def lock(self):
        self.thlock = 1

    def unlock(self):
        self.thlock = 0

    def is_lock(self):
        return self.thlock

    def random_choice(self):
        i = random.choice([i for i, v in enumerate(self.parent.board2info) if v == 0])
        tag = self.parent.alpstr[i]
        flag = self.parent.update_board(tag)
        if flag:
            self.lock()
            return
        self.parent.playerTurn = 1
        self.lock()
        self.keylock = 0

    def alpha_zero(self):
        action, _, _, _ = self.parent.cpu.act(self.parent.state, 0)
        tag = self.parent.alpstr[action]
        # Do the action
        done = self.parent.update_board(action, tag)
        self.lock()
        if done:
            return
        self.keylock = 0

    def run(self):
        while self.is_running:
            if self.is_lock():
                continue
            #self.random_choice()
            self.alpha_zero()


table = """
Turn {}
        |{:^3s}|{:^3s}|{:^3s}|
        -------------
        |{:^3s}|{:^3s}|{:^3s}|
        -------------
        |{:^3s}|{:^3s}|{:^3s}|
"""

table_r = """
        結果: {}
        |{:^3s}|{:^3s}|{:^3s}|
        -------------
        |{:^3s}|{:^3s}|{:^3s}|
        -------------
        |{:^3s}|{:^3s}|{:^3s}|
"""

def check(board):
    wins = [[0,1,2], [3,4,5], [6,7,8], [0,3,6], [1,4,7], [2,5,8], [0,4,8], [2,4,6]]
    for i in range(len(wins)):
        if board[wins[i][0]]==board[wins[i][1]]==board[wins[i][2]]==1:
            return 1
        elif board[wins[i][0]]==board[wins[i][1]]==board[wins[i][2]]==-1:
            return 1
    return [2, 0][0 in board]

class App(tk.Tk):
    def __init__(self):
        super(App, self).__init__()
        # Window
        self.title("三目並べ")
        self.geometry("{}x{}+{}+{}".format(360, 400, 450, 100))
        # Set up some variables
        self.set_variables()
        # Set up game board
        self.set_board()
        # Set up some buttons
        self.set_button()
        # Set up env
        self.settings()
        # Set threading
        self.thread = Thread()
        self.thread.set_parent(self)

    def set_variables(self):
        self.board2info = [0] * 9
        self.symbol = " ox"
        self.alpstr = "abcdefghi"
        self.winner = ["", "あなた", "引き分け", "CPU"]

    def set_board(self):
        self.board = tk.Canvas(self, bg="white", width=340, height=340)
        self.tag2pos = {}
        position = [(20, 20, 120, 120), (120, 20, 220, 120), (220, 20, 320, 120),
                    (20, 120, 120, 220), (120, 120, 220, 220), (220, 120, 320, 220),
                    (20, 220, 120, 320), (120, 220, 220, 320), (220, 220, 320, 320)]

        for tag, pos in zip(self.alpstr, position):
            self.tag2pos[tag] = pos[:2]
            self.board.create_rectangle(*pos, fill='green yellow', outline='green yellow', tags=tag)
            self.board.tag_bind(tag, "<ButtonPress-1>", self.pressed)
        # Vertical line
        for x in range(120, 320, 100):
            self.board.create_line(x, 20, x, 320)
        # Horizontal line
        for y in range(120, 320, 100):
            self.board.create_line(20, y, 320, y)
        self.board.place(x=10, y=0)

    def set_button(self):
        self.reset = tk.Button(self, text="reset", relief="groove", command=self.clear)
        self.reset.place(x=170, y=360)
        self.quit_program = tk.Button(self, text="quit", relief="groove", command=self.close)
        self.quit_program.place(x=320, y=360)

    def settings(self):
        graph = tf.get_default_graph()
        env = Game()
        state = env.reset()

        player_NN = Residual_CNN(
            config.REG_CONST,
            config.LEARNING_RATE,
            env.input_shape,
            env.action_size,
            config.HIDDEN_CNN_LAYERS
        )

        net = player_NN.load()
        player_NN.model.set_weights(net.get_weights())
        cpu = AgentX(
            "Agent",
            env.state_size,
            env.action_size,
            config.MCTS_SIMS,
            config.CPUCT,
            player_NN,
            graph
        )
        cpu.mcts = None
        self.cpu = cpu
        self.env = env
        self.state = state


    def clear(self):
        self.board.delete("all")
        self.set_variables()
        self.set_board()
        self.thread.keylock = 0

        self.state = self.env.reset()

    def draw_symbol(self, tag):
        symbol = self.symbol[self.env.currentPlayer]
        x, y = self.tag2pos[tag]
        self.board.create_text(x+50, y+50,
                               font=("Helvetica", 60),
                               text=symbol)

    def pressed(self, event):
        if self.thread.keylock:
            return
        item_id = self.board.find_closest(event.x, event.y)
        tag = self.board.gettags(item_id[0])[0]
        action = self.alpstr.index(tag)
        state = self.board2info[action]
        if state in [-1, 1]:
            return
        if self.update_board(action, tag):
            self.thread.keylock = 1
            return
        self.thread()

    def update_board(self, action, tag):
        self.board2info[action] = self.env.currentPlayer
        self.draw_symbol(tag)
        self.check_result()
        self.state, _, done, _ = self.env.step(action)
        return done


    def check_result(self):
        result = check(self.board2info)
        winner = "Turn {}".format("?")
        if result:
            winner = self.winner[result] if result == 2 else self.winner[self.env.currentPlayer]
            print(table_r.format(winner, *[[" ", "o", "x"][i] for i in self.board2info]))
        else:
            print(table.format("?", *[[" ", "o", "x"][i] for i in self.board2info]))

    def close(self):
        self.thread.is_running = 0
        self.quit()

    def run(self):
        self.mainloop()


if __name__ == "__main__":
    app = App()
    app.run()

game.py

import numpy as np
import logging


class Game:
    def __init__(self):
        self.currentPlayer = 1
        self.gameState = GameState(np.array([0]*9, dtype=np.int), 1)
        self.actionSpace = np.array([0]*9, dtype=np.int)
        self.pieces = {'-1': 'X', '0': '-', '1': 'O'}
        self.grid_shape = (3, 3)
        self.input_shape = (2, 3, 3)
        self.name = "三目並べ"
        self.state_size = len(self.gameState.binary)
        self.action_size = len(self.actionSpace)

    def reset(self):
        self.gameState = GameState(np.array([0]*9, dtype=np.int), 1)
        self.currentPlayer = 1
        return self.gameState

    def step(self, action):
        next_state, value, done = self.gameState.takeAction(action)
        self.gameState = next_state
        self.currentPlayer = -self.currentPlayer
        info = None
        return ((next_state, value, done, info))

    def identities(self, state, actionValues):
        identities = [(state, actionValues)]
        currentBoard = state.board
        currentAV = actionValues

        currentBoard = np.array([
            currentBoard[2], currentBoard[1], currentBoard[0],
            currentBoard[5], currentBoard[4], currentBoard[3],
            currentBoard[8], currentBoard[7], currentBoard[6]
        ])

        currentAV =  np.array([
            currentAV[2], currentAV[1], currentAV[0],
            currentAV[5], currentAV[4], currentAV[3],
            currentAV[8], currentAV[7], currentAV[6]
        ])

        identities.append((GameState(currentBoard, state.playerTurn), currentAV))
        return identities


class GameState:
    def __init__(self, board, playerTurn):
        self.board = board
        self.pieces = {'-1': 'X', '0': '-', '1': 'O'}
        self.winners = [[0,1,2], [3,4,5], [6,7,8], [0,3,6], [1,4,7], [2,5,8], [0,4,8], [2,4,6]]
        self.playerTurn = playerTurn
        self.binary = self._binary()
        self.id = self._convertStateToId()
        self.allowedActions = self._allowedActions()
        self.isEndGame = self._checkForEndGame()
        self.value = self._getValue()
        self.score = self._getScore()

    def _allowedActions(self):
        return np.where(self.board == 0)[0]

    def _binary(self):
        currentplayer_position = np.zeros(len(self.board), dtype=np.int)
        currentplayer_position[self.board == self.playerTurn] = 1

        other_position = np.zeros(len(self.board), dtype=np.int)
        other_position[self.board == -self.playerTurn] = 1

        position = np.append(currentplayer_position, other_position)
        return (position)

    def _convertStateToId(self):
        player1_position = np.zeros(len(self.board), dtype=np.int)
        player1_position[self.board == 1] = 1

        other_position = np.zeros(len(self.board), dtype=np.int)
        other_position[self.board == -1] = 1

        position = np.append(player1_position, other_position)

        return "".join(map(str, position))

    def _checkForEndGame(self):
        return self._check()

    def _check(self):
        for i in range(len(self.winners)):
            if self.board[self.winners[i][0]]==self.board[self.winners[i][1]]==self.board[self.winners[i][2]]==1:
                return 1
            if self.board[self.winners[i][0]]==self.board[self.winners[i][1]]==self.board[self.winners[i][2]]==-1:
                return 2
        return 3 if np.count_nonzero(self.board) == 9 else 0


    def _getValue(self):
        result = self._check()
        if result == 1:
            return (-1, -1, 1)
        elif result == 2:
            return (-1, 1, -1)
        elif result == 3:
            return (0, 1, 1)
        return (0, 0, 0)

    def _getScore(self):
        tmp = self.value
        return (tmp[1], tmp[2])

    def takeAction(self, action):
        newBoard = np.array(self.board)
        newBoard[action] = self.playerTurn
        newState = GameState(newBoard, -self.playerTurn)

        value = 0
        done = 0

        if newState.isEndGame:
            value = newState.value[0]
            done = 1

        return (newState, value, done)

    def render(self, logger):
        for r in range(3):
            logger.info([self.pieces[str(x)] for x in self.board[3*r : (3*r+3)]])
        logger.info('--------------')