""" Module: tests.test_infrastructure.test_retry_policy Description: Tests for retry policies and circuit breaker Author: Anderson H. Silva Date: 2025-01-25 License: Proprietary - All rights reserved """ import pytest import asyncio from datetime import datetime, timedelta from unittest.mock import MagicMock, AsyncMock, patch from src.infrastructure.queue.retry_policy import ( RetryStrategy, RetryPolicy, RetryHandler, CircuitBreaker, DEFAULT_RETRY_POLICY, AGGRESSIVE_RETRY_POLICY, GENTLE_RETRY_POLICY ) class TestRetryPolicy: """Test suite for retry policies.""" def test_default_policy(self): """Test default retry policy settings.""" assert DEFAULT_RETRY_POLICY.strategy == RetryStrategy.EXPONENTIAL_BACKOFF assert DEFAULT_RETRY_POLICY.max_attempts == 3 assert DEFAULT_RETRY_POLICY.initial_delay == 1.0 assert DEFAULT_RETRY_POLICY.jitter is True def test_aggressive_policy(self): """Test aggressive retry policy.""" assert AGGRESSIVE_RETRY_POLICY.max_attempts == 5 assert AGGRESSIVE_RETRY_POLICY.initial_delay == 0.5 assert AGGRESSIVE_RETRY_POLICY.multiplier == 1.5 def test_gentle_policy(self): """Test gentle retry policy.""" assert GENTLE_RETRY_POLICY.strategy == RetryStrategy.LINEAR_BACKOFF assert GENTLE_RETRY_POLICY.max_attempts == 2 assert GENTLE_RETRY_POLICY.jitter is False class TestRetryHandler: """Test suite for retry handler.""" def test_should_retry_max_attempts(self): """Test retry decision based on max attempts.""" policy = RetryPolicy(max_attempts=3) handler = RetryHandler(policy) exception = ValueError("Test error") assert handler.should_retry(exception, 1) is True assert handler.should_retry(exception, 2) is True assert handler.should_retry(exception, 3) is False # Max reached def test_should_retry_exception_whitelist(self): """Test retry with specific exception types.""" policy = RetryPolicy( retry_on=[ValueError, TypeError] ) handler = RetryHandler(policy) assert handler.should_retry(ValueError("test"), 1) is True assert handler.should_retry(TypeError("test"), 1) is True assert handler.should_retry(RuntimeError("test"), 1) is False def test_should_retry_exception_blacklist(self): """Test retry with exception blacklist.""" policy = RetryPolicy( dont_retry_on=[RuntimeError, KeyError] ) handler = RetryHandler(policy) assert handler.should_retry(ValueError("test"), 1) is True assert handler.should_retry(RuntimeError("test"), 1) is False assert handler.should_retry(KeyError("test"), 1) is False def test_calculate_delay_fixed(self): """Test fixed delay calculation.""" policy = RetryPolicy( strategy=RetryStrategy.FIXED_DELAY, initial_delay=2.0, jitter=False ) handler = RetryHandler(policy) assert handler.calculate_delay(1) == 2.0 assert handler.calculate_delay(2) == 2.0 assert handler.calculate_delay(3) == 2.0 def test_calculate_delay_exponential(self): """Test exponential backoff calculation.""" policy = RetryPolicy( strategy=RetryStrategy.EXPONENTIAL_BACKOFF, initial_delay=1.0, multiplier=2.0, jitter=False ) handler = RetryHandler(policy) assert handler.calculate_delay(1) == 1.0 assert handler.calculate_delay(2) == 2.0 assert handler.calculate_delay(3) == 4.0 assert handler.calculate_delay(4) == 8.0 def test_calculate_delay_linear(self): """Test linear backoff calculation.""" policy = RetryPolicy( strategy=RetryStrategy.LINEAR_BACKOFF, initial_delay=2.0, jitter=False ) handler = RetryHandler(policy) assert handler.calculate_delay(1) == 2.0 assert handler.calculate_delay(2) == 4.0 assert handler.calculate_delay(3) == 6.0 def test_calculate_delay_fibonacci(self): """Test fibonacci backoff calculation.""" policy = RetryPolicy( strategy=RetryStrategy.FIBONACCI, initial_delay=1.0, jitter=False ) handler = RetryHandler(policy) assert handler.calculate_delay(1) == 1.0 # fib(1) = 1 assert handler.calculate_delay(2) == 1.0 # fib(2) = 1 assert handler.calculate_delay(3) == 2.0 # fib(3) = 2 assert handler.calculate_delay(4) == 3.0 # fib(4) = 3 assert handler.calculate_delay(5) == 5.0 # fib(5) = 5 def test_calculate_delay_with_jitter(self): """Test delay calculation with jitter.""" policy = RetryPolicy( strategy=RetryStrategy.FIXED_DELAY, initial_delay=10.0, jitter=True ) handler = RetryHandler(policy) # With jitter, delay should be within ±25% of base delays = [handler.calculate_delay(1) for _ in range(10)] assert all(7.5 <= d <= 12.5 for d in delays) # Should have some variation assert len(set(delays)) > 1 def test_calculate_delay_max_cap(self): """Test delay is capped at max_delay.""" policy = RetryPolicy( strategy=RetryStrategy.EXPONENTIAL_BACKOFF, initial_delay=10.0, multiplier=10.0, max_delay=50.0, jitter=False ) handler = RetryHandler(policy) assert handler.calculate_delay(1) == 10.0 assert handler.calculate_delay(2) == 50.0 # Would be 100 but capped assert handler.calculate_delay(3) == 50.0 # Would be 1000 but capped @pytest.mark.asyncio async def test_execute_with_retry_success(self): """Test successful execution without retry.""" policy = RetryPolicy() handler = RetryHandler(policy) async def successful_func(value): return value * 2 result = await handler.execute_with_retry(successful_func, 5) assert result == 10 @pytest.mark.asyncio async def test_execute_with_retry_eventual_success(self): """Test execution that succeeds after retries.""" policy = RetryPolicy( initial_delay=0.1, jitter=False ) handler = RetryHandler(policy) attempt_count = 0 async def flaky_func(): nonlocal attempt_count attempt_count += 1 if attempt_count < 3: raise ValueError("Temporary failure") return "success" result = await handler.execute_with_retry(flaky_func) assert result == "success" assert attempt_count == 3 @pytest.mark.asyncio async def test_execute_with_retry_max_attempts_exceeded(self): """Test execution that fails after max attempts.""" policy = RetryPolicy( max_attempts=2, initial_delay=0.1, jitter=False ) handler = RetryHandler(policy) async def always_failing_func(): raise ValueError("Always fails") with pytest.raises(ValueError) as exc_info: await handler.execute_with_retry(always_failing_func) assert str(exc_info.value) == "Always fails" @pytest.mark.asyncio async def test_execute_with_retry_callbacks(self): """Test retry callbacks.""" retry_calls = [] failure_calls = [] def on_retry(exc, attempt, delay): retry_calls.append((str(exc), attempt, delay)) def on_failure(exc, attempt, delay): failure_calls.append((str(exc), attempt)) policy = RetryPolicy( max_attempts=2, initial_delay=0.1, jitter=False, on_retry=on_retry, on_failure=on_failure ) handler = RetryHandler(policy) async def failing_func(): raise ValueError("Test error") with pytest.raises(ValueError): await handler.execute_with_retry(failing_func) # Should have one retry callback assert len(retry_calls) == 1 assert retry_calls[0][0] == "Test error" assert retry_calls[0][1] == 1 # Should have one failure callback assert len(failure_calls) == 1 assert failure_calls[0][0] == "Test error" assert failure_calls[0][1] == 2 def test_execute_with_retry_sync_function(self): """Test retry with synchronous function.""" policy = RetryPolicy(initial_delay=0.1) handler = RetryHandler(policy) attempt_count = 0 def sync_func(): nonlocal attempt_count attempt_count += 1 if attempt_count < 2: raise ValueError("Temporary") return "success" # Run in async context result = asyncio.run(handler.execute_with_retry(sync_func)) assert result == "success" assert attempt_count == 2 class TestCircuitBreaker: """Test suite for circuit breaker.""" def test_circuit_breaker_initialization(self): """Test circuit breaker initialization.""" breaker = CircuitBreaker( failure_threshold=3, recovery_timeout=30.0 ) assert breaker.state == CircuitBreaker.State.CLOSED assert breaker.failure_count == 0 assert breaker.failure_threshold == 3 assert breaker.recovery_timeout == 30.0 def test_circuit_breaker_success(self): """Test circuit breaker with successful calls.""" breaker = CircuitBreaker() def successful_func(): return "success" # Multiple successful calls for _ in range(10): result = breaker.call(successful_func) assert result == "success" assert breaker.state == CircuitBreaker.State.CLOSED assert breaker.failure_count == 0 def test_circuit_breaker_opens_on_failures(self): """Test circuit breaker opens after threshold.""" breaker = CircuitBreaker(failure_threshold=3) def failing_func(): raise ValueError("Always fails") # First failures for i in range(3): with pytest.raises(ValueError): breaker.call(failing_func) # Circuit should be open assert breaker.state == CircuitBreaker.State.OPEN assert breaker.failure_count == 3 # Next call should fail immediately with pytest.raises(Exception) as exc_info: breaker.call(failing_func) assert "Circuit breaker is OPEN" in str(exc_info.value) def test_circuit_breaker_half_open_recovery(self): """Test circuit breaker recovery through half-open state.""" breaker = CircuitBreaker( failure_threshold=2, recovery_timeout=0.1 # Short timeout for testing ) # Open the circuit def failing_func(): raise ValueError("Fails") for _ in range(2): with pytest.raises(ValueError): breaker.call(failing_func) assert breaker.state == CircuitBreaker.State.OPEN # Wait for recovery timeout import time time.sleep(0.2) # Next call should transition to half-open def successful_func(): return "success" # First success in half-open result = breaker.call(successful_func) assert result == "success" assert breaker.state == CircuitBreaker.State.HALF_OPEN # Need more successes to fully close for _ in range(2): breaker.call(successful_func) assert breaker.state == CircuitBreaker.State.CLOSED def test_circuit_breaker_half_open_failure(self): """Test circuit breaker returns to open on half-open failure.""" breaker = CircuitBreaker( failure_threshold=2, recovery_timeout=0.1 ) # Open the circuit def failing_func(): raise ValueError("Fails") for _ in range(2): with pytest.raises(ValueError): breaker.call(failing_func) # Wait for recovery import time time.sleep(0.2) # Fail in half-open state with pytest.raises(ValueError): breaker.call(failing_func) # Should return to open assert breaker.state == CircuitBreaker.State.OPEN @pytest.mark.asyncio async def test_circuit_breaker_async(self): """Test circuit breaker with async functions.""" breaker = CircuitBreaker(failure_threshold=2) async def async_failing(): raise ValueError("Async fail") # Open circuit for _ in range(2): with pytest.raises(ValueError): await breaker.call_async(async_failing) assert breaker.state == CircuitBreaker.State.OPEN # Next call should fail immediately with pytest.raises(Exception) as exc_info: await breaker.call_async(async_failing) assert "Circuit breaker is OPEN" in str(exc_info.value) def test_circuit_breaker_expected_exception(self): """Test circuit breaker only triggers on expected exceptions.""" breaker = CircuitBreaker( failure_threshold=2, expected_exception=ValueError ) def func_with_different_error(): raise TypeError("Different error") # These shouldn't trigger the breaker for _ in range(5): with pytest.raises(TypeError): breaker.call(func_with_different_error) assert breaker.state == CircuitBreaker.State.CLOSED assert breaker.failure_count == 0 # But ValueError should def func_with_expected_error(): raise ValueError("Expected error") for _ in range(2): with pytest.raises(ValueError): breaker.call(func_with_expected_error) assert breaker.state == CircuitBreaker.State.OPEN