Spaces:
Running
on
Zero
Running
on
Zero
####################################################################### | |
# Name: env.py | |
# | |
# - Reads and processes training and test maps (E.g. DungeonMaps) | |
# - Processes rewards, new frontiers given action | |
# - Updates a graph representation of environment for input into network | |
####################################################################### | |
import sys | |
import cv2 | |
from matplotlib.colors import LogNorm, PowerNorm | |
if sys.modules['TRAINING']: | |
from parameter import * | |
else: | |
from test_parameter import * | |
import copy | |
import pandas as pd | |
import rasterio | |
from skimage import io | |
import matplotlib.pyplot as plt | |
import os | |
from skimage.measure import block_reduce | |
from sensor import * | |
from graph_generator import * | |
from node import * | |
from scipy.ndimage import label, find_objects | |
import matplotlib.image as mpimg | |
from matplotlib.colors import Normalize | |
# import matplotlib | |
# matplotlib.use("Agg") # <-- key line to avoid tkinter dependency | |
class Env(): | |
def __init__(self, map_index, n_agent, k_size=20, plot=False, test=False, mask_index=None): | |
self.n_agent = n_agent | |
self.test = test | |
self.map_dir = GRIDMAP_SET_DIR | |
# Import environment gridmap | |
self.map_list = os.listdir(self.map_dir) | |
self.map_list.sort(reverse=True) | |
# NEW: Import segmentation utility map | |
self.seg_dir = MASK_SET_DIR | |
self.segmentation_mask, self.target_positions, self.target_found_idxs = None, [], [] | |
self.segmentation_mask_list = os.listdir(self.seg_dir) | |
self.segmentation_mask_list.sort(reverse=True) | |
# Import target maps (if relevant) | |
if TARGETS_SET_DIR != "": | |
self.targets_map_list = os.listdir(TARGETS_SET_DIR) | |
self.targets_map_list.sort(reverse=True) | |
# # NEW: Find common files in both directories | |
# if TARGETS_SET_DIR != "": | |
# common_files = [file for file in self.map_list if file in self.segmentation_mask_list and file in self.targets_map_list] | |
# else: | |
# common_files = [file for file in self.map_list if file in self.segmentation_mask_list] | |
self.map_index = map_index % len(self.map_list) | |
if mask_index is not None: | |
self.mask_index = mask_index % len(self.segmentation_mask_list) | |
else: | |
self.mask_index = map_index % len(self.segmentation_mask_list) | |
# self.common_map_file = common_files[self.map_index] | |
# print("self.common_map_file: ", self.common_map_file) | |
# Import ground truth and segmentation mask | |
self.ground_truth, self.map_start_position = self.import_ground_truth( | |
os.path.join(self.map_dir, self.map_list[self.map_index]))# self.common_map_file)) | |
self.ground_truth_size = np.shape(self.ground_truth) # (480, 640) | |
self.robot_belief = np.ones(self.ground_truth_size) * 127 # unexplored 127 | |
self.downsampled_belief = None | |
self.old_robot_belief = copy.deepcopy(self.robot_belief) | |
self.coverage_belief = np.ones(self.ground_truth_size) * 127 # unexplored 127 | |
# Import segmentation mask | |
mask_filename = self.segmentation_mask_list[self.mask_index] | |
self.segmentation_mask = self.import_segmentation_mask( | |
os.path.join(self.seg_dir, mask_filename))# self.common_map_file)) | |
# print("mask_filename: ", mask_filename) | |
# Overwrite target positions if directory specified | |
self.gt_segmentation_mask = None | |
if self.test and TARGETS_SET_DIR != "": | |
self.gt_segmentation_mask = self.import_segmentation_mask( | |
os.path.join(TARGETS_SET_DIR, self.map_list[self.map_index])) # UNUSED - self.common_map_file)) | |
# print("target_positions: ", self.target_positions) | |
# print("np.unique(self.segmentation_mask): ", np.unique(self.segmentation_mask)) | |
self.segmentation_info_mask = None | |
self.gt_segmentation_info_mask = None | |
self.segmentation_info_mask_unnormalized = None | |
self.filtered_seg_info_mask = None | |
self.num_targets_found = 0 | |
self.num_new_targets_found = 0 | |
# # Link score masks to raw image files | |
# csv_file = pd.read_csv(RAW_IMG_PATH_DICT, header=None) | |
# img_score_paths = csv_file.iloc[:,2].tolist() | |
# raw_img_paths = csv_file.iloc[:,0].tolist() | |
# self.score_to_img_dict = {os.path.basename(img_score_path): raw_img_path for img_score_path, raw_img_path in zip(img_score_paths, raw_img_paths)} | |
self.resolution = 4 | |
self.sensor_range = SENSOR_RANGE | |
self.explored_rate = 0 | |
self.targets_found_rate = 0 | |
self.info_gain = 0 | |
self.total_info = 0 | |
self.graph_generator = Graph_generator(map_size=self.ground_truth_size, sensor_range=self.sensor_range, k_size=k_size, plot=plot) | |
self.node_coords, self.graph, self.node_utility, self.guidepost = None, None, None, None | |
self.frontiers = None | |
self.start_positions = [] | |
self.begin(self.map_start_position) | |
self.plot = plot | |
self.frame_files = [] | |
def find_index_from_coords(self, position): | |
index = np.argmin(np.linalg.norm(self.node_coords - position, axis=1)) | |
return index | |
def begin(self, start_position): | |
# self.robot_belief = self.update_robot_belief(robot_position, self.sensor_range, self.robot_belief, | |
# self.ground_truth) | |
self.robot_belief = self.ground_truth | |
self.downsampled_belief = block_reduce(self.robot_belief.copy(), block_size=(self.resolution, self.resolution), | |
func=np.min) | |
self.frontiers = self.find_frontier() | |
self.old_robot_belief = copy.deepcopy(self.robot_belief) | |
self.node_coords, self.graph, self.node_utility, self.guidepost = self.graph_generator.generate_graph( | |
self.robot_belief, self.frontiers) | |
# Find non-conflicting start positions | |
if FIX_START_POSITION: | |
coords_res_row = int(self.robot_belief.shape[0]/NUM_COORDS_HEIGHT) | |
coords_res_col = int(self.robot_belief.shape[1]/NUM_COORDS_WIDTH) | |
self.start_positions = [(int(self.robot_belief.shape[1]/2)-coords_res_col/2,int(self.robot_belief.shape[0]/2)-coords_res_row/2) for _ in range(self.n_agent)] # bottom-left corner | |
else: | |
nearby_coords = self.graph_generator.get_neighbors_grid_coords(start_position) | |
itr = 0 | |
for i in range(self.n_agent): | |
if i == 0 or len(nearby_coords) == 0: | |
self.start_positions.append(start_position) | |
else: | |
idx = min(itr, len(nearby_coords)-1) | |
self.start_positions.append(nearby_coords[idx]) | |
itr += 1 | |
for i in range(len(self.start_positions)): | |
self.start_positions[i] = self.node_coords[self.find_index_from_coords(self.start_positions[i])] | |
self.coverage_belief = self.update_robot_belief(self.start_positions[i], self.sensor_range, self.coverage_belief, | |
self.ground_truth) | |
for start_position in self.start_positions: | |
self.graph_generator.route_node.append(start_position) | |
# Info map from ground truth | |
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
self.segmentation_info_mask = np.zeros((len(self.node_coords), 1)) | |
self.gt_segmentation_info_mask = np.zeros((len(self.node_coords), 1)) | |
for i, node_coord in enumerate(self.node_coords): | |
max_x = min(node_coord[0] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
min_x = max(node_coord[0] - int(math.ceil(rng_x)), 0) | |
max_y = min(node_coord[1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
min_y = max(node_coord[1] - int(math.ceil(rng_y)), 0) | |
# if np.any(self.segmentation_mask[min_y:max_y, min_x:max_x] == 255): | |
# self.segmentation_info_mask[i] = 1.0 | |
# else: | |
# self.segmentation_info_mask[i] = 0.0 | |
# self.segmentation_info_mask[i] = np.mean(self.segmentation_mask[min_y:max_y, min_x:max_x]) | |
# self.segmentation_info_mask[i] = np.max(self.segmentation_mask[min_y:max_y, min_x:max_x]) | |
if TARGETS_SET_DIR == "": # If targets combined with segmentation mask | |
exclude = {208} # Exclude target positions | |
else: | |
exclude = {} | |
self.segmentation_info_mask[i] = max(x for x in self.segmentation_mask[min_y:max_y, min_x:max_x].flatten() if x not in exclude) / 100.0 | |
if self.gt_segmentation_mask is not None: | |
self.gt_segmentation_info_mask[i] = max(x for x in self.gt_segmentation_mask[min_y:max_y, min_x:max_x].flatten() if x not in exclude) / 100.0 | |
# print("np.unique(self.segmentation_info_mask): ", np.unique(self.segmentation_info_mask)) | |
self.filtered_seg_info_mask = copy.deepcopy(self.segmentation_info_mask) | |
# In case targets found at beginning... | |
done, num_targets_found = self.check_done() | |
self.num_targets_found = num_targets_found | |
def multi_robot_step(self, next_position_list, dist_list, travel_dist_list): | |
temp_frontiers = copy.deepcopy(self.frontiers) | |
reward_list = [] | |
for dist, robot_position in zip(dist_list, next_position_list): | |
self.graph_generator.route_node.append(robot_position) | |
next_node_index = self.find_index_from_coords(robot_position) | |
self.graph_generator.nodes_list[next_node_index].set_visited() | |
# self.robot_belief = self.update_robot_belief(robot_position, self.sensor_range, self.robot_belief, | |
# self.ground_truth) | |
self.coverage_belief = self.update_robot_belief(robot_position, self.sensor_range, self.coverage_belief, | |
self.ground_truth) | |
self.robot_belief = self.ground_truth | |
self.downsampled_belief = block_reduce(self.robot_belief.copy(), | |
block_size=(self.resolution, self.resolution), | |
func=np.min) | |
frontiers = self.find_frontier() | |
#num_observed_frontiers = self.calculate_num_observed_frontiers(temp_frontiers, frontiers) | |
#temp_frontiers = frontiers | |
num_observed_frontiers = self.node_utility[next_node_index] | |
# individual_reward = num_observed_frontiers / 50 - dist / 64 | |
individual_reward = -dist / 32 # 64 | |
info_gain_reward = 0 | |
robot_position_idx = self.find_index_from_coords(robot_position) | |
# if self.segmentation_info_mask[robot_position_idx] == 1.0 and self.guidepost[robot_position_idx] == 0.0: | |
# # print("High Info (Unvisited)") | |
# info_gain_reward = (HIGH_INFO_REWARD_RATIO * 1.5) | |
# elif self.segmentation_info_mask[robot_position_idx] == 0.0 and self.guidepost[robot_position_idx] == 0.0: | |
# # print("Low Info (Unvisited)") | |
# info_gain_reward = ((1-HIGH_INFO_REWARD_RATIO) * 1.5) | |
info_gain_reward = self.filtered_seg_info_mask[robot_position_idx][0] * 1.5 | |
if self.guidepost[robot_position_idx] == 0.0: | |
info_gain_reward += 0.2 | |
# print("info_gain_reward: ", info_gain_reward) | |
individual_reward += info_gain_reward | |
# print("dist / 64: ", dist / 64) | |
# print("info gain reward: ", info_gain_reward) | |
reward_list.append(individual_reward) | |
self.node_coords, self.graph, self.node_utility, self.guidepost = self.graph_generator.update_graph(self.robot_belief, self.old_robot_belief, frontiers, self.frontiers) | |
self.old_robot_belief = copy.deepcopy(self.robot_belief) | |
self.filtered_seg_info_mask = [info[0] if self.guidepost[i] == 0.0 else 0.0 for i, info in enumerate(self.segmentation_info_mask)] | |
self.filtered_seg_info_mask = np.expand_dims(np.array(self.filtered_seg_info_mask), axis=1) | |
num_observed_frontiers = self.calculate_num_observed_frontiers(self.frontiers, frontiers) | |
self.frontiers = frontiers | |
self.explored_rate = self.evaluate_exploration_rate() | |
done, num_targets_found = self.check_done() | |
self.num_new_targets_found = num_targets_found - self.num_targets_found | |
# #team_reward = sum(reward_list) / len(reward_list) | |
# # team_reward = num_observed_frontiers / 50 | |
# team_reward = self.num_new_targets_found * 5.0 | |
team_reward = 0.0 | |
# # print("target found reward: ", self.num_new_targets_found * 5.0) | |
self.num_targets_found = num_targets_found | |
self.targets_found_rate = self.evaluate_targets_found_rate() | |
self.info_gain, self.total_info = self.evaluate_info_gain() | |
if done: | |
# team_reward += np.sum(self.robot_belief == 255) / sum(travel_dist_list) | |
team_reward += 40 # 20 | |
for i in range(len(reward_list)): | |
reward_list[i] += team_reward | |
return reward_list, done | |
def import_ground_truth(self, map_index): | |
# occupied 1, free 255, unexplored 127 | |
try: | |
# ground_truth = (io.imread(map_index, 1) * 255).astype(int) | |
ground_truth = (io.imread(map_index, 1)).astype(int) | |
if np.all(ground_truth == 0): | |
ground_truth = (io.imread(map_index, 1) * 255).astype(int) | |
except: | |
new_map_index = self.map_dir + '/' + self.map_list[0] | |
ground_truth = (io.imread(new_map_index, 1)).astype(int) | |
print('could not read the map_path ({}), hence skipping it and using ({}).'.format(map_index, new_map_index)) | |
robot_location = np.nonzero(ground_truth == 208) | |
# print("robot_location: ", robot_location) | |
# print("np.array(robot_location)[1, 127]: ", np.array(robot_location)[1, 127]) | |
robot_location = np.array([np.array(robot_location)[1, 127], np.array(robot_location)[0, 127]]) | |
ground_truth = (ground_truth > 150) | |
ground_truth = ground_truth * 254 + 1 | |
return ground_truth, robot_location | |
def import_segmentation_mask(self, map_index): | |
# occupied 1, free 255, unexplored 127 | |
# mask = (io.imread(map_index, 1) * 255).astype(int) # NOTE: Cannot work well with seg mask self-generated | |
mask = cv2.imread(map_index).astype(int) | |
# print("np.unique(segmentation_mask): ", np.unique(mask)) | |
# NOTE: Could contain mutiple start positions | |
# target_position = np.nonzero(mask == 208) | |
# target_positions = self.find_target_locations(mask) | |
# target_position = np.array([np.array(target_position)[1, 127], np.array(target_position)[0, 127]]) | |
return mask #, target_positions | |
def find_target_locations(self, image_array, grey_value=208): | |
# Load the image | |
# image = Image.open(image_path) | |
# image_array = np.array(image) | |
# Identify pixels equal to the grey value | |
grey_pixels = np.where(image_array == grey_value) | |
# Create a binary array where grey pixels are marked as True | |
binary_array = np.zeros_like(image_array, dtype=bool) | |
binary_array[grey_pixels] = True | |
# Label connected components | |
labeled_array, num_features = label(binary_array) | |
# Find objects returns slices for each connected component | |
slices = find_objects(labeled_array) | |
# Calculate the center of each box | |
centers = [] | |
for slice in slices: | |
row_center = (slice[0].start + slice[0].stop - 1) // 2 | |
col_center = (slice[1].start + slice[1].stop - 1) // 2 | |
centers.append((col_center, row_center)) # (y,x) | |
return centers | |
def free_cells(self): | |
index = np.where(self.ground_truth == 255) | |
free = np.asarray([index[1], index[0]]).T | |
return free | |
def update_robot_belief(self, robot_position, sensor_range, robot_belief, ground_truth): | |
robot_belief = sensor_work(robot_position, sensor_range, robot_belief, ground_truth) | |
return robot_belief | |
def check_done(self, robot_id=0): | |
""" All agnets to have explored most of the env map """ | |
done = False | |
# for idx in range(self.n_agent): | |
# if np.sum(self.ground_truth == 255) - np.sum(self.all_robot_belief[idx][idx] == 255) > 40: | |
# done = False | |
# NEW: ADDITIONAL VLM SEARCH CRITERIA | |
num_targets_found = 0 | |
self.target_found_idxs = [] | |
for i, target in enumerate(self.target_positions): | |
if self.coverage_belief[target[1], target[0]] == 255: # 255: | |
num_targets_found += 1 | |
self.target_found_idxs.append(i) | |
# free_cells_mask = self.all_robot_belief[robot_id][robot_id] == 255 | |
# filtered_segmentation_mask = np.where(free_cells_mask, self.segmentation_mask, 0) | |
# targets = self.find_target_locations(filtered_segmentation_mask) | |
# print("num_targets_found: ", num_targets_found) | |
if TERMINATE_ON_TGTS_FOUND and num_targets_found >= len(self.target_positions): | |
done = True | |
if not TERMINATE_ON_TGTS_FOUND and np.sum(self.coverage_belief == 255) / np.sum(self.ground_truth == 255) >= 0.99: | |
done = True | |
return done, num_targets_found | |
def calculate_num_observed_frontiers(self, old_frontiers, frontiers): | |
frontiers_to_check = frontiers[:, 0] + frontiers[:, 1] * 1j | |
pre_frontiers_to_check = old_frontiers[:, 0] + old_frontiers[:, 1] * 1j | |
frontiers_num = np.intersect1d(frontiers_to_check, pre_frontiers_to_check).shape[0] | |
pre_frontiers_num = pre_frontiers_to_check.shape[0] | |
delta_num = pre_frontiers_num - frontiers_num | |
return delta_num | |
def calculate_reward(self, dist, frontiers): | |
reward = 0 | |
reward -= dist / 64 | |
frontiers_to_check = frontiers[:, 0] + frontiers[:, 1] * 1j | |
pre_frontiers_to_check = self.frontiers[:, 0] + self.frontiers[:, 1] * 1j | |
frontiers_num = np.intersect1d(frontiers_to_check, pre_frontiers_to_check).shape[0] | |
pre_frontiers_num = pre_frontiers_to_check.shape[0] | |
delta_num = pre_frontiers_num - frontiers_num | |
reward += delta_num / 50 | |
return reward | |
def evaluate_exploration_rate(self): | |
# rate = np.sum(self.robot_belief == 255) / np.sum(self.ground_truth == 255) | |
rate = np.sum(self.coverage_belief == 255) / np.sum(self.ground_truth == 255) | |
return rate | |
def evaluate_targets_found_rate(self): | |
if len(self.target_positions) == 0: | |
return 0 | |
else: | |
rate = self.num_targets_found / len(self.target_positions) | |
return rate | |
def evaluate_info_gain(self): | |
# print("self.segmentation_mask.shape: ", self.segmentation_mask.shape) | |
# coverage_belief = (self.coverage_belief == 255) | |
# print("coverage_belief.shape: ", coverage_belief.shape) | |
# print("np.unique(coverage_belief): ", np.unique(coverage_belief)) | |
# print("np.count_nonzero(coverage_belief): ", np.count_nonzero(coverage_belief)) | |
# print("np.count_zero(coverage_belief): ", coverage_belief.size - np.count_nonzero(coverage_belief)) | |
# print("self.segmentation_mask[self.coverage_belief == 255].shape: ", self.segmentation_mask[self.coverage_belief == 255].shape) | |
if self.test and TARGETS_SET_DIR != "": | |
info_gained = np.sum(self.gt_segmentation_mask[self.coverage_belief == 255]) / 100.0 | |
total_info = np.sum(self.gt_segmentation_mask) / 100.0 | |
else: | |
info_gained = np.sum(self.segmentation_mask[self.coverage_belief == 255]) / 100.0 | |
total_info = np.sum(self.segmentation_mask) / 100.0 | |
return info_gained, total_info | |
def calculate_new_free_area(self): | |
old_free_area = self.old_robot_belief == 255 | |
current_free_area = self.robot_belief == 255 | |
new_free_area = (current_free_area.astype(np.int) - old_free_area.astype(np.int)) * 255 | |
return new_free_area, np.sum(old_free_area) | |
def calculate_dist_path(self, path): | |
dist = 0 | |
start = path[0] | |
end = path[-1] | |
for index in path: | |
if index == end: | |
break | |
dist += np.linalg.norm(self.node_coords[start] - self.node_coords[index]) | |
start = index | |
return dist | |
def find_frontier(self): | |
y_len = self.downsampled_belief.shape[0] | |
x_len = self.downsampled_belief.shape[1] | |
mapping = self.downsampled_belief.copy() | |
belief = self.downsampled_belief.copy() | |
# 0-1 unknown area map | |
mapping = (mapping == 127) * 1 | |
mapping = np.lib.pad(mapping, ((1, 1), (1, 1)), 'constant', constant_values=0) | |
fro_map = mapping[2:][:, 1:x_len + 1] + mapping[:y_len][:, 1:x_len + 1] + mapping[1:y_len + 1][:, 2:] + \ | |
mapping[1:y_len + 1][:, :x_len] + mapping[:y_len][:, 2:] + mapping[2:][:, :x_len] + mapping[2:][:, | |
2:] + \ | |
mapping[:y_len][:, :x_len] | |
ind_free = np.where(belief.ravel(order='F') == 255)[0] | |
ind_fron_1 = np.where(1 < fro_map.ravel(order='F'))[0] | |
ind_fron_2 = np.where(fro_map.ravel(order='F') < 8)[0] | |
ind_fron = np.intersect1d(ind_fron_1, ind_fron_2) | |
ind_to = np.intersect1d(ind_free, ind_fron) | |
map_x = x_len | |
map_y = y_len | |
x = np.linspace(0, map_x - 1, map_x) | |
y = np.linspace(0, map_y - 1, map_y) | |
t1, t2 = np.meshgrid(x, y) | |
points = np.vstack([t1.T.ravel(), t2.T.ravel()]).T | |
f = points[ind_to] | |
f = f.astype(int) | |
f = f * self.resolution | |
return f | |
def plot_env(self, n, path, step, travel_dist, robots_route, img_path_override=None, sat_path_override=None, msk_name_override=None, sound_id_override=None, colormap_mid_val=None): | |
# # TEMP | |
# if TAXABIND_TTA: | |
# # Save self.segmentation_info_mask as .npy file in gifs_path | |
# side_dim = int(np.sqrt(self.segmentation_info_mask.shape[0])) | |
# mask_viz = self.segmentation_info_mask.squeeze().reshape((side_dim, side_dim)).T | |
# np.save(os.path.join(path, f"seg_mask_step{step}.npy"), mask_viz) | |
plt.switch_backend('agg') | |
# plt.ion() | |
plt.cla() | |
color_list = ["r", "g", "c", "m", "y", "k"] | |
if TARGETS_SET_DIR == "" and not TAXABIND_TTA: | |
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5)) | |
else: | |
fig, ((ax1, ax2, ax3), (ax4, ax5, ax6)) = plt.subplots(2, 3, figsize=(12, 8)) | |
### Fig1: Environment ### | |
msk_name = "" | |
if TAXABIND_TTA: | |
image = mpimg.imread(sat_path_override) | |
msk_name = msk_name_override | |
# else: | |
# plt.imshow(self.robot_belief, cmap='gray') | |
# ax1.imshow(self.coverage_belief, cmap='gray') | |
# image = mpimg.imread("Maps/real_maps/real/4259_masked_img_0.jpg") | |
# msk_name = self.map_list[self.map_index] | |
# raw_img_path = self.score_to_img_dict[msk_name] | |
# if "flair" in raw_img_path: | |
# with rasterio.open(raw_img_path) as src_img: | |
# image = src_img.read([1,2,3]) | |
# image = np.transpose(image, (1, 2, 0)) | |
# else: | |
# image = mpimg.imread(raw_img_path) | |
### Fig1: Environment ### | |
ax = ax1 # if TAXABIND_TTA else ax1 | |
ax.imshow(image) | |
ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) | |
ax.set_title("Image") | |
# if VIZ_GRAPH_EDGES: | |
# for i in range(len(self.graph_generator.x)): | |
# ax.plot(self.graph_generator.x[i], self.graph_generator.y[i], 'tan', zorder=1) | |
# # ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.node_utility, zorder=5) | |
# ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.segmentation_info_mask, zorder=5) | |
# ax.scatter(self.frontiers[:, 0], self.frontiers[:, 1], c='r', s=2, zorder=3) | |
for i, route in enumerate(robots_route): | |
robot_marker_color = color_list[i % len(color_list)] | |
xPoints = route[0] | |
yPoints = route[1] | |
ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) | |
# ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) | |
ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") | |
ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) | |
# Sensor range | |
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) | |
max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) | |
ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) | |
ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) | |
ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) | |
ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) | |
### Fig2: Graph ### | |
ax = ax4 if TAXABIND_TTA else ax1 | |
# ax.imshow(image) | |
ax.imshow(self.coverage_belief, cmap='gray') | |
ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) | |
ax.set_title("Information Graph") | |
if VIZ_GRAPH_EDGES: | |
for i in range(len(self.graph_generator.x)): | |
ax.plot(self.graph_generator.x[i], self.graph_generator.y[i], 'tan', zorder=1) | |
# ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.node_utility, zorder=5) | |
# ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.segmentation_info_mask, zorder=5) | |
# filtered_seg_info_mask = [info[0] if self.guidepost[i] == 0.0 else 0.0 for i, info in enumerate(self.segmentation_info_mask)] | |
ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.filtered_seg_info_mask, zorder=5, s=8) | |
# ax.scatter(self.frontiers[:, 0], self.frontiers[:, 1], c='r', s=2, zorder=3) | |
for i, route in enumerate(robots_route): | |
robot_marker_color = color_list[i % len(color_list)] | |
xPoints = route[0] | |
yPoints = route[1] | |
ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) | |
# ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) | |
ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") | |
ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) | |
# Sensor range | |
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) | |
max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) | |
ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) | |
ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) | |
ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) | |
ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) | |
# Plot target positions | |
for target in self.target_positions: | |
if self.coverage_belief[target[1], target[0]] == 255: | |
# ax.plot(target[0], target[1], 'go', markersize=8, zorder=99) | |
ax.plot(target[0], target[1], color='g', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99) | |
else: | |
# ax.plot(target[0], target[1], 'ro', markersize=8, zorder=99) | |
ax.plot(target[0], target[1], color='r', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99) | |
# ax.pause(0.1) | |
### Fig3: Segmentation Mask ### | |
ax = ax5 if TAXABIND_TTA else ax2 | |
if TAXABIND_TTA and USE_CLIP_PREDS: | |
side_dim = int(np.sqrt(self.segmentation_info_mask.shape[0])) | |
mask_viz = self.segmentation_info_mask.squeeze().reshape((side_dim, side_dim)).T | |
scale_y = math.ceil(self.ground_truth_size[1] / side_dim) | |
scale_x = math.ceil(self.ground_truth_size[0] / side_dim) | |
upscaled_mask_viz = np.kron(mask_viz, np.ones((scale_y, scale_x))) # Integer scaling only | |
upscaled_mask_viz = upscaled_mask_viz[:self.ground_truth_size[1], :self.ground_truth_size[0]] | |
im = ax.imshow(upscaled_mask_viz, cmap="viridis") | |
ax.axis("off") | |
else: | |
im = ax.imshow(self.segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) # cmap='gray' | |
ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) | |
ax.set_title(f"Predicted Mask (Normalized)") | |
for i, route in enumerate(robots_route): | |
robot_marker_color = color_list[i % len(color_list)] | |
xPoints = route[0] | |
yPoints = route[1] | |
ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) | |
# ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) | |
ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") | |
ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) | |
# Sensor range | |
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) | |
max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) | |
ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) | |
ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) | |
ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) | |
ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) | |
# Add a colorbar | |
cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) | |
cbar.set_label("Normalized Probs") | |
# ax.pause(0.1) | |
### Fig4: Segmentation Mask ### | |
if TAXABIND_TTA and USE_CLIP_PREDS: | |
ax = ax6 | |
side_dim = int(np.sqrt(self.segmentation_info_mask_unnormalized.shape[0])) | |
mask_viz = self.segmentation_info_mask_unnormalized.squeeze().reshape((side_dim, side_dim)).T | |
scale_y = math.ceil(self.ground_truth_size[1] / side_dim) | |
scale_x = math.ceil(self.ground_truth_size[0] / side_dim) | |
upscaled_mask_viz = np.kron(mask_viz, np.ones((scale_y, scale_x))) # Integer scaling only | |
upscaled_mask_viz = upscaled_mask_viz[:self.ground_truth_size[1], :self.ground_truth_size[0]] | |
max_val = 0.15 # TO CHANGE | |
mid_val = colormap_mid_val if colormap_mid_val is not None else 0.05 | |
# mid_val = np.max(self.segmentation_info_mask_unnormalized) | |
norm = CustomNorm(vmin=0.0, vmax=max_val, mid=mid_val, lower_portion=0.8) | |
im = ax.imshow(upscaled_mask_viz, cmap="viridis", norm=norm) # norm=LogNorm(vmin=0.01, vmax=0.1)) | |
# norm = PowerNorm(gamma=0.25, vmin=0.01, vmax=0.2) | |
# norm=LogNorm(vmin=0.01, vmax=0.2) | |
im = ax.imshow(upscaled_mask_viz, cmap="viridis", norm=norm) # norm=LogNorm(vmin=0.01, vmax=0.1)) | |
ax.axis("off") | |
# else: | |
# im = ax.imshow(self.segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) # cmap='gray' | |
# ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) | |
ax.set_title(f"Predicted Mask (Unnormalized)") | |
for i, route in enumerate(robots_route): | |
robot_marker_color = color_list[i % len(color_list)] | |
xPoints = route[0] | |
yPoints = route[1] | |
ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) | |
# ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) | |
ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") | |
ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) | |
# Sensor range | |
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) | |
max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) | |
ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) | |
ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) | |
ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) | |
ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) | |
# Add a colorbar | |
cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) | |
if TAXABIND_TTA and USE_CLIP_PREDS: | |
cbar.set_ticks([0.0, mid_val, max_val]) | |
cbar.set_label("Probs (Scaled by expectation)") | |
# Fog5: GT Mask | |
if TARGETS_SET_DIR != "": | |
ax = ax2 | |
im = ax.imshow(self.gt_segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) # cmap='gray' | |
ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) | |
ax.set_title(f"Ground Truth Mask") | |
for i, route in enumerate(robots_route): | |
robot_marker_color = color_list[i % len(color_list)] | |
xPoints = route[0] | |
yPoints = route[1] | |
ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) | |
# ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) | |
ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") | |
ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) | |
# Sensor range | |
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) | |
max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) | |
ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) | |
ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) | |
ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) | |
ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) | |
# Add a colorbar | |
cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) | |
cbar.set_label("Normalized Mask Value") | |
# ax4.pause(0.1) | |
### Fig6: Segmentation Mask (GT) ### | |
if TAXABIND_TTA: | |
ax = ax3 | |
image = mpimg.imread(img_path_override) | |
ax.imshow(image) | |
ax.set_title("Ground Image") | |
ax.axis("off") | |
sound_id = sound_id_override if sound_id_override is not None else "-1" | |
plt.suptitle('Targets Found: {}/{} Coverage ratio: {:.4g} Travel Dist: {:.4g} Info Gain: {:.4g}% \n ({}) \n (Sound ID: {})'.format(self.num_targets_found, \ | |
len(self.target_positions), self.explored_rate, travel_dist, (100*self.info_gain/self.total_info), msk_name, | |
sound_id)) | |
plt.tight_layout() | |
plt.savefig('{}/{}_{}_samples.png'.format(path, n, step, dpi=100)) | |
# plt.show() | |
frame = '{}/{}_{}_samples.png'.format(path, n, step) | |
self.frame_files.append(frame) | |
plt.close() | |
#################### | |
# ADDED: For app.py | |
#################### | |
def plot_heatmap(self, save_dir, step, travel_dist, robots_route=None): | |
"""Plot only the segmentation heatmap and save it as ``{step}.png`` in | |
``save_dir``. This lightweight helper is meant for asynchronous | |
streaming in the Gradio demo when full `plot_env` is too heavy. | |
Parameters | |
---------- | |
save_dir : str | |
Directory to save the generated PNG file. | |
step : int | |
Current timestep; becomes the filename ``{step}.png``. | |
robots_route : list | None | |
Optional list of routes (xPoints, yPoints) to overlay. | |
Returns | |
------- | |
str | |
Full path to the generated PNG file. | |
""" | |
import os | |
plt.switch_backend('agg') | |
# Do not clear the global figure state in case it interferes with | |
# the current figure. Each call creates its own Figure object that | |
# we close explicitly at the end, so a global clear is unnecessary | |
# and may break concurrent drawing. | |
# plt.cla() | |
color_list = ["r", "g", "c", "m", "y", "k"] | |
fig, ax = plt.subplots(1, 1, figsize=(6, 6)) | |
# Select the mask to visualise | |
# if TAXABIND_TTA and USE_CLIP_PREDS: | |
side_dim = int(np.sqrt(self.segmentation_info_mask.shape[0])) | |
mask_viz = self.segmentation_info_mask.squeeze().reshape((side_dim, side_dim)).T | |
# Properly map image to pixel coordinates and keep limits fixed | |
H, W = self.ground_truth_size # rows (y), cols (x) | |
im = ax.imshow( | |
mask_viz, | |
cmap="viridis", | |
origin="upper", | |
extent=[0, W, H, 0], # x: 0..W, y: H..0 (origin at top-left) | |
interpolation="nearest", # keep cell edges sharp & aligned | |
zorder=0, | |
) | |
ax.set_xlim(0, W) | |
ax.set_ylim(H, 0) | |
ax.set_axis_off() # hide ticks but keep limits | |
# else: | |
# im = ax.imshow(self.segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) | |
# ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) | |
# Optionally overlay robot paths | |
if robots_route is not None: | |
for i, route in enumerate(robots_route): | |
robot_marker_color = color_list[i % len(color_list)] | |
xPoints, yPoints = route | |
ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) | |
ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") | |
ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) | |
# Plot target positions | |
for target in self.target_positions: | |
if self.coverage_belief[target[1], target[0]] == 255: | |
# ax.plot(target[0], target[1], 'go', markersize=8, zorder=99) | |
ax.plot(target[0], target[1], color='g', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99) | |
else: | |
# ax.plot(target[0], target[1], 'ro', markersize=8, zorder=99) | |
ax.plot(target[0], target[1], color='r', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99) | |
# Sensor range | |
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) | |
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) | |
max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) | |
min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) | |
max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) | |
min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) | |
ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) | |
ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) | |
ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) | |
ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) | |
# Color bar | |
cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) | |
cbar.set_label("Normalized Probs") | |
# Change coverage to 1dp | |
plt.suptitle('Targets Found: {}/{} Coverage: {:.1f}% Steps: {}/{}'.format( | |
self.num_targets_found, \ | |
len(self.target_positions), | |
self.explored_rate*100, | |
step+1, | |
NUM_EPS_STEPS), | |
y=0.94, # Closer to plot | |
) | |
plt.tight_layout() | |
os.makedirs(save_dir, exist_ok=True) | |
out_path = os.path.join(save_dir, f"{step}.png") | |
# Save atomically: write to temp file then move into place so the poller never sees a partial file. | |
tmp_path = out_path + ".tmp" | |
fig.savefig(tmp_path, dpi=100, format='png') | |
os.replace(tmp_path, out_path) # atomic on same filesystem | |
plt.close(fig) | |
return out_path | |
#################### | |
class CustomNorm(Normalize): | |
""" | |
A custom normalization that allocates a larger fraction of the colormap | |
to the lower data range [vmin, mid] than to [mid, vmax]. | |
Parameters | |
---------- | |
vmin : float | |
Minimum data value | |
vmax : float | |
Maximum data value | |
mid : float | |
Midpoint in data where we switch from 'lower' to 'upper' mapping | |
lower_portion : float | |
Fraction of the colormap to allocate for [vmin, mid]. | |
For example, 0.8 => 80% of colors for [vmin, mid], 20% for [mid, vmax]. | |
clip : bool | |
Whether to clip data outside [vmin, vmax]. | |
""" | |
def __init__(self, vmin=None, vmax=None, mid=0.05, lower_portion=0.8, clip=False): | |
self.mid = mid | |
self.lower_portion = lower_portion | |
super().__init__(vmin, vmax, clip) | |
def __call__(self, value, clip=None): | |
"""Forward transform: data -> [0..1] color space.""" | |
vmin, vmax, mid = self.vmin, self.vmax, self.mid | |
lp = self.lower_portion | |
value = np.asarray(value, dtype=np.float64) | |
# Piecewise linear mapping: | |
# [vmin..mid] => [0..lp] | |
# [mid..vmax] => [lp..1] | |
normed = np.where( | |
value <= mid, | |
lp * (value - vmin) / (mid - vmin), | |
lp + (value - mid) / (vmax - mid) * (1 - lp) | |
) | |
return np.clip(normed, 0, 1) | |
def inverse(self, value): | |
""" | |
Inverse transform: [0..1] color space -> data space. | |
Matplotlib's colorbar calls this to place ticks correctly. | |
""" | |
vmin, vmax, mid = self.vmin, self.vmax, self.mid | |
lp = self.lower_portion | |
value = np.asarray(value, dtype=np.float64) | |
# For color space [0..lp], invert to [vmin..mid] | |
# For color space [lp..1], invert to [mid..vmax] | |
below = (value <= lp) | |
above = ~below | |
# Allocate array for results | |
data = np.zeros_like(value, dtype=np.float64) | |
# Invert lower segment | |
data[below] = vmin + (value[below] / lp) * (mid - vmin) | |
# Invert upper segment | |
data[above] = mid + ((value[above] - lp) / (1 - lp)) * (vmax - mid) | |
return data | |