In this post I will present a simple version of Policy Gradient method for solving the CartPole game. I have followed this youtube video for fundamental knowledge of Policy Gradient method.

### Introduction

We want to obtain a Policy Function (as shown below) that can output actions probability density for a given observed state. In order to train the network to have higher and higher performance, we should run many rounds of this game (episodes). In each round at moment `t`, we do the following steps and records data (state, action, reward) of each step. When a round (an episode) ends (done == True), we use all historic data of this round to calculate the gradient of the network: With the calculated network gradient from multiple episodes, we update the trainable variables of the Gradient Policy Network: ### Neural network model

Function `create_model()` implements a simple Neural network that will take states as inputs and outputs probabilities of actions. Code sample:

``````    def create_model(self):

self.model = Sequential([Dense(self.h_dim, activation="relu", input_shape=(None, self.s_dim)),
Dense( self.a_dim, activation="softmax")])

self.model.summary()
``````

### Step 1 and 2

Below are the code for observing a state and calculate an action.

``````      # with a state inpute state_current, calculate the action probability density function action_pdf
action_pdf = agent.model(state_current.reshape(1, 4))

# with action_pdf, randomly sampling an action
a = np.random.choice(action_pdf, p=action_pdf)
a = np.argmax(action_pdf == a)
#  use env.step(a) to get an observed state
state_obs, reward, done, info = env.step(a)
``````

### Step 3, 4 and 5

Function `Calc_grad()` will take historic states and rewards data as input, defines a loss function to minimize and obtains gradients of this loss function.

``````    def calc_grad(self, state_input, action_holder, reward_holder):

self.output = self.model(state_input)

indexes = tf.range(0, tf.shape(self.output)) * tf.shape(self.output) + action_holder
self.outputs = tf.gather(tf.reshape(self.output, [-1]), indexes)
self.loss = -tf.reduce_mean(K.log(self.outputs) * reward_holder)

``````

### Step 6

Function `update_gradient()` will take calculated gradients of multiple executed rounds of games as input, then update the trainable variables of `self.model`.

``````    def update_gradient(self, gradient_holders):
``````

### Entire Code

``````import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = "2"

import tensorflow as tf

if tf.__version__.startswith("1."):
raise RuntimeError("Error!! You are using tensorflow-v1")
import numpy as np
import gym

import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation
import tensorflow.keras.backend as K

# a class of Policy Gradient Neural Network

## learn_rate: learning rate
## state_dim: dimension of state space
## action_dim: dimension of action space
## hidden_dim: perceptron number in hidden layer
## discount factor for actualisation of future Reward
def __init__(self, learn_rate, state_dim, action_dim, hidden_dim, discount_ratio=0.99):

self.lr = learn_rate
self.model = None
self.optimizer = None
self.gamma = discount_ratio  # discount ratio for future rewards
self.h_dim = hidden_dim
self.a_dim = action_dim
self.s_dim = state_dim
def create_model(self):

self.model = Sequential([Dense(self.h_dim, activation="relu", input_shape=(None, self.s_dim)),
Dense( self.a_dim, activation="softmax")])

self.model.summary()

self.output = self.model(state_input)

indexes = tf.range(0, tf.shape(self.output)) * tf.shape(self.output) + action_holder
self.outputs = tf.gather(tf.reshape(self.output, [-1]), indexes)
self.loss = -tf.reduce_mean(K.log(self.outputs) * reward_holder)

def get_variable(self):
return self.model.variables

def calc_rewards(self, rewards):
discounted_r = np.zeros_like(rewards)
for t in reversed(range(0, rewards.size)):
return discounted_r

def save_model(self):
self.model.save_weights('cartpole_chkpt/weights.chkpt')

ENV_SEED = 1024  ## Reproducibility of the game
NP_SEED = 1024  ## Reproducibility of numpy random

env = gym.make('CartPole-v0')
env = env.unwrapped    # use unwrapped version, otherwise episodes will terminate after 200 steps
env.seed(ENV_SEED)
np.random.seed(NP_SEED)

### The Discrete space allows a fixed range of non-negative numbers, so in this case valid actions are either 0 or 1.
print(env.action_space)
### The Box space represents an n-dimensional box, so valid observations will be an array of 4 numbers.
print(env.observation_space)
### We can also check the Box’s bounds:
print(env.observation_space.high)
print(env.observation_space.low)

update_step = 5   # number of episodes for updating the network's gradient
limit_train = 1000  # training episode limit for stopping

# action_dim = 2: left or right
# state_dim = 4: x-position, x-velocity, angle, angular-velority
agent = PolicyGradientNet(learn_rate=0.01,  action_dim=2,   state_dim=4,  hidden_dim=8)
agent.create_model()
# total reward
total_reward = []

i = 0  # episode counter
max_step = 0
while i < limit_train:
step  = 0
state_current = env.reset()

episode_reward = 0
history_data = []
while True:

env.render()  ## refreshing of visual result
step += 1
#a = np.random.choice([0, 1], p=[0.5, 0.5])

action_pdf = agent.model(state_current.reshape(1, 4))

action_pdf = np.array(action_pdf)
action_pdf /= action_pdf.sum()

a = np.random.choice(action_pdf, p=action_pdf)
a = np.argmax(action_pdf == a)

## env.step() shall return: observation(object), reward(float), done(boolean),info(dict)
state_obs, reward, done, info = env.step(a)

x, x_prime, theta, theta_prime = state_obs

# my heuristic for reward
rwd = 0.2*np.exp(-1 * abs(x_prime)) #  for limiting x-velocity
rwd += 0.5*(1.0 - abs(theta)/theta_limit)  #  for limiting angle about vertical axis

history_data.append([state_current, a, rwd, state_obs])
episode_reward += r
state_current = state_obs

if done:  # done being True indicates the episode has terminated.

history_data = np.array(history_data)
history_data[:, 2] = agent.calc_rewards(history_data[:, 2])

feed_data = {
"state_input": np.vstack(history_data[:, 0]),
"action_holder": history_data[:, 1],
"reward_holder": history_data[:, 2]
}

if i % update_step == 0 and i != 0:
# Apply the calculated gradients for updating model

total_reward.append(episode_reward)

if max_step < step:
max_step = step
#print("Step: ", step)
break

if i % 50 == 0:
print("Max step is {} until episode {}  ".format(max_step, i))
print("Average reward for episodes {} - {} : {}".format(i, i + 50, np.mean(total_reward[-50:])))
i += 1
``````

Below is a short training video for showing a temporary result (final max_step is more than 10K):