|
import os |
|
import sys |
|
import gym |
|
import random |
|
import numpy as np |
|
import pickle |
|
|
|
from collections import deque |
|
|
|
from keras.layers import Dense |
|
from keras.optimizers import Adam |
|
from keras.models import Sequential |
|
from matplotlib import pyplot as plt |
|
|
|
WEIGHTS_PATH = './trained_models/CartPole-v0/1/' |
|
BUFFER_PATH = './buffers/CartPole-v0/1/' |
|
|
|
class Agent: |
|
def __init__(self, algorithm, state_size, action_size): |
|
self.algorithm = algorithm |
|
self.render = False |
|
self.state_size = state_size |
|
self.action_size = action_size |
|
self.memory = deque(maxlen=2000) |
|
|
|
if self.algorithm in ['DQN', 'DDQN', 'DQV']: |
|
self.model = self.build_model() |
|
self.model.load_weights(os.path.join(WEIGHTS_PATH, self.algorithm, 'trained_model.h5')) |
|
else: |
|
self.model = self.build_actor() |
|
self.model.load_weights(os.path.join(WEIGHTS_PATH, self.algorithm, 'trained_model.h5')) |
|
|
|
|
|
def build_actor(self): |
|
actor = Sequential() |
|
actor.add(Dense(24, input_dim=self.state_size, activation='relu', kernel_initializer='he_uniform')) |
|
actor.add(Dense(self.action_size, activation='softmax', kernel_initializer='he_uniform')) |
|
|
|
return actor |
|
|
|
def build_model(self): |
|
model = Sequential() |
|
model.add(Dense(24, input_dim=self.state_size, activation='relu', |
|
kernel_initializer='he_uniform')) |
|
model.add(Dense(24, activation='relu', |
|
kernel_initializer='he_uniform')) |
|
model.add(Dense(self.action_size, activation='linear', |
|
kernel_initializer='he_uniform')) |
|
|
|
return model |
|
|
|
def get_action(self, state): |
|
if self.algorithm == 'A2C': |
|
policy = self.model.predict(state, batch_size=1).flatten() |
|
|
|
return np.random.choice(self.action_size, 1, p=policy)[0] |
|
|
|
else: |
|
q_value = self.model.predict(state) |
|
return np.argmax(q_value[0]) |
|
|
|
def append_sample(self, state, action, reward, next_state, done): |
|
self.memory.append((state, action, reward, next_state, done)) |
|
|
|
def save_buffer(self): |
|
if not os.path.exists(os.path.join(BUFFER_PATH, self.algorithm)): |
|
os.makedirs(os.path.join(BUFFER_PATH, self.algorithm)) |
|
|
|
with open(os.path.join(BUFFER_PATH, self.algorithm, 'memory_buffer.p'), 'wb') as filehandler: |
|
pickle.dump(self.memory, filehandler) |
|
|
|
def fill_buffer(algorithm): |
|
max_len = 10000 |
|
results = [] |
|
game = 'CartPole-v0' |
|
|
|
env = gym.make(game) |
|
|
|
state_size = env.observation_space.shape[0] |
|
action_size = env.action_space.n |
|
|
|
agent = Agent(algorithm, state_size, action_size) |
|
|
|
while True: |
|
done = False |
|
score = 0 |
|
state = env.reset() |
|
state = np.reshape(state, [1, state_size]) |
|
|
|
while not done: |
|
action = agent.get_action(state) |
|
next_state, reward, done, info = env.step(action) |
|
next_state = np.reshape(next_state, [1, state_size]) |
|
|
|
agent.append_sample(state, action, reward, next_state, done) |
|
|
|
score += reward |
|
state = next_state |
|
|
|
if len(agent.memory) > max_len: |
|
agent.save_buffer() |
|
break |
|
|
|
fill_buffer('DQN') |
|
|