DEV Community

Nattaphak
Nattaphak

Posted on

สอน AI เล่น the Snake Game ด้วย Reinforcement Learning โดยการใช้ Python

ในปัจจุบันได้มีการฝึก AI ให้สามารถเล่นเกมต่างๆ ไม่ว่าจะเป็น OpenAI ที่ได้มีการเทรน AI ในการเล่นเกม MOBA อย่าง Dota2 และยังประสบผลสำเร็จอย่างมาก และโดยส่วนใหญ่ก็ได้ใช้วิธีการอย่าง Reinforcement Learning ในการเทรน AI นั้นเอง

เพราะฉะนั้นในบทความนี้ผมจะทำการเทรน AI ในการเล่นเกมที่ง่ายๆอย่าง the Snake Game ด้วยวิธีการ Reinforcement Learning ใน Python ซึ่งผมจะใช้เวลาเทรนทั้งหมด 120 เกม


Reinforcement Learning คืออะไร ?

ก่อนที่จะไปวิธีการเขียนโค้ดเราก็ต้องมารู้จัก Reinforcement Learning กันก่อน ซึ่งจริงๆแล้วมันก็คือวิธีการหนึ่งของการทำ Machine learning นั่นเอง โดยวิธีการนี้จะเป็นการให้ Agent หรือก็คือ AI ที่ได้ลองผิดลองถูกด้วยตัวเองโดยใช้รางวัลและการลงโทษเพื่อสอนพฤติกรรมเชิงบวกและเชิงลบ เช่นเกมงูที่มีรางวัลคือผลไม้และการลงโทษคือการ GameOver

Image description

เมื่อได้รู้จัก Reinforcement Learning คร่าวๆแล้วต่อมาก็จะไปขั้นตอนการเขียนโค้ดกันเลย


ขั้นตอนที่ 1 Setup

ในบทความนี้เราจะทำเกมงูขึ้นด้วย Python ดังนั้นจึงต้องทำการติดตั้ง Packages 4 ตัวนี้

  1. NumPy: python library ที่ใช้สำหรับการทำงานกับอาร์เรย์
  2. Matplotlib: ช่วยพล็อตและสร้างการแสดงภาพข้อมูล
  3. Pytorch: เครื่องมือที่ถูกใช้อย่างแพร่หลายในการสร้างและฝึกอบรมโมเดลปัญญาประดิษฐ์ (AI) และเครือข่ายประสาทเทียม
  4. Pygame: โมดูล Python ที่ออกแบบมาสำหรับวิดีโอเกม

ขั้นตอนที่ 2 สร้าง the Snake Game

โค้ดทั้งหมดในการสร้าง the Snake Game ด้วย Python

import pygame
import random
from enum import Enum
from collections import namedtuple
import numpy as np

pygame.init()
font = pygame.font.Font('arial.ttf', 25)
#font = pygame.font.SysFont('arial', 25)

class Direction(Enum):
    RIGHT = 1
    LEFT = 2
    UP = 3
    DOWN = 4

Point = namedtuple('Point', 'x, y')

# rgb colors
WHITE = (255, 255, 255)
RED = (200,0,0)
BLUE1 = (0, 0, 255)
BLUE2 = (0, 100, 255)
BLACK = (0,0,0)

BLOCK_SIZE = 20
SPEED = 40

class SnakeGameAI:

    def __init__(self, w=640, h=480):
        self.w = w
        self.h = h
        # init display
        self.display = pygame.display.set_mode((self.w, self.h))
        pygame.display.set_caption('Snake')
        self.clock = pygame.time.Clock()
        self.reset()


    def reset(self):
        # init game state
        self.direction = Direction.RIGHT

        self.head = Point(self.w/2, self.h/2)
        self.snake = [self.head,
                      Point(self.head.x-BLOCK_SIZE, self.head.y),
                      Point(self.head.x-(2*BLOCK_SIZE), self.head.y)]

        self.score = 0
        self.food = None
        self._place_food()
        self.frame_iteration = 0


    def _place_food(self):
        x = random.randint(0, (self.w-BLOCK_SIZE )//BLOCK_SIZE )*BLOCK_SIZE
        y = random.randint(0, (self.h-BLOCK_SIZE )//BLOCK_SIZE )*BLOCK_SIZE
        self.food = Point(x, y)
        if self.food in self.snake:
            self._place_food()


    def play_step(self, action):
        self.frame_iteration += 1
        # 1. collect user input
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                quit()

        # 2. move
        self._move(action) # update the head
        self.snake.insert(0, self.head)

        # 3. check if game over
        reward = 0
        game_over = False
        if self.is_collision() or self.frame_iteration > 100*len(self.snake):
            game_over = True
            reward = -10
            return reward, game_over, self.score

        # 4. place new food or just move
        if self.head == self.food:
            self.score += 1
            reward = 10
            self._place_food()
        else:
            self.snake.pop()

        # 5. update ui and clock
        self._update_ui()
        self.clock.tick(SPEED)
        # 6. return game over and score
        return reward, game_over, self.score


    def is_collision(self, pt=None):
        if pt is None:
            pt = self.head
        # hits boundary
        if pt.x > self.w - BLOCK_SIZE or pt.x < 0 or pt.y > self.h - BLOCK_SIZE or pt.y < 0:
            return True
        # hits itself
        if pt in self.snake[1:]:
            return True

        return False


    def _update_ui(self):
        self.display.fill(BLACK)

        for pt in self.snake:
            pygame.draw.rect(self.display, BLUE1, pygame.Rect(pt.x, pt.y, BLOCK_SIZE, BLOCK_SIZE))
            pygame.draw.rect(self.display, BLUE2, pygame.Rect(pt.x+4, pt.y+4, 12, 12))

        pygame.draw.rect(self.display, RED, pygame.Rect(self.food.x, self.food.y, BLOCK_SIZE, BLOCK_SIZE))

        text = font.render("Score: " + str(self.score), True, WHITE)
        self.display.blit(text, [0, 0])
        pygame.display.flip()


    def _move(self, action):
        # [straight, right, left]

        clock_wise = [Direction.RIGHT, Direction.DOWN, Direction.LEFT, Direction.UP]
        idx = clock_wise.index(self.direction)

        if np.array_equal(action, [1, 0, 0]):
            new_dir = clock_wise[idx] # no change
        elif np.array_equal(action, [0, 1, 0]):
            next_idx = (idx + 1) % 4
            new_dir = clock_wise[next_idx] # right turn r -> d -> l -> u
        else: # [0, 0, 1]
            next_idx = (idx - 1) % 4
            new_dir = clock_wise[next_idx] # left turn r -> u -> l -> d

        self.direction = new_dir

        x = self.head.x
        y = self.head.y
        if self.direction == Direction.RIGHT:
            x += BLOCK_SIZE
        elif self.direction == Direction.LEFT:
            x -= BLOCK_SIZE
        elif self.direction == Direction.DOWN:
            y += BLOCK_SIZE
        elif self.direction == Direction.UP:
            y -= BLOCK_SIZE

        self.head = Point(x, y)
Enter fullscreen mode Exit fullscreen mode

ขั้นตอนที่ 3 ทำการสร้างและเทรน Neural Network

Import Pytorch

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import os
Enter fullscreen mode Exit fullscreen mode

ทำการสร้าง Neural Network

class Linear_QNet(nn.Module):
    def __init__(self, input_size, hidden_size, output_size): #building the input, hidden and output layer
        super().__init__()
        self.linear1 = nn.Linear(input_size, hidden_size)
        self.linear2 = nn.Linear(hidden_size, output_size)

    def forward(self, x): #this is a feed-forward neural net
        x = F.relu(self.linear1(x))
        x = self.linear2(x)
        return x

    def save(self, file_name='model.pth'): #saving the model
        model_folder_path = './model'
        if not os.path.exists(model_folder_path):
            os.makedirs(model_folder_path)

        file_name = os.path.join(model_folder_path, file_name)
        torch.save(self.state_dict(), file_name)
Enter fullscreen mode Exit fullscreen mode

เทรนและเพิ่มประสิทธิภาพให้กับตัว Network
โค้ดส่วนนี้จะนำสมการ Deep Q-learning อย่างง่ายมาใช้

class QTrainer:
    def __init__(self, model, lr, gamma):
        self.lr = lr
        self.gamma = gamma
        self.model = model
        self.optimizer = optim.Adam(model.parameters(), lr=self.lr)
        self.criterion = nn.MSELoss()

    def train_step(self, state, action, reward, next_state, done):
        state = torch.tensor(state, dtype=torch.float)
        next_state = torch.tensor(next_state, dtype=torch.float)
        action = torch.tensor(action, dtype=torch.long)
        reward = torch.tensor(reward, dtype=torch.float)
        # (n, x)

        if len(state.shape) == 1:
            # (1, x)
            state = torch.unsqueeze(state, 0)
            next_state = torch.unsqueeze(next_state, 0)
            action = torch.unsqueeze(action, 0)
            reward = torch.unsqueeze(reward, 0)
            done = (done, )

        # 1: predicted Q values with current state
        pred = self.model(state)

        target = pred.clone()
        for idx in range(len(done)):
            Q_new = reward[idx]
            if not done[idx]:
                Q_new = reward[idx] + self.gamma * torch.max(self.model(next_state[idx]))

            target[idx][torch.argmax(action[idx]).item()] = Q_new

        # 2: Q_new = r + y * max(next_predicted Q value) -> only do this if not done
        # pred.clone()
        # preds[argmax(action)] = Q_new
        self.optimizer.zero_grad()
        loss = self.criterion(target, pred)
        loss.backward()

        self.optimizer.step()
Enter fullscreen mode Exit fullscreen mode

สร้างโปรแกรมที่เก็บข้อมูลการเล่นของ Agent

import matplotlib.pyplot as plt
from IPython import display

plt.ion()

def plot(scores, mean_scores):
    display.clear_output(wait=True)
    display.display(plt.gcf())
    plt.clf()
    plt.title('Training...')
    plt.xlabel('Number of Games')
    plt.ylabel('Score')
    plt.plot(scores)
    plt.plot(mean_scores)
    plt.ylim(ymin=0)
    plt.text(len(scores)-1, scores[-1], str(scores[-1]))
    plt.text(len(mean_scores)-1, mean_scores[-1], str(mean_scores[-1]))
    plt.show(block=False)
    plt.pause(.1)

Enter fullscreen mode Exit fullscreen mode

ขั้นตอนที่ 4 ทำการสร้างตัว Agent ขึ้นมา

Import และทำการสร้างพารามิเตอร์

import torch
import random
import numpy as np
from collections import deque
from game import SnakeGameAI, Direction, Point
from model import Linear_QNet, QTrainer
from helper import plot

MAX_MEMORY = 100_000
BATCH_SIZE = 1000
LR = 0.001
Enter fullscreen mode Exit fullscreen mode

Initializing: การตั้งค่าที่จะมีความสำคัญในภายหลัง เช่น จำนวนเกม, discount rate, หน่วยความจำ และพารามิเตอร์ของ Neural Network

def __init__(self):
        self.n_games = 0
        self.epsilon = 0 # randomness
        self.gamma = 0.9 # discount rate
        self.memory = deque(maxlen=MAX_MEMORY) # popleft()
        self.model = Linear_QNet(11, 256, 3)
        self.trainer = QTrainer(self.model, lr=LR, gamma=self.gamma)
Enter fullscreen mode Exit fullscreen mode

การคำนวณสถานะ: มีจุดอยู่รอบหัวงูเนื่องจากเป็นตัวกำหนดสถานะของงู อาร์เรย์ "state" จะบอก Agent ถึงความน่าจะเป็นของอันตรายหรือรางวัลตามทิศทางที่กำลังมุ่งหน้าไป

def get_state(self, game):
        head = game.snake[0]
        point_l = Point(head.x - 20, head.y)
        point_r = Point(head.x + 20, head.y)
        point_u = Point(head.x, head.y - 20)
        point_d = Point(head.x, head.y + 20)

        dir_l = game.direction == Direction.LEFT
        dir_r = game.direction == Direction.RIGHT
        dir_u = game.direction == Direction.UP
        dir_d = game.direction == Direction.DOWN

        state = [
            # Danger straight
            (dir_r and game.is_collision(point_r)) or 
            (dir_l and game.is_collision(point_l)) or 
            (dir_u and game.is_collision(point_u)) or 
            (dir_d and game.is_collision(point_d)),

            # Danger right
            (dir_u and game.is_collision(point_r)) or 
            (dir_d and game.is_collision(point_l)) or 
            (dir_l and game.is_collision(point_u)) or 
            (dir_r and game.is_collision(point_d)),

            # Danger left
            (dir_d and game.is_collision(point_r)) or 
            (dir_u and game.is_collision(point_l)) or 
            (dir_r and game.is_collision(point_u)) or 
            (dir_l and game.is_collision(point_d)),

            # Move direction
            dir_l,
            dir_r,
            dir_u,
            dir_d,

            # Food location 
            game.food.x < game.head.x,  # food left
            game.food.x > game.head.x,  # food right
            game.food.y < game.head.y,  # food up
            game.food.y > game.head.y  # food down
            ]

        return np.array(state, dtype=int)
Enter fullscreen mode Exit fullscreen mode

การสร้างหน่วยความจำ: สิ่งนี้ทำให้แน่ใจได้ว่าตัว agent จะจดจำการเทรนในระยะยาว (ตลอดระยะเวลาที่โปรแกรมยังคงทำงานอยู่) และในระยะสั้น (ระยะเวลาที่ตัวแทนเล่นเกมเดียว)

def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done)) # popleft if MAX_MEMORY is reached

    def train_long_memory(self):
        if len(self.memory) > BATCH_SIZE:
            mini_sample = random.sample(self.memory, BATCH_SIZE) # list of tuples
        else:
            mini_sample = self.memory

        states, actions, rewards, next_states, dones = zip(*mini_sample)
        self.trainer.train_step(states, actions, rewards, next_states, dones)
        #for state, action, reward, nexrt_state, done in mini_sample:
        #    self.trainer.train_step(state, action, reward, next_state, done)

    def train_short_memory(self, state, action, reward, next_state, done):
        self.trainer.train_step(state, action, reward, next_state, done)
Enter fullscreen mode Exit fullscreen mode

ทำให้ Agent สามารถเล่นเกมได้

def get_action(self, state):
        # random moves: tradeoff exploration / exploitation
        self.epsilon = 80 - self.n_games
        final_move = [0,0,0]
        if random.randint(0, 200) < self.epsilon:
            move = random.randint(0, 2)
            final_move[move] = 1
        else:
            state0 = torch.tensor(state, dtype=torch.float)
            prediction = self.model(state0)
            move = torch.argmax(prediction).item()
            final_move[move] = 1

        return final_move
Enter fullscreen mode Exit fullscreen mode

โค้ดในการเทรน Agent

def train():
    plot_scores = []
    plot_mean_scores = []
    total_score = 0
    record = 0
    agent = Agent()
    game = SnakeGameAI()
    while True:
        # get old state
        state_old = agent.get_state(game)

        # get move
        final_move = agent.get_action(state_old)

        # perform move and get new state
        reward, done, score = game.play_step(final_move)
        state_new = agent.get_state(game)

        # train short memory
        agent.train_short_memory(state_old, final_move, reward, state_new, done)

        # remember
        agent.remember(state_old, final_move, reward, state_new, done)

        if done:
            # train long memory, plot result
            game.reset()
            agent.n_games += 1
            agent.train_long_memory()

            if score > record:
                record = score
                agent.model.save()

            print('Game', agent.n_games, 'Score', score, 'Record:', record)

            plot_scores.append(score)
            total_score += score
            mean_score = total_score / agent.n_games
            plot_mean_scores.append(mean_score)
            plot(plot_scores, plot_mean_scores)


if __name__ == '__main__':
    train()
Enter fullscreen mode Exit fullscreen mode

และนี้คือทั้งหมดในส่วนของการเขียนโค้ดต่อไปก็คือผลลัพทธ์ในการ เทรนทั้งหมด 120 เกม

Image description

สรุปผล

จากการเล่นทั้งหมด 120 เกม Agent สามารถทำคะแนนได้สูงสุดถึง 54 คะแนน ซึ่งจากกราฟจะเห็นได้ว่าการใช้ Reinforcement Learning ยิ่งให้ระยะเวลาหรือจำนวนครั้งในการเล่นที่มากขึ้น คะแนนที่ Agent ทำได้ก็จะสูงขึ้นเรื่อยๆ

ดังนั้นหากเราปล่อยให้ Agent มีจำนวนเกมในการเล่นที่มากขึ้นก็จะสามารถทำคะแนนได้ดีขึ้นอีกนั้นเอง และหากอ่านจนจบแล้วยังไม่ค่อยเข้าใจก็สามารถศึกษาต่อได้จาก https://www.youtube.com/watch?v=L8ypSXwyBds ได้เลย ( ^◡^)

Top comments (0)