search-tta-demo / env.py
derektan
Ensure perfect alignment between heatmap and robot / sensor footprint
14e5686
#######################################################################
# Name: env.py
#
# - Reads and processes training and test maps (E.g. DungeonMaps)
# - Processes rewards, new frontiers given action
# - Updates a graph representation of environment for input into network
#######################################################################
import sys
import cv2
from matplotlib.colors import LogNorm, PowerNorm
if sys.modules['TRAINING']:
from parameter import *
else:
from test_parameter import *
import copy
import pandas as pd
import rasterio
from skimage import io
import matplotlib.pyplot as plt
import os
from skimage.measure import block_reduce
from sensor import *
from graph_generator import *
from node import *
from scipy.ndimage import label, find_objects
import matplotlib.image as mpimg
from matplotlib.colors import Normalize
# import matplotlib
# matplotlib.use("Agg") # <-- key line to avoid tkinter dependency
class Env():
def __init__(self, map_index, n_agent, k_size=20, plot=False, test=False, mask_index=None):
self.n_agent = n_agent
self.test = test
self.map_dir = GRIDMAP_SET_DIR
# Import environment gridmap
self.map_list = os.listdir(self.map_dir)
self.map_list.sort(reverse=True)
# NEW: Import segmentation utility map
self.seg_dir = MASK_SET_DIR
self.segmentation_mask, self.target_positions, self.target_found_idxs = None, [], []
self.segmentation_mask_list = os.listdir(self.seg_dir)
self.segmentation_mask_list.sort(reverse=True)
# Import target maps (if relevant)
if TARGETS_SET_DIR != "":
self.targets_map_list = os.listdir(TARGETS_SET_DIR)
self.targets_map_list.sort(reverse=True)
# # NEW: Find common files in both directories
# if TARGETS_SET_DIR != "":
# common_files = [file for file in self.map_list if file in self.segmentation_mask_list and file in self.targets_map_list]
# else:
# common_files = [file for file in self.map_list if file in self.segmentation_mask_list]
self.map_index = map_index % len(self.map_list)
if mask_index is not None:
self.mask_index = mask_index % len(self.segmentation_mask_list)
else:
self.mask_index = map_index % len(self.segmentation_mask_list)
# self.common_map_file = common_files[self.map_index]
# print("self.common_map_file: ", self.common_map_file)
# Import ground truth and segmentation mask
self.ground_truth, self.map_start_position = self.import_ground_truth(
os.path.join(self.map_dir, self.map_list[self.map_index]))# self.common_map_file))
self.ground_truth_size = np.shape(self.ground_truth) # (480, 640)
self.robot_belief = np.ones(self.ground_truth_size) * 127 # unexplored 127
self.downsampled_belief = None
self.old_robot_belief = copy.deepcopy(self.robot_belief)
self.coverage_belief = np.ones(self.ground_truth_size) * 127 # unexplored 127
# Import segmentation mask
mask_filename = self.segmentation_mask_list[self.mask_index]
self.segmentation_mask = self.import_segmentation_mask(
os.path.join(self.seg_dir, mask_filename))# self.common_map_file))
# print("mask_filename: ", mask_filename)
# Overwrite target positions if directory specified
self.gt_segmentation_mask = None
if self.test and TARGETS_SET_DIR != "":
self.gt_segmentation_mask = self.import_segmentation_mask(
os.path.join(TARGETS_SET_DIR, self.map_list[self.map_index])) # UNUSED - self.common_map_file))
# print("target_positions: ", self.target_positions)
# print("np.unique(self.segmentation_mask): ", np.unique(self.segmentation_mask))
self.segmentation_info_mask = None
self.gt_segmentation_info_mask = None
self.segmentation_info_mask_unnormalized = None
self.filtered_seg_info_mask = None
self.num_targets_found = 0
self.num_new_targets_found = 0
# # Link score masks to raw image files
# csv_file = pd.read_csv(RAW_IMG_PATH_DICT, header=None)
# img_score_paths = csv_file.iloc[:,2].tolist()
# raw_img_paths = csv_file.iloc[:,0].tolist()
# self.score_to_img_dict = {os.path.basename(img_score_path): raw_img_path for img_score_path, raw_img_path in zip(img_score_paths, raw_img_paths)}
self.resolution = 4
self.sensor_range = SENSOR_RANGE
self.explored_rate = 0
self.targets_found_rate = 0
self.info_gain = 0
self.total_info = 0
self.graph_generator = Graph_generator(map_size=self.ground_truth_size, sensor_range=self.sensor_range, k_size=k_size, plot=plot)
self.node_coords, self.graph, self.node_utility, self.guidepost = None, None, None, None
self.frontiers = None
self.start_positions = []
self.begin(self.map_start_position)
self.plot = plot
self.frame_files = []
def find_index_from_coords(self, position):
index = np.argmin(np.linalg.norm(self.node_coords - position, axis=1))
return index
def begin(self, start_position):
# self.robot_belief = self.update_robot_belief(robot_position, self.sensor_range, self.robot_belief,
# self.ground_truth)
self.robot_belief = self.ground_truth
self.downsampled_belief = block_reduce(self.robot_belief.copy(), block_size=(self.resolution, self.resolution),
func=np.min)
self.frontiers = self.find_frontier()
self.old_robot_belief = copy.deepcopy(self.robot_belief)
self.node_coords, self.graph, self.node_utility, self.guidepost = self.graph_generator.generate_graph(
self.robot_belief, self.frontiers)
# Find non-conflicting start positions
if FIX_START_POSITION:
coords_res_row = int(self.robot_belief.shape[0]/NUM_COORDS_HEIGHT)
coords_res_col = int(self.robot_belief.shape[1]/NUM_COORDS_WIDTH)
self.start_positions = [(int(self.robot_belief.shape[1]/2)-coords_res_col/2,int(self.robot_belief.shape[0]/2)-coords_res_row/2) for _ in range(self.n_agent)] # bottom-left corner
else:
nearby_coords = self.graph_generator.get_neighbors_grid_coords(start_position)
itr = 0
for i in range(self.n_agent):
if i == 0 or len(nearby_coords) == 0:
self.start_positions.append(start_position)
else:
idx = min(itr, len(nearby_coords)-1)
self.start_positions.append(nearby_coords[idx])
itr += 1
for i in range(len(self.start_positions)):
self.start_positions[i] = self.node_coords[self.find_index_from_coords(self.start_positions[i])]
self.coverage_belief = self.update_robot_belief(self.start_positions[i], self.sensor_range, self.coverage_belief,
self.ground_truth)
for start_position in self.start_positions:
self.graph_generator.route_node.append(start_position)
# Info map from ground truth
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH)
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT)
self.segmentation_info_mask = np.zeros((len(self.node_coords), 1))
self.gt_segmentation_info_mask = np.zeros((len(self.node_coords), 1))
for i, node_coord in enumerate(self.node_coords):
max_x = min(node_coord[0] + int(math.ceil(rng_x)), self.ground_truth.shape[1])
min_x = max(node_coord[0] - int(math.ceil(rng_x)), 0)
max_y = min(node_coord[1] + int(math.ceil(rng_y)), self.ground_truth.shape[0])
min_y = max(node_coord[1] - int(math.ceil(rng_y)), 0)
# if np.any(self.segmentation_mask[min_y:max_y, min_x:max_x] == 255):
# self.segmentation_info_mask[i] = 1.0
# else:
# self.segmentation_info_mask[i] = 0.0
# self.segmentation_info_mask[i] = np.mean(self.segmentation_mask[min_y:max_y, min_x:max_x])
# self.segmentation_info_mask[i] = np.max(self.segmentation_mask[min_y:max_y, min_x:max_x])
if TARGETS_SET_DIR == "": # If targets combined with segmentation mask
exclude = {208} # Exclude target positions
else:
exclude = {}
self.segmentation_info_mask[i] = max(x for x in self.segmentation_mask[min_y:max_y, min_x:max_x].flatten() if x not in exclude) / 100.0
if self.gt_segmentation_mask is not None:
self.gt_segmentation_info_mask[i] = max(x for x in self.gt_segmentation_mask[min_y:max_y, min_x:max_x].flatten() if x not in exclude) / 100.0
# print("np.unique(self.segmentation_info_mask): ", np.unique(self.segmentation_info_mask))
self.filtered_seg_info_mask = copy.deepcopy(self.segmentation_info_mask)
# In case targets found at beginning...
done, num_targets_found = self.check_done()
self.num_targets_found = num_targets_found
def multi_robot_step(self, next_position_list, dist_list, travel_dist_list):
temp_frontiers = copy.deepcopy(self.frontiers)
reward_list = []
for dist, robot_position in zip(dist_list, next_position_list):
self.graph_generator.route_node.append(robot_position)
next_node_index = self.find_index_from_coords(robot_position)
self.graph_generator.nodes_list[next_node_index].set_visited()
# self.robot_belief = self.update_robot_belief(robot_position, self.sensor_range, self.robot_belief,
# self.ground_truth)
self.coverage_belief = self.update_robot_belief(robot_position, self.sensor_range, self.coverage_belief,
self.ground_truth)
self.robot_belief = self.ground_truth
self.downsampled_belief = block_reduce(self.robot_belief.copy(),
block_size=(self.resolution, self.resolution),
func=np.min)
frontiers = self.find_frontier()
#num_observed_frontiers = self.calculate_num_observed_frontiers(temp_frontiers, frontiers)
#temp_frontiers = frontiers
num_observed_frontiers = self.node_utility[next_node_index]
# individual_reward = num_observed_frontiers / 50 - dist / 64
individual_reward = -dist / 32 # 64
info_gain_reward = 0
robot_position_idx = self.find_index_from_coords(robot_position)
# if self.segmentation_info_mask[robot_position_idx] == 1.0 and self.guidepost[robot_position_idx] == 0.0:
# # print("High Info (Unvisited)")
# info_gain_reward = (HIGH_INFO_REWARD_RATIO * 1.5)
# elif self.segmentation_info_mask[robot_position_idx] == 0.0 and self.guidepost[robot_position_idx] == 0.0:
# # print("Low Info (Unvisited)")
# info_gain_reward = ((1-HIGH_INFO_REWARD_RATIO) * 1.5)
info_gain_reward = self.filtered_seg_info_mask[robot_position_idx][0] * 1.5
if self.guidepost[robot_position_idx] == 0.0:
info_gain_reward += 0.2
# print("info_gain_reward: ", info_gain_reward)
individual_reward += info_gain_reward
# print("dist / 64: ", dist / 64)
# print("info gain reward: ", info_gain_reward)
reward_list.append(individual_reward)
self.node_coords, self.graph, self.node_utility, self.guidepost = self.graph_generator.update_graph(self.robot_belief, self.old_robot_belief, frontiers, self.frontiers)
self.old_robot_belief = copy.deepcopy(self.robot_belief)
self.filtered_seg_info_mask = [info[0] if self.guidepost[i] == 0.0 else 0.0 for i, info in enumerate(self.segmentation_info_mask)]
self.filtered_seg_info_mask = np.expand_dims(np.array(self.filtered_seg_info_mask), axis=1)
num_observed_frontiers = self.calculate_num_observed_frontiers(self.frontiers, frontiers)
self.frontiers = frontiers
self.explored_rate = self.evaluate_exploration_rate()
done, num_targets_found = self.check_done()
self.num_new_targets_found = num_targets_found - self.num_targets_found
# #team_reward = sum(reward_list) / len(reward_list)
# # team_reward = num_observed_frontiers / 50
# team_reward = self.num_new_targets_found * 5.0
team_reward = 0.0
# # print("target found reward: ", self.num_new_targets_found * 5.0)
self.num_targets_found = num_targets_found
self.targets_found_rate = self.evaluate_targets_found_rate()
self.info_gain, self.total_info = self.evaluate_info_gain()
if done:
# team_reward += np.sum(self.robot_belief == 255) / sum(travel_dist_list)
team_reward += 40 # 20
for i in range(len(reward_list)):
reward_list[i] += team_reward
return reward_list, done
def import_ground_truth(self, map_index):
# occupied 1, free 255, unexplored 127
try:
# ground_truth = (io.imread(map_index, 1) * 255).astype(int)
ground_truth = (io.imread(map_index, 1)).astype(int)
if np.all(ground_truth == 0):
ground_truth = (io.imread(map_index, 1) * 255).astype(int)
except:
new_map_index = self.map_dir + '/' + self.map_list[0]
ground_truth = (io.imread(new_map_index, 1)).astype(int)
print('could not read the map_path ({}), hence skipping it and using ({}).'.format(map_index, new_map_index))
robot_location = np.nonzero(ground_truth == 208)
# print("robot_location: ", robot_location)
# print("np.array(robot_location)[1, 127]: ", np.array(robot_location)[1, 127])
robot_location = np.array([np.array(robot_location)[1, 127], np.array(robot_location)[0, 127]])
ground_truth = (ground_truth > 150)
ground_truth = ground_truth * 254 + 1
return ground_truth, robot_location
def import_segmentation_mask(self, map_index):
# occupied 1, free 255, unexplored 127
# mask = (io.imread(map_index, 1) * 255).astype(int) # NOTE: Cannot work well with seg mask self-generated
mask = cv2.imread(map_index).astype(int)
# print("np.unique(segmentation_mask): ", np.unique(mask))
# NOTE: Could contain mutiple start positions
# target_position = np.nonzero(mask == 208)
# target_positions = self.find_target_locations(mask)
# target_position = np.array([np.array(target_position)[1, 127], np.array(target_position)[0, 127]])
return mask #, target_positions
def find_target_locations(self, image_array, grey_value=208):
# Load the image
# image = Image.open(image_path)
# image_array = np.array(image)
# Identify pixels equal to the grey value
grey_pixels = np.where(image_array == grey_value)
# Create a binary array where grey pixels are marked as True
binary_array = np.zeros_like(image_array, dtype=bool)
binary_array[grey_pixels] = True
# Label connected components
labeled_array, num_features = label(binary_array)
# Find objects returns slices for each connected component
slices = find_objects(labeled_array)
# Calculate the center of each box
centers = []
for slice in slices:
row_center = (slice[0].start + slice[0].stop - 1) // 2
col_center = (slice[1].start + slice[1].stop - 1) // 2
centers.append((col_center, row_center)) # (y,x)
return centers
def free_cells(self):
index = np.where(self.ground_truth == 255)
free = np.asarray([index[1], index[0]]).T
return free
def update_robot_belief(self, robot_position, sensor_range, robot_belief, ground_truth):
robot_belief = sensor_work(robot_position, sensor_range, robot_belief, ground_truth)
return robot_belief
def check_done(self, robot_id=0):
""" All agnets to have explored most of the env map """
done = False
# for idx in range(self.n_agent):
# if np.sum(self.ground_truth == 255) - np.sum(self.all_robot_belief[idx][idx] == 255) > 40:
# done = False
# NEW: ADDITIONAL VLM SEARCH CRITERIA
num_targets_found = 0
self.target_found_idxs = []
for i, target in enumerate(self.target_positions):
if self.coverage_belief[target[1], target[0]] == 255: # 255:
num_targets_found += 1
self.target_found_idxs.append(i)
# free_cells_mask = self.all_robot_belief[robot_id][robot_id] == 255
# filtered_segmentation_mask = np.where(free_cells_mask, self.segmentation_mask, 0)
# targets = self.find_target_locations(filtered_segmentation_mask)
# print("num_targets_found: ", num_targets_found)
if TERMINATE_ON_TGTS_FOUND and num_targets_found >= len(self.target_positions):
done = True
if not TERMINATE_ON_TGTS_FOUND and np.sum(self.coverage_belief == 255) / np.sum(self.ground_truth == 255) >= 0.99:
done = True
return done, num_targets_found
def calculate_num_observed_frontiers(self, old_frontiers, frontiers):
frontiers_to_check = frontiers[:, 0] + frontiers[:, 1] * 1j
pre_frontiers_to_check = old_frontiers[:, 0] + old_frontiers[:, 1] * 1j
frontiers_num = np.intersect1d(frontiers_to_check, pre_frontiers_to_check).shape[0]
pre_frontiers_num = pre_frontiers_to_check.shape[0]
delta_num = pre_frontiers_num - frontiers_num
return delta_num
def calculate_reward(self, dist, frontiers):
reward = 0
reward -= dist / 64
frontiers_to_check = frontiers[:, 0] + frontiers[:, 1] * 1j
pre_frontiers_to_check = self.frontiers[:, 0] + self.frontiers[:, 1] * 1j
frontiers_num = np.intersect1d(frontiers_to_check, pre_frontiers_to_check).shape[0]
pre_frontiers_num = pre_frontiers_to_check.shape[0]
delta_num = pre_frontiers_num - frontiers_num
reward += delta_num / 50
return reward
def evaluate_exploration_rate(self):
# rate = np.sum(self.robot_belief == 255) / np.sum(self.ground_truth == 255)
rate = np.sum(self.coverage_belief == 255) / np.sum(self.ground_truth == 255)
return rate
def evaluate_targets_found_rate(self):
if len(self.target_positions) == 0:
return 0
else:
rate = self.num_targets_found / len(self.target_positions)
return rate
def evaluate_info_gain(self):
# print("self.segmentation_mask.shape: ", self.segmentation_mask.shape)
# coverage_belief = (self.coverage_belief == 255)
# print("coverage_belief.shape: ", coverage_belief.shape)
# print("np.unique(coverage_belief): ", np.unique(coverage_belief))
# print("np.count_nonzero(coverage_belief): ", np.count_nonzero(coverage_belief))
# print("np.count_zero(coverage_belief): ", coverage_belief.size - np.count_nonzero(coverage_belief))
# print("self.segmentation_mask[self.coverage_belief == 255].shape: ", self.segmentation_mask[self.coverage_belief == 255].shape)
if self.test and TARGETS_SET_DIR != "":
info_gained = np.sum(self.gt_segmentation_mask[self.coverage_belief == 255]) / 100.0
total_info = np.sum(self.gt_segmentation_mask) / 100.0
else:
info_gained = np.sum(self.segmentation_mask[self.coverage_belief == 255]) / 100.0
total_info = np.sum(self.segmentation_mask) / 100.0
return info_gained, total_info
def calculate_new_free_area(self):
old_free_area = self.old_robot_belief == 255
current_free_area = self.robot_belief == 255
new_free_area = (current_free_area.astype(np.int) - old_free_area.astype(np.int)) * 255
return new_free_area, np.sum(old_free_area)
def calculate_dist_path(self, path):
dist = 0
start = path[0]
end = path[-1]
for index in path:
if index == end:
break
dist += np.linalg.norm(self.node_coords[start] - self.node_coords[index])
start = index
return dist
def find_frontier(self):
y_len = self.downsampled_belief.shape[0]
x_len = self.downsampled_belief.shape[1]
mapping = self.downsampled_belief.copy()
belief = self.downsampled_belief.copy()
# 0-1 unknown area map
mapping = (mapping == 127) * 1
mapping = np.lib.pad(mapping, ((1, 1), (1, 1)), 'constant', constant_values=0)
fro_map = mapping[2:][:, 1:x_len + 1] + mapping[:y_len][:, 1:x_len + 1] + mapping[1:y_len + 1][:, 2:] + \
mapping[1:y_len + 1][:, :x_len] + mapping[:y_len][:, 2:] + mapping[2:][:, :x_len] + mapping[2:][:,
2:] + \
mapping[:y_len][:, :x_len]
ind_free = np.where(belief.ravel(order='F') == 255)[0]
ind_fron_1 = np.where(1 < fro_map.ravel(order='F'))[0]
ind_fron_2 = np.where(fro_map.ravel(order='F') < 8)[0]
ind_fron = np.intersect1d(ind_fron_1, ind_fron_2)
ind_to = np.intersect1d(ind_free, ind_fron)
map_x = x_len
map_y = y_len
x = np.linspace(0, map_x - 1, map_x)
y = np.linspace(0, map_y - 1, map_y)
t1, t2 = np.meshgrid(x, y)
points = np.vstack([t1.T.ravel(), t2.T.ravel()]).T
f = points[ind_to]
f = f.astype(int)
f = f * self.resolution
return f
def plot_env(self, n, path, step, travel_dist, robots_route, img_path_override=None, sat_path_override=None, msk_name_override=None, sound_id_override=None, colormap_mid_val=None):
# # TEMP
# if TAXABIND_TTA:
# # Save self.segmentation_info_mask as .npy file in gifs_path
# side_dim = int(np.sqrt(self.segmentation_info_mask.shape[0]))
# mask_viz = self.segmentation_info_mask.squeeze().reshape((side_dim, side_dim)).T
# np.save(os.path.join(path, f"seg_mask_step{step}.npy"), mask_viz)
plt.switch_backend('agg')
# plt.ion()
plt.cla()
color_list = ["r", "g", "c", "m", "y", "k"]
if TARGETS_SET_DIR == "" and not TAXABIND_TTA:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))
else:
fig, ((ax1, ax2, ax3), (ax4, ax5, ax6)) = plt.subplots(2, 3, figsize=(12, 8))
### Fig1: Environment ###
msk_name = ""
if TAXABIND_TTA:
image = mpimg.imread(sat_path_override)
msk_name = msk_name_override
# else:
# plt.imshow(self.robot_belief, cmap='gray')
# ax1.imshow(self.coverage_belief, cmap='gray')
# image = mpimg.imread("Maps/real_maps/real/4259_masked_img_0.jpg")
# msk_name = self.map_list[self.map_index]
# raw_img_path = self.score_to_img_dict[msk_name]
# if "flair" in raw_img_path:
# with rasterio.open(raw_img_path) as src_img:
# image = src_img.read([1,2,3])
# image = np.transpose(image, (1, 2, 0))
# else:
# image = mpimg.imread(raw_img_path)
### Fig1: Environment ###
ax = ax1 # if TAXABIND_TTA else ax1
ax.imshow(image)
ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0))
ax.set_title("Image")
# if VIZ_GRAPH_EDGES:
# for i in range(len(self.graph_generator.x)):
# ax.plot(self.graph_generator.x[i], self.graph_generator.y[i], 'tan', zorder=1)
# # ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.node_utility, zorder=5)
# ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.segmentation_info_mask, zorder=5)
# ax.scatter(self.frontiers[:, 0], self.frontiers[:, 1], c='r', s=2, zorder=3)
for i, route in enumerate(robots_route):
robot_marker_color = color_list[i % len(color_list)]
xPoints = route[0]
yPoints = route[1]
ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2)
# ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10)
ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black")
ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5)
# Sensor range
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH)
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT)
max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1])
min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0)
max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0])
min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0)
ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1)
ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1)
ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1)
ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1)
### Fig2: Graph ###
ax = ax4 if TAXABIND_TTA else ax1
# ax.imshow(image)
ax.imshow(self.coverage_belief, cmap='gray')
ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0))
ax.set_title("Information Graph")
if VIZ_GRAPH_EDGES:
for i in range(len(self.graph_generator.x)):
ax.plot(self.graph_generator.x[i], self.graph_generator.y[i], 'tan', zorder=1)
# ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.node_utility, zorder=5)
# ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.segmentation_info_mask, zorder=5)
# filtered_seg_info_mask = [info[0] if self.guidepost[i] == 0.0 else 0.0 for i, info in enumerate(self.segmentation_info_mask)]
ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.filtered_seg_info_mask, zorder=5, s=8)
# ax.scatter(self.frontiers[:, 0], self.frontiers[:, 1], c='r', s=2, zorder=3)
for i, route in enumerate(robots_route):
robot_marker_color = color_list[i % len(color_list)]
xPoints = route[0]
yPoints = route[1]
ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2)
# ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10)
ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black")
ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5)
# Sensor range
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH)
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT)
max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1])
min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0)
max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0])
min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0)
ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1)
ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1)
ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1)
ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1)
# Plot target positions
for target in self.target_positions:
if self.coverage_belief[target[1], target[0]] == 255:
# ax.plot(target[0], target[1], 'go', markersize=8, zorder=99)
ax.plot(target[0], target[1], color='g', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99)
else:
# ax.plot(target[0], target[1], 'ro', markersize=8, zorder=99)
ax.plot(target[0], target[1], color='r', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99)
# ax.pause(0.1)
### Fig3: Segmentation Mask ###
ax = ax5 if TAXABIND_TTA else ax2
if TAXABIND_TTA and USE_CLIP_PREDS:
side_dim = int(np.sqrt(self.segmentation_info_mask.shape[0]))
mask_viz = self.segmentation_info_mask.squeeze().reshape((side_dim, side_dim)).T
scale_y = math.ceil(self.ground_truth_size[1] / side_dim)
scale_x = math.ceil(self.ground_truth_size[0] / side_dim)
upscaled_mask_viz = np.kron(mask_viz, np.ones((scale_y, scale_x))) # Integer scaling only
upscaled_mask_viz = upscaled_mask_viz[:self.ground_truth_size[1], :self.ground_truth_size[0]]
im = ax.imshow(upscaled_mask_viz, cmap="viridis")
ax.axis("off")
else:
im = ax.imshow(self.segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) # cmap='gray'
ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0))
ax.set_title(f"Predicted Mask (Normalized)")
for i, route in enumerate(robots_route):
robot_marker_color = color_list[i % len(color_list)]
xPoints = route[0]
yPoints = route[1]
ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2)
# ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10)
ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black")
ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5)
# Sensor range
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH)
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT)
max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1])
min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0)
max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0])
min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0)
ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1)
ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1)
ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1)
ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1)
# Add a colorbar
cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
cbar.set_label("Normalized Probs")
# ax.pause(0.1)
### Fig4: Segmentation Mask ###
if TAXABIND_TTA and USE_CLIP_PREDS:
ax = ax6
side_dim = int(np.sqrt(self.segmentation_info_mask_unnormalized.shape[0]))
mask_viz = self.segmentation_info_mask_unnormalized.squeeze().reshape((side_dim, side_dim)).T
scale_y = math.ceil(self.ground_truth_size[1] / side_dim)
scale_x = math.ceil(self.ground_truth_size[0] / side_dim)
upscaled_mask_viz = np.kron(mask_viz, np.ones((scale_y, scale_x))) # Integer scaling only
upscaled_mask_viz = upscaled_mask_viz[:self.ground_truth_size[1], :self.ground_truth_size[0]]
max_val = 0.15 # TO CHANGE
mid_val = colormap_mid_val if colormap_mid_val is not None else 0.05
# mid_val = np.max(self.segmentation_info_mask_unnormalized)
norm = CustomNorm(vmin=0.0, vmax=max_val, mid=mid_val, lower_portion=0.8)
im = ax.imshow(upscaled_mask_viz, cmap="viridis", norm=norm) # norm=LogNorm(vmin=0.01, vmax=0.1))
# norm = PowerNorm(gamma=0.25, vmin=0.01, vmax=0.2)
# norm=LogNorm(vmin=0.01, vmax=0.2)
im = ax.imshow(upscaled_mask_viz, cmap="viridis", norm=norm) # norm=LogNorm(vmin=0.01, vmax=0.1))
ax.axis("off")
# else:
# im = ax.imshow(self.segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) # cmap='gray'
# ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0))
ax.set_title(f"Predicted Mask (Unnormalized)")
for i, route in enumerate(robots_route):
robot_marker_color = color_list[i % len(color_list)]
xPoints = route[0]
yPoints = route[1]
ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2)
# ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10)
ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black")
ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5)
# Sensor range
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH)
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT)
max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1])
min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0)
max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0])
min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0)
ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1)
ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1)
ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1)
ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1)
# Add a colorbar
cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
if TAXABIND_TTA and USE_CLIP_PREDS:
cbar.set_ticks([0.0, mid_val, max_val])
cbar.set_label("Probs (Scaled by expectation)")
# Fog5: GT Mask
if TARGETS_SET_DIR != "":
ax = ax2
im = ax.imshow(self.gt_segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) # cmap='gray'
ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0))
ax.set_title(f"Ground Truth Mask")
for i, route in enumerate(robots_route):
robot_marker_color = color_list[i % len(color_list)]
xPoints = route[0]
yPoints = route[1]
ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2)
# ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10)
ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black")
ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5)
# Sensor range
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH)
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT)
max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1])
min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0)
max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0])
min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0)
ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1)
ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1)
ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1)
ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1)
# Add a colorbar
cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
cbar.set_label("Normalized Mask Value")
# ax4.pause(0.1)
### Fig6: Segmentation Mask (GT) ###
if TAXABIND_TTA:
ax = ax3
image = mpimg.imread(img_path_override)
ax.imshow(image)
ax.set_title("Ground Image")
ax.axis("off")
sound_id = sound_id_override if sound_id_override is not None else "-1"
plt.suptitle('Targets Found: {}/{} Coverage ratio: {:.4g} Travel Dist: {:.4g} Info Gain: {:.4g}% \n ({}) \n (Sound ID: {})'.format(self.num_targets_found, \
len(self.target_positions), self.explored_rate, travel_dist, (100*self.info_gain/self.total_info), msk_name,
sound_id))
plt.tight_layout()
plt.savefig('{}/{}_{}_samples.png'.format(path, n, step, dpi=100))
# plt.show()
frame = '{}/{}_{}_samples.png'.format(path, n, step)
self.frame_files.append(frame)
plt.close()
####################
# ADDED: For app.py
####################
def plot_heatmap(self, save_dir, step, travel_dist, robots_route=None):
"""Plot only the segmentation heatmap and save it as ``{step}.png`` in
``save_dir``. This lightweight helper is meant for asynchronous
streaming in the Gradio demo when full `plot_env` is too heavy.
Parameters
----------
save_dir : str
Directory to save the generated PNG file.
step : int
Current timestep; becomes the filename ``{step}.png``.
robots_route : list | None
Optional list of routes (xPoints, yPoints) to overlay.
Returns
-------
str
Full path to the generated PNG file.
"""
import os
plt.switch_backend('agg')
# Do not clear the global figure state in case it interferes with
# the current figure. Each call creates its own Figure object that
# we close explicitly at the end, so a global clear is unnecessary
# and may break concurrent drawing.
# plt.cla()
color_list = ["r", "g", "c", "m", "y", "k"]
fig, ax = plt.subplots(1, 1, figsize=(6, 6))
# Select the mask to visualise
# if TAXABIND_TTA and USE_CLIP_PREDS:
side_dim = int(np.sqrt(self.segmentation_info_mask.shape[0]))
mask_viz = self.segmentation_info_mask.squeeze().reshape((side_dim, side_dim)).T
# Properly map image to pixel coordinates and keep limits fixed
H, W = self.ground_truth_size # rows (y), cols (x)
im = ax.imshow(
mask_viz,
cmap="viridis",
origin="upper",
extent=[0, W, H, 0], # x: 0..W, y: H..0 (origin at top-left)
interpolation="nearest", # keep cell edges sharp & aligned
zorder=0,
)
ax.set_xlim(0, W)
ax.set_ylim(H, 0)
ax.set_axis_off() # hide ticks but keep limits
# else:
# im = ax.imshow(self.segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100)
# ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0))
# Optionally overlay robot paths
if robots_route is not None:
for i, route in enumerate(robots_route):
robot_marker_color = color_list[i % len(color_list)]
xPoints, yPoints = route
ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2)
ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black")
ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5)
# Plot target positions
for target in self.target_positions:
if self.coverage_belief[target[1], target[0]] == 255:
# ax.plot(target[0], target[1], 'go', markersize=8, zorder=99)
ax.plot(target[0], target[1], color='g', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99)
else:
# ax.plot(target[0], target[1], 'ro', markersize=8, zorder=99)
ax.plot(target[0], target[1], color='r', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99)
# Sensor range
rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH)
rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT)
max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1])
min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0)
max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0])
min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0)
ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1)
ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1)
ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1)
ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1)
# Color bar
cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
cbar.set_label("Normalized Probs")
# Change coverage to 1dp
plt.suptitle('Targets Found: {}/{} Coverage: {:.1f}% Steps: {}/{}'.format(
self.num_targets_found, \
len(self.target_positions),
self.explored_rate*100,
step+1,
NUM_EPS_STEPS),
y=0.94, # Closer to plot
)
plt.tight_layout()
os.makedirs(save_dir, exist_ok=True)
out_path = os.path.join(save_dir, f"{step}.png")
# Save atomically: write to temp file then move into place so the poller never sees a partial file.
tmp_path = out_path + ".tmp"
fig.savefig(tmp_path, dpi=100, format='png')
os.replace(tmp_path, out_path) # atomic on same filesystem
plt.close(fig)
return out_path
####################
class CustomNorm(Normalize):
"""
A custom normalization that allocates a larger fraction of the colormap
to the lower data range [vmin, mid] than to [mid, vmax].
Parameters
----------
vmin : float
Minimum data value
vmax : float
Maximum data value
mid : float
Midpoint in data where we switch from 'lower' to 'upper' mapping
lower_portion : float
Fraction of the colormap to allocate for [vmin, mid].
For example, 0.8 => 80% of colors for [vmin, mid], 20% for [mid, vmax].
clip : bool
Whether to clip data outside [vmin, vmax].
"""
def __init__(self, vmin=None, vmax=None, mid=0.05, lower_portion=0.8, clip=False):
self.mid = mid
self.lower_portion = lower_portion
super().__init__(vmin, vmax, clip)
def __call__(self, value, clip=None):
"""Forward transform: data -> [0..1] color space."""
vmin, vmax, mid = self.vmin, self.vmax, self.mid
lp = self.lower_portion
value = np.asarray(value, dtype=np.float64)
# Piecewise linear mapping:
# [vmin..mid] => [0..lp]
# [mid..vmax] => [lp..1]
normed = np.where(
value <= mid,
lp * (value - vmin) / (mid - vmin),
lp + (value - mid) / (vmax - mid) * (1 - lp)
)
return np.clip(normed, 0, 1)
def inverse(self, value):
"""
Inverse transform: [0..1] color space -> data space.
Matplotlib's colorbar calls this to place ticks correctly.
"""
vmin, vmax, mid = self.vmin, self.vmax, self.mid
lp = self.lower_portion
value = np.asarray(value, dtype=np.float64)
# For color space [0..lp], invert to [vmin..mid]
# For color space [lp..1], invert to [mid..vmax]
below = (value <= lp)
above = ~below
# Allocate array for results
data = np.zeros_like(value, dtype=np.float64)
# Invert lower segment
data[below] = vmin + (value[below] / lp) * (mid - vmin)
# Invert upper segment
data[above] = mid + ((value[above] - lp) / (1 - lp)) * (vmax - mid)
return data