LeRobot documentation
Introduction to Processors
Introduction to Processors
In robotics, there’s a fundamental mismatch between the data that robots and humans produce and what machine learning models expect. Robots output raw sensor data like camera images and joint positions that need normalization, batching, and device placement before models can process them. Language instructions from humans must be tokenized into numerical representations, and different robots use different coordinate systems that need standardization.
The challenge extends to model outputs as well. Models might output end-effector positions while robots need joint-space commands, or teleoperators produce relative movements while robots expect absolute commands. Model predictions are often normalized and need conversion back to real-world scales.
Cross-domain translation adds another layer of complexity. Training data from one robot setup needs adaptation for deployment on different hardware, models trained with specific camera configurations must work with new arrangements, and datasets with different naming conventions need harmonization.
That’s where processors come in. They serve as universal translators that bridge these gaps, ensuring seamless data flow from sensors to models to actuators. Processors handle all the preprocessing and postprocessing steps needed to convert raw environment data into model-ready inputs and vice versa.
This means that your favorite policy can be used like this:
import torch
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.policies.factory import make_pre_post_processors
from lerobot.policies.your_policy import YourPolicy
from lerobot.processor.pipeline import RobotProcessorPipeline, PolicyProcessorPipeline
dataset = LeRobotDataset("hf_user/dataset", episodes=[0])
sample = dataset[10]
model = YourPolicy.from_pretrained(
"hf_user/model",
)
model.eval()
model.to("cuda")
preprocessor, postprocessor = make_pre_post_processors(model.config, pretrained_path="hf_user/model", dataset_stats=dataset.meta.stats)
preprocessed_sample = preprocessor(sample)
action = model.select_action(preprocessed_sample)
postprocessed_action = postprocessor(action)
What are Processors?
In robotics, data comes in many forms: images from cameras, joint positions from sensors, text instructions from users, and more. Each type of data requires specific transformations before a model can use it effectively. Models need this data to be:
- Normalized: Scaled to appropriate ranges for neural network processing
- Batched: Organized with proper dimensions for batch processing
- Tokenized: Text converted to numerical representations
- Device-placed: Moved to the right hardware (CPU/GPU)
- Type-converted: Cast to appropriate data types
Processors handle these transformations through composable, reusable steps that can be chained together into pipelines. Think of them as a modular assembly line where each station performs a specific transformation on your data.
Core Concepts
EnvTransition: The Universal Data Container
The EnvTransition
is the fundamental data structure that flows through all processors.
It’s a typed dictionary that represents a complete robot-environment interaction:
- OBSERVATION: All sensor data (images, states, proprioception)
- ACTION: The action to execute or that was executed
- REWARD: Reinforcement learning signal
- DONE/TRUNCATED: Episode boundary indicators
- INFO: Arbitrary metadata
- COMPLEMENTARY_DATA: Task descriptions, indices, padding flags, inter-step data
ProcessorStep: The Building Block
A ProcessorStep
is a single transformation unit that processes transitions. It’s an abstract base class with two required methods:
from lerobot.processor import ProcessorStep, EnvTransition
class MyProcessorStep(ProcessorStep):
"""Example processor step - inherit and implement abstract methods."""
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""Transform the transition - REQUIRED abstract method."""
# Your processing logic here
return transition
def transform_features(self, features):
"""Declare how this step transforms feature shapes/types - REQUIRED abstract method."""
return features # Most processors return features unchanged
__call__
is the core of your processor step. It takes an EnvTransition
and returns a modified EnvTransition
.
transform_features
is used to declare how this step transforms feature shapes/types.
DataProcessorPipeline: The Generic Orchestrator
The DataProcessorPipeline[TInput, TOutput]
chains multiple ProcessorStep
instances together:
from lerobot.processor import RobotProcessorPipeline, PolicyProcessorPipeline
# For robot hardware (unbatched data)
robot_processor = RobotProcessorPipeline[RobotAction, RobotAction](
steps=[step1, step2, step3],
name="robot_pipeline"
)
# For model training/inference (batched data)
policy_processor = PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=[step1, step2, step3],
name="policy_pipeline"
)
RobotProcessorPipeline vs PolicyProcessorPipeline
The key distinction is in the data structures they handle:
Aspect | RobotProcessorPipeline | PolicyProcessorPipeline |
---|---|---|
Input | dict[str, Any] - Individual robot values | dict[str, Any] - Batched tensors |
Output | dict[str, Any] - Individual robot commands | torch.Tensor - Policy predictions |
Use Case | Real-time robot control | Model training/inference |
Data Format | Unbatched, heterogeneous | Batched, homogeneous |
Examples | {"joint_1": 0.5} | {"observation.state": tensor([[0.5]])} |
Use RobotProcessorPipeline
for robot hardware interfaces:
# Robot data structures: dict[str, Any] for observations and actions
robot_obs: dict[str, Any] = {
"joint_1": 0.5, # Individual joint values
"joint_2": -0.3,
"camera_0": image_array # Raw camera data
}
robot_action: dict[str, Any] = {
"joint_1": 0.2, # Target joint positions
"joint_2": 0.1,
"gripper": 0.8
}
Use PolicyProcessorPipeline
for model training and batch processing:
# Policy data structures: batch dicts and tensors
policy_batch: dict[str, Any] = {
"observation.state": torch.tensor([[0.5, -0.3]]), # Batched states
"observation.images.camera0": torch.tensor(...), # Batched images
"action": torch.tensor([[0.2, 0.1, 0.8]]) # Batched actions
}
policy_action: torch.Tensor = torch.tensor([[0.2, 0.1, 0.8]]) # Model output tensor
Converter Functions
LeRobot provides converter functions to bridge different data formats in lerobot.processor.converters
. These functions handle the crucial translations between robot hardware data structures, policy model formats, and the internal EnvTransition
representation that flows through processor pipelines.
Category | Function | Description |
---|---|---|
Robot Hardware Converters | robot_action_to_transition | Robot dict → EnvTransition |
observation_to_transition | Robot obs → EnvTransition | |
transition_to_robot_action | EnvTransition → Robot dict | |
Policy/Training Converters | batch_to_transition | Batch dict → EnvTransition |
transition_to_batch | EnvTransition → Batch dict | |
policy_action_to_transition | Policy tensor → EnvTransition | |
transition_to_policy_action | EnvTransition → Policy tensor | |
Utilities | create_transition | Build transitions with defaults |
identity_transition | Pass-through converter |
The key insight is that robot hardware converters work with individual values and dictionaries, while policy/training converters work with batched tensors and model outputs. The converter functions automatically handle the structural differences, so your processor steps can focus on the core transformations without worrying about data format compatibility.
Processor Examples
The following examples demonstrate real-world processor configurations for policy training and inference.
Here is an example processor for policy training and inference:
# Training data preprocessing (optimized order for GPU performance)
training_preprocessor = PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=[
RenameObservationsProcessorStep(rename_map={}), # Standardize keys
AddBatchDimensionProcessorStep(), # Add batch dims
TokenizerProcessorStep(tokenizer_name="...", ...), # Tokenize language
DeviceProcessorStep(device="cuda"), # Move to GPU first
NormalizerProcessorStep(features=..., stats=...), # Normalize on GPU
]
)
# Model output postprocessing
training_postprocessor = PolicyProcessorPipeline[torch.Tensor, torch.Tensor](
steps=[
DeviceProcessorStep(device="cpu"), # Move to CPU
UnnormalizerProcessorStep(features=..., stats=...), # Denormalize
]
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
)
An interaction between a robot and a policy with processors
The most common real-world scenario combines both pipeline types robot hardware generates observations that need policy processing, and policy outputs need robot-compatible postprocessing:
# Real deployment: Robot sensors → Model → Robot commands
with torch.no_grad():
while not done:
raw_obs = robot.get_observation() # dict[str, Any]
# Add your robot observation to policy observation processor
policy_input = policy_preprocessor(raw_obs) # Batched dict
policy_output = policy.select_action(policy_input) # Policy tensor
policy_action = policy_postprocessor(policy_output)
# Add your robot action to policy action processor
robot.send_action(policy_action)
Feature Contracts: Shape and Type Transformation
Processors don’t just transform data - they can also change the data structure itself. The transform_features()
method declares these changes, which is crucial for dataset recording and policy creation.
Why Feature Contracts Matter
When building datasets or policies, LeRobot needs to know:
- What data fields will exist after processing
- What shapes and types each field will have
- How to configure models for the expected data structure
# Example: A processor that adds velocity to observations
class VelocityProcessor(ObservationProcessorStep):
def observation(self, obs):
new_obs = obs.copy()
if "observation.state" in obs:
# concatenate computed velocity field to the state
new_obs["observation.state"] = self._compute_velocity(obs["observation.state"])
return new_obs
def transform_features(self, features):
"""Declare the new velocity field we're adding."""
state_feature = features[PipelineFeatureType.OBSERVATION].get("observation.state")
if state_feature:
double_shape = (state_feature.shape[0] * 2,) if state_feature.shape else (2,)
features[PipelineFeatureType.OBSERVATION]["observation.state"] = PolicyFeature(
type=FeatureType.STATE, shape=double_shape
)
return features
Feature Specification Functions
create_initial_features()
and aggregate_pipeline_dataset_features()
solve a critical dataset creation problem: determining the exact final data structure before any data is processed.
Since processor pipelines can add new features (like velocity fields), change tensor shapes (like cropping images), or rename keys, datasets need to know the complete output specification upfront to allocate proper storage and define schemas.
These functions work together by starting with robot hardware specifications (create_initial_features()
) then simulating the entire pipeline transformation (aggregate_pipeline_dataset_features()
) to compute the final feature dictionary that gets passed to LeRobotDataset.create()
, ensuring perfect alignment between what processors output and what datasets expect to store.
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features
# Start with robot's raw features
initial_features = create_initial_features(
observation=robot.observation_features, # {"joint_1.pos": float, "camera_0": (480,640,3)}
action=robot.action_features # {"joint_1.pos": float, "gripper.pos": float}
)
# Apply processor pipeline to compute final features
final_features = aggregate_pipeline_dataset_features(
pipeline=my_processor_pipeline,
initial_features=initial_features,
use_videos=True
)
# Use for dataset creation
dataset = LeRobotDataset.create(
repo_id="my_dataset",
features=final_features, # Knows exactly what data to expect
...
)
Common Processor Steps
LeRobot provides many registered processor steps. Here are the most commonly used core processors:
Essential Processors
normalizer_processor
: Normalize observations/actions using dataset statistics (mean/std or min/max)device_processor
: Move tensors to CPU/GPU with optional dtype conversionto_batch_processor
: Add batch dimensions to transitions for model compatibilityrename_observations_processor
: Rename observation keys using mapping dictionariestokenizer_processor
: Tokenize natural language task descriptions into tokens and attention masks
Next Steps
- Implement Your Own Processor - Create custom processor steps
- Debug Your Pipeline - Troubleshoot and optimize pipelines
- Processors for Robots and Teleoperators - Real-world integration patterns
Summary
Processors solve the data translation problem in robotics by providing:
- Modular transformations: Composable, reusable processing steps
- Type safety: Generic pipelines with compile-time checking
- Performance optimization: GPU-accelerated operations
- Robot/Policy distinction: Separate pipelines for different data structures
- Comprehensive ecosystem: 30+ registered processors for common tasks
The key insight: RobotProcessorPipeline
handles unbatched robot hardware data, while PolicyProcessorPipeline
handles batched model data. Choose the right tool for your data structure!