cidadao.ai-backend / tests /integration /test_cache_service_integration.py
anderson-ufrj
test(cache): add comprehensive tests for cache service
4230b3c
"""Integration tests for cache service with real Redis"""
import pytest
import asyncio
from datetime import datetime
import time
import os
from src.services.cache_service import CacheService, cache_result
# Skip these tests if Redis is not available
redis_available = os.getenv("REDIS_URL") is not None
@pytest.mark.skipif(not redis_available, reason="Redis not available")
class TestCacheServiceIntegration:
"""Integration tests with real Redis instance"""
@pytest.fixture
async def cache_service(self):
"""Create real cache service instance"""
service = CacheService()
await service.initialize()
yield service
# Cleanup
await service.redis.flushdb()
await service.close()
@pytest.mark.asyncio
async def test_basic_operations(self, cache_service):
"""Test basic get/set/delete operations"""
# Set value
assert await cache_service.set("test_key", {"data": "value"}, ttl=10)
# Get value
result = await cache_service.get("test_key")
assert result == {"data": "value"}
# Delete value
assert await cache_service.delete("test_key")
# Verify deleted
result = await cache_service.get("test_key")
assert result is None
@pytest.mark.asyncio
async def test_ttl_expiration(self, cache_service):
"""Test TTL expiration"""
# Set with 1 second TTL
await cache_service.set("expire_key", "value", ttl=1)
# Should exist immediately
assert await cache_service.get("expire_key") == "value"
# Wait for expiration
await asyncio.sleep(1.5)
# Should be expired
assert await cache_service.get("expire_key") is None
@pytest.mark.asyncio
async def test_compression(self, cache_service):
"""Test compression for large values"""
large_data = {"data": "x" * 10000} # ~10KB
# Set with compression
await cache_service.set("compressed_key", large_data, compress=True)
# Get with decompression
result = await cache_service.get("compressed_key", decompress=True)
assert result == large_data
@pytest.mark.asyncio
async def test_chat_response_caching(self, cache_service):
"""Test chat response caching workflow"""
response = {
"message": "This is a test response",
"confidence": 0.95,
"agent": "test_agent"
}
# Cache response
assert await cache_service.cache_chat_response("Hello world", response, "greeting")
# Get cached response
cached = await cache_service.get_cached_chat_response("Hello world", "greeting")
assert cached == response
# Case insensitive and trimmed
cached2 = await cache_service.get_cached_chat_response(" hello WORLD ", "greeting")
assert cached2 == response
@pytest.mark.asyncio
async def test_session_management(self, cache_service):
"""Test session state management"""
session_id = "test_session_123"
initial_state = {
"user_id": "user_456",
"started_at": datetime.utcnow().isoformat(),
"context": {"intent": "help"}
}
# Save session
assert await cache_service.save_session_state(session_id, initial_state)
# Get session
state = await cache_service.get_session_state(session_id)
assert state["user_id"] == "user_456"
assert state["context"]["intent"] == "help"
assert "last_updated" in state
# Update field
assert await cache_service.update_session_field(session_id, "messages_count", 5)
# Verify update
state = await cache_service.get_session_state(session_id)
assert state["messages_count"] == 5
assert state["user_id"] == "user_456" # Original fields preserved
@pytest.mark.asyncio
async def test_investigation_caching(self, cache_service):
"""Test investigation result caching"""
investigation_id = "inv_789"
results = {
"anomalies_found": 3,
"contracts_analyzed": 150,
"findings": [
{"type": "price_anomaly", "severity": "high"},
{"type": "vendor_concentration", "severity": "medium"}
]
}
# Cache investigation
assert await cache_service.cache_investigation_result(investigation_id, results)
# Retrieve investigation
cached = await cache_service.get_cached_investigation(investigation_id)
assert cached == results
@pytest.mark.asyncio
async def test_search_results_caching(self, cache_service):
"""Test search results caching with filters"""
query = "contratos suspeitos"
filters = {"year": 2024, "min_value": 100000, "status": "active"}
results = [
{"id": "1", "title": "Contrato A", "value": 150000},
{"id": "2", "title": "Contrato B", "value": 200000}
]
# Cache search results
assert await cache_service.cache_search_results(query, filters, results)
# Get cached results
cached = await cache_service.get_cached_search_results(query, filters)
assert cached == results
# Different filters should not hit cache
different_filters = {"year": 2023, "min_value": 100000, "status": "active"}
cached2 = await cache_service.get_cached_search_results(query, different_filters)
assert cached2 is None
@pytest.mark.asyncio
async def test_agent_context_management(self, cache_service):
"""Test agent context storage"""
agent_id = "zumbi"
session_id = "session_123"
context = {
"investigation_id": "inv_456",
"findings": ["anomaly1", "anomaly2"],
"confidence": 0.85
}
# Save context
assert await cache_service.save_agent_context(agent_id, session_id, context)
# Get context
retrieved = await cache_service.get_agent_context(agent_id, session_id)
assert retrieved == context
@pytest.mark.asyncio
async def test_concurrent_operations(self, cache_service):
"""Test concurrent cache operations"""
async def set_value(key, value):
await cache_service.set(key, value)
async def get_value(key):
return await cache_service.get(key)
# Create many concurrent operations
tasks = []
for i in range(50):
tasks.append(set_value(f"concurrent_{i}", f"value_{i}"))
await asyncio.gather(*tasks)
# Verify all values
get_tasks = []
for i in range(50):
get_tasks.append(get_value(f"concurrent_{i}"))
results = await asyncio.gather(*get_tasks)
for i, result in enumerate(results):
assert result == f"value_{i}"
@pytest.mark.asyncio
async def test_cache_stats(self, cache_service):
"""Test cache statistics"""
# Add some data
await cache_service.cache_chat_response("test1", {"msg": "response1"})
await cache_service.save_session_state("session1", {"data": "test"})
await cache_service.cache_investigation_result("inv1", {"results": "data"})
# Get stats
stats = await cache_service.get_cache_stats()
assert stats["connected"] is True
assert stats["total_keys"] >= 3
assert "memory_used" in stats
assert "hit_rate" in stats
assert stats["keys_by_type"]["chat"] >= 1
assert stats["keys_by_type"]["session"] >= 1
assert stats["keys_by_type"]["investigation"] >= 1
@pytest.mark.asyncio
async def test_stampede_protection(self, cache_service):
"""Test cache stampede protection"""
refresh_count = 0
async def slow_refresh():
nonlocal refresh_count
refresh_count += 1
await asyncio.sleep(0.1)
return {"refreshed": refresh_count}
# Set initial value with short TTL
await cache_service.set("stampede_key", {"initial": "value"}, ttl=2)
# Multiple concurrent requests near expiration
await asyncio.sleep(1.5) # Close to expiration
tasks = []
for _ in range(10):
tasks.append(
cache_service.get_with_stampede_protection(
"stampede_key",
10,
slow_refresh
)
)
results = await asyncio.gather(*tasks)
# All should get same value
assert all(r == results[0] for r in results)
# Refresh should be called only once or twice (not 10 times)
assert refresh_count <= 2
@pytest.mark.skipif(not redis_available, reason="Redis not available")
class TestCacheDecoratorIntegration:
"""Integration tests for cache decorator"""
@pytest.fixture
async def cleanup_redis(self):
"""Clean Redis after tests"""
yield
service = CacheService()
await service.initialize()
await service.redis.flushdb()
await service.close()
@pytest.mark.asyncio
async def test_decorator_caching(self, cleanup_redis):
"""Test function caching with decorator"""
call_count = 0
@cache_result("expensive", ttl=5)
async def expensive_function(param1, param2):
nonlocal call_count
call_count += 1
await asyncio.sleep(0.1) # Simulate expensive operation
return {"result": f"{param1}-{param2}", "count": call_count}
# First call - should execute
result1 = await expensive_function("test", "123")
assert result1["count"] == 1
# Second call - should use cache
result2 = await expensive_function("test", "123")
assert result2["count"] == 1 # Same count, not executed again
# Different parameters - should execute
result3 = await expensive_function("test", "456")
assert result3["count"] == 2
# Original parameters again - should use cache
result4 = await expensive_function("test", "123")
assert result4["count"] == 1