import os import sys import gym import random import utils import numpy as np from collections import deque from keras.layers import Dense from keras.optimizers import Adam from keras.models import Sequential from matplotlib import pyplot as plt class DoubleDQNAgent: def __init__(self, state_size, action_size): self.render = False self.load_model = False self.state_size = state_size self.action_size = action_size self.discount_factor = 0.99 self.learning_rate = 0.001 self.epsilon = 1.0 self.epsilon_decay = 0.999 self.epsilon_min = 0.01 self.batch_size = 64 self.train_start = 1000 self.memory = deque(maxlen=2000) self.model = self.build_model() self.target_model = self.build_model() self.update_target_model() def build_model(self): model = Sequential() model.add(Dense(24, input_dim=self.state_size, activation='relu', kernel_initializer='he_uniform')) model.add(Dense(24, activation='relu', kernel_initializer='he_uniform')) model.add(Dense(self.action_size, activation='linear', kernel_initializer='he_uniform')) model.compile(loss='mse', optimizer=Adam(lr=self.learning_rate)) return model def update_target_model(self): self.target_model.set_weights(self.model.get_weights()) def get_action(self, state): if np.random.rand() <= self.epsilon: return random.randrange(self.action_size) else: q_value = self.model.predict(state) return np.argmax(q_value[0]) def append_sample(self, state, action, reward, next_state, done): self.memory.append((state, action, reward, next_state, done)) if self.epsilon > self.epsilon_min: self.epsilon *= self.epsilon_decay def train_model(self): if len(self.memory) < self.train_start: return batch_size = min(self.batch_size, len(self.memory)) mini_batch = random.sample(self.memory, batch_size) update_input = np.zeros((batch_size, self.state_size)) update_target = np.zeros((batch_size, self.state_size)) action, reward, done = [], [], [] for i in range(batch_size): update_input[i] = mini_batch[i][0] action.append(mini_batch[i][1]) reward.append(mini_batch[i][2]) update_target[i] = mini_batch[i][3] done.append(mini_batch[i][4]) target = self.model.predict(update_input) target_next = self.model.predict(update_target) target_val = self.target_model.predict(update_target) for i in range(self.batch_size): if done[i]: target[i][action[i]] = reward[i] else: a = np.argmax(target_next[i]) target[i][action[i]] = reward[i] + self.discount_factor * ( target_val[i][a]) self.model.fit(update_input, target, batch_size=self.batch_size, epochs=1, verbose=0) def run_DDQN(): episodes = 500 seed = 1 results = [] game = 'CartPole-v0' env = gym.make(game) state_size = env.observation_space.shape[0] action_size = env.action_space.n agent = DoubleDQNAgent(state_size, action_size) for e in range(episodes): done = False score = 0 state = env.reset() state = np.reshape(state, [1, state_size]) while not done: action = agent.get_action(state) next_state, reward, done, info = env.step(action) next_state = np.reshape(next_state, [1, state_size]) agent.append_sample(state, action, reward, next_state, done) agent.train_model() score += reward state = next_state if done: agent.update_target_model() results.append(score) utils.save_trained_model(game, seed, 'DDQN', agent.model) plt.plot(results) plt.show() run_DDQN()