Spaces:
Sleeping
Sleeping
import os | |
import sys | |
import asyncio | |
from pathlib import Path | |
from core.utils.translations import translations | |
from datetime import datetime | |
from typing import Tuple | |
import gradio as gr | |
# Add parent directory to path for imports | |
sys.path.append(str(Path(__file__).parent.parent)) | |
from core.ai_engine import OptimizedGazaRAGSystem | |
from ui.components import ( | |
get_custom_css, | |
create_header_section, | |
create_query_input_section, | |
create_response_output_section, | |
create_quick_access_section, | |
create_example_scenarios, | |
gradio_user_selector, | |
gradio_sidebar_controls, | |
gradio_show_response | |
) | |
# import logging | |
# logger = logging.getLogger(__name__) | |
# logging.basicConfig(level=logging.INFO) | |
from core.utils.translator import ArabicTranslator | |
from core.utils.logger import logger | |
# Global system instance | |
optimized_rag_system = None | |
def initialize_optimized_system(vector_store_dir: str = "./vector_store"): | |
global optimized_rag_system | |
if optimized_rag_system is None: | |
try: | |
optimized_rag_system = OptimizedGazaRAGSystem(vector_store_dir) | |
optimized_rag_system.initialize() | |
logger.info("β Optimized Gaza RAG System initialized successfully") | |
except Exception as e: | |
logger.error(f"β Failed to initialize optimized system: {e}") | |
raise | |
return optimized_rag_system | |
def process_medical_query_with_progress(query: str,language, progress=gr.Progress()) -> Tuple[str, str, str, str, str]: | |
from core.utils.translations import translations | |
t = translations.get(language, translations["English"]) | |
if not query.strip(): | |
return "Please enter a medical question.", "", "β οΈ No query provided", {}, {} | |
try: | |
progress(0.05, desc="π§ Initializing optimized system...") | |
system = initialize_optimized_system() | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
def progress_callback(value, desc): | |
progress(value, desc=desc) | |
try: | |
result = loop.run_until_complete( | |
system.generate_response_async(query, progress_callback, language=language) | |
) | |
finally: | |
loop.close() | |
response = result["response"] | |
metadata_parts = [ | |
f"π― Confidence: {result.get('confidence', 0):.1%}", | |
f"β±οΈ Response: {result.get('response_time', 0)}s", | |
f"π Sources: {result.get('search_results_count', 0)} found" | |
] | |
if result.get("cached"): | |
metadata_parts.append("πΎ Cached") | |
if result.get("sources"): | |
metadata_parts.append(f"π Refs: {', '.join(result['sources'][:2])}") | |
metadata = " | ".join(metadata_parts) | |
status_parts = [] | |
if result.get("safety_warnings"): | |
status_parts.append(f"β οΈ {len(result['safety_warnings'])} warnings") | |
if result.get("safety_issues"): | |
status_parts.append(f"π¨ {len(result['safety_issues'])} issues") | |
if not status_parts: | |
status_parts.append("β Safe response") | |
status = " | ".join(status_parts) | |
return (response, metadata, status, gr.Markdown(f"### AI Response\n{response}"), gr.Markdown(f"### Metadata\n{metadata}"), gr.Markdown(f"### Safety Check\n{status}")) | |
except Exception as e: | |
logger.error(f"β Error processing query: {e}") | |
error_response = f"β οΈ Error processing your query: {str(e)}\n\nπ¨ For medical emergencies, seek immediate professional help." | |
error_metadata = f"β Error at {datetime.now().strftime('%H:%M:%S')}" | |
error_status = "π¨ System error occurred" | |
return (error_response, error_metadata, error_status, gr.update(value=""), gr.update(value=""), gr.update(value="")) | |
def get_system_stats() -> str: | |
try: | |
system = initialize_optimized_system() | |
stats = system.knowledge_base.get_stats() | |
if stats["status"] == "initialized": | |
return f""" | |
π **System Statistics:** | |
- Status: β Initialized | |
- Total Chunks: {stats['total_chunks']:,} | |
- Vector Dimension: {stats['embedding_dimension']} | |
- Index Type: {stats['index_type']} | |
- Sources: {len(stats['sources'])} documents | |
- Available Sources: {', '.join(stats['sources'][:5])}{'...' if len(stats['sources']) > 5 else ''} | |
""" | |
else: | |
return "π System Status: β Not Initialized" | |
except Exception as e: | |
return f"π System Status: β Error - {str(e)}" | |
def create_optimized_gradio_interface(): | |
with gr.Blocks( | |
css=get_custom_css() + """ | |
#role-selection-box { | |
display: flex; | |
flex-direction: column; | |
align-items: center; | |
justify-content: center; | |
height: 90vh; | |
gap: 1rem; | |
} | |
.gr-button { | |
width: 350px; | |
font-size: 1.1rem; | |
} | |
..highlight-flash { | |
animation: flash-highlight 1.6s ease-in-out; | |
} | |
""", | |
title="π₯ Optimized Gaza First Aid Assistant", | |
theme=gr.themes.Soft(primary_hue="blue", secondary_hue="green", neutral_hue="slate") | |
) as interface: | |
user_role = gr.State() | |
# Role Selection UI | |
with gr.Column(elem_id="role-selection-box", visible=True) as role_selection_group: | |
role_title = gr.Markdown("### π§ββοΈ Select Your Role to Begin") | |
volunteer_btn = gr.Button("π― I'm a Volunteer") | |
organizer_btn = gr.Button("π I'm an Event Organizer") | |
divider = gr.Markdown("---") | |
# Main UI (Hidden at first) | |
with gr.Column(visible=False) as main_ui: | |
with gr.Row(elem_classes=["main-container"]): | |
create_header_section() | |
with gr.Row(elem_classes=["main-container"]): | |
with gr.Group(elem_classes=["stats-container"]): | |
stats_display = gr.Markdown( | |
value=get_system_stats(), | |
label="π System Status" | |
) | |
with gr.Row(elem_classes=["main-container"]): | |
with gr.Column(scale=2): | |
query_input, submit_btn, clear_btn = create_query_input_section() | |
create_example_scenarios(query_input) | |
with gr.Column(scale=1): | |
create_quick_access_section() | |
user_type_dropdown = gradio_user_selector() | |
language_dropdown = gradio_sidebar_controls() | |
with gr.Row(elem_classes=["main-container"]): | |
with gr.Column(): | |
response_output, metadata_output, status_output = create_response_output_section() | |
show_response_output = gr.Markdown(label="AI Response", elem_classes=["highlight-flash"]) | |
show_metadata_output = gr.Markdown(label="Metadata") | |
show_safety_output = gr.Markdown(label="Safety Check") | |
# with gr.Row(elem_classes=["main-container"]): | |
# create_example_scenarios(query_input) | |
submit_btn.click( | |
process_medical_query_with_progress, | |
inputs=[query_input, language_dropdown], | |
outputs=[ | |
response_output, | |
metadata_output, | |
status_output, | |
show_response_output, | |
show_metadata_output, | |
show_safety_output | |
], | |
show_progress=True | |
) | |
query_input.submit( | |
process_medical_query_with_progress, | |
inputs=[query_input, language_dropdown] , | |
outputs=[ | |
response_output, | |
metadata_output, | |
status_output, | |
show_response_output, | |
show_metadata_output, | |
show_safety_output | |
], | |
show_progress=True | |
) | |
clear_btn.click( | |
lambda: ("", "", "", gr.update(value=""), gr.update(value=""), gr.update(value="")), | |
outputs=[ | |
query_input, | |
response_output, | |
metadata_output, | |
status_output, | |
show_response_output, | |
show_metadata_output, | |
show_safety_output | |
] | |
) | |
refresh_stats_btn = gr.Button("π Refresh System Stats", variant="secondary") | |
refresh_stats_btn.click( | |
fn=lambda: get_system_stats(), | |
outputs=stats_display | |
) | |
# Button Logic | |
def set_user_role(role): | |
return ( | |
role, | |
gr.update(visible=True), # show main UI | |
gr.update(visible=False), # hide role title | |
gr.update(visible=False), # hide volunteer_btn | |
gr.update(visible=False), # hide organizer_btn | |
gr.update(visible=False) # hide full group | |
) | |
volunteer_btn.click( | |
lambda: set_user_role("volunteer"), | |
outputs=[ | |
user_role, | |
main_ui, | |
role_title, | |
volunteer_btn, | |
organizer_btn, | |
role_selection_group | |
] | |
) | |
organizer_btn.click( | |
lambda: set_user_role("organizer"), | |
outputs=[ | |
user_role, | |
main_ui, | |
role_title, | |
volunteer_btn, | |
organizer_btn, | |
role_selection_group | |
] | |
) | |
return interface | |
gr.HTML(""" | |
<script> | |
function scrollToResponse() { | |
const target = document.getElementById('ai-response-output'); | |
if (target) { | |
target.scrollIntoView({ behavior: 'smooth', block: 'start' }); | |
target.classList.add('highlight-flash'); | |
setTimeout(() => target.classList.remove('highlight-flash'), 1600); | |
} | |
} | |
</script> | |
""") | |
def main(): | |
logger.info("π Starting Optimized Gaza First Aid Assistant") | |
try: | |
vector_store_dir = "./vector_store" | |
if not Path(vector_store_dir).exists(): | |
alt_paths = ["./results/vector_store", "./results/vector_store_extracted"] | |
for alt_path in alt_paths: | |
if Path(alt_path).exists(): | |
vector_store_dir = alt_path | |
logger.info(f"π Found vector store at: {vector_store_dir}") | |
break | |
else: | |
raise FileNotFoundError("Vector store directory not found. Please ensure pre-made assets are available.") | |
logger.info(f"π§ Loading optimized system from: {vector_store_dir}") | |
system = initialize_optimized_system(vector_store_dir) | |
stats = system.knowledge_base.get_stats() | |
logger.info(f"β Knowledge base loaded: {stats['total_chunks']} chunks, {stats['embedding_dimension']}D") | |
logger.info(f"β Sources: {len(stats['sources'])} documents") | |
logger.info("β Medical fact checker ready") | |
logger.info("β Optimized FAISS indexing active") | |
logger.info("π¨ Creating optimized Gradio interface...") | |
interface = create_optimized_gradio_interface() | |
logger.info("π Launching optimized interface...") | |
interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
max_threads=6, | |
show_error=True, | |
quiet=False | |
) | |
except Exception as e: | |
logger.error(f"β Failed to start Optimized Gaza First Aid Assistant: {e}") | |
print(f"\nπ¨ STARTUP ERROR: {e}") | |
print("\nπ§ Troubleshooting Steps:") | |
print("1. Ensure vector_store directory exists with index.faiss, chunks.txt, and metadata.pkl") | |
print("2. Check if all dependencies are installed: pip install -r requirements.txt") | |
print("3. Verify sufficient memory is available (minimum 4GB RAM recommended)") | |
print("4. Check system logs for detailed error information") | |
print("\nπ For technical support, check the application logs above..") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() | |