|
import os |
|
import math |
|
import time |
|
import gymnasium as gym |
|
import random |
|
import utils |
|
import keras |
|
import numpy as np |
|
|
|
from collections import deque |
|
from matplotlib import pyplot as plt |
|
from sklearn.preprocessing import OneHotEncoder |
|
from sklearn.tree import DecisionTreeClassifier |
|
from sklearn.ensemble import ( |
|
RandomForestClassifier, |
|
ExtraTreesClassifier, |
|
AdaBoostClassifier, |
|
) |
|
from sklearn.neighbors import KNeighborsClassifier |
|
from sklearn.svm import SVC |
|
|
|
from sklearn.exceptions import NotFittedError |
|
from sklearn.ensemble import GradientBoostingClassifier |
|
from tqdm import trange |
|
|
|
|
|
class ReplayBuffer: |
|
""" |
|
Thank you: https://github.com/BY571/ |
|
""" |
|
|
|
def __init__(self, max_size): |
|
self.max_size = max_size |
|
self.buffer = [] |
|
|
|
def add_sample(self, states, actions, rewards): |
|
episode = { |
|
"states": states, |
|
"actions": actions, |
|
"rewards": rewards, |
|
"summed_rewards": sum(rewards), |
|
} |
|
self.buffer.append(episode) |
|
|
|
def sort(self): |
|
|
|
self.buffer = sorted( |
|
self.buffer, key=lambda i: i["summed_rewards"], reverse=True |
|
) |
|
|
|
self.buffer = self.buffer[: self.max_size] |
|
|
|
def get_random_samples(self, batch_size): |
|
self.sort() |
|
|
|
idxs = np.random.randint(0, len(self.buffer), batch_size) |
|
batch = [self.buffer[idx] for idx in idxs] |
|
|
|
return batch |
|
|
|
def get_n_best(self, n): |
|
self.sort() |
|
return self.buffer[:n] |
|
|
|
def __len__(self): |
|
return len(self.buffer) |
|
|
|
|
|
class UpsideDownAgent: |
|
def __init__(self, environment, approximator): |
|
print(environment) |
|
self.environment = gym.make(environment) |
|
self.approximator = approximator |
|
self.state_size = self.environment.observation_space.shape[0] |
|
self.action_size = self.environment.action_space.n |
|
self.warm_up_episodes = 50 |
|
self.render = False |
|
self.memory = ReplayBuffer(700) |
|
self.last_few = 75 |
|
self.batch_size = 32 |
|
self.command_size = 2 |
|
self.desired_return = 1 |
|
self.desired_horizon = 1 |
|
self.horizon_scale = 0.02 |
|
self.return_scale = 0.02 |
|
self.testing_state = 0 |
|
|
|
if approximator == "neural_network": |
|
self.behaviour_function = utils.get_functional_behaviour_function( |
|
self.state_size, self.command_size, self.action_size |
|
) |
|
|
|
elif approximator == "forest": |
|
self.behaviour_function = RandomForestClassifier(200) |
|
|
|
elif approximator == "extra-trees": |
|
self.behaviour_function = ExtraTreesClassifier() |
|
|
|
elif approximator == "knn": |
|
self.behaviour_function = KNeighborsClassifier() |
|
|
|
elif approximator == "adaboost": |
|
self.behaviour_function = AdaBoostClassifier() |
|
|
|
self.testing_rewards = [] |
|
self.warm_up_buffer() |
|
|
|
def warm_up_buffer(self): |
|
|
|
for i in range(self.warm_up_episodes): |
|
|
|
state, _ = self.environment.reset() |
|
states = [] |
|
rewards = [] |
|
actions = [] |
|
done = False |
|
desired_return = 1 |
|
desired_horizon = 1 |
|
|
|
while not done: |
|
state = np.reshape(state, [1, self.state_size]) |
|
states.append(state) |
|
|
|
observation = state |
|
|
|
command = np.asarray( |
|
[ |
|
desired_return * self.return_scale, |
|
desired_horizon * self.horizon_scale, |
|
] |
|
) |
|
|
|
command = np.reshape(command, [1, len(command)]) |
|
|
|
action = self.get_action(observation, command) |
|
actions.append(action) |
|
|
|
next_state, reward, tru, ter, info = self.environment.step(action) |
|
done = tru or ter |
|
next_state = np.reshape(next_state, [1, self.state_size]) |
|
|
|
rewards.append(reward) |
|
|
|
state = next_state |
|
|
|
desired_return -= reward |
|
desired_horizon -= 1 |
|
desired_horizon = np.maximum(desired_horizon, 1) |
|
|
|
self.memory.add_sample(states, actions, rewards) |
|
|
|
def get_action(self, observation, command): |
|
""" |
|
We will sample from the action distribution modeled by the Behavior Function |
|
""" |
|
|
|
if self.approximator == "neural_network": |
|
action_probs = self.behaviour_function.predict([observation, command]) |
|
action = np.random.choice(np.arange(0, self.action_size), p=action_probs[0]) |
|
|
|
return action |
|
|
|
elif self.approximator in ["forest", "extra-trees", "knn", "svm", "adaboost"]: |
|
try: |
|
input_state = np.concatenate((observation, command), axis=1) |
|
action = self.behaviour_function.predict(input_state) |
|
|
|
if np.random.rand() > 0.8: |
|
return int(not np.argmax(action)) |
|
|
|
return np.argmax(action) |
|
|
|
except NotFittedError as e: |
|
return random.randint(0, 1) |
|
|
|
def get_greedy_action(self, observation, command): |
|
|
|
if self.approximator == "neural_network": |
|
action_probs = self.behaviour_function.predict([observation, command]) |
|
action = np.argmax(action_probs) |
|
|
|
return action |
|
|
|
else: |
|
input_state = np.concatenate((observation, command), axis=1) |
|
action = self.behaviour_function.predict(input_state) |
|
|
|
self.testing_state += 1 |
|
|
|
feature_importances = {} |
|
|
|
for t in self.behaviour_function.estimators_: |
|
branch = t.decision_path(input_state).todense() |
|
branch = np.array(branch, dtype=bool) |
|
imp = t.tree_.impurity[branch[0]] |
|
for f, i in zip(t.tree_.feature[branch[0]][:-1], imp[:-1] - imp[1:]): |
|
feature_importances.setdefault(f, []).append(i) |
|
|
|
summed_importances = [ |
|
sum(feature_importances[0]), |
|
sum(feature_importances[1]), |
|
sum(feature_importances[2]), |
|
sum(feature_importances[3]), |
|
sum(feature_importances[4]), |
|
sum(feature_importances[5]), |
|
] |
|
|
|
x = np.arange(len(summed_importances)) |
|
|
|
plt.figure() |
|
plt.title("Cartpole-v0") |
|
plt.bar(x, summed_importances) |
|
plt.xticks( |
|
x, |
|
[ |
|
"feature-1", |
|
"feature-2", |
|
"feature-3", |
|
"feature-4", |
|
r"$d_t^{r}$", |
|
r"$d_t^{h}$", |
|
], |
|
) |
|
plt.savefig("importances_state_" + str(self.testing_state) + ".jpg") |
|
|
|
return np.argmax(action) |
|
|
|
def train_behaviour_function(self): |
|
|
|
random_episodes = self.memory.get_random_samples(self.batch_size) |
|
|
|
training_observations = np.zeros((self.batch_size, self.state_size)) |
|
training_commands = np.zeros((self.batch_size, 2)) |
|
|
|
y = [] |
|
|
|
for idx, episode in enumerate(random_episodes): |
|
T = len(episode["states"]) |
|
t1 = np.random.randint(0, T - 1) |
|
t2 = np.random.randint(t1 + 1, T) |
|
|
|
state = episode["states"][t1] |
|
desired_return = sum(episode["rewards"][t1:t2]) |
|
desired_horizon = t2 - t1 |
|
|
|
target = episode["actions"][t1] |
|
|
|
training_observations[idx] = state[0] |
|
training_commands[idx] = np.asarray( |
|
[ |
|
desired_return * self.return_scale, |
|
desired_horizon * self.horizon_scale, |
|
] |
|
) |
|
y.append(target) |
|
|
|
_y = keras.utils.to_categorical(y) |
|
|
|
if self.approximator == "neural_network": |
|
self.behaviour_function.fit( |
|
[training_observations, training_commands], _y, verbose=0 |
|
) |
|
|
|
elif self.approximator in ["forest", "extra-trees", "adaboost"]: |
|
input_classifier = np.concatenate( |
|
(training_observations, training_commands), axis=1 |
|
) |
|
|
|
self.behaviour_function.fit(input_classifier, _y) |
|
|
|
def sample_exploratory_commands(self): |
|
best_episodes = self.memory.get_n_best(self.last_few) |
|
exploratory_desired_horizon = np.mean([len(i["states"]) for i in best_episodes]) |
|
|
|
returns = [i["summed_rewards"] for i in best_episodes] |
|
exploratory_desired_returns = np.random.uniform( |
|
np.mean(returns), np.mean(returns) + np.std(returns) |
|
) |
|
|
|
return [exploratory_desired_returns, exploratory_desired_horizon] |
|
|
|
def generate_episode( |
|
self, environment, e, desired_return, desired_horizon, testing |
|
): |
|
|
|
env = gym.make(environment) |
|
tot_rewards = [] |
|
done = False |
|
|
|
score = 0 |
|
|
|
state, _ = env.reset() |
|
|
|
scores = [] |
|
states = [] |
|
actions = [] |
|
rewards = [] |
|
|
|
while not done: |
|
state = np.reshape(state, [1, self.state_size]) |
|
states.append(state) |
|
|
|
observation = state |
|
|
|
command = np.asarray( |
|
[ |
|
desired_return * self.return_scale, |
|
desired_horizon * self.horizon_scale, |
|
] |
|
) |
|
command = np.reshape(command, [1, len(command)]) |
|
|
|
if not testing: |
|
action = self.get_action(observation, command) |
|
actions.append(action) |
|
else: |
|
action = self.get_greedy_action(observation, command) |
|
|
|
|
|
next_state, reward, tru, ter, info = env.step(action) |
|
done = tru or ter |
|
next_state = np.reshape(next_state, [1, self.state_size]) |
|
|
|
rewards.append(reward) |
|
score += reward |
|
|
|
state = next_state |
|
|
|
desired_return -= reward |
|
desired_horizon -= 1 |
|
desired_horizon = np.maximum(desired_horizon, 1) |
|
|
|
self.memory.add_sample(states, actions, rewards) |
|
|
|
self.testing_rewards.append(score) |
|
|
|
if testing: |
|
print("Querying the model ...") |
|
print("Testing score: {}".format(score)) |
|
|
|
return score |
|
|
|
|
|
def run_experiment(): |
|
|
|
import argparse |
|
|
|
parser = argparse.ArgumentParser() |
|
|
|
parser.add_argument("--approximator", type=str, default="forest") |
|
parser.add_argument("--environment", type=str, default="CartPole-v0") |
|
parser.add_argument("--seed", type=int, default=42) |
|
|
|
args = parser.parse_args() |
|
|
|
approximator = args.approximator |
|
environment = args.environment |
|
seed = args.seed |
|
print(args) |
|
|
|
episodes = 500 |
|
returns = [] |
|
|
|
agent = UpsideDownAgent(environment, approximator) |
|
epi_bar = trange(episodes) |
|
for e in epi_bar: |
|
for i in range(100): |
|
agent.train_behaviour_function() |
|
|
|
for i in range(15): |
|
tmp_r = [] |
|
exploratory_commands = ( |
|
agent.sample_exploratory_commands() |
|
) |
|
desired_return = exploratory_commands[0] |
|
desired_horizon = exploratory_commands[1] |
|
r = agent.generate_episode( |
|
environment, e, desired_return, desired_horizon, False |
|
) |
|
tmp_r.append(r) |
|
|
|
epi_bar.set_postfix( |
|
{ |
|
"mean": np.mean(tmp_r), |
|
"std": np.std(tmp_r), |
|
} |
|
) |
|
|
|
returns.append(np.mean(tmp_r)) |
|
|
|
exploratory_commands = agent.sample_exploratory_commands() |
|
|
|
agent.generate_episode(environment, 1, 200, 200, True) |
|
|
|
utils.save_results(environment, approximator, seed, returns) |
|
|
|
if approximator == "neural_network": |
|
utils.save_trained_model(environment, seed, agent.behaviour_function) |
|
|
|
|
|
if __name__ == "__main__": |
|
import warnings |
|
|
|
warnings.simplefilter("ignore", DeprecationWarning) |
|
run_experiment() |
|
|