LeRobot documentation

Introduction to Processors

Hugging Face's logo
Join the Hugging Face community

and get access to the augmented documentation experience

to get started

Introduction to Processors

In robotics, there’s a fundamental mismatch between the data that robots and humans produce and what machine learning models expect. Robots output raw sensor data like camera images and joint positions that need normalization, batching, and device placement before models can process them. Language instructions from humans must be tokenized into numerical representations, and different robots use different coordinate systems that need standardization.

The challenge extends to model outputs as well. Models might output end-effector positions while robots need joint-space commands, or teleoperators produce relative movements while robots expect absolute commands. Model predictions are often normalized and need conversion back to real-world scales.

Cross-domain translation adds another layer of complexity. Training data from one robot setup needs adaptation for deployment on different hardware, models trained with specific camera configurations must work with new arrangements, and datasets with different naming conventions need harmonization.

That’s where processors come in. They serve as universal translators that bridge these gaps, ensuring seamless data flow from sensors to models to actuators. Processors handle all the preprocessing and postprocessing steps needed to convert raw environment data into model-ready inputs and vice versa.

This means that your favorite policy can be used like this:

import torch

from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.policies.factory import make_pre_post_processors
from lerobot.policies.your_policy import YourPolicy
from lerobot.processor.pipeline import RobotProcessorPipeline, PolicyProcessorPipeline
dataset = LeRobotDataset("hf_user/dataset", episodes=[0])
sample = dataset[10]

model = YourPolicy.from_pretrained(
    "hf_user/model",
)
model.eval()
model.to("cuda")
preprocessor, postprocessor = make_pre_post_processors(model.config, pretrained_path="hf_user/model", dataset_stats=dataset.meta.stats)

preprocessed_sample = preprocessor(sample)
action = model.select_action(preprocessed_sample)
postprocessed_action = postprocessor(action)

What are Processors?

In robotics, data comes in many forms: images from cameras, joint positions from sensors, text instructions from users, and more. Each type of data requires specific transformations before a model can use it effectively. Models need this data to be:

  • Normalized: Scaled to appropriate ranges for neural network processing
  • Batched: Organized with proper dimensions for batch processing
  • Tokenized: Text converted to numerical representations
  • Device-placed: Moved to the right hardware (CPU/GPU)
  • Type-converted: Cast to appropriate data types

Processors handle these transformations through composable, reusable steps that can be chained together into pipelines. Think of them as a modular assembly line where each station performs a specific transformation on your data.

Core Concepts

EnvTransition: The Universal Data Container

The EnvTransition is the fundamental data structure that flows through all processors. It’s a typed dictionary that represents a complete robot-environment interaction:

  • OBSERVATION: All sensor data (images, states, proprioception)
  • ACTION: The action to execute or that was executed
  • REWARD: Reinforcement learning signal
  • DONE/TRUNCATED: Episode boundary indicators
  • INFO: Arbitrary metadata
  • COMPLEMENTARY_DATA: Task descriptions, indices, padding flags, inter-step data

ProcessorStep: The Building Block

A ProcessorStep is a single transformation unit that processes transitions. It’s an abstract base class with two required methods:

from lerobot.processor import ProcessorStep, EnvTransition

class MyProcessorStep(ProcessorStep):
    """Example processor step - inherit and implement abstract methods."""

    def __call__(self, transition: EnvTransition) -> EnvTransition:
        """Transform the transition - REQUIRED abstract method."""
        # Your processing logic here
        return transition

    def transform_features(self, features):
        """Declare how this step transforms feature shapes/types - REQUIRED abstract method."""
        return features  # Most processors return features unchanged

__call__ is the core of your processor step. It takes an EnvTransition and returns a modified EnvTransition.

transform_features is used to declare how this step transforms feature shapes/types.

DataProcessorPipeline: The Generic Orchestrator

The DataProcessorPipeline[TInput, TOutput] chains multiple ProcessorStep instances together:

from lerobot.processor import RobotProcessorPipeline, PolicyProcessorPipeline

# For robot hardware (unbatched data)
robot_processor = RobotProcessorPipeline[RobotAction, RobotAction](
    steps=[step1, step2, step3],
    name="robot_pipeline"
)

# For model training/inference (batched data)
policy_processor = PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
    steps=[step1, step2, step3],
    name="policy_pipeline"
)

RobotProcessorPipeline vs PolicyProcessorPipeline

The key distinction is in the data structures they handle:

Aspect RobotProcessorPipeline PolicyProcessorPipeline
Input dict[str, Any] - Individual robot values dict[str, Any] - Batched tensors
Output dict[str, Any] - Individual robot commands torch.Tensor - Policy predictions
Use Case Real-time robot control Model training/inference
Data Format Unbatched, heterogeneous Batched, homogeneous
Examples {"joint_1": 0.5} {"observation.state": tensor([[0.5]])}

Use RobotProcessorPipeline for robot hardware interfaces:

# Robot data structures: dict[str, Any] for observations and actions
robot_obs: dict[str, Any] = {
    "joint_1": 0.5,           # Individual joint values
    "joint_2": -0.3,
    "camera_0": image_array   # Raw camera data
}

robot_action: dict[str, Any] = {
    "joint_1": 0.2,          # Target joint positions
    "joint_2": 0.1,
    "gripper": 0.8
}

Use PolicyProcessorPipeline for model training and batch processing:

# Policy data structures: batch dicts and tensors
policy_batch: dict[str, Any] = {
    "observation.state": torch.tensor([[0.5, -0.3]]),      # Batched states
    "observation.images.camera0": torch.tensor(...),        # Batched images
    "action": torch.tensor([[0.2, 0.1, 0.8]])              # Batched actions
}

policy_action: torch.Tensor = torch.tensor([[0.2, 0.1, 0.8]])  # Model output tensor

Converter Functions

LeRobot provides converter functions to bridge different data formats in lerobot.processor.converters. These functions handle the crucial translations between robot hardware data structures, policy model formats, and the internal EnvTransition representation that flows through processor pipelines.

Category Function Description
Robot Hardware Converters robot_action_to_transition Robot dict → EnvTransition
observation_to_transition Robot obs → EnvTransition
transition_to_robot_action EnvTransition → Robot dict
Policy/Training Converters batch_to_transition Batch dict → EnvTransition
transition_to_batch EnvTransition → Batch dict
policy_action_to_transition Policy tensor → EnvTransition
transition_to_policy_action EnvTransition → Policy tensor
Utilities create_transition Build transitions with defaults
identity_transition Pass-through converter

The key insight is that robot hardware converters work with individual values and dictionaries, while policy/training converters work with batched tensors and model outputs. The converter functions automatically handle the structural differences, so your processor steps can focus on the core transformations without worrying about data format compatibility.

Processor Examples

The following examples demonstrate real-world processor configurations for policy training and inference.

Here is an example processor for policy training and inference:

# Training data preprocessing (optimized order for GPU performance)
training_preprocessor = PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
    steps=[
        RenameObservationsProcessorStep(rename_map={}),     # Standardize keys
        AddBatchDimensionProcessorStep(),                   # Add batch dims
        TokenizerProcessorStep(tokenizer_name="...", ...),  # Tokenize language
        DeviceProcessorStep(device="cuda"),                 # Move to GPU first
        NormalizerProcessorStep(features=..., stats=...),   # Normalize on GPU
    ]
)

# Model output postprocessing
training_postprocessor = PolicyProcessorPipeline[torch.Tensor, torch.Tensor](
    steps=[
        DeviceProcessorStep(device="cpu"),                  # Move to CPU
        UnnormalizerProcessorStep(features=..., stats=...), # Denormalize
    ]
    to_transition=policy_action_to_transition,
    to_output=transition_to_policy_action,
)

An interaction between a robot and a policy with processors

The most common real-world scenario combines both pipeline types robot hardware generates observations that need policy processing, and policy outputs need robot-compatible postprocessing:

# Real deployment: Robot sensors → Model → Robot commands
with torch.no_grad():
    while not done:
        raw_obs = robot.get_observation()  # dict[str, Any]

        # Add your robot observation to policy observation processor

        policy_input = policy_preprocessor(raw_obs)  # Batched dict

        policy_output = policy.select_action(policy_input)  # Policy tensor

        policy_action = policy_postprocessor(policy_output)

        # Add your robot action to policy action processor

        robot.send_action(policy_action)

Feature Contracts: Shape and Type Transformation

Processors don’t just transform data - they can also change the data structure itself. The transform_features() method declares these changes, which is crucial for dataset recording and policy creation.

Why Feature Contracts Matter

When building datasets or policies, LeRobot needs to know:

  • What data fields will exist after processing
  • What shapes and types each field will have
  • How to configure models for the expected data structure
# Example: A processor that adds velocity to observations
class VelocityProcessor(ObservationProcessorStep):
    def observation(self, obs):
        new_obs = obs.copy()
        if "observation.state" in obs:
            # concatenate computed velocity field to the state
            new_obs["observation.state"] = self._compute_velocity(obs["observation.state"])
        return new_obs

    def transform_features(self, features):
        """Declare the new velocity field we're adding."""
        state_feature = features[PipelineFeatureType.OBSERVATION].get("observation.state")
        if state_feature:
            double_shape = (state_feature.shape[0] * 2,) if state_feature.shape else (2,)
            features[PipelineFeatureType.OBSERVATION]["observation.state"] = PolicyFeature(
                type=FeatureType.STATE, shape=double_shape
            )
        return features

Feature Specification Functions

create_initial_features() and aggregate_pipeline_dataset_features() solve a critical dataset creation problem: determining the exact final data structure before any data is processed. Since processor pipelines can add new features (like velocity fields), change tensor shapes (like cropping images), or rename keys, datasets need to know the complete output specification upfront to allocate proper storage and define schemas. These functions work together by starting with robot hardware specifications (create_initial_features()) then simulating the entire pipeline transformation (aggregate_pipeline_dataset_features()) to compute the final feature dictionary that gets passed to LeRobotDataset.create(), ensuring perfect alignment between what processors output and what datasets expect to store.

from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features

# Start with robot's raw features
initial_features = create_initial_features(
    observation=robot.observation_features,  # {"joint_1.pos": float, "camera_0": (480,640,3)}
    action=robot.action_features            # {"joint_1.pos": float, "gripper.pos": float}
)

# Apply processor pipeline to compute final features
final_features = aggregate_pipeline_dataset_features(
    pipeline=my_processor_pipeline,
    initial_features=initial_features,
    use_videos=True
)

# Use for dataset creation
dataset = LeRobotDataset.create(
    repo_id="my_dataset",
    features=final_features,  # Knows exactly what data to expect
    ...
)

Common Processor Steps

LeRobot provides many registered processor steps. Here are the most commonly used core processors:

Essential Processors

  • normalizer_processor: Normalize observations/actions using dataset statistics (mean/std or min/max)
  • device_processor: Move tensors to CPU/GPU with optional dtype conversion
  • to_batch_processor: Add batch dimensions to transitions for model compatibility
  • rename_observations_processor: Rename observation keys using mapping dictionaries
  • tokenizer_processor: Tokenize natural language task descriptions into tokens and attention masks

Next Steps

Summary

Processors solve the data translation problem in robotics by providing:

  • Modular transformations: Composable, reusable processing steps
  • Type safety: Generic pipelines with compile-time checking
  • Performance optimization: GPU-accelerated operations
  • Robot/Policy distinction: Separate pipelines for different data structures
  • Comprehensive ecosystem: 30+ registered processors for common tasks

The key insight: RobotProcessorPipeline handles unbatched robot hardware data, while PolicyProcessorPipeline handles batched model data. Choose the right tool for your data structure!

< > Update on GitHub