""" Airflow API client for Mezura. """ import requests import logging import json import os from typing import Dict, Any, Optional import re from api.config import get_api_config, get_api_config_for_type, get_airflow_config # Set up logger logger = logging.getLogger(__name__) class AirflowClient: """ Client for interacting with Airflow API """ def __init__(self): """Initialize the Airflow API client""" # Get Airflow configuration from config module airflow_config = get_airflow_config() # Set the base URL for Airflow API self.airflow_base_url = airflow_config.get("base_url") if not self.airflow_base_url: raise ValueError("Airflow base URL not found in configuration") # Get auth credentials from config auth = airflow_config.get("auth", {}) self.username = auth.get("username") self.password = auth.get("password") # Validate required auth credentials if not self.username or not self.password: error_msg = "Airflow authentication credentials not found in configuration" # Check if environment variables are set properly if auth.get("use_env", False): username_env = auth.get("env_username", "MEZURA_API_USERNAME") password_env = auth.get("env_password", "MEZURA_API_PASSWORD") username_exists = os.environ.get(username_env) is not None password_exists = os.environ.get(password_env) is not None if not username_exists or not password_exists: missing_vars = [] if not username_exists: missing_vars.append(username_env) if not password_exists: missing_vars.append(password_env) error_msg = f"Required environment variables not set: {', '.join(missing_vars)}" raise ValueError(error_msg) # Get timeout and retry settings self.timeout = airflow_config.get("timeout", 30) self.retry_attempts = airflow_config.get("retry_attempts", 3) logger.info(f"Airflow API client initialized with base URL: {self.airflow_base_url}") # SECURITY: Commented out to prevent potential credential exposure # logger.info(f"Using auth credentials: {self.username}:***") def send_dag_request(self, dag_id: str, conf: Dict[str, Any]) -> Dict[str, Any]: """ Sends a request to start a DAG run. Args: dag_id: The ID of the DAG to run conf: The configuration for the DAG run Returns: Dict[str, Any]: DAG run response """ try: # Airflow API endpoint for triggering DAGs airflow_endpoint = f"{self.airflow_base_url}/api/v1/dags/{dag_id}/dagRuns" # Create a proper copy of the configuration conf_copy = conf.copy() if conf else {} # Sanitize the configuration to ensure all values are serializable # This is especially important for username which might be an object in some cases if "username" in conf_copy: # Username is required - if it's None, this is an error condition # We'll convert it to string if needed but not remove it if conf_copy["username"] is None: logger.error("Username is None but required for API request") raise ValueError("Username is required for benchmark submission") # If username is not a primitive type, convert it to string elif not isinstance(conf_copy["username"], (str, int, float, bool)): conf_copy["username"] = str(conf_copy["username"]) # Double check for "Logout (username)" format username_str = str(conf_copy["username"]) logout_pattern = re.compile(r'Logout \(([^)]+)\)') match = logout_pattern.search(username_str) if match: conf_copy["username"] = match.group(1) # Handle any string with parentheses elif '(' in username_str and ')' in username_str: try: start = username_str.rindex('(') + 1 end = username_str.find(')', start) if start < end: extracted = username_str[start:end].strip() if extracted: # Only use if not empty conf_copy["username"] = extracted except: pass # Keep original if extraction fails else: # Username is required logger.error("Username field missing from request configuration") raise ValueError("Username is required for benchmark submission") # Create request payload - e-posta değerini olduğu gibi kullan payload = {"conf": conf_copy} # Log the request we're about to send - maske YOK, gerçek değerleri loglanacak # logger.info(f"Sending POST request to: {airflow_endpoint}") # SECURITY: Commented out to prevent potential credential exposure # logger.info(f"Request payload: {json.dumps(payload)}") # Send the request response = requests.post( airflow_endpoint, json=payload, auth=(self.username, self.password), timeout=self.timeout, headers={ 'Content-Type': 'application/json', 'Accept': 'application/json' } ) # Log the response logger.info(f"Response status code: {response.status_code}") # Check if request was successful if response.status_code in (200, 201): try: data = response.json() logger.info(f"Response data: {json.dumps(data)}") run_id = data.get("dag_run_id", "unknown") logger.info(f"DAG run triggered: {run_id}") return { "run_id": run_id, "status": "submitted", "dag_id": dag_id } except Exception as e: logger.error(f"Error parsing response: {e}") return { "error": f"Error parsing response: {str(e)}", "status": "error", "dag_id": dag_id } else: error_msg = f"API Error: {response.status_code}, {response.text}" logger.error(error_msg) return { "error": error_msg, "status": "error", "dag_id": dag_id } except Exception as e: error_msg = f"Request failed: {str(e)}" logger.error(error_msg) return { "error": error_msg, "status": "error", "dag_id": dag_id } def send_status_request(self, dag_id: str, run_id: str) -> Dict[str, Any]: """ Sends a status request to check the status of a DAG run. Args: dag_id: The ID of the DAG run_id: The DAG run ID returned by the send_dag_request method Returns: Dict[str, Any]: Status information """ try: # Airflow DAG run status endpoint status_url = f"{self.airflow_base_url}/api/v1/dags/{dag_id}/dagRuns/{run_id}" # Log the request logger.info(f"Checking status for DAG run: {run_id}, URL: {status_url}") # Send the request response = requests.get( status_url, auth=(self.username, self.password), timeout=self.timeout, headers={'Accept': 'application/json'} ) # Log response status logger.info(f"Status response code: {response.status_code}") if response.status_code == 200: try: data = response.json() state = data.get("state", "unknown") # Map Airflow states to Mezura states status_mapping = { "running": "running", "success": "completed", "failed": "failed", "queued": "pending" } status_info = { "status": status_mapping.get(state, "unknown"), "progress": 100 if state == "success" else 0, "current_step": state, "error": None if state != "failed" else "DAG execution failed", "run_id": run_id, "dag_id": dag_id } logger.info(f"DAG run status: {state}") return status_info except Exception as e: error_msg = f"Error parsing status response: {str(e)}" logger.error(error_msg) return { "status": "error", "error": error_msg, "run_id": run_id, "dag_id": dag_id } else: error_msg = f"Status API Error: {response.status_code}, {response.text}" logger.error(error_msg) return { "status": "error", "error": error_msg, "run_id": run_id, "dag_id": dag_id } except Exception as e: error_msg = f"Status request failed: {str(e)}" logger.error(error_msg) return { "status": "error", "error": error_msg, "run_id": run_id, "dag_id": dag_id } def send_logs_request(self, dag_id: str, run_id: str, task_id: str = "process_results") -> Dict[str, Any]: """ Sends a request to get the logs of a DAG run. Args: dag_id: The ID of the DAG run_id: The DAG run ID task_id: The task ID to get logs for, defaults to process_results Returns: Dict[str, Any]: Log information """ try: # Log endpoint URL logs_url = f"{self.airflow_base_url}/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/logs" # Log the request logger.info(f"Getting logs for DAG run ID: {run_id}, URL: {logs_url}") # Send the request response = requests.get( logs_url, auth=(self.username, self.password), timeout=self.timeout, headers={'Accept': 'application/json'} ) # Log response status logger.info(f"Logs response code: {response.status_code}") if response.status_code == 200: return { "logs": response.text, "status": "success", "run_id": run_id, "dag_id": dag_id } else: error_msg = f"Logs API Error: {response.status_code}, {response.text}" logger.error(error_msg) return { "status": "error", "error": error_msg, "run_id": run_id, "dag_id": dag_id, "logs": "Failed to retrieve logs" } except Exception as e: error_msg = f"Logs request failed: {str(e)}" logger.error(error_msg) return { "status": "error", "error": error_msg, "run_id": run_id, "dag_id": dag_id, "logs": "Failed to retrieve logs due to an error" }