import re import time import chainlit as cl import pandas as pd import httpx import asyncio from typing import Dict, List, Any, Optional, Callable from dataclasses import dataclass, field import os import uuid from datetime import datetime, timedelta API_BASE_URL = os.getenv("API_BASE_URL") @dataclass class ConversationState: """Data class to hold conversation state""" session_id: Optional[str] = None specs_advantages: Dict[str, Any] = field(default_factory=dict) solution_packages: List[str] = field(default_factory=list) raw_documents: Optional[Dict[str, Any]] = None outputs: Optional[Dict[str, Any]] = None selected_model: str = "Gemini 2.0 Flash" product_model_search: bool = False # New fields for delayed cleanup - now using asyncio pending_cleanup: bool = False cleanup_task: Optional[asyncio.Task] = None last_activity: datetime = field(default_factory=datetime.now) def reset(self): """Reset state to initial values""" self.session_id = None self.specs_advantages = {} self.solution_packages = [] self.raw_documents = None self.outputs = None self.selected_model = "Gemini 2.0 Flash" self.product_model_search = False # Reset cleanup fields but don't touch tasks self.pending_cleanup = False self.last_activity = datetime.now() def cancel_cleanup_task(self): """Cancel pending cleanup task if exists""" if self.cleanup_task and not self.cleanup_task.done(): self.cleanup_task.cancel() self.cleanup_task = None print(f"🚫 Cancelled cleanup task for session: {self.session_id}") class StateManager: """Manages conversation state operations with per-session isolation and delayed cleanup""" # CLASS-LEVEL session storage for isolation between different browser sessions _session_states: Dict[str, ConversationState] = {} _lock = asyncio.Lock() # Async lock for consistency @staticmethod async def get_or_create_session_state(session_id: str) -> ConversationState: """Get existing session state or create new one""" async with StateManager._lock: if session_id not in StateManager._session_states: state = ConversationState() state.session_id = session_id StateManager._session_states[session_id] = state print(f"🆕 Created new session state for: {session_id}") else: state = StateManager._session_states[session_id] print(f"🔄 Retrieved existing session state for: {session_id}") # CRITICAL: If session was pending cleanup, cancel it because user is active again if state.pending_cleanup: state.cancel_cleanup_task() state.pending_cleanup = False print(f"♻️ User activity detected! Cancelled pending cleanup for: {session_id}") # Update activity timestamp state.last_activity = datetime.now() return state @staticmethod async def schedule_delayed_cleanup(session_id: str, delay_seconds: int = 3600): """Schedule delayed cleanup for a session using asyncio (default 1 hour for disconnect tolerance)""" async with StateManager._lock: if session_id not in StateManager._session_states: print(f"⚠️ Cannot schedule cleanup for non-existent session: {session_id}") return state = StateManager._session_states[session_id] # Cancel existing task if any state.cancel_cleanup_task() # Mark as pending cleanup state.pending_cleanup = True # Schedule new cleanup using asyncio async def delayed_cleanup(): try: await asyncio.sleep(delay_seconds) print(f"⏰ Executing delayed cleanup for session: {session_id}") await StateManager._perform_actual_cleanup(session_id) except asyncio.CancelledError: print(f"🚫 Cleanup task cancelled for session: {session_id}") raise except Exception as e: print(f"❌ Error in delayed cleanup for {session_id}: {e}") state.cleanup_task = asyncio.create_task(delayed_cleanup()) print(f"⏱️ Scheduled cleanup in {delay_seconds}s for session: {session_id} (likely disconnect)") @staticmethod async def _perform_actual_cleanup(session_id: str): """Perform the actual cleanup after delay""" async with StateManager._lock: if session_id not in StateManager._session_states: print(f"⚠️ Session already cleaned or doesn't exist: {session_id}") return state = StateManager._session_states[session_id] # Double-check if session is still pending cleanup (user might have sent message) if not state.pending_cleanup: print(f"🚫 Cleanup cancelled - user activity detected for: {session_id}") return # Perform API cleanup using httpx try: if API_BASE_URL: payload = { "reset_cache": True, "reset_model": False, "session_id": session_id } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(f"{API_BASE_URL}/clear_memory", json=payload) print(f"Clear memory response for {session_id}: {response.status_code}") except Exception as e: print(f"Warning: clear_memory failed for {session_id}: {e}") # Remove from memory del StateManager._session_states[session_id] print(f"🗑️ Successfully cleaned up session: {session_id}") @staticmethod async def cleanup_session_immediate(session_id: str): """Immediate cleanup (for testing or forced cleanup)""" async with StateManager._lock: if session_id in StateManager._session_states: state = StateManager._session_states[session_id] state.cancel_cleanup_task() await StateManager._perform_actual_cleanup(session_id) @staticmethod async def clear_chat_state(state: ConversationState): """Clear all conversation history and reset state via API (but keep session alive)""" if state.session_id is not None and API_BASE_URL: try: payload = { "reset_cache": True, "reset_model": False, "session_id": state.session_id } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(f"{API_BASE_URL}/clear_memory", json=payload) print(f"Clear memory response: {response.status_code}") except Exception as e: print(f"Warning: clear_memory failed: {e}") # Reset state but keep session_id and don't trigger cleanup session_id = state.session_id state.reset() state.session_id = session_id @staticmethod async def change_model(state: ConversationState, model_name: str): """Change the selected model""" state.selected_model = model_name state.last_activity = datetime.now() @staticmethod async def toggle_product_model_search(state: ConversationState): """Toggle product model search mode""" state.product_model_search = not state.product_model_search state.last_activity = datetime.now() @staticmethod async def get_session_status() -> Dict[str, Dict[str, Any]]: """Get status of all sessions (for debugging)""" async with StateManager._lock: status = {} for session_id, state in StateManager._session_states.items(): status[session_id] = { "pending_cleanup": state.pending_cleanup, "has_task": state.cleanup_task is not None and not state.cleanup_task.done(), "last_activity": state.last_activity.isoformat(), "selected_model": state.selected_model, "product_model_search": state.product_model_search } return status class ChatService: """Handles chat-related operations with async HTTP calls""" @staticmethod async def respond_to_chat( state: ConversationState, message: str, image_path: Optional[str] = None ) -> str: """Handle chat responses with image support using async HTTP""" print(f"🔄 === DEBUG STATE ===\n Chat request with model: {state.selected_model}, Product Model Search: {state.product_model_search}, Session ID: {state.session_id}") # Update activity timestamp - this is KEY to prevent cleanup during active use state.last_activity = datetime.now() start = time.perf_counter() if not API_BASE_URL: return "Error: API_BASE_URL not configured" if not state.session_id: return "Error: Session ID not initialized" # Call API using httpx for async HTTP try: async with httpx.AsyncClient(timeout=600.0) as client: if image_path: # For image uploads, use form-data format as expected by API with open(image_path, 'rb') as f: files = {"image": f.read()} data = { "message": message, "product_model_search": str(state.product_model_search).lower(), "session_id": state.session_id, "llm_model": state.selected_model, "debug": "Normal" } # Use multipart form data for image upload files_dict = {"image": ("image.jpg", files["image"], "image/jpeg")} resp = await client.post( f"{API_BASE_URL}/chat_with_image", files=files_dict, data=data ) else: # For text messages, use form-data format as expected by API data = { "message": message, "session_id": state.session_id, "debug": "Normal", "product_model_search": str(state.product_model_search).lower(), "llm_model": state.selected_model } resp = await client.post( f"{API_BASE_URL}/chat", data=data # Form data format ) if resp.status_code == 200: j = resp.json() response = j.get("response", "") specs_advantages = j.get("specs_advantages") solution_packages = j.get("solution_packages") raw_documents = j.get("raw_documents") # This might be None from API outputs = j.get("outputs") else: print(f"API Error: {resp.status_code} - {resp.text}") response = f"Error: API status {resp.status_code}" specs_advantages, solution_packages, raw_documents, outputs = None, None, None, None except Exception as e: print(f"Exception calling API: {e}") response = f"Error calling API: {e}" specs_advantages, solution_packages, raw_documents, outputs = None, None, None, None end = time.perf_counter() # Update state if specs_advantages is not None: state.specs_advantages = specs_advantages if solution_packages is not None: state.solution_packages = solution_packages if raw_documents is not None: state.raw_documents = raw_documents if outputs is not None: state.outputs = outputs # Filter products based on query if state.specs_advantages is not None: await ChatService.get_specific_product_from_query(message, state) # Format response with 2-column grid for products formatted_response = ChatService.format_product_grid(response) return formatted_response + f"\n\n*Thời gian xử lí: {end - start:.6f}s*" @staticmethod def format_product_grid(response_text: str) -> str: """Format product listings into 2-column grid while keeping other content intact""" # Pattern to match: * **[Name](url)**\n\n ![alt](img_url) pattern = r'\*\s+\*\*\[(.*?)\]\((.*?)\)\*\*\s*\n\s*!\[(.*?)\]\((.*?)\)' matches = list(re.finditer(pattern, response_text)) if not matches: # No product listings found, return original return response_text # Find boundaries of product section first_match_start = matches[0].start() last_match_end = matches[-1].end() # Split into: intro + products + rest intro_text = response_text[:first_match_start].strip() rest_text = response_text[last_match_end:].strip() # Extract all products products = [] for match in matches: products.append({ 'name': match.group(1), 'url': match.group(2), 'alt': match.group(3), 'img': match.group(4) }) # Build 2-column markdown table grid_content = "\n\n" for i in range(0, len(products), 2): p1 = products[i] if i + 1 < len(products): p2 = products[i + 1] # Two columns grid_content += f"| **[{p1['name']}]({p1['url']})** | **[{p2['name']}]({p2['url']})** |\n" grid_content += f"|:---:|:---:|\n" grid_content += f"| ![{p1['alt']}]({p1['img']}) | ![{p2['alt']}]({p2['img']}) |\n\n" else: # Single column for last odd product grid_content += f"| **[{p1['name']}]({p1['url']})** |\n" grid_content += f"|:---:|\n" grid_content += f"| ![{p1['alt']}]({p1['img']}) |\n\n" # Reconstruct full response return intro_text + grid_content + rest_text @staticmethod async def get_specific_product_from_query(query, state): """Filter specs_advantages based on models found in query""" specs_map = state.specs_advantages or {} product_model_list = [] for prod_id, data in specs_map.items(): model = data.get("model", None) if model is not None: product_model_list.append(model) found_models = [] for model in product_model_list: pattern = re.escape(model) if re.search(pattern, query, re.IGNORECASE): found_models.append(model) new_specs_advantages = {} if found_models != []: for prod_id, data in specs_map.items(): if data.get("model", None) in found_models: new_specs_advantages[prod_id] = data state.specs_advantages = new_specs_advantages class DisplayService: """Handles display-related operations with async HTTP calls""" @staticmethod async def show_specs(state: ConversationState) -> str: """Generate specifications table""" specs_map = state.specs_advantages columns = ["Thông số"] raw_data = [] if not specs_map: return "📄 **Thông số kỹ thuật**\n\nKhông có thông số kỹ thuật nào." print(specs_map) for prod_id, data in specs_map.items(): spec = data.get("specification", None) model = data.get("model", "") url = data.get("url", "") # Handle both products and solution packages if url: full_name = f"**[{data['name']} {model}]({url})**" else: full_name = f"**{data['name']} {model}**" if full_name not in columns: columns.append(full_name) if spec: # Check if this is a solution package (contains markdown table) if "### 📦" in spec: # For solution packages, parse the markdown table properly lines = spec.split('\n') in_table = False headers = [] for line in lines: line = line.strip() if '|' in line and '---' not in line and line.startswith('|') and line.endswith('|'): cells = [cell.strip() for cell in line.split('|')[1:-1]] if not in_table: # This is the header row headers = cells in_table = True continue # This is a data row if len(cells) >= len(headers): for i, header in enumerate(headers): if i < len(cells): param_name = header param_value = cells[i] existing_row = None for row in raw_data: if row["Thông số"] == param_name: existing_row = row break if existing_row: existing_row[full_name] = param_value else: new_row = {"Thông số": param_name} for col in columns[1:]: new_row[col] = "" new_row[full_name] = param_value raw_data.append(new_row) elif in_table and (not line or not line.startswith('|')): in_table = False else: # For products, parse specification items items = re.split(r';|\n', spec) for item in items: if ":" in item: key, value = item.split(':', 1) spec_key = key.strip().capitalize() if spec_key == "Vậtl iệu": spec_key = "Vật liệu" existing_row = None for row in raw_data: if row["Thông số"] == spec_key: existing_row = row break if existing_row: existing_row[full_name] = value.strip() if value else "" else: new_row = {"Thông số": spec_key} for col in columns[1:]: new_row[col] = "" new_row[full_name] = value.strip() if value else "" raw_data.append(new_row) if raw_data: df = pd.DataFrame(raw_data, columns=columns) df = df.fillna("").replace("None", "").replace("nan", "") else: df = pd.DataFrame( [["Không có thông số kỹ thuật", "", ""]], columns=columns) markdown_table = df.to_markdown(index=False) return f"📄 **Thông số kỹ thuật**\n\n{markdown_table}" @staticmethod async def show_advantages(state: ConversationState) -> str: """Generate advantages as bullet list instead of table""" specs_map = state.specs_advantages if not specs_map: return "💡 **Ưu điểm nổi trội**\n\nKhông có ưu điểm nào." content = "💡 **Ưu điểm nổi trội**\n\n" for prod_id, data in specs_map.items(): adv = data.get("advantages", "Không có ưu điểm") model = data.get("model", "") url = data.get("url", "") # Handle both products and solution packages if url: full_name = f"**[{data['name']} {model}]({url})**" else: full_name = f"**{data['name']} {model}**" if adv not in ["Không có ưu điểm", "", None]: content += f"### {full_name}\n" # Split by newlines and create bullet points advantages_list = [line.strip() for line in adv.split('\n') if line.strip()] for advantage in advantages_list: content += f"- {advantage}\n" content += "\n" return content @staticmethod async def show_solution_packages(state: ConversationState) -> str: """Show solution packages in a structured format""" packages = state.solution_packages if not packages or packages == []: return "📦 **Gói sản phẩm**\n\nKhông có gói sản phẩm nào" markdown_table = "\n\n".join(packages) return markdown_table @staticmethod async def show_all_products_table(state: ConversationState): """Show all products table using async HTTP""" outputs = state.outputs or {} if not outputs: return "Không có dữ liệu sản phẩm" try: # Updated to match API format - send outputs in request body payload = {"outputs": outputs} async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.post(f"{API_BASE_URL}/products_by_category", json=payload) if resp.status_code == 200: data = resp.json() return data.get("markdown_table", "Không có dữ liệu sản phẩm") else: print(f"All products API error: {resp.status_code} - {resp.text}") return "Không có dữ liệu sản phẩm" except Exception as e: print(f"Exception in show_all_products_table: {e}") return f"Error: {e}" class UIService: """Handles UI-related operations""" @staticmethod def create_action_buttons(state: ConversationState): """Create persistent action buttons""" search_status = "🔍 Tìm theo mã sản phẩm (Đang tắt)" if not state.product_model_search else "🔍 Tìm theo mã sản phẩm (Đang bật)" return [ cl.Action(name="show_specs", value="specs", label="📄 Thông số kỹ thuật", payload={"action": "specs"}), cl.Action(name="show_advantages", value="advantages", label="💡 Ưu điểm nổi trội", payload={"action": "advantages"}), cl.Action(name="show_packages", value="packages", label="📦 Gói sản phẩm", payload={"action": "packages"}), cl.Action(name="show_all_products", value="all_products", label="🛒 Tất cả sản phẩm", payload={"action": "all_products"}), cl.Action(name="toggle_product_search", value="toggle_search", label=search_status, payload={"action": "toggle_search"}), cl.Action(name="change_model", value="model", label="🔄 Đổi model", payload={"action": "model"}), ] @staticmethod def create_start_buttons(state: ConversationState): """Create start buttons""" search_status = "🔍 Tìm theo mã sản phẩm (Đang tắt)" if not state.product_model_search else "🔍 Tìm theo mã sản phẩm (Đang bật)" return [ cl.Action(name="toggle_product_search", value="toggle_search", label=search_status, payload={"action": "toggle_search"}), cl.Action(name="change_model", value="model", label="🔄 Đổi model", payload={"action": "model"}), ] @staticmethod async def send_message_with_buttons(content: str, state: ConversationState, actions=None, author="assistant"): """Send message with optional action buttons and author""" if actions is None: actions = UIService.create_action_buttons(state) await cl.Message( content=content, actions=actions, author=author ).send() @staticmethod async def create_typing_animation(): """Create typing animation effect (legacy method - kept for compatibility)""" msg = cl.Message(content="", author="assistant") await msg.send() # Typing animation frames typing_frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] for i in range(27): # Show animation for ~2 seconds frame = typing_frames[i % len(typing_frames)] msg.content = f"{frame} Đang suy nghĩ..." await msg.update() await asyncio.sleep(0.25) return msg async def run_typing_animation(msg: cl.Message): """Run typing animation until cancelled""" typing_frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] frame_index = 0 try: while True: # Run indefinitely until cancelled frame = typing_frames[frame_index % len(typing_frames)] msg.content = f"{frame} Đang suy nghĩ..." await msg.update() await asyncio.sleep(0.25) frame_index += 1 except asyncio.CancelledError: # Animation was cancelled, this is expected print("🎬 Animation cancelled - API response received") raise # HELPER FUNCTIONS: Session management with proper async error handling async def ensure_session_state() -> Optional[ConversationState]: """Ensure session state exists, create if not""" try: session_id = cl.user_session.get("session_id") if not session_id: print(f"Lỗi: Không lấy được session id ở ensure_session_state") return None return await StateManager.get_or_create_session_state(session_id) except Exception as e: print(f"⚠️ Error ensuring session state: {e}") return None async def get_current_session_state() -> Optional[ConversationState]: """Get current session state using Chainlit's session system""" try: # Use Chainlit's user session to get unique session ID chainlit_session_id = cl.user_session.get("session_id") if chainlit_session_id: return await StateManager.get_or_create_session_state(chainlit_session_id) else: print("⚠️ No Chainlit session ID found") return None except Exception as e: print(f"⚠️ Error getting session state: {e}") return None @cl.on_chat_start async def on_chat_start(): """Initialize the chat session""" session_id = cl.user_session.get("session_id") if not session_id: session_id = str(uuid.uuid4()) cl.user_session.set("session_id", session_id) print(f"🆕 Generated new session_id: {session_id}") else: print(f"🔄 Reusing existing session_id: {session_id}") app_state = await StateManager.get_or_create_session_state(session_id) await cl.Message( content=f"🛍️ **RangDong Sales Agent** (Session: {session_id[:8]}...)\n\n" f"Xin chào! Tôi có thể giúp bạn tìm kiếm và tư vấn sản phẩm RangDong. Hãy thử các câu hỏi mẫu:\n\n" f"- Tìm sản phẩm bình giữ nhiệt dung tích dưới 2 lít\n" f"- Tìm sản phẩm ổ cắm thông minh\n" f"- Tư vấn cho tôi đèn học chống cận cho con gái của tôi học lớp 6", author="assistant" ).send() actions = UIService.create_start_buttons(app_state) await cl.Message( content="Sử dụng nút bên dưới để cấu hình:", actions=actions, author="assistant" ).send() @cl.on_chat_end async def on_chat_end(): """Handle chat session end with delayed cleanup mechanism using asyncio""" try: session_id = cl.user_session.get("session_id") print(f"📤 on_chat_end triggered for session {session_id}") if session_id: # Schedule delayed cleanup instead of immediate cleanup # Use shorter delay (30s) since this is likely just a temporary disconnect await StateManager.schedule_delayed_cleanup(session_id, delay_seconds=3600) print(f"⏳ Scheduled delayed cleanup for session {session_id} (1h delay for disconnect tolerance)") else: print("⚠️ No session_id found in on_chat_end") except Exception as e: print(f"⚠️ Error during on_chat_end: {e}") # ACTION CALLBACKS - All use ensure_session_state() for better reliability @cl.action_callback("show_specs") async def on_show_specs(action): """Handle show specifications action""" app_state = await ensure_session_state() if app_state is None: await cl.Message(content="Error: Session state not found", author="assistant").send() return specs_content = await DisplayService.show_specs(app_state) await UIService.send_message_with_buttons(specs_content, app_state, author="assistant") @cl.action_callback("show_advantages") async def on_show_advantages(action): """Handle show advantages action""" app_state = await ensure_session_state() if app_state is None: await cl.Message(content="Error: Session state not found", author="assistant").send() return adv_content = await DisplayService.show_advantages(app_state) await UIService.send_message_with_buttons(adv_content, app_state, author="assistant") @cl.action_callback("show_packages") async def on_show_packages(action): """Handle show packages action""" app_state = await ensure_session_state() if app_state is None: await cl.Message(content="Error: Session state not found", author="assistant").send() return pkg_content = await DisplayService.show_solution_packages(app_state) await UIService.send_message_with_buttons(pkg_content, app_state, author="assistant") @cl.action_callback("show_all_products") async def on_show_all_products(action): """Handle show all products action""" app_state = await ensure_session_state() if app_state is None: await cl.Message(content="Error: Session state not found", author="assistant").send() return all_products_content = await DisplayService.show_all_products_table(app_state) await UIService.send_message_with_buttons(all_products_content, app_state, author="assistant") @cl.action_callback("toggle_product_search") async def on_toggle_product_search(action): """Handle toggle product model search action""" app_state = await ensure_session_state() if app_state is None: await cl.Message(content="Error: Session state not found", author="assistant").send() return await StateManager.toggle_product_model_search(app_state) status_message = ( "✅ **Đã bật tìm kiếm theo mã sản phẩm**\n\n" "Khi bạn nhắc đến mã/model cụ thể trong câu hỏi, hệ thống sẽ tìm kiếm chính xác theo mã đó." if app_state.product_model_search else "✅ **Đã tắt tìm kiếm theo mã sản phẩm**\n\n" "Hệ thống sẽ tìm kiếm sản phẩm theo cách thông thường." ) await UIService.send_message_with_buttons(status_message, app_state, author="assistant") @cl.action_callback("change_model") async def on_change_model(action): """Handle model change action""" app_state = await ensure_session_state() if app_state is None: await cl.Message(content="Error: Session state not found", author="assistant").send() return models = ["Gemini 2.0 Flash", "Gemini 2.5 Flash Lite", "Gemini 2.0 Flash Lite"] model_actions = [ cl.Action(name=f"select_model_{i}", value=model, label=model, payload={"model": model}) for i, model in enumerate(models) ] model_actions.append( cl.Action(name="back_to_main", value="back", label="🔙 Quay lại", payload={"action": "back"}) ) await cl.Message( content=f"**Model hiện tại**: {app_state.selected_model}\n**Tìm kiếm theo mã**: {'Đang bật' if app_state.product_model_search else 'Đang tắt'}\n\nChọn model mới:", actions=model_actions, author="assistant" ).send() @cl.action_callback("back_to_main") async def on_back_to_main(action): """Handle back to main menu action""" app_state = await ensure_session_state() if app_state is None: await cl.Message(content="Error: Session state not found", author="assistant").send() return actions = UIService.create_action_buttons(app_state) await cl.Message( content="📋 **Menu chính**\n\nSử dụng các nút bên dưới để:", actions=actions, author="assistant" ).send() @cl.action_callback("select_model_0") async def on_select_model_0(action): app_state = await ensure_session_state() if app_state is None: await cl.Message(content="Error: Session state not found", author="assistant").send() return await StateManager.change_model(app_state, "Gemini 2.0 Flash") await UIService.send_message_with_buttons("✅ Đã chuyển sang **Gemini 2.0 Flash**", app_state, author="assistant") @cl.action_callback("select_model_1") async def on_select_model_1(action): app_state = await ensure_session_state() if app_state is None: await cl.Message(content="Error: Session state not found", author="assistant").send() return await StateManager.change_model(app_state, "Gemini 2.5 Flash Lite") await UIService.send_message_with_buttons("✅ Đã chuyển sang **Gemini 2.5 Flash Lite**", app_state, author="assistant") @cl.action_callback("select_model_2") async def on_select_model_2(action): app_state = await ensure_session_state() if app_state is None: await cl.Message(content="Error: Session state not found", author="assistant").send() return await StateManager.change_model(app_state, "Gemini 2.0 Flash Lite") await UIService.send_message_with_buttons("✅ Đã chuyển sang **Gemini 2.0 Flash Lite**", app_state, author="assistant") # DEBUG ENDPOINTS (optional - for monitoring session status) @cl.action_callback("debug_sessions") async def on_debug_sessions(action): """Debug action to show session status (can be added to debug builds)""" try: status = await StateManager.get_session_status() debug_content = "🔍 **Debug: Session Status**\n\n" if not status: debug_content += "No active sessions." else: for session_id, info in status.items(): debug_content += f"**Session: {session_id[:8]}...**\n" debug_content += f"- Pending cleanup: {info['pending_cleanup']}\n" debug_content += f"- Has task: {info['has_task']}\n" debug_content += f"- Last activity: {info['last_activity']}\n" debug_content += f"- Model: {info['selected_model']}\n" debug_content += f"- Product search: {info['product_model_search']}\n\n" await cl.Message(content=debug_content, author="assistant").send() except Exception as e: await cl.Message(content=f"Debug error: {e}", author="assistant").send() @cl.on_message async def main(message: cl.Message): """Main message handler with concurrent animation and API call""" app_state = await ensure_session_state() if app_state is None: await cl.Message(content="Error: Session state not found", author="assistant").send() return # Handle images if present image_path = None if message.elements: for element in message.elements: if isinstance(element, cl.Image): image_path = element.path break # Create initial message for animation msg = cl.Message(content="", author="assistant") await msg.send() # Create concurrent tasks for animation and API call animation_task = asyncio.create_task(run_typing_animation(msg)) api_task = asyncio.create_task(ChatService.respond_to_chat(app_state, message.content, image_path)) try: # Wait for API response (this will complete first usually) response = await api_task # Cancel animation task since we have the response animation_task.cancel() # Wait a bit for graceful animation cancellation try: await asyncio.wait_for(animation_task, timeout=0.1) except (asyncio.CancelledError, asyncio.TimeoutError): pass except Exception as e: # If API fails, cancel animation and show error animation_task.cancel() try: await asyncio.wait_for(animation_task, timeout=0.1) except (asyncio.CancelledError, asyncio.TimeoutError): pass response = f"Error: {e}" # Update message with final response and buttons msg.content = response msg.actions = UIService.create_action_buttons(app_state) await msg.update()