####################################################################### # Name: env.py # # - Reads and processes training and test maps (E.g. DungeonMaps) # - Processes rewards, new frontiers given action # - Updates a graph representation of environment for input into network ####################################################################### import sys import cv2 from matplotlib.colors import LogNorm, PowerNorm if sys.modules['TRAINING']: from parameter import * else: from test_parameter import * import copy import pandas as pd import rasterio from skimage import io import matplotlib.pyplot as plt import os from skimage.measure import block_reduce from sensor import * from graph_generator import * from node import * from scipy.ndimage import label, find_objects import matplotlib.image as mpimg from matplotlib.colors import Normalize # import matplotlib # matplotlib.use("Agg") # <-- key line to avoid tkinter dependency class Env(): def __init__(self, map_index, n_agent, k_size=20, plot=False, test=False, mask_index=None): self.n_agent = n_agent self.test = test self.map_dir = GRIDMAP_SET_DIR # Import environment gridmap self.map_list = os.listdir(self.map_dir) self.map_list.sort(reverse=True) # NEW: Import segmentation utility map self.seg_dir = MASK_SET_DIR self.segmentation_mask, self.target_positions, self.target_found_idxs = None, [], [] self.segmentation_mask_list = os.listdir(self.seg_dir) self.segmentation_mask_list.sort(reverse=True) # Import target maps (if relevant) if TARGETS_SET_DIR != "": self.targets_map_list = os.listdir(TARGETS_SET_DIR) self.targets_map_list.sort(reverse=True) # # NEW: Find common files in both directories # if TARGETS_SET_DIR != "": # common_files = [file for file in self.map_list if file in self.segmentation_mask_list and file in self.targets_map_list] # else: # common_files = [file for file in self.map_list if file in self.segmentation_mask_list] self.map_index = map_index % len(self.map_list) if mask_index is not None: self.mask_index = mask_index % len(self.segmentation_mask_list) else: self.mask_index = map_index % len(self.segmentation_mask_list) # self.common_map_file = common_files[self.map_index] # print("self.common_map_file: ", self.common_map_file) # Import ground truth and segmentation mask self.ground_truth, self.map_start_position = self.import_ground_truth( os.path.join(self.map_dir, self.map_list[self.map_index]))# self.common_map_file)) self.ground_truth_size = np.shape(self.ground_truth) # (480, 640) self.robot_belief = np.ones(self.ground_truth_size) * 127 # unexplored 127 self.downsampled_belief = None self.old_robot_belief = copy.deepcopy(self.robot_belief) self.coverage_belief = np.ones(self.ground_truth_size) * 127 # unexplored 127 # Import segmentation mask mask_filename = self.segmentation_mask_list[self.mask_index] self.segmentation_mask = self.import_segmentation_mask( os.path.join(self.seg_dir, mask_filename))# self.common_map_file)) # print("mask_filename: ", mask_filename) # Overwrite target positions if directory specified self.gt_segmentation_mask = None if self.test and TARGETS_SET_DIR != "": self.gt_segmentation_mask = self.import_segmentation_mask( os.path.join(TARGETS_SET_DIR, self.map_list[self.map_index])) # UNUSED - self.common_map_file)) # print("target_positions: ", self.target_positions) # print("np.unique(self.segmentation_mask): ", np.unique(self.segmentation_mask)) self.segmentation_info_mask = None self.gt_segmentation_info_mask = None self.segmentation_info_mask_unnormalized = None self.filtered_seg_info_mask = None self.num_targets_found = 0 self.num_new_targets_found = 0 # # Link score masks to raw image files # csv_file = pd.read_csv(RAW_IMG_PATH_DICT, header=None) # img_score_paths = csv_file.iloc[:,2].tolist() # raw_img_paths = csv_file.iloc[:,0].tolist() # self.score_to_img_dict = {os.path.basename(img_score_path): raw_img_path for img_score_path, raw_img_path in zip(img_score_paths, raw_img_paths)} self.resolution = 4 self.sensor_range = SENSOR_RANGE self.explored_rate = 0 self.targets_found_rate = 0 self.info_gain = 0 self.total_info = 0 self.graph_generator = Graph_generator(map_size=self.ground_truth_size, sensor_range=self.sensor_range, k_size=k_size, plot=plot) self.node_coords, self.graph, self.node_utility, self.guidepost = None, None, None, None self.frontiers = None self.start_positions = [] self.begin(self.map_start_position) self.plot = plot self.frame_files = [] def find_index_from_coords(self, position): index = np.argmin(np.linalg.norm(self.node_coords - position, axis=1)) return index def begin(self, start_position): # self.robot_belief = self.update_robot_belief(robot_position, self.sensor_range, self.robot_belief, # self.ground_truth) self.robot_belief = self.ground_truth self.downsampled_belief = block_reduce(self.robot_belief.copy(), block_size=(self.resolution, self.resolution), func=np.min) self.frontiers = self.find_frontier() self.old_robot_belief = copy.deepcopy(self.robot_belief) self.node_coords, self.graph, self.node_utility, self.guidepost = self.graph_generator.generate_graph( self.robot_belief, self.frontiers) # Find non-conflicting start positions if FIX_START_POSITION: coords_res_row = int(self.robot_belief.shape[0]/NUM_COORDS_HEIGHT) coords_res_col = int(self.robot_belief.shape[1]/NUM_COORDS_WIDTH) self.start_positions = [(int(self.robot_belief.shape[1]/2)-coords_res_col/2,int(self.robot_belief.shape[0]/2)-coords_res_row/2) for _ in range(self.n_agent)] # bottom-left corner else: nearby_coords = self.graph_generator.get_neighbors_grid_coords(start_position) itr = 0 for i in range(self.n_agent): if i == 0 or len(nearby_coords) == 0: self.start_positions.append(start_position) else: idx = min(itr, len(nearby_coords)-1) self.start_positions.append(nearby_coords[idx]) itr += 1 for i in range(len(self.start_positions)): self.start_positions[i] = self.node_coords[self.find_index_from_coords(self.start_positions[i])] self.coverage_belief = self.update_robot_belief(self.start_positions[i], self.sensor_range, self.coverage_belief, self.ground_truth) for start_position in self.start_positions: self.graph_generator.route_node.append(start_position) # Info map from ground truth rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) self.segmentation_info_mask = np.zeros((len(self.node_coords), 1)) self.gt_segmentation_info_mask = np.zeros((len(self.node_coords), 1)) for i, node_coord in enumerate(self.node_coords): max_x = min(node_coord[0] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) min_x = max(node_coord[0] - int(math.ceil(rng_x)), 0) max_y = min(node_coord[1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) min_y = max(node_coord[1] - int(math.ceil(rng_y)), 0) # if np.any(self.segmentation_mask[min_y:max_y, min_x:max_x] == 255): # self.segmentation_info_mask[i] = 1.0 # else: # self.segmentation_info_mask[i] = 0.0 # self.segmentation_info_mask[i] = np.mean(self.segmentation_mask[min_y:max_y, min_x:max_x]) # self.segmentation_info_mask[i] = np.max(self.segmentation_mask[min_y:max_y, min_x:max_x]) if TARGETS_SET_DIR == "": # If targets combined with segmentation mask exclude = {208} # Exclude target positions else: exclude = {} self.segmentation_info_mask[i] = max(x for x in self.segmentation_mask[min_y:max_y, min_x:max_x].flatten() if x not in exclude) / 100.0 if self.gt_segmentation_mask is not None: self.gt_segmentation_info_mask[i] = max(x for x in self.gt_segmentation_mask[min_y:max_y, min_x:max_x].flatten() if x not in exclude) / 100.0 # print("np.unique(self.segmentation_info_mask): ", np.unique(self.segmentation_info_mask)) self.filtered_seg_info_mask = copy.deepcopy(self.segmentation_info_mask) # In case targets found at beginning... done, num_targets_found = self.check_done() self.num_targets_found = num_targets_found def multi_robot_step(self, next_position_list, dist_list, travel_dist_list): temp_frontiers = copy.deepcopy(self.frontiers) reward_list = [] for dist, robot_position in zip(dist_list, next_position_list): self.graph_generator.route_node.append(robot_position) next_node_index = self.find_index_from_coords(robot_position) self.graph_generator.nodes_list[next_node_index].set_visited() # self.robot_belief = self.update_robot_belief(robot_position, self.sensor_range, self.robot_belief, # self.ground_truth) self.coverage_belief = self.update_robot_belief(robot_position, self.sensor_range, self.coverage_belief, self.ground_truth) self.robot_belief = self.ground_truth self.downsampled_belief = block_reduce(self.robot_belief.copy(), block_size=(self.resolution, self.resolution), func=np.min) frontiers = self.find_frontier() #num_observed_frontiers = self.calculate_num_observed_frontiers(temp_frontiers, frontiers) #temp_frontiers = frontiers num_observed_frontiers = self.node_utility[next_node_index] # individual_reward = num_observed_frontiers / 50 - dist / 64 individual_reward = -dist / 32 # 64 info_gain_reward = 0 robot_position_idx = self.find_index_from_coords(robot_position) # if self.segmentation_info_mask[robot_position_idx] == 1.0 and self.guidepost[robot_position_idx] == 0.0: # # print("High Info (Unvisited)") # info_gain_reward = (HIGH_INFO_REWARD_RATIO * 1.5) # elif self.segmentation_info_mask[robot_position_idx] == 0.0 and self.guidepost[robot_position_idx] == 0.0: # # print("Low Info (Unvisited)") # info_gain_reward = ((1-HIGH_INFO_REWARD_RATIO) * 1.5) info_gain_reward = self.filtered_seg_info_mask[robot_position_idx][0] * 1.5 if self.guidepost[robot_position_idx] == 0.0: info_gain_reward += 0.2 # print("info_gain_reward: ", info_gain_reward) individual_reward += info_gain_reward # print("dist / 64: ", dist / 64) # print("info gain reward: ", info_gain_reward) reward_list.append(individual_reward) self.node_coords, self.graph, self.node_utility, self.guidepost = self.graph_generator.update_graph(self.robot_belief, self.old_robot_belief, frontiers, self.frontiers) self.old_robot_belief = copy.deepcopy(self.robot_belief) self.filtered_seg_info_mask = [info[0] if self.guidepost[i] == 0.0 else 0.0 for i, info in enumerate(self.segmentation_info_mask)] self.filtered_seg_info_mask = np.expand_dims(np.array(self.filtered_seg_info_mask), axis=1) num_observed_frontiers = self.calculate_num_observed_frontiers(self.frontiers, frontiers) self.frontiers = frontiers self.explored_rate = self.evaluate_exploration_rate() done, num_targets_found = self.check_done() self.num_new_targets_found = num_targets_found - self.num_targets_found # #team_reward = sum(reward_list) / len(reward_list) # # team_reward = num_observed_frontiers / 50 # team_reward = self.num_new_targets_found * 5.0 team_reward = 0.0 # # print("target found reward: ", self.num_new_targets_found * 5.0) self.num_targets_found = num_targets_found self.targets_found_rate = self.evaluate_targets_found_rate() self.info_gain, self.total_info = self.evaluate_info_gain() if done: # team_reward += np.sum(self.robot_belief == 255) / sum(travel_dist_list) team_reward += 40 # 20 for i in range(len(reward_list)): reward_list[i] += team_reward return reward_list, done def import_ground_truth(self, map_index): # occupied 1, free 255, unexplored 127 try: # ground_truth = (io.imread(map_index, 1) * 255).astype(int) ground_truth = (io.imread(map_index, 1)).astype(int) if np.all(ground_truth == 0): ground_truth = (io.imread(map_index, 1) * 255).astype(int) except: new_map_index = self.map_dir + '/' + self.map_list[0] ground_truth = (io.imread(new_map_index, 1)).astype(int) print('could not read the map_path ({}), hence skipping it and using ({}).'.format(map_index, new_map_index)) robot_location = np.nonzero(ground_truth == 208) # print("robot_location: ", robot_location) # print("np.array(robot_location)[1, 127]: ", np.array(robot_location)[1, 127]) robot_location = np.array([np.array(robot_location)[1, 127], np.array(robot_location)[0, 127]]) ground_truth = (ground_truth > 150) ground_truth = ground_truth * 254 + 1 return ground_truth, robot_location def import_segmentation_mask(self, map_index): # occupied 1, free 255, unexplored 127 # mask = (io.imread(map_index, 1) * 255).astype(int) # NOTE: Cannot work well with seg mask self-generated mask = cv2.imread(map_index).astype(int) # print("np.unique(segmentation_mask): ", np.unique(mask)) # NOTE: Could contain mutiple start positions # target_position = np.nonzero(mask == 208) # target_positions = self.find_target_locations(mask) # target_position = np.array([np.array(target_position)[1, 127], np.array(target_position)[0, 127]]) return mask #, target_positions def find_target_locations(self, image_array, grey_value=208): # Load the image # image = Image.open(image_path) # image_array = np.array(image) # Identify pixels equal to the grey value grey_pixels = np.where(image_array == grey_value) # Create a binary array where grey pixels are marked as True binary_array = np.zeros_like(image_array, dtype=bool) binary_array[grey_pixels] = True # Label connected components labeled_array, num_features = label(binary_array) # Find objects returns slices for each connected component slices = find_objects(labeled_array) # Calculate the center of each box centers = [] for slice in slices: row_center = (slice[0].start + slice[0].stop - 1) // 2 col_center = (slice[1].start + slice[1].stop - 1) // 2 centers.append((col_center, row_center)) # (y,x) return centers def free_cells(self): index = np.where(self.ground_truth == 255) free = np.asarray([index[1], index[0]]).T return free def update_robot_belief(self, robot_position, sensor_range, robot_belief, ground_truth): robot_belief = sensor_work(robot_position, sensor_range, robot_belief, ground_truth) return robot_belief def check_done(self, robot_id=0): """ All agnets to have explored most of the env map """ done = False # for idx in range(self.n_agent): # if np.sum(self.ground_truth == 255) - np.sum(self.all_robot_belief[idx][idx] == 255) > 40: # done = False # NEW: ADDITIONAL VLM SEARCH CRITERIA num_targets_found = 0 self.target_found_idxs = [] for i, target in enumerate(self.target_positions): if self.coverage_belief[target[1], target[0]] == 255: # 255: num_targets_found += 1 self.target_found_idxs.append(i) # free_cells_mask = self.all_robot_belief[robot_id][robot_id] == 255 # filtered_segmentation_mask = np.where(free_cells_mask, self.segmentation_mask, 0) # targets = self.find_target_locations(filtered_segmentation_mask) # print("num_targets_found: ", num_targets_found) if TERMINATE_ON_TGTS_FOUND and num_targets_found >= len(self.target_positions): done = True if not TERMINATE_ON_TGTS_FOUND and np.sum(self.coverage_belief == 255) / np.sum(self.ground_truth == 255) >= 0.99: done = True return done, num_targets_found def calculate_num_observed_frontiers(self, old_frontiers, frontiers): frontiers_to_check = frontiers[:, 0] + frontiers[:, 1] * 1j pre_frontiers_to_check = old_frontiers[:, 0] + old_frontiers[:, 1] * 1j frontiers_num = np.intersect1d(frontiers_to_check, pre_frontiers_to_check).shape[0] pre_frontiers_num = pre_frontiers_to_check.shape[0] delta_num = pre_frontiers_num - frontiers_num return delta_num def calculate_reward(self, dist, frontiers): reward = 0 reward -= dist / 64 frontiers_to_check = frontiers[:, 0] + frontiers[:, 1] * 1j pre_frontiers_to_check = self.frontiers[:, 0] + self.frontiers[:, 1] * 1j frontiers_num = np.intersect1d(frontiers_to_check, pre_frontiers_to_check).shape[0] pre_frontiers_num = pre_frontiers_to_check.shape[0] delta_num = pre_frontiers_num - frontiers_num reward += delta_num / 50 return reward def evaluate_exploration_rate(self): # rate = np.sum(self.robot_belief == 255) / np.sum(self.ground_truth == 255) rate = np.sum(self.coverage_belief == 255) / np.sum(self.ground_truth == 255) return rate def evaluate_targets_found_rate(self): if len(self.target_positions) == 0: return 0 else: rate = self.num_targets_found / len(self.target_positions) return rate def evaluate_info_gain(self): # print("self.segmentation_mask.shape: ", self.segmentation_mask.shape) # coverage_belief = (self.coverage_belief == 255) # print("coverage_belief.shape: ", coverage_belief.shape) # print("np.unique(coverage_belief): ", np.unique(coverage_belief)) # print("np.count_nonzero(coverage_belief): ", np.count_nonzero(coverage_belief)) # print("np.count_zero(coverage_belief): ", coverage_belief.size - np.count_nonzero(coverage_belief)) # print("self.segmentation_mask[self.coverage_belief == 255].shape: ", self.segmentation_mask[self.coverage_belief == 255].shape) if self.test and TARGETS_SET_DIR != "": info_gained = np.sum(self.gt_segmentation_mask[self.coverage_belief == 255]) / 100.0 total_info = np.sum(self.gt_segmentation_mask) / 100.0 else: info_gained = np.sum(self.segmentation_mask[self.coverage_belief == 255]) / 100.0 total_info = np.sum(self.segmentation_mask) / 100.0 return info_gained, total_info def calculate_new_free_area(self): old_free_area = self.old_robot_belief == 255 current_free_area = self.robot_belief == 255 new_free_area = (current_free_area.astype(np.int) - old_free_area.astype(np.int)) * 255 return new_free_area, np.sum(old_free_area) def calculate_dist_path(self, path): dist = 0 start = path[0] end = path[-1] for index in path: if index == end: break dist += np.linalg.norm(self.node_coords[start] - self.node_coords[index]) start = index return dist def find_frontier(self): y_len = self.downsampled_belief.shape[0] x_len = self.downsampled_belief.shape[1] mapping = self.downsampled_belief.copy() belief = self.downsampled_belief.copy() # 0-1 unknown area map mapping = (mapping == 127) * 1 mapping = np.lib.pad(mapping, ((1, 1), (1, 1)), 'constant', constant_values=0) fro_map = mapping[2:][:, 1:x_len + 1] + mapping[:y_len][:, 1:x_len + 1] + mapping[1:y_len + 1][:, 2:] + \ mapping[1:y_len + 1][:, :x_len] + mapping[:y_len][:, 2:] + mapping[2:][:, :x_len] + mapping[2:][:, 2:] + \ mapping[:y_len][:, :x_len] ind_free = np.where(belief.ravel(order='F') == 255)[0] ind_fron_1 = np.where(1 < fro_map.ravel(order='F'))[0] ind_fron_2 = np.where(fro_map.ravel(order='F') < 8)[0] ind_fron = np.intersect1d(ind_fron_1, ind_fron_2) ind_to = np.intersect1d(ind_free, ind_fron) map_x = x_len map_y = y_len x = np.linspace(0, map_x - 1, map_x) y = np.linspace(0, map_y - 1, map_y) t1, t2 = np.meshgrid(x, y) points = np.vstack([t1.T.ravel(), t2.T.ravel()]).T f = points[ind_to] f = f.astype(int) f = f * self.resolution return f def plot_env(self, n, path, step, travel_dist, robots_route, img_path_override=None, sat_path_override=None, msk_name_override=None, sound_id_override=None, colormap_mid_val=None): # # TEMP # if TAXABIND_TTA: # # Save self.segmentation_info_mask as .npy file in gifs_path # side_dim = int(np.sqrt(self.segmentation_info_mask.shape[0])) # mask_viz = self.segmentation_info_mask.squeeze().reshape((side_dim, side_dim)).T # np.save(os.path.join(path, f"seg_mask_step{step}.npy"), mask_viz) plt.switch_backend('agg') # plt.ion() plt.cla() color_list = ["r", "g", "c", "m", "y", "k"] if TARGETS_SET_DIR == "" and not TAXABIND_TTA: fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5)) else: fig, ((ax1, ax2, ax3), (ax4, ax5, ax6)) = plt.subplots(2, 3, figsize=(12, 8)) ### Fig1: Environment ### msk_name = "" if TAXABIND_TTA: image = mpimg.imread(sat_path_override) msk_name = msk_name_override # else: # plt.imshow(self.robot_belief, cmap='gray') # ax1.imshow(self.coverage_belief, cmap='gray') # image = mpimg.imread("Maps/real_maps/real/4259_masked_img_0.jpg") # msk_name = self.map_list[self.map_index] # raw_img_path = self.score_to_img_dict[msk_name] # if "flair" in raw_img_path: # with rasterio.open(raw_img_path) as src_img: # image = src_img.read([1,2,3]) # image = np.transpose(image, (1, 2, 0)) # else: # image = mpimg.imread(raw_img_path) ### Fig1: Environment ### ax = ax1 # if TAXABIND_TTA else ax1 ax.imshow(image) ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) ax.set_title("Image") # if VIZ_GRAPH_EDGES: # for i in range(len(self.graph_generator.x)): # ax.plot(self.graph_generator.x[i], self.graph_generator.y[i], 'tan', zorder=1) # # ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.node_utility, zorder=5) # ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.segmentation_info_mask, zorder=5) # ax.scatter(self.frontiers[:, 0], self.frontiers[:, 1], c='r', s=2, zorder=3) for i, route in enumerate(robots_route): robot_marker_color = color_list[i % len(color_list)] xPoints = route[0] yPoints = route[1] ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) # ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) # Sensor range rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) ### Fig2: Graph ### ax = ax4 if TAXABIND_TTA else ax1 # ax.imshow(image) ax.imshow(self.coverage_belief, cmap='gray') ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) ax.set_title("Information Graph") if VIZ_GRAPH_EDGES: for i in range(len(self.graph_generator.x)): ax.plot(self.graph_generator.x[i], self.graph_generator.y[i], 'tan', zorder=1) # ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.node_utility, zorder=5) # ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.segmentation_info_mask, zorder=5) # filtered_seg_info_mask = [info[0] if self.guidepost[i] == 0.0 else 0.0 for i, info in enumerate(self.segmentation_info_mask)] ax.scatter(self.node_coords[:, 0], self.node_coords[:, 1], c=self.filtered_seg_info_mask, zorder=5, s=8) # ax.scatter(self.frontiers[:, 0], self.frontiers[:, 1], c='r', s=2, zorder=3) for i, route in enumerate(robots_route): robot_marker_color = color_list[i % len(color_list)] xPoints = route[0] yPoints = route[1] ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) # ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) # Sensor range rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) # Plot target positions for target in self.target_positions: if self.coverage_belief[target[1], target[0]] == 255: # ax.plot(target[0], target[1], 'go', markersize=8, zorder=99) ax.plot(target[0], target[1], color='g', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99) else: # ax.plot(target[0], target[1], 'ro', markersize=8, zorder=99) ax.plot(target[0], target[1], color='r', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99) # ax.pause(0.1) ### Fig3: Segmentation Mask ### ax = ax5 if TAXABIND_TTA else ax2 if TAXABIND_TTA and USE_CLIP_PREDS: side_dim = int(np.sqrt(self.segmentation_info_mask.shape[0])) mask_viz = self.segmentation_info_mask.squeeze().reshape((side_dim, side_dim)).T scale_y = math.ceil(self.ground_truth_size[1] / side_dim) scale_x = math.ceil(self.ground_truth_size[0] / side_dim) upscaled_mask_viz = np.kron(mask_viz, np.ones((scale_y, scale_x))) # Integer scaling only upscaled_mask_viz = upscaled_mask_viz[:self.ground_truth_size[1], :self.ground_truth_size[0]] im = ax.imshow(upscaled_mask_viz, cmap="viridis") ax.axis("off") else: im = ax.imshow(self.segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) # cmap='gray' ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) ax.set_title(f"Predicted Mask (Normalized)") for i, route in enumerate(robots_route): robot_marker_color = color_list[i % len(color_list)] xPoints = route[0] yPoints = route[1] ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) # ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) # Sensor range rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) # Add a colorbar cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) cbar.set_label("Normalized Probs") # ax.pause(0.1) ### Fig4: Segmentation Mask ### if TAXABIND_TTA and USE_CLIP_PREDS: ax = ax6 side_dim = int(np.sqrt(self.segmentation_info_mask_unnormalized.shape[0])) mask_viz = self.segmentation_info_mask_unnormalized.squeeze().reshape((side_dim, side_dim)).T scale_y = math.ceil(self.ground_truth_size[1] / side_dim) scale_x = math.ceil(self.ground_truth_size[0] / side_dim) upscaled_mask_viz = np.kron(mask_viz, np.ones((scale_y, scale_x))) # Integer scaling only upscaled_mask_viz = upscaled_mask_viz[:self.ground_truth_size[1], :self.ground_truth_size[0]] max_val = 0.15 # TO CHANGE mid_val = colormap_mid_val if colormap_mid_val is not None else 0.05 # mid_val = np.max(self.segmentation_info_mask_unnormalized) norm = CustomNorm(vmin=0.0, vmax=max_val, mid=mid_val, lower_portion=0.8) im = ax.imshow(upscaled_mask_viz, cmap="viridis", norm=norm) # norm=LogNorm(vmin=0.01, vmax=0.1)) # norm = PowerNorm(gamma=0.25, vmin=0.01, vmax=0.2) # norm=LogNorm(vmin=0.01, vmax=0.2) im = ax.imshow(upscaled_mask_viz, cmap="viridis", norm=norm) # norm=LogNorm(vmin=0.01, vmax=0.1)) ax.axis("off") # else: # im = ax.imshow(self.segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) # cmap='gray' # ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) ax.set_title(f"Predicted Mask (Unnormalized)") for i, route in enumerate(robots_route): robot_marker_color = color_list[i % len(color_list)] xPoints = route[0] yPoints = route[1] ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) # ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) # Sensor range rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) # Add a colorbar cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) if TAXABIND_TTA and USE_CLIP_PREDS: cbar.set_ticks([0.0, mid_val, max_val]) cbar.set_label("Probs (Scaled by expectation)") # Fog5: GT Mask if TARGETS_SET_DIR != "": ax = ax2 im = ax.imshow(self.gt_segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) # cmap='gray' ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) ax.set_title(f"Ground Truth Mask") for i, route in enumerate(robots_route): robot_marker_color = color_list[i % len(color_list)] xPoints = route[0] yPoints = route[1] ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) # ax.plot(xPoints[-1], yPoints[-1], 'mo', markersize=8, zorder=10) ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) # Sensor range rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) # Add a colorbar cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) cbar.set_label("Normalized Mask Value") # ax4.pause(0.1) ### Fig6: Segmentation Mask (GT) ### if TAXABIND_TTA: ax = ax3 image = mpimg.imread(img_path_override) ax.imshow(image) ax.set_title("Ground Image") ax.axis("off") sound_id = sound_id_override if sound_id_override is not None else "-1" plt.suptitle('Targets Found: {}/{} Coverage ratio: {:.4g} Travel Dist: {:.4g} Info Gain: {:.4g}% \n ({}) \n (Sound ID: {})'.format(self.num_targets_found, \ len(self.target_positions), self.explored_rate, travel_dist, (100*self.info_gain/self.total_info), msk_name, sound_id)) plt.tight_layout() plt.savefig('{}/{}_{}_samples.png'.format(path, n, step, dpi=100)) # plt.show() frame = '{}/{}_{}_samples.png'.format(path, n, step) self.frame_files.append(frame) plt.close() #################### # ADDED: For app.py #################### def plot_heatmap(self, save_dir, step, travel_dist, robots_route=None): """Plot only the segmentation heatmap and save it as ``{step}.png`` in ``save_dir``. This lightweight helper is meant for asynchronous streaming in the Gradio demo when full `plot_env` is too heavy. Parameters ---------- save_dir : str Directory to save the generated PNG file. step : int Current timestep; becomes the filename ``{step}.png``. robots_route : list | None Optional list of routes (xPoints, yPoints) to overlay. Returns ------- str Full path to the generated PNG file. """ import os plt.switch_backend('agg') # Do not clear the global figure state in case it interferes with # the current figure. Each call creates its own Figure object that # we close explicitly at the end, so a global clear is unnecessary # and may break concurrent drawing. # plt.cla() color_list = ["r", "g", "c", "m", "y", "k"] fig, ax = plt.subplots(1, 1, figsize=(6, 6)) # Select the mask to visualise # if TAXABIND_TTA and USE_CLIP_PREDS: side_dim = int(np.sqrt(self.segmentation_info_mask.shape[0])) mask_viz = self.segmentation_info_mask.squeeze().reshape((side_dim, side_dim)).T # Properly map image to pixel coordinates and keep limits fixed H, W = self.ground_truth_size # rows (y), cols (x) im = ax.imshow( mask_viz, cmap="viridis", origin="upper", extent=[0, W, H, 0], # x: 0..W, y: H..0 (origin at top-left) interpolation="nearest", # keep cell edges sharp & aligned zorder=0, ) ax.set_xlim(0, W) ax.set_ylim(H, 0) ax.set_axis_off() # hide ticks but keep limits # else: # im = ax.imshow(self.segmentation_mask.mean(axis=-1), cmap='viridis', vmin=0, vmax=100) # ax.axis((0, self.ground_truth_size[1], self.ground_truth_size[0], 0)) # Optionally overlay robot paths if robots_route is not None: for i, route in enumerate(robots_route): robot_marker_color = color_list[i % len(color_list)] xPoints, yPoints = route ax.plot(xPoints, yPoints, c=robot_marker_color, linewidth=2) ax.plot(xPoints[-1], yPoints[-1], markersize=12, zorder=99, marker="^", ls="-", c=robot_marker_color, mec="black") ax.plot(xPoints[0], yPoints[0], 'co', c=robot_marker_color, markersize=8, zorder=5) # Plot target positions for target in self.target_positions: if self.coverage_belief[target[1], target[0]] == 255: # ax.plot(target[0], target[1], 'go', markersize=8, zorder=99) ax.plot(target[0], target[1], color='g', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99) else: # ax.plot(target[0], target[1], 'ro', markersize=8, zorder=99) ax.plot(target[0], target[1], color='r', marker='x', linestyle='-', markersize=12, markeredgewidth=4, zorder=99) # Sensor range rng_x = 0.5 * (self.ground_truth.shape[1] / NUM_COORDS_WIDTH) rng_y = 0.5 * (self.ground_truth.shape[0] / NUM_COORDS_HEIGHT) max_x = min(xPoints[-1] + int(math.ceil(rng_x)), self.ground_truth.shape[1]) min_x = max(xPoints[-1] - int(math.ceil(rng_x)), 0) max_y = min(yPoints[-1] + int(math.ceil(rng_y)), self.ground_truth.shape[0]) min_y = max(yPoints[-1] - int(math.ceil(rng_y)), 0) ax.plot((min_x, min_x), (min_y, max_y), c=robot_marker_color, linewidth=1) ax.plot((min_x, max_x), (max_y, max_y), c=robot_marker_color, linewidth=1) ax.plot((max_x, max_x), (max_y, min_y), c=robot_marker_color, linewidth=1) ax.plot((max_x, min_x), (min_y, min_y), c=robot_marker_color, linewidth=1) # Color bar cbar = fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04) cbar.set_label("Normalized Probs") # Change coverage to 1dp plt.suptitle('Targets Found: {}/{} Coverage: {:.1f}% Steps: {}/{}'.format( self.num_targets_found, \ len(self.target_positions), self.explored_rate*100, step+1, NUM_EPS_STEPS), y=0.94, # Closer to plot ) plt.tight_layout() os.makedirs(save_dir, exist_ok=True) out_path = os.path.join(save_dir, f"{step}.png") # Save atomically: write to temp file then move into place so the poller never sees a partial file. tmp_path = out_path + ".tmp" fig.savefig(tmp_path, dpi=100, format='png') os.replace(tmp_path, out_path) # atomic on same filesystem plt.close(fig) return out_path #################### class CustomNorm(Normalize): """ A custom normalization that allocates a larger fraction of the colormap to the lower data range [vmin, mid] than to [mid, vmax]. Parameters ---------- vmin : float Minimum data value vmax : float Maximum data value mid : float Midpoint in data where we switch from 'lower' to 'upper' mapping lower_portion : float Fraction of the colormap to allocate for [vmin, mid]. For example, 0.8 => 80% of colors for [vmin, mid], 20% for [mid, vmax]. clip : bool Whether to clip data outside [vmin, vmax]. """ def __init__(self, vmin=None, vmax=None, mid=0.05, lower_portion=0.8, clip=False): self.mid = mid self.lower_portion = lower_portion super().__init__(vmin, vmax, clip) def __call__(self, value, clip=None): """Forward transform: data -> [0..1] color space.""" vmin, vmax, mid = self.vmin, self.vmax, self.mid lp = self.lower_portion value = np.asarray(value, dtype=np.float64) # Piecewise linear mapping: # [vmin..mid] => [0..lp] # [mid..vmax] => [lp..1] normed = np.where( value <= mid, lp * (value - vmin) / (mid - vmin), lp + (value - mid) / (vmax - mid) * (1 - lp) ) return np.clip(normed, 0, 1) def inverse(self, value): """ Inverse transform: [0..1] color space -> data space. Matplotlib's colorbar calls this to place ticks correctly. """ vmin, vmax, mid = self.vmin, self.vmax, self.mid lp = self.lower_portion value = np.asarray(value, dtype=np.float64) # For color space [0..lp], invert to [vmin..mid] # For color space [lp..1], invert to [mid..vmax] below = (value <= lp) above = ~below # Allocate array for results data = np.zeros_like(value, dtype=np.float64) # Invert lower segment data[below] = vmin + (value[below] / lp) * (mid - vmin) # Invert upper segment data[above] = mid + ((value[above] - lp) / (1 - lp)) * (vmax - mid) return data