|
import os |
|
import h5py |
|
import yaml |
|
import numpy as np |
|
|
|
from configs.state_vec import STATE_VEC_IDX_MAPPING |
|
import glob |
|
from scipy.interpolate import interp1d |
|
from PIL import Image |
|
|
|
|
|
def interpolate_action_sequence(action_sequence, target_size): |
|
""" |
|
Extend the action sequece to `target_size` by linear interpolation. |
|
|
|
Args: |
|
action_sequence (np.ndarray): original action sequence, shape (N, D). |
|
target_size (int): target sequence length. |
|
|
|
Returns: |
|
extended_sequence (np.ndarray): extended action sequence, shape (target_size, D). |
|
""" |
|
N, D = action_sequence.shape |
|
indices_old = np.arange(N) |
|
indices_new = np.linspace(0, N - 1, target_size) |
|
|
|
interp_func = interp1d(indices_old, action_sequence, |
|
kind='linear', axis=0, assume_sorted=True) |
|
action_sequence_new = interp_func(indices_new) |
|
|
|
return action_sequence_new |
|
|
|
|
|
class HDF5VLADataset: |
|
""" |
|
This class is used to sample episodes from the embodiment dataset |
|
stored in HDF5 files. |
|
""" |
|
def __init__(self): |
|
|
|
self.DATASET_NAME = "agilex" |
|
|
|
self.data_dir = "data/datasets/rdt-ft-data/demo_1k" |
|
self.tasks = os.listdir(self.data_dir) |
|
|
|
self.tasks = ['PickCube-v1', 'StackCube-v1', 'PlugCharger-v1', 'PushCube-v1', 'PegInsertionSide-v1'] |
|
|
|
with open('configs/base.yaml', 'r') as file: |
|
config = yaml.safe_load(file) |
|
self.CHUNK_SIZE = config['common']['action_chunk_size'] |
|
self.IMG_HISTORY_SIZE = config['common']['img_history_size'] |
|
self.STATE_DIM = config['common']['state_dim'] |
|
|
|
self.num_episode_per_task = 1000 |
|
self.img = [] |
|
self.state = [] |
|
self.action = [] |
|
|
|
|
|
for task in self.tasks: |
|
file_path = glob.glob(os.path.join(self.data_dir, task, 'motionplanning', '*.h5'))[0] |
|
with h5py.File(file_path, "r") as f: |
|
trajs = f.keys() |
|
|
|
trajs = sorted(trajs, key=lambda x: int(x.split('_')[-1])) |
|
for traj in trajs: |
|
|
|
states = f[traj]['obs']['agent']['qpos'][:] |
|
actions = f[traj]['actions'][:] |
|
|
|
self.state.append(states) |
|
self.action.append(actions) |
|
|
|
|
|
self.state_min = np.concatenate(self.state).min(axis=0) |
|
self.state_max = np.concatenate(self.state).max(axis=0) |
|
self.action_min = np.concatenate(self.action).min(axis=0) |
|
self.action_max = np.concatenate(self.action).max(axis=0) |
|
self.action_std = np.concatenate(self.action).std(axis=0) |
|
self.action_mean = np.concatenate(self.action).mean(axis=0) |
|
|
|
self.task2lang = { |
|
"PegInsertionSide-v1": "Pick up a orange-white peg and insert the orange end into the box with a hole in it.", |
|
"PickCube-v1": "Grasp a red cube and move it to a target goal position.", |
|
"StackCube-v1": "Pick up a red cube and stack it on top of a green cube and let go of the cube without it falling.", |
|
"PlugCharger-v1": "Pick up one of the misplaced shapes on the board/kit and insert it into the correct empty slot.", |
|
"PushCube-v1": "Push and move a cube to a goal region in front of it." |
|
} |
|
|
|
def __len__(self): |
|
|
|
return len(self.tasks) * self.num_episode_per_task |
|
|
|
def get_dataset_name(self): |
|
return self.DATASET_NAME |
|
|
|
def get_item(self, index=None): |
|
""" |
|
Get a training sample at a random timestep. |
|
|
|
Args: |
|
index (int, optional): The index of the episode. |
|
If not provided, a random episode will be selected. |
|
state_only (bool, optional): Whether to return only the state. |
|
In this way, the sample will contain a complete trajectory rather |
|
than a single timestep. Defaults to False. |
|
|
|
Returns: |
|
sample (dict): A dictionary containing the training sample. |
|
""" |
|
while True: |
|
if index is None: |
|
index = np.random.randint(0, self.__len__()) |
|
valid, sample = self.parse_hdf5_file(index) |
|
if valid: |
|
return sample |
|
else: |
|
index = np.random.randint(0, self.__len__()) |
|
|
|
def parse_hdf5_file(self, index): |
|
""" |
|
Parse an HDF5 file to generate a training sample at a random timestep. |
|
|
|
Args: |
|
file_path (str): The path to the HDF5 file. |
|
|
|
Returns: |
|
valid (bool): Whether the episode is valid. |
|
dict: A dictionary containing the training sample. |
|
""" |
|
num_steps = len(self.action[index]) |
|
step_index = np.random.randint(0, num_steps) |
|
task_index = index // self.num_episode_per_task |
|
language = self.task2lang[self.tasks[task_index]] |
|
task_inner_index = index % self.num_episode_per_task |
|
|
|
if self.tasks[task_index] == 'PegInsertionSide-v1' and task_inner_index > 400: |
|
return False, None |
|
proc_index = task_inner_index // 100 |
|
episode_index = task_inner_index % 100 |
|
|
|
|
|
states = (self.state[index] - self.state_min) / (self.state_max - self.state_min) * 2 - 1 |
|
states = states[:, :-1] |
|
actions = (self.action[index] - self.action_min) / (self.action_max - self.action_min) * 2 - 1 |
|
|
|
|
|
start_img_idx = max(0, step_index - self.IMG_HISTORY_SIZE + 1) |
|
end_img_idx = step_index + 1 |
|
img_history = [] |
|
for i in range(start_img_idx, end_img_idx): |
|
img_path = os.path.join(self.data_dir, self.tasks[task_index], 'motionplanning', f'{proc_index}', f'{episode_index}', f"{i + 1}.png") |
|
img = np.array(Image.open(img_path)) |
|
img_history.append(img) |
|
img_history = np.array(img_history) |
|
|
|
img_valid_len = img_history.shape[0] |
|
|
|
|
|
if img_valid_len < self.IMG_HISTORY_SIZE: |
|
padding = np.tile(img_history[0:1], (self.IMG_HISTORY_SIZE - img_valid_len, 1, 1, 1)) |
|
img_history = np.concatenate([padding, img_history], axis=0) |
|
|
|
img_history_mask = np.array( |
|
[False] * (self.IMG_HISTORY_SIZE - img_valid_len) + [True] * img_valid_len |
|
) |
|
|
|
|
|
state_std = np.std(states, axis=0) |
|
state_mean = np.mean(states, axis=0) |
|
state_norm = np.sqrt(np.mean(states ** 2, axis=0)) |
|
|
|
|
|
state = states[step_index: step_index + 1] |
|
runtime_chunksize = self.CHUNK_SIZE // 4 |
|
action_sequence = actions[step_index: step_index + runtime_chunksize] |
|
|
|
|
|
|
|
if action_sequence.shape[0] < runtime_chunksize: |
|
padding = np.tile(action_sequence[-1:], (runtime_chunksize - action_sequence.shape[0], 1)) |
|
action_sequence = np.concatenate([action_sequence, padding], axis=0) |
|
|
|
action_sequence = interpolate_action_sequence(action_sequence, self.CHUNK_SIZE) |
|
|
|
|
|
def fill_in_state(values): |
|
UNI_STATE_INDICES = [ |
|
STATE_VEC_IDX_MAPPING[f"right_arm_joint_{i}_pos"] for i in range(7) |
|
] + [ |
|
STATE_VEC_IDX_MAPPING[f"right_gripper_open"] |
|
] |
|
uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) |
|
uni_vec[..., UNI_STATE_INDICES] = values |
|
return uni_vec |
|
|
|
state_indicator = fill_in_state(np.ones_like(state_std)) |
|
state = fill_in_state(state) |
|
state_std = fill_in_state(state_std) |
|
state_mean = fill_in_state(state_mean) |
|
state_norm = fill_in_state(state_norm) |
|
action_sequence = fill_in_state(action_sequence) |
|
|
|
|
|
meta = { |
|
"dataset_name": self.DATASET_NAME, |
|
"#steps": num_steps, |
|
"step_id": step_index, |
|
"instruction": language |
|
} |
|
|
|
|
|
return True, { |
|
"meta": meta, |
|
"state": state, |
|
"state_std": state_std, |
|
"state_mean": state_mean, |
|
"state_norm": state_norm, |
|
"actions": action_sequence, |
|
"state_indicator": state_indicator, |
|
"cam_high": img_history, |
|
"cam_high_mask": img_history_mask, |
|
"cam_left_wrist": np.zeros((self.IMG_HISTORY_SIZE, 0, 0, 0)), |
|
"cam_left_wrist_mask": np.zeros(self.IMG_HISTORY_SIZE, dtype=bool), |
|
"cam_right_wrist": np.zeros((self.IMG_HISTORY_SIZE, 0, 0, 0)), |
|
"cam_right_wrist_mask": np.zeros(self.IMG_HISTORY_SIZE, dtype=bool), |
|
} |
|
|
|
|
|
if __name__ == "__main__": |
|
from PIL import Image |
|
|
|
ds = HDF5VLADataset() |
|
|
|
json_data = { |
|
'state_min': ds.state_min.tolist(), |
|
'state_max': ds.state_max.tolist(), |
|
'action_min': ds.action_min.tolist(), |
|
'action_max': ds.action_max.tolist(), |
|
} |
|
print(json_data) |
|
|
|
|
|
|