Spaces:
Running
Running
File size: 6,146 Bytes
3aea7c6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
from typing import Dict, Optional, List
from datetime import datetime, timezone
import logging
from .models import Robot, RobotStatus, DriverJointState
logger = logging.getLogger(__name__)
class RobotManager:
"""Manages robot lifecycle and state"""
def __init__(self):
self.robots: Dict[str, Robot] = {}
async def create_robot(self, robot_id: str, robot_type: str, name: str) -> Robot:
"""Create a new robot"""
if robot_id in self.robots:
raise ValueError(f"Robot {robot_id} already exists")
# Create demo joint configuration for so-arm100
joints = self._create_demo_joints(robot_type)
robot = Robot(
id=robot_id,
name=name,
robot_type=robot_type,
created_at=datetime.now(timezone.utc),
joints=joints
)
self.robots[robot_id] = robot
logger.info(f"Created robot {robot_id} ({robot_type}) with {len(joints)} joints")
return robot
def get_robot(self, robot_id: str) -> Optional[Robot]:
"""Get robot by ID"""
return self.robots.get(robot_id)
async def delete_robot(self, robot_id: str):
"""Delete a robot"""
if robot_id in self.robots:
del self.robots[robot_id]
logger.info(f"Deleted robot {robot_id}")
async def set_master_connected(self, robot_id: str, connection_id: str):
"""Mark robot as having a master connection"""
robot = self.robots.get(robot_id)
if robot:
robot.master_connected = True
robot.master_connection_id = connection_id
robot.last_command_source = "master"
robot.last_command_time = datetime.now(timezone.utc)
async def set_master_disconnected(self, robot_id: str):
"""Mark robot as having no master connection"""
robot = self.robots.get(robot_id)
if robot:
robot.master_connected = False
robot.master_connection_id = None
robot.last_command_source = "none"
async def add_slave_connection(self, robot_id: str, connection_id: str):
"""Add a slave connection to robot"""
robot = self.robots.get(robot_id)
if robot:
if connection_id not in robot.slave_connections:
robot.slave_connections.append(connection_id)
logger.info(f"Added slave {connection_id} to robot {robot_id} ({len(robot.slave_connections)} total)")
async def remove_slave_connection(self, robot_id: str, connection_id: str):
"""Remove a slave connection from robot"""
robot = self.robots.get(robot_id)
if robot:
try:
robot.slave_connections.remove(connection_id)
logger.info(f"Removed slave {connection_id} from robot {robot_id} ({len(robot.slave_connections)} remaining)")
except ValueError:
logger.warning(f"Slave {connection_id} not found in robot {robot_id} connections")
def get_robot_status(self, robot_id: str) -> Optional[RobotStatus]:
"""Get robot connection status"""
robot = self.robots.get(robot_id)
if not robot:
return None
return RobotStatus(
robot_id=robot_id,
master_connected=robot.master_connected,
slave_count=len(robot.slave_connections),
last_command_source=robot.last_command_source,
last_command_time=robot.last_command_time,
last_seen=datetime.now(timezone.utc)
)
def list_robots_with_masters(self) -> List[Robot]:
"""Get all robots that have master connections"""
return [robot for robot in self.robots.values() if robot.master_connected]
def list_robots_with_slaves(self) -> List[Robot]:
"""Get all robots that have slave connections"""
return [robot for robot in self.robots.values() if robot.slave_connections]
def _create_demo_joints(self, robot_type: str) -> List[DriverJointState]:
"""Create demo joint configuration based on robot type"""
if robot_type == "so-arm100":
return [
DriverJointState(
name="Rotation",
servo_id=1,
type="revolute",
virtual_value=0.0,
real_value=0.0
),
DriverJointState(
name="Pitch",
servo_id=2,
type="revolute",
virtual_value=0.0,
real_value=0.0
),
DriverJointState(
name="Elbow",
servo_id=3,
type="revolute",
virtual_value=0.0,
real_value=0.0
),
DriverJointState(
name="Wrist_Pitch",
servo_id=4,
type="revolute",
virtual_value=0.0,
real_value=0.0
),
DriverJointState(
name="Wrist_Roll",
servo_id=5,
type="revolute",
virtual_value=0.0,
real_value=0.0
),
DriverJointState(
name="Jaw",
servo_id=6,
type="revolute",
virtual_value=0.0,
real_value=0.0
)
]
else:
# Default generic robot
return [
DriverJointState(
name="joint_1",
servo_id=1,
type="revolute",
virtual_value=0.0,
real_value=0.0
),
DriverJointState(
name="joint_2",
servo_id=2,
type="revolute",
virtual_value=0.0,
real_value=0.0
)
] |