Spaces:
Running
Running
from typing import Dict, Optional, List | |
from datetime import datetime, timezone | |
import logging | |
from .models import Robot, RobotStatus, DriverJointState | |
logger = logging.getLogger(__name__) | |
class RobotManager: | |
"""Manages robot lifecycle and state""" | |
def __init__(self): | |
self.robots: Dict[str, Robot] = {} | |
async def create_robot(self, robot_id: str, robot_type: str, name: str) -> Robot: | |
"""Create a new robot""" | |
if robot_id in self.robots: | |
raise ValueError(f"Robot {robot_id} already exists") | |
# Create demo joint configuration for so-arm100 | |
joints = self._create_demo_joints(robot_type) | |
robot = Robot( | |
id=robot_id, | |
name=name, | |
robot_type=robot_type, | |
created_at=datetime.now(timezone.utc), | |
joints=joints | |
) | |
self.robots[robot_id] = robot | |
logger.info(f"Created robot {robot_id} ({robot_type}) with {len(joints)} joints") | |
return robot | |
def get_robot(self, robot_id: str) -> Optional[Robot]: | |
"""Get robot by ID""" | |
return self.robots.get(robot_id) | |
async def delete_robot(self, robot_id: str): | |
"""Delete a robot""" | |
if robot_id in self.robots: | |
del self.robots[robot_id] | |
logger.info(f"Deleted robot {robot_id}") | |
async def set_master_connected(self, robot_id: str, connection_id: str): | |
"""Mark robot as having a master connection""" | |
robot = self.robots.get(robot_id) | |
if robot: | |
robot.master_connected = True | |
robot.master_connection_id = connection_id | |
robot.last_command_source = "master" | |
robot.last_command_time = datetime.now(timezone.utc) | |
async def set_master_disconnected(self, robot_id: str): | |
"""Mark robot as having no master connection""" | |
robot = self.robots.get(robot_id) | |
if robot: | |
robot.master_connected = False | |
robot.master_connection_id = None | |
robot.last_command_source = "none" | |
async def add_slave_connection(self, robot_id: str, connection_id: str): | |
"""Add a slave connection to robot""" | |
robot = self.robots.get(robot_id) | |
if robot: | |
if connection_id not in robot.slave_connections: | |
robot.slave_connections.append(connection_id) | |
logger.info(f"Added slave {connection_id} to robot {robot_id} ({len(robot.slave_connections)} total)") | |
async def remove_slave_connection(self, robot_id: str, connection_id: str): | |
"""Remove a slave connection from robot""" | |
robot = self.robots.get(robot_id) | |
if robot: | |
try: | |
robot.slave_connections.remove(connection_id) | |
logger.info(f"Removed slave {connection_id} from robot {robot_id} ({len(robot.slave_connections)} remaining)") | |
except ValueError: | |
logger.warning(f"Slave {connection_id} not found in robot {robot_id} connections") | |
def get_robot_status(self, robot_id: str) -> Optional[RobotStatus]: | |
"""Get robot connection status""" | |
robot = self.robots.get(robot_id) | |
if not robot: | |
return None | |
return RobotStatus( | |
robot_id=robot_id, | |
master_connected=robot.master_connected, | |
slave_count=len(robot.slave_connections), | |
last_command_source=robot.last_command_source, | |
last_command_time=robot.last_command_time, | |
last_seen=datetime.now(timezone.utc) | |
) | |
def list_robots_with_masters(self) -> List[Robot]: | |
"""Get all robots that have master connections""" | |
return [robot for robot in self.robots.values() if robot.master_connected] | |
def list_robots_with_slaves(self) -> List[Robot]: | |
"""Get all robots that have slave connections""" | |
return [robot for robot in self.robots.values() if robot.slave_connections] | |
def _create_demo_joints(self, robot_type: str) -> List[DriverJointState]: | |
"""Create demo joint configuration based on robot type""" | |
if robot_type == "so-arm100": | |
return [ | |
DriverJointState( | |
name="Rotation", | |
servo_id=1, | |
type="revolute", | |
virtual_value=0.0, | |
real_value=0.0 | |
), | |
DriverJointState( | |
name="Pitch", | |
servo_id=2, | |
type="revolute", | |
virtual_value=0.0, | |
real_value=0.0 | |
), | |
DriverJointState( | |
name="Elbow", | |
servo_id=3, | |
type="revolute", | |
virtual_value=0.0, | |
real_value=0.0 | |
), | |
DriverJointState( | |
name="Wrist_Pitch", | |
servo_id=4, | |
type="revolute", | |
virtual_value=0.0, | |
real_value=0.0 | |
), | |
DriverJointState( | |
name="Wrist_Roll", | |
servo_id=5, | |
type="revolute", | |
virtual_value=0.0, | |
real_value=0.0 | |
), | |
DriverJointState( | |
name="Jaw", | |
servo_id=6, | |
type="revolute", | |
virtual_value=0.0, | |
real_value=0.0 | |
) | |
] | |
else: | |
# Default generic robot | |
return [ | |
DriverJointState( | |
name="joint_1", | |
servo_id=1, | |
type="revolute", | |
virtual_value=0.0, | |
real_value=0.0 | |
), | |
DriverJointState( | |
name="joint_2", | |
servo_id=2, | |
type="revolute", | |
virtual_value=0.0, | |
real_value=0.0 | |
) | |
] |