import os import fnmatch import json import h5py import yaml import cv2 import numpy as np from configs.state_vec import STATE_VEC_IDX_MAPPING TABLETOP_6D_INDICES_NAMES = [ 'left_eef_pos_x','left_eef_pos_y','left_eef_pos_z','left_eef_angle_0','left_eef_angle_1','left_eef_angle_2','left_eef_angle_3','left_eef_angle_4','left_eef_angle_5','left_gripper_open','right_eef_pos_x','right_eef_pos_y','right_eef_pos_z','right_eef_angle_0','right_eef_angle_1','right_eef_angle_2','right_eef_angle_3','right_eef_angle_4','right_eef_angle_5','right_gripper_open'] TABLETOP_6D_INDICES = [STATE_VEC_IDX_MAPPING[n] for n in TABLETOP_6D_INDICES_NAMES] class TabletopHDF5VLADataset: """ This class is used to sample episodes from the embododiment dataset stored in HDF5. """ def __init__(self, task_name) -> None: # [Modify] The path to the HDF5 dataset directory # Each HDF5 file contains one episode dataset_name = task_name HDF5_DIR = f"/data5/jellyho/tabletop/{dataset_name}/" self.DATASET_NAME = dataset_name self.file_paths = [] for root, _, files in os.walk(HDF5_DIR): for filename in fnmatch.filter(files, '*.hdf5'): file_path = os.path.join(root, filename) self.file_paths.append(file_path) # Load the config with open('configs/base.yaml', 'r') as file: config = yaml.safe_load(file) self.CHUNK_SIZE = config['common']['action_chunk_size'] self.IMG_HISORY_SIZE = config['common']['img_history_size'] self.STATE_DIM = config['common']['state_dim'] # Get each episode's len episode_lens = [] for file_path in self.file_paths: valid, res = self.parse_hdf5_file_state_only(file_path) _len = res['state'].shape[0] if valid else 0 episode_lens.append(_len) self.episode_sample_weights = np.array(episode_lens) / np.sum(episode_lens) def __len__(self): return len(self.file_paths) def get_dataset_name(self): return self.DATASET_NAME def get_item(self, index: int=None, state_only=False): """Get a training sample at a random timestep. Args: index (int, optional): the index of the episode. If not provided, a random episode will be selected. state_only (bool, optional): Whether to return only the state. In this way, the sample will contain a complete trajectory rather than a single timestep. Defaults to False. Returns: sample (dict): a dictionary containing the training sample. """ while True: if index is None: file_path = np.random.choice(self.file_paths, p=self.episode_sample_weights) else: file_path = self.file_paths[index] valid, sample = self.parse_hdf5_file(file_path) \ if not state_only else self.parse_hdf5_file_state_only(file_path) if valid: return sample else: index = np.random.randint(0, len(self.file_paths)) def parse_hdf5_file(self, file_path): """[Modify] Parse a hdf5 file to generate a training sample at a random timestep. Args: file_path (str): the path to the hdf5 file Returns: valid (bool): whether the episode is valid, which is useful for filtering. If False, this episode will be dropped. dict: a dictionary containing the training sample, { "meta": { "dataset_name": str, # the name of your dataset. "#steps": int, # the number of steps in the episode, # also the total timesteps. "instruction": str # the language instruction for this episode. }, "step_id": int, # the index of the sampled step, # also the timestep t. "state": ndarray, # state[t], (1, STATE_DIM). "state_std": ndarray, # std(state[:]), (STATE_DIM,). "state_mean": ndarray, # mean(state[:]), (STATE_DIM,). "state_norm": ndarray, # norm(state[:]), (STATE_DIM,). "actions": ndarray, # action[t:t+CHUNK_SIZE], (CHUNK_SIZE, STATE_DIM). "state_indicator", ndarray, # indicates the validness of each dim, (STATE_DIM,). "cam_high": ndarray, # external camera image, (IMG_HISORY_SIZE, H, W, 3) # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. "cam_high_mask": ndarray, # indicates the validness of each timestep, (IMG_HISORY_SIZE,) boolean array. # For the first IMAGE_HISTORY_SIZE-1 timesteps, the mask should be False. "cam_left_wrist": ndarray, # left wrist camera image, (IMG_HISORY_SIZE, H, W, 3). # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. "cam_left_wrist_mask": ndarray, "cam_right_wrist": ndarray, # right wrist camera image, (IMG_HISORY_SIZE, H, W, 3). # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. # If only one wrist, make it right wrist, plz. "cam_right_wrist_mask": ndarray } or None if the episode is invalid. """ with h5py.File(file_path, 'r') as f: states = f['observations']['states']['ee_6d_pos'][:] actions = f['actions']['ee_6d_pos'][:] num_steps = states.shape[0] # [Optional] We drop too-short episode if num_steps < 20: return False, None # We randomly sample a timestep step_id = np.random.randint(0, num_steps) # You can also use precomputed language embeddings (recommended) if self.DATASET_NAME == 'aloha_box_into_pot_easy': instruction = f['observations']['states']['language_instruction'][0].decode('utf-8') else: instruction = f"lang_embed/{self.DATASET_NAME}.pt" # Assemble the meta meta = { "dataset_name": self.DATASET_NAME, "#steps": num_steps, "step_id": step_id, "instruction": instruction } # Rescale gripper to [0, 1] states = states / np.array( [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] ) actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array( [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] ) # Parse the state and action state = states[step_id:step_id+1] state_std = np.std(states, axis=0) state_mean = np.mean(states, axis=0) state_norm = np.sqrt(np.mean(states**2, axis=0)) if actions.shape[0] < self.CHUNK_SIZE: # Pad the actions using the last action actions = np.concatenate([ actions, np.tile(actions[-1:], (self.CHUNK_SIZE-actions.shape[0], 1)) ], axis=0) # Fill the state/action into the unified vector def fill_in_state(values): uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) uni_vec[..., TABLETOP_6D_INDICES] = values return uni_vec state = fill_in_state(state) state_indicator = fill_in_state(np.ones_like(state_std)) state_std = fill_in_state(state_std) state_mean = fill_in_state(state_mean) state_norm = fill_in_state(state_norm) # If action's format is different from state's, # you may implement fill_in_action() actions = fill_in_state(actions) # Parse the images def parse_img(key): imgs = [] for i in range(max(step_id-self.IMG_HISORY_SIZE+1, 0), step_id+1): img = f['observations']['images'][key][i] # imgs.append(cv2.imdecode(np.frombuffer(img, np.uint8), cv2.IMREAD_COLOR)) imgs.append(img) # print(imgs) imgs = np.stack(imgs) if imgs.shape[0] < self.IMG_HISORY_SIZE: # Pad the images using the first image imgs = np.concatenate([ np.tile(imgs[:1], (self.IMG_HISORY_SIZE-imgs.shape[0], 1, 1, 1)), imgs ], axis=0) return imgs # `cam_high` is the external camera image cam_high = parse_img('back') # For step_id = first_idx - 1, the valid_len should be one valid_len = min(step_id + 1, self.IMG_HISORY_SIZE) cam_high_mask = np.array( [False] * (self.IMG_HISORY_SIZE - valid_len) + [True] * valid_len ) cam_left_wrist = parse_img('wrist_left') cam_left_wrist_mask = cam_high_mask.copy() cam_right_wrist = parse_img('wrist_right') cam_right_wrist_mask = cam_high_mask.copy() # print(cam_left_wrist is not None, cam_right_wrist is not None, cam_high is not None) # Return the resulting sample # For unavailable images, return zero-shape arrays, i.e., (IMG_HISORY_SIZE, 0, 0, 0) # E.g., return np.zeros((self.IMG_HISORY_SIZE, 0, 0, 0)) for the key "cam_left_wrist", # if the left-wrist camera is unavailable on your robot return True, { "meta": meta, "state": state, "state_std": state_std, "state_mean": state_mean, "state_norm": state_norm, "actions": actions, "state_indicator": state_indicator, "cam_high": cam_high, "cam_high_mask": cam_high_mask, "cam_left_wrist": cam_left_wrist, "cam_left_wrist_mask": cam_left_wrist_mask, "cam_right_wrist": cam_right_wrist, "cam_right_wrist_mask": cam_right_wrist_mask } def parse_hdf5_file_state_only(self, file_path): """[Modify] Parse a hdf5 file to generate a state trajectory. Args: file_path (str): the path to the hdf5 file Returns: valid (bool): whether the episode is valid, which is useful for filtering. If False, this episode will be dropped. dict: a dictionary containing the training sample, { "state": ndarray, # state[:], (T, STATE_DIM). "action": ndarray, # action[:], (T, STATE_DIM). } or None if the episode is invalid. """ with h5py.File(file_path, 'r') as f: states = f['observations']['states']['ee_6d_pos'][:] actions = f['actions']['ee_6d_pos'][:] num_steps = states.shape[0] step_id = np.random.randint(0, num_steps) # Rescale gripper to [0, 1] states = states / np.array( [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] ) actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array( [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] ) # Fill the state/action into the unified vector def fill_in_state(values): uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) uni_vec[..., TABLETOP_6D_INDICES] = values return uni_vec state = fill_in_state(states) action = fill_in_state(actions) # Return the resulting sample return True, { "state": state, "action": action } class AnubisHDF5VLADataset: """ This class is used to sample episodes from the embododiment dataset stored in HDF5. """ def __init__(self, task_name) -> None: # [Modify] The path to the HDF5 dataset directory # Each HDF5 file contains one episode dataset_name = task_name HDF5_DIR = f"/data5/jellyho/anubis_hdf5/{dataset_name}/" self.DATASET_NAME = dataset_name self.file_paths = [] for root, _, files in os.walk(HDF5_DIR): for filename in fnmatch.filter(files, '*.hdf5'): file_path = os.path.join(root, filename) self.file_paths.append(file_path) # Load the config with open('configs/base.yaml', 'r') as file: config = yaml.safe_load(file) self.CHUNK_SIZE = config['common']['action_chunk_size'] self.IMG_HISORY_SIZE = config['common']['img_history_size'] self.STATE_DIM = config['common']['state_dim'] # Get each episode's len episode_lens = [] for file_path in self.file_paths: valid, res = self.parse_hdf5_file_state_only(file_path) _len = res['state'].shape[0] if valid else 0 episode_lens.append(_len) self.episode_sample_weights = np.array(episode_lens) / np.sum(episode_lens) def __len__(self): return len(self.file_paths) def get_dataset_name(self): return self.DATASET_NAME def get_item(self, index: int=None, state_only=False): """Get a training sample at a random timestep. Args: index (int, optional): the index of the episode. If not provided, a random episode will be selected. state_only (bool, optional): Whether to return only the state. In this way, the sample will contain a complete trajectory rather than a single timestep. Defaults to False. Returns: sample (dict): a dictionary containing the training sample. """ while True: if index is None: file_path = np.random.choice(self.file_paths, p=self.episode_sample_weights) else: file_path = self.file_paths[index] valid, sample = self.parse_hdf5_file(file_path) \ if not state_only else self.parse_hdf5_file_state_only(file_path) if valid: return sample else: index = np.random.randint(0, len(self.file_paths)) def parse_hdf5_file(self, file_path): """[Modify] Parse a hdf5 file to generate a training sample at a random timestep. Args: file_path (str): the path to the hdf5 file Returns: valid (bool): whether the episode is valid, which is useful for filtering. If False, this episode will be dropped. dict: a dictionary containing the training sample, { "meta": { "dataset_name": str, # the name of your dataset. "#steps": int, # the number of steps in the episode, # also the total timesteps. "instruction": str # the language instruction for this episode. }, "step_id": int, # the index of the sampled step, # also the timestep t. "state": ndarray, # state[t], (1, STATE_DIM). "state_std": ndarray, # std(state[:]), (STATE_DIM,). "state_mean": ndarray, # mean(state[:]), (STATE_DIM,). "state_norm": ndarray, # norm(state[:]), (STATE_DIM,). "actions": ndarray, # action[t:t+CHUNK_SIZE], (CHUNK_SIZE, STATE_DIM). "state_indicator", ndarray, # indicates the validness of each dim, (STATE_DIM,). "cam_high": ndarray, # external camera image, (IMG_HISORY_SIZE, H, W, 3) # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. "cam_high_mask": ndarray, # indicates the validness of each timestep, (IMG_HISORY_SIZE,) boolean array. # For the first IMAGE_HISTORY_SIZE-1 timesteps, the mask should be False. "cam_left_wrist": ndarray, # left wrist camera image, (IMG_HISORY_SIZE, H, W, 3). # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. "cam_left_wrist_mask": ndarray, "cam_right_wrist": ndarray, # right wrist camera image, (IMG_HISORY_SIZE, H, W, 3). # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. # If only one wrist, make it right wrist, plz. "cam_right_wrist_mask": ndarray } or None if the episode is invalid. """ with h5py.File(file_path, 'r') as f: states = f['observation']['eef_pose'][:] actions = f['action']['eef_pose'][:] num_steps = states.shape[0] # [Optional] We drop too-short episode if num_steps < 20: return False, None # We randomly sample a timestep step_id = np.random.randint(0, num_steps) # You can also use precomputed language embeddings (recommended) if self.DATASET_NAME == 'aloha_box_into_pot_easy': instruction = f['observations']['states']['language_instruction'][0].decode('utf-8') else: instruction = f"lang_embed/{self.DATASET_NAME}.pt" # Assemble the meta meta = { "dataset_name": self.DATASET_NAME, "#steps": num_steps, "step_id": step_id, "instruction": instruction } # Rescale gripper to [0, 1] states = states / np.array( [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] ) actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array( [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] ) # Parse the state and action state = states[step_id:step_id+1] state_std = np.std(states, axis=0) state_mean = np.mean(states, axis=0) state_norm = np.sqrt(np.mean(states**2, axis=0)) if actions.shape[0] < self.CHUNK_SIZE: # Pad the actions using the last action actions = np.concatenate([ actions, np.tile(actions[-1:], (self.CHUNK_SIZE-actions.shape[0], 1)) ], axis=0) # Fill the state/action into the unified vector def fill_in_state(values): uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) uni_vec[..., TABLETOP_6D_INDICES] = values return uni_vec state = fill_in_state(state) state_indicator = fill_in_state(np.ones_like(state_std)) state_std = fill_in_state(state_std) state_mean = fill_in_state(state_mean) state_norm = fill_in_state(state_norm) # If action's format is different from state's, # you may implement fill_in_action() actions = fill_in_state(actions) # Parse the images def parse_img(key): imgs = [] for i in range(max(step_id-self.IMG_HISORY_SIZE+1, 0), step_id+1): img = f['observation'][key][i] # imgs.append(cv2.imdecode(np.frombuffer(img, np.uint8), cv2.IMREAD_COLOR)) imgs.append(img) # print(imgs) imgs = np.stack(imgs) if imgs.shape[0] < self.IMG_HISORY_SIZE: # Pad the images using the first image imgs = np.concatenate([ np.tile(imgs[:1], (self.IMG_HISORY_SIZE-imgs.shape[0], 1, 1, 1)), imgs ], axis=0) return imgs # `cam_high` is the external camera image cam_high = parse_img('agentview_image') # For step_id = first_idx - 1, the valid_len should be one valid_len = min(step_id + 1, self.IMG_HISORY_SIZE) cam_high_mask = np.array( [False] * (self.IMG_HISORY_SIZE - valid_len) + [True] * valid_len ) cam_left_wrist = parse_img('wrist_left_image') cam_left_wrist_mask = cam_high_mask.copy() cam_right_wrist = parse_img('wrist_right_image') cam_right_wrist_mask = cam_high_mask.copy() # print(cam_left_wrist is not None, cam_right_wrist is not None, cam_high is not None) # Return the resulting sample # For unavailable images, return zero-shape arrays, i.e., (IMG_HISORY_SIZE, 0, 0, 0) # E.g., return np.zeros((self.IMG_HISORY_SIZE, 0, 0, 0)) for the key "cam_left_wrist", # if the left-wrist camera is unavailable on your robot return True, { "meta": meta, "state": state, "state_std": state_std, "state_mean": state_mean, "state_norm": state_norm, "actions": actions, "state_indicator": state_indicator, "cam_high": cam_high, "cam_high_mask": cam_high_mask, "cam_left_wrist": cam_left_wrist, "cam_left_wrist_mask": cam_left_wrist_mask, "cam_right_wrist": cam_right_wrist, "cam_right_wrist_mask": cam_right_wrist_mask } def parse_hdf5_file_state_only(self, file_path): """[Modify] Parse a hdf5 file to generate a state trajectory. Args: file_path (str): the path to the hdf5 file Returns: valid (bool): whether the episode is valid, which is useful for filtering. If False, this episode will be dropped. dict: a dictionary containing the training sample, { "state": ndarray, # state[:], (T, STATE_DIM). "action": ndarray, # action[:], (T, STATE_DIM). } or None if the episode is invalid. """ with h5py.File(file_path, 'r') as f: states = f['observation']['eef_pose'][:] actions = f['action']['eef_pose'][:] num_steps = states.shape[0] step_id = np.random.randint(0, num_steps) # Rescale gripper to [0, 1] states = states / np.array( [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] ) actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array( [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] ) # Fill the state/action into the unified vector def fill_in_state(values): uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) uni_vec[..., TABLETOP_6D_INDICES] = values return uni_vec state = fill_in_state(states) action = fill_in_state(actions) # Return the resulting sample return True, { "state": state, "action": action } if __name__ == "__main__": ds = TabletopHDF5VLADataset() for i in range(len(ds)): print(f"Processing episode {i}/{len(ds)}...") ds.get_item(i)