LeRobot-Arena / src-python /src /robot_manager.py
blanchon's picture
squash: initial commit
3aea7c6
from typing import Dict, Optional, List
from datetime import datetime, timezone
import logging
from .models import Robot, RobotStatus, DriverJointState
logger = logging.getLogger(__name__)
class RobotManager:
"""Manages robot lifecycle and state"""
def __init__(self):
self.robots: Dict[str, Robot] = {}
async def create_robot(self, robot_id: str, robot_type: str, name: str) -> Robot:
"""Create a new robot"""
if robot_id in self.robots:
raise ValueError(f"Robot {robot_id} already exists")
# Create demo joint configuration for so-arm100
joints = self._create_demo_joints(robot_type)
robot = Robot(
id=robot_id,
name=name,
robot_type=robot_type,
created_at=datetime.now(timezone.utc),
joints=joints
)
self.robots[robot_id] = robot
logger.info(f"Created robot {robot_id} ({robot_type}) with {len(joints)} joints")
return robot
def get_robot(self, robot_id: str) -> Optional[Robot]:
"""Get robot by ID"""
return self.robots.get(robot_id)
async def delete_robot(self, robot_id: str):
"""Delete a robot"""
if robot_id in self.robots:
del self.robots[robot_id]
logger.info(f"Deleted robot {robot_id}")
async def set_master_connected(self, robot_id: str, connection_id: str):
"""Mark robot as having a master connection"""
robot = self.robots.get(robot_id)
if robot:
robot.master_connected = True
robot.master_connection_id = connection_id
robot.last_command_source = "master"
robot.last_command_time = datetime.now(timezone.utc)
async def set_master_disconnected(self, robot_id: str):
"""Mark robot as having no master connection"""
robot = self.robots.get(robot_id)
if robot:
robot.master_connected = False
robot.master_connection_id = None
robot.last_command_source = "none"
async def add_slave_connection(self, robot_id: str, connection_id: str):
"""Add a slave connection to robot"""
robot = self.robots.get(robot_id)
if robot:
if connection_id not in robot.slave_connections:
robot.slave_connections.append(connection_id)
logger.info(f"Added slave {connection_id} to robot {robot_id} ({len(robot.slave_connections)} total)")
async def remove_slave_connection(self, robot_id: str, connection_id: str):
"""Remove a slave connection from robot"""
robot = self.robots.get(robot_id)
if robot:
try:
robot.slave_connections.remove(connection_id)
logger.info(f"Removed slave {connection_id} from robot {robot_id} ({len(robot.slave_connections)} remaining)")
except ValueError:
logger.warning(f"Slave {connection_id} not found in robot {robot_id} connections")
def get_robot_status(self, robot_id: str) -> Optional[RobotStatus]:
"""Get robot connection status"""
robot = self.robots.get(robot_id)
if not robot:
return None
return RobotStatus(
robot_id=robot_id,
master_connected=robot.master_connected,
slave_count=len(robot.slave_connections),
last_command_source=robot.last_command_source,
last_command_time=robot.last_command_time,
last_seen=datetime.now(timezone.utc)
)
def list_robots_with_masters(self) -> List[Robot]:
"""Get all robots that have master connections"""
return [robot for robot in self.robots.values() if robot.master_connected]
def list_robots_with_slaves(self) -> List[Robot]:
"""Get all robots that have slave connections"""
return [robot for robot in self.robots.values() if robot.slave_connections]
def _create_demo_joints(self, robot_type: str) -> List[DriverJointState]:
"""Create demo joint configuration based on robot type"""
if robot_type == "so-arm100":
return [
DriverJointState(
name="Rotation",
servo_id=1,
type="revolute",
virtual_value=0.0,
real_value=0.0
),
DriverJointState(
name="Pitch",
servo_id=2,
type="revolute",
virtual_value=0.0,
real_value=0.0
),
DriverJointState(
name="Elbow",
servo_id=3,
type="revolute",
virtual_value=0.0,
real_value=0.0
),
DriverJointState(
name="Wrist_Pitch",
servo_id=4,
type="revolute",
virtual_value=0.0,
real_value=0.0
),
DriverJointState(
name="Wrist_Roll",
servo_id=5,
type="revolute",
virtual_value=0.0,
real_value=0.0
),
DriverJointState(
name="Jaw",
servo_id=6,
type="revolute",
virtual_value=0.0,
real_value=0.0
)
]
else:
# Default generic robot
return [
DriverJointState(
name="joint_1",
servo_id=1,
type="revolute",
virtual_value=0.0,
real_value=0.0
),
DriverJointState(
name="joint_2",
servo_id=2,
type="revolute",
virtual_value=0.0,
real_value=0.0
)
]