""" Open Source Multimodal Tools This module provides multimodal tool capabilities using open-source models: - BLIP-2 and Mistral Vision models for image analysis - Faster-Whisper for European audio transcription - DistilBERT for document question answering - Hugging Face transformers for various tasks - No dependency on proprietary OpenAI models Key Features: - Image analysis using BLIP-2 or Mistral Vision - Audio transcription using Faster-Whisper (European community-driven) - Text generation using Mistral models - Document processing and analysis - All capabilities using open-source models with no API dependencies """ import os import logging import base64 import io from typing import Dict, Any, List, Optional, Union from pathlib import Path import requests from PIL import Image # Environment setup from utils.environment_setup import get_api_key, has_api_key, should_suppress_warnings # Mistral and open-source model imports try: # Try new API first (recommended) from mistralai import Mistral as MistralClient from mistralai import UserMessage MISTRAL_AVAILABLE = True MISTRAL_CLIENT_TYPE = "new" except ImportError: try: # Fallback to old API (deprecated) from mistralai.client import MistralClient from mistralai import UserMessage MISTRAL_AVAILABLE = True MISTRAL_CLIENT_TYPE = "old" except ImportError: MistralClient = None UserMessage = None MISTRAL_AVAILABLE = False MISTRAL_CLIENT_TYPE = None # European Community-Driven Audio Processing try: # Faster-Whisper - Community-driven European alternative # Optimized, CPU-friendly, 4x faster than original Whisper # Developed by European open-source community import faster_whisper FASTER_WHISPER_AVAILABLE = True except ImportError: FASTER_WHISPER_AVAILABLE = False # Audio processing availability (European community solution only) AUDIO_AVAILABLE = FASTER_WHISPER_AVAILABLE # Hugging Face transformers for additional capabilities try: from transformers import pipeline, AutoProcessor, AutoModel import torch TRANSFORMERS_AVAILABLE = True except ImportError: TRANSFORMERS_AVAILABLE = False # AGNO framework from agno.tools.toolkit import Toolkit # Response formatting from utils.response_formatter import ( ResponseFormatter, ResponseType, FormatConfig, FormatStandard, ) logger = logging.getLogger(__name__) class OpenSourceMultimodalTools(Toolkit): """ Open-source multimodal tools using Mistral and other open models. This is a tool collection, not an agent. It provides multimodal capabilities that can be integrated into AGNO agents. Capabilities: - Image analysis using BLIP-2 and Mistral Vision - Audio transcription using Faster-Whisper (European community-driven) - Document analysis using DistilBERT - Text generation using Mistral models - All using open-source models with no proprietary dependencies """ def __init__(self): """Initialize the Mistral-based multimodal agent.""" logger.info("🚀 Initializing Mistral Multimodal Agent (Open Source)...") # Load environment variables from .env file self._load_env_file() # Initialize response formatter self._init_response_formatter() # Initialize Mistral client self.mistral_client = None self.mistral_api_key = get_api_key('mistral') if self.mistral_api_key and MISTRAL_AVAILABLE and MistralClient: try: if MISTRAL_CLIENT_TYPE == "new": # New API initialization self.mistral_client = MistralClient(api_key=self.mistral_api_key) logger.info("✅ Mistral client initialized (new API)") else: # Old API initialization (deprecated) self.mistral_client = MistralClient(api_key=self.mistral_api_key) logger.info("✅ Mistral client initialized (old API - deprecated)") except Exception as e: if not should_suppress_warnings(): logger.warning(f"âš ī¸ Mistral client initialization failed: {e}") else: if not should_suppress_warnings(): if not MISTRAL_AVAILABLE: logger.info("â„šī¸ Mistral library not available - using fallback models") elif not self.mistral_api_key: logger.info("â„šī¸ MISTRAL_API_KEY not found - using open-source alternatives") # Initialize open-source models self.whisper_model = None self.vision_pipeline = None self.document_pipeline = None self._init_open_source_models() # Track available capabilities self.capabilities = self._assess_capabilities() # Build tools list for AGNO registration tools = [ self.analyze_image, self.transcribe_audio, self.analyze_document ] # Initialize the toolkit with auto-registration enabled super().__init__(name="multimodal_tools", tools=tools) logger.info("✅ Mistral Multimodal Agent initialized") logger.info(f"📊 Available capabilities: {list(self.capabilities.keys())}") logger.info(f"🔧 Registered AGNO tools: {[tool.__name__ for tool in tools]}") def _load_env_file(self): """Load environment variables from .env file if it exists.""" from pathlib import Path env_file = Path('.env') if env_file.exists(): with open(env_file, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() logger.info("✅ Environment variables loaded from .env file") # Reload the environment manager to pick up new variables from utils.environment_setup import env_manager env_manager._load_environment() def _init_response_formatter(self): """Initialize response formatter for consistent output.""" format_config = FormatConfig( format_standard=FormatStandard.HF_EVALUATION, remove_markdown=True, remove_prefixes=True, strip_whitespace=True, normalize_spaces=True ) self.response_formatter = ResponseFormatter(config=format_config) def _init_open_source_models(self): """Initialize open-source models for multimodal capabilities.""" # Initialize Faster-Whisper (European community-driven alternative) self.whisper_model = None if FASTER_WHISPER_AVAILABLE: try: # Use CPU-optimized configuration for European deployment self.whisper_model = faster_whisper.WhisperModel( "base", # Lightweight model for efficiency device="cpu", # CPU-friendly for European servers compute_type="int8", # Memory-efficient quantization num_workers=1 # Conservative resource usage ) logger.info("✅ Faster-Whisper loaded (European community-driven alternative)") logger.info("đŸ‡ĒđŸ‡ē Using CPU-optimized configuration for European deployment") except Exception as e: logger.warning(f"âš ī¸ Faster-Whisper loading failed: {e}") if not self.whisper_model: logger.warning("âš ī¸ No audio transcription available") logger.info("💡 Install: pip install faster-whisper (European community alternative)") # Initialize vision pipeline using open models if TRANSFORMERS_AVAILABLE: try: # Use BLIP-2 for image captioning (open source) self.vision_pipeline = pipeline( "image-to-text", model="Salesforce/blip-image-captioning-base", device=0 if torch.cuda.is_available() else -1 ) logger.info("✅ Vision pipeline initialized (BLIP-2)") except Exception as e: logger.warning(f"âš ī¸ Vision pipeline initialization failed: {e}") try: # Document analysis pipeline self.document_pipeline = pipeline( "question-answering", model="distilbert-base-cased-distilled-squad" ) logger.info("✅ Document analysis pipeline initialized") except Exception as e: logger.warning(f"âš ī¸ Document pipeline initialization failed: {e}") def _assess_capabilities(self) -> Dict[str, bool]: """Assess what multimodal capabilities are available.""" return { 'text_generation': self.mistral_client is not None, 'image_analysis': self.vision_pipeline is not None or self.mistral_client is not None, 'audio_transcription': self.whisper_model is not None, 'document_analysis': self.document_pipeline is not None, 'vision_reasoning': self.mistral_client is not None, # Mistral Vision } def analyze_image(self, image_input: Union[str, bytes, Image.Image, dict], question: str = None) -> str: """ Analyze an image using open-source models. Args: image_input: Image file path, bytes, PIL Image, or dict with file_path question: Optional specific question about the image Returns: Analysis result as string """ try: # Convert input to PIL Image if isinstance(image_input, dict): # Handle AGNO tool format: {'file_path': 'image.png'} if 'file_path' in image_input: image_path = image_input['file_path'] if os.path.exists(image_path): image = Image.open(image_path) else: return f"Error: Image file not found: {image_path}" else: return "Error: Dictionary input must contain 'file_path' key" elif isinstance(image_input, str): if os.path.exists(image_input): image = Image.open(image_input) else: # Assume it's a URL response = requests.get(image_input) image = Image.open(io.BytesIO(response.content)) elif isinstance(image_input, bytes): image = Image.open(io.BytesIO(image_input)) elif isinstance(image_input, Image.Image): image = image_input else: return "Error: Unsupported image input format" # Try Mistral Vision first (if available) if self.mistral_client and question: try: result = self._analyze_with_mistral_vision(image, question) if result: return result except Exception as e: logger.warning(f"Mistral Vision failed: {e}") # Fallback to open-source vision pipeline if self.vision_pipeline: try: # Generate image caption caption_result = self.vision_pipeline(image) caption = caption_result[0]['generated_text'] if caption_result else "Unable to generate caption" if question: # Use Mistral to reason about the image based on caption if self.mistral_client: reasoning_prompt = f""" Image Description: {caption} Question: {question} Based on the image description, please answer the question about the image. """ if MISTRAL_CLIENT_TYPE == "new": response = self.mistral_client.chat.complete( model="mistral-large-latest", messages=[UserMessage(content=reasoning_prompt)] ) else: # Old API format (deprecated) response = self.mistral_client.chat( model="mistral-large-latest", messages=[UserMessage(content=reasoning_prompt)] ) return response.choices[0].message.content else: return f"Image shows: {caption}. Question: {question} (Unable to reason without Mistral API)" else: return f"Image analysis: {caption}" except Exception as e: logger.error(f"Vision pipeline failed: {e}") return f"Error analyzing image: {e}" return "Error: No image analysis capabilities available" except Exception as e: logger.error(f"Image analysis failed: {e}") return f"Error: {e}" def _analyze_with_mistral_vision(self, image: Image.Image, question: str) -> Optional[str]: """ Analyze image using Mistral Vision model. Args: image: PIL Image object question: Question about the image Returns: Analysis result or None if failed """ try: # Convert image to base64 buffer = io.BytesIO() image.save(buffer, format='PNG') image_b64 = base64.b64encode(buffer.getvalue()).decode() # Create message with image - compatible with both API versions messages = [ UserMessage( content=[ { "type": "text", "text": question }, { "type": "image_url", "image_url": f"data:image/png;base64,{image_b64}" } ] ) ] # Use Mistral Vision model - different API call formats if MISTRAL_CLIENT_TYPE == "new": response = self.mistral_client.chat.complete( model="pixtral-12b-2409", # Mistral's vision model messages=messages ) else: # Old API format (deprecated) response = self.mistral_client.chat( model="pixtral-12b-2409", # Mistral's vision model messages=messages ) return response.choices[0].message.content except Exception as e: logger.warning(f"Mistral Vision analysis failed: {e}") return None def transcribe_audio(self, audio_input: Union[str, bytes, dict]) -> str: """ Transcribe audio using Faster-Whisper (European community-driven alternative). Args: audio_input: Audio file path, bytes, or dict with 'file_path' key Returns: Transcription text """ if not self.whisper_model: return "Error: Audio transcription not available (Faster-Whisper not loaded)" try: # Handle different input types from AGNO framework if isinstance(audio_input, dict): # AGNO passes {'file_path': '/path/to/file'} if 'file_path' in audio_input: file_path = audio_input['file_path'] else: return "Error: Invalid audio input format - expected 'file_path' key in dict" elif isinstance(audio_input, str): # Direct file path file_path = audio_input elif isinstance(audio_input, bytes): # Handle bytes input - save to temporary file import tempfile with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp: tmp.write(audio_input) tmp.flush() file_path = tmp.name else: return f"Error: Unsupported audio input type: {type(audio_input)}" # Transcribe using Faster-Whisper segments, info = self.whisper_model.transcribe(file_path) transcription = " ".join([segment.text for segment in segments]) # Clean up temporary file if we created one if isinstance(audio_input, bytes): os.unlink(file_path) logger.info(f"đŸ‡ĒđŸ‡ē Audio transcribed using Faster-Whisper (European community)") return transcription.strip() except Exception as e: logger.error(f"Audio transcription failed: {e}") return f"Error transcribing audio: {e}" def analyze_document(self, document_text: str, question: str) -> str: """ Analyze document content and answer questions. Args: document_text: Text content of document question: Question about the document Returns: Answer based on document analysis """ try: # Use Mistral for complex reasoning if available if self.mistral_client: prompt = f""" Document Content: {document_text[:4000]} # Limit length Question: {question} Please analyze the document and answer the question based on the content provided. """ if MISTRAL_CLIENT_TYPE == "new": response = self.mistral_client.chat.complete( model="mistral-large-latest", messages=[UserMessage(content=prompt)] ) else: # Old API format (deprecated) response = self.mistral_client.chat( model="mistral-large-latest", messages=[UserMessage(content=prompt)] ) return response.choices[0].message.content # Fallback to simple QA pipeline elif self.document_pipeline: result = self.document_pipeline( question=question, context=document_text[:1000] # Limit context length ) return result['answer'] else: return "Error: Document analysis not available" except Exception as e: logger.error(f"Document analysis failed: {e}") return f"Error analyzing document: {e}" def generate_text(self, prompt: str, max_tokens: int = 500) -> str: """ Generate text using Mistral model. Args: prompt: Input prompt max_tokens: Maximum tokens to generate Returns: Generated text """ if not self.mistral_client: return "Error: Text generation not available (Mistral API key required)" try: if MISTRAL_CLIENT_TYPE == "new": response = self.mistral_client.chat.complete( model="mistral-large-latest", messages=[UserMessage(content=prompt)], max_tokens=max_tokens ) else: # Old API format (deprecated) response = self.mistral_client.chat( model="mistral-large-latest", messages=[UserMessage(content=prompt)], max_tokens=max_tokens ) return response.choices[0].message.content except Exception as e: logger.error(f"Text generation failed: {e}") return f"Error generating text: {e}" def __call__(self, question: str, **kwargs) -> str: """ Main interface for the multimodal agent. Args: question: User question/request **kwargs: Additional parameters (image, audio, document, etc.) Returns: Formatted response """ try: logger.info(f"🤔 Processing multimodal question: {question[:100]}...") # Check for multimodal inputs if 'image' in kwargs: result = self.analyze_image(kwargs['image'], question) elif 'audio' in kwargs: # First transcribe, then process transcription = self.transcribe_audio(kwargs['audio']) combined_question = f"Audio transcription: {transcription}\nQuestion: {question}" result = self.generate_text(combined_question) elif 'document' in kwargs: result = self.analyze_document(kwargs['document'], question) else: # Pure text generation result = self.generate_text(question) # Format response formatted_result = self.response_formatter.format_response( result, response_type=ResponseType.DIRECT_ANSWER ) logger.info(f"📤 Mistral Multimodal Agent response: {formatted_result[:100]}...") return formatted_result except Exception as e: logger.error(f"Multimodal processing failed: {e}") return "Error processing multimodal request" def get_capabilities_status(self) -> Dict[str, Any]: """Get detailed status of multimodal capabilities.""" return { 'agent_type': 'mistral_multimodal', 'capabilities': self.capabilities, 'models': { 'text_generation': 'mistral-large-latest' if self.mistral_client else None, 'vision': 'pixtral-12b-2409' if self.mistral_client else 'BLIP-2', 'audio': 'faster-whisper-base' if self.whisper_model else None, 'document_qa': 'distilbert-base-cased' if self.document_pipeline else None, }, 'dependencies': { 'mistral_api': self.mistral_client is not None, 'whisper': FASTER_WHISPER_AVAILABLE and self.whisper_model is not None, 'transformers': TRANSFORMERS_AVAILABLE, 'vision_pipeline': self.vision_pipeline is not None, } } # Convenience function for easy import def create_mistral_multimodal_agent(): """Create and return an open-source multimodal tools instance.""" return OpenSourceMultimodalTools() def create_open_source_multimodal_tools(): """Create and return an open-source multimodal tools instance.""" return OpenSourceMultimodalTools()