RoboticsDiffusionTransformer / data /hdf5_maniskill_dataset.py
euijinrnd's picture
Add files using upload-large-folder tool
9de9fbf verified
import os
import h5py
import yaml
import numpy as np
# Assuming STATE_VEC_IDX_MAPPING is a dictionary mapping state variable names to indices
from configs.state_vec import STATE_VEC_IDX_MAPPING
import glob
from scipy.interpolate import interp1d
from PIL import Image
def interpolate_action_sequence(action_sequence, target_size):
"""
Extend the action sequece to `target_size` by linear interpolation.
Args:
action_sequence (np.ndarray): original action sequence, shape (N, D).
target_size (int): target sequence length.
Returns:
extended_sequence (np.ndarray): extended action sequence, shape (target_size, D).
"""
N, D = action_sequence.shape
indices_old = np.arange(N)
indices_new = np.linspace(0, N - 1, target_size)
interp_func = interp1d(indices_old, action_sequence,
kind='linear', axis=0, assume_sorted=True)
action_sequence_new = interp_func(indices_new)
return action_sequence_new
class HDF5VLADataset:
"""
This class is used to sample episodes from the embodiment dataset
stored in HDF5 files.
"""
def __init__(self):
# The name of your dataset
self.DATASET_NAME = "agilex"
self.data_dir = "data/datasets/rdt-ft-data/demo_1k"
self.tasks = os.listdir(self.data_dir)
# Multiple tasks
self.tasks = ['PickCube-v1', 'StackCube-v1', 'PlugCharger-v1', 'PushCube-v1', 'PegInsertionSide-v1']
# Load configuration from YAML file
with open('configs/base.yaml', 'r') as file:
config = yaml.safe_load(file)
self.CHUNK_SIZE = config['common']['action_chunk_size']
self.IMG_HISTORY_SIZE = config['common']['img_history_size']
self.STATE_DIM = config['common']['state_dim']
self.num_episode_per_task = 1000
self.img = []
self.state = []
self.action = []
# open the hdf5 files in memory to speed up the data loading
for task in self.tasks:
file_path = glob.glob(os.path.join(self.data_dir, task, 'motionplanning', '*.h5'))[0]
with h5py.File(file_path, "r") as f:
trajs = f.keys() # traj_0, traj_1,
# sort by the traj number
trajs = sorted(trajs, key=lambda x: int(x.split('_')[-1]))
for traj in trajs:
# images = f[traj]['obs']['sensor_data']['base_camera']['rgb'][:]
states = f[traj]['obs']['agent']['qpos'][:]
actions = f[traj]['actions'][:]
self.state.append(states)
self.action.append(actions)
# self.img.append(images)
self.state_min = np.concatenate(self.state).min(axis=0)
self.state_max = np.concatenate(self.state).max(axis=0)
self.action_min = np.concatenate(self.action).min(axis=0)
self.action_max = np.concatenate(self.action).max(axis=0)
self.action_std = np.concatenate(self.action).std(axis=0)
self.action_mean = np.concatenate(self.action).mean(axis=0)
self.task2lang = {
"PegInsertionSide-v1": "Pick up a orange-white peg and insert the orange end into the box with a hole in it.",
"PickCube-v1": "Grasp a red cube and move it to a target goal position.",
"StackCube-v1": "Pick up a red cube and stack it on top of a green cube and let go of the cube without it falling.",
"PlugCharger-v1": "Pick up one of the misplaced shapes on the board/kit and insert it into the correct empty slot.",
"PushCube-v1": "Push and move a cube to a goal region in front of it."
}
def __len__(self):
# Assume each file contains 100 episodes
return len(self.tasks) * self.num_episode_per_task
def get_dataset_name(self):
return self.DATASET_NAME
def get_item(self, index=None):
"""
Get a training sample at a random timestep.
Args:
index (int, optional): The index of the episode.
If not provided, a random episode will be selected.
state_only (bool, optional): Whether to return only the state.
In this way, the sample will contain a complete trajectory rather
than a single timestep. Defaults to False.
Returns:
sample (dict): A dictionary containing the training sample.
"""
while True:
if index is None:
index = np.random.randint(0, self.__len__())
valid, sample = self.parse_hdf5_file(index)
if valid:
return sample
else:
index = np.random.randint(0, self.__len__())
def parse_hdf5_file(self, index):
"""
Parse an HDF5 file to generate a training sample at a random timestep.
Args:
file_path (str): The path to the HDF5 file.
Returns:
valid (bool): Whether the episode is valid.
dict: A dictionary containing the training sample.
"""
num_steps = len(self.action[index])
step_index = np.random.randint(0, num_steps)
task_index = index // self.num_episode_per_task
language = self.task2lang[self.tasks[task_index]]
task_inner_index = index % self.num_episode_per_task
# Skip these episodes since in the eef version dataset they are invalid.
if self.tasks[task_index] == 'PegInsertionSide-v1' and task_inner_index > 400:
return False, None
proc_index = task_inner_index // 100
episode_index = task_inner_index % 100
# images0 = self.img[index]
# normalize to -1, 1
states = (self.state[index] - self.state_min) / (self.state_max - self.state_min) * 2 - 1
states = states[:, :-1] # remove the last state as it is replicate of the -2 state
actions = (self.action[index] - self.action_min) / (self.action_max - self.action_min) * 2 - 1
# Get image history
start_img_idx = max(0, step_index - self.IMG_HISTORY_SIZE + 1)
end_img_idx = step_index + 1
img_history = []
for i in range(start_img_idx, end_img_idx):
img_path = os.path.join(self.data_dir, self.tasks[task_index], 'motionplanning', f'{proc_index}', f'{episode_index}', f"{i + 1}.png")
img = np.array(Image.open(img_path))
img_history.append(img)
img_history = np.array(img_history)
# img_history = images0[start_img_idx:end_img_idx]
img_valid_len = img_history.shape[0]
# Pad images if necessary
if img_valid_len < self.IMG_HISTORY_SIZE:
padding = np.tile(img_history[0:1], (self.IMG_HISTORY_SIZE - img_valid_len, 1, 1, 1))
img_history = np.concatenate([padding, img_history], axis=0)
img_history_mask = np.array(
[False] * (self.IMG_HISTORY_SIZE - img_valid_len) + [True] * img_valid_len
)
# Compute state statistics
state_std = np.std(states, axis=0)
state_mean = np.mean(states, axis=0)
state_norm = np.sqrt(np.mean(states ** 2, axis=0))
# Get state and action at the specified timestep
state = states[step_index: step_index + 1]
runtime_chunksize = self.CHUNK_SIZE // 4
action_sequence = actions[step_index: step_index + runtime_chunksize]
# we use linear interpolation to pad the action sequence
# Pad action sequence if necessary
if action_sequence.shape[0] < runtime_chunksize:
padding = np.tile(action_sequence[-1:], (runtime_chunksize - action_sequence.shape[0], 1))
action_sequence = np.concatenate([action_sequence, padding], axis=0)
action_sequence = interpolate_action_sequence(action_sequence, self.CHUNK_SIZE)
# Fill state and action into unified vectors
def fill_in_state(values):
UNI_STATE_INDICES = [
STATE_VEC_IDX_MAPPING[f"right_arm_joint_{i}_pos"] for i in range(7)
] + [
STATE_VEC_IDX_MAPPING[f"right_gripper_open"]
]
uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,))
uni_vec[..., UNI_STATE_INDICES] = values
return uni_vec
state_indicator = fill_in_state(np.ones_like(state_std))
state = fill_in_state(state)
state_std = fill_in_state(state_std)
state_mean = fill_in_state(state_mean)
state_norm = fill_in_state(state_norm)
action_sequence = fill_in_state(action_sequence)
# Assemble the meta information
meta = {
"dataset_name": self.DATASET_NAME,
"#steps": num_steps,
"step_id": step_index,
"instruction": language
}
# Return the resulting sample
return True, {
"meta": meta,
"state": state,
"state_std": state_std,
"state_mean": state_mean,
"state_norm": state_norm,
"actions": action_sequence,
"state_indicator": state_indicator,
"cam_high": img_history, # Assuming images0 are high-level camera images
"cam_high_mask": img_history_mask,
"cam_left_wrist": np.zeros((self.IMG_HISTORY_SIZE, 0, 0, 0)),
"cam_left_wrist_mask": np.zeros(self.IMG_HISTORY_SIZE, dtype=bool),
"cam_right_wrist": np.zeros((self.IMG_HISTORY_SIZE, 0, 0, 0)),
"cam_right_wrist_mask": np.zeros(self.IMG_HISTORY_SIZE, dtype=bool),
}
if __name__ == "__main__":
from PIL import Image
ds = HDF5VLADataset()
json_data = {
'state_min': ds.state_min.tolist(),
'state_max': ds.state_max.tolist(),
'action_min': ds.action_min.tolist(),
'action_max': ds.action_max.tolist(),
}
print(json_data)