anderson-ufrj commited on
Commit
7977566
·
1 Parent(s): 38a0e2a

test(agents): add comprehensive tests for new agents and orchestration

Browse files

- Add integration tests for José Bonifácio and Maria Quitéria agents
- Test policy-security compliance workflow integration
- Create security-driven policy recommendations tests
- Add parallel agent execution tests
- Implement advanced orchestration tests with multiple patterns
- Add comprehensive performance tests with defined targets
- Test individual agent response times and throughput
- Implement multi-agent pipeline performance tests

tests/integration/test_new_agents_integration.py ADDED
@@ -0,0 +1,424 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Integration tests for José Bonifácio and Maria Quitéria agents.
3
+ Tests agent interactions, coordination, and complete workflows.
4
+ """
5
+
6
+ import pytest
7
+ import asyncio
8
+ from datetime import datetime
9
+ from uuid import uuid4
10
+ from unittest.mock import AsyncMock, patch
11
+
12
+ from src.agents import (
13
+ BonifacioAgent,
14
+ MariaQuiteriaAgent,
15
+ MasterAgent,
16
+ ZumbiAgent,
17
+ AgentContext,
18
+ AgentMessage
19
+ )
20
+ from src.models.agent import AgentStatus
21
+ from src.services.investigation_service import InvestigationService
22
+
23
+
24
+ @pytest.fixture
25
+ async def investigation_service():
26
+ """Create investigation service for tests."""
27
+ service = InvestigationService()
28
+ return service
29
+
30
+
31
+ @pytest.fixture
32
+ def investigation_context():
33
+ """Create investigation context for integration tests."""
34
+ return AgentContext(
35
+ investigation_id=str(uuid4()),
36
+ user_id="integration-tester",
37
+ session_id=str(uuid4()),
38
+ metadata={
39
+ "test_type": "integration",
40
+ "agents": ["bonifacio", "maria_quiteria"],
41
+ "timestamp": datetime.utcnow().isoformat()
42
+ }
43
+ )
44
+
45
+
46
+ class TestBonifacioMariaQuiteriaIntegration:
47
+ """Integration tests for Bonifácio and Maria Quitéria agents."""
48
+
49
+ @pytest.mark.integration
50
+ @pytest.mark.asyncio
51
+ async def test_policy_security_compliance_workflow(self, investigation_context):
52
+ """Test complete workflow: policy analysis + security compliance check."""
53
+ # Initialize agents
54
+ bonifacio = BonifacioAgent()
55
+ maria_quiteria = MariaQuiteriaAgent()
56
+
57
+ # Step 1: Analyze policy with Bonifácio
58
+ policy_message = AgentMessage(
59
+ type="policy_analysis",
60
+ data={
61
+ "policy_name": "Sistema Nacional de Dados Sensíveis",
62
+ "policy_area": "security",
63
+ "budget_data": {
64
+ "planned": 10_000_000,
65
+ "executed": 9_500_000
66
+ }
67
+ },
68
+ sender="master",
69
+ metadata={"step": "policy_analysis"}
70
+ )
71
+
72
+ policy_response = await bonifacio.process(policy_message, investigation_context)
73
+
74
+ assert policy_response.success is True
75
+ assert "policy_evaluation" in policy_response.data
76
+
77
+ # Step 2: Check security compliance based on policy
78
+ security_message = AgentMessage(
79
+ type="security_audit",
80
+ data={
81
+ "system_name": "Sistema Nacional de Dados Sensíveis",
82
+ "policy_requirements": policy_response.data["policy_evaluation"],
83
+ "compliance_frameworks": ["LGPD", "ISO27001"]
84
+ },
85
+ sender="bonifacio",
86
+ metadata={"step": "security_verification"}
87
+ )
88
+
89
+ security_response = await maria_quiteria.process(security_message, investigation_context)
90
+
91
+ assert security_response.success is True
92
+ assert "security_assessment" in security_response.data
93
+
94
+ # Verify cross-agent data flow
95
+ compliance_status = security_response.data["security_assessment"]["compliance_status"]
96
+ assert "LGPD" in compliance_status
97
+ assert compliance_status["LGPD"] > 0.7 # Policy should ensure good compliance
98
+
99
+ @pytest.mark.integration
100
+ @pytest.mark.asyncio
101
+ async def test_multi_agent_investigation_with_new_agents(self, investigation_service, investigation_context):
102
+ """Test complete investigation involving all agents including new ones."""
103
+
104
+ # Mock external data sources
105
+ with patch("src.services.data_service.TransparencyAPIClient") as mock_api:
106
+ mock_api.return_value.search_contracts.return_value = [{
107
+ "id": "contract-123",
108
+ "valor": 5_000_000,
109
+ "objeto": "Sistema de Segurança Digital com Compliance LGPD",
110
+ "fornecedor": "TechSec Solutions",
111
+ "modalidade": "Pregão Eletrônico",
112
+ "data_assinatura": "2024-01-15"
113
+ }]
114
+
115
+ # Create investigation request
116
+ investigation_request = {
117
+ "query": "Investigar contrato de sistema de segurança com compliance",
118
+ "investigation_type": "comprehensive",
119
+ "include_agents": ["zumbi", "bonifacio", "maria_quiteria"]
120
+ }
121
+
122
+ # Execute investigation
123
+ result = await investigation_service.create_investigation(
124
+ request=investigation_request,
125
+ user_id=investigation_context.user_id
126
+ )
127
+
128
+ investigation_id = result["investigation_id"]
129
+
130
+ # Wait for investigation to complete (with timeout)
131
+ max_attempts = 30
132
+ for _ in range(max_attempts):
133
+ status = await investigation_service.get_investigation_status(investigation_id)
134
+ if status["status"] in ["completed", "failed"]:
135
+ break
136
+ await asyncio.sleep(1)
137
+
138
+ # Verify investigation results
139
+ assert status["status"] == "completed"
140
+ assert "agents_involved" in status
141
+
142
+ # Check that all requested agents participated
143
+ agents_involved = status["agents_involved"]
144
+ assert "zumbi" in agents_involved
145
+ assert "bonifacio" in agents_involved
146
+ assert "maria_quiteria" in agents_involved
147
+
148
+ @pytest.mark.integration
149
+ @pytest.mark.asyncio
150
+ async def test_security_driven_policy_recommendations(self, investigation_context):
151
+ """Test workflow: security issues trigger policy recommendations."""
152
+
153
+ bonifacio = BonifacioAgent()
154
+ maria_quiteria = MariaQuiteriaAgent()
155
+
156
+ # Step 1: Security audit finds vulnerabilities
157
+ security_audit_msg = AgentMessage(
158
+ type="security_audit",
159
+ data={
160
+ "system_name": "Portal Transparência",
161
+ "audit_scope": "comprehensive"
162
+ },
163
+ sender="master",
164
+ metadata={}
165
+ )
166
+
167
+ security_result = await maria_quiteria.process(security_audit_msg, investigation_context)
168
+
169
+ # Step 2: Based on security findings, get policy recommendations
170
+ policy_request_msg = AgentMessage(
171
+ type="policy_analysis",
172
+ data={
173
+ "policy_name": "Política de Segurança Digital",
174
+ "security_findings": security_result.data["security_assessment"],
175
+ "focus_area": "security_improvements"
176
+ },
177
+ sender="maria_quiteria",
178
+ metadata={"triggered_by": "security_audit"}
179
+ )
180
+
181
+ policy_result = await bonifacio.process(policy_request_msg, investigation_context)
182
+
183
+ # Verify recommendations address security issues
184
+ recommendations = policy_result.data["strategic_recommendations"]
185
+ assert len(recommendations) > 0
186
+
187
+ # At least one recommendation should address security
188
+ security_related = any(
189
+ "security" in rec.get("area", "").lower() or
190
+ "compliance" in rec.get("area", "").lower()
191
+ for rec in recommendations
192
+ )
193
+ assert security_related is True
194
+
195
+ @pytest.mark.integration
196
+ @pytest.mark.asyncio
197
+ async def test_parallel_agent_execution(self, investigation_context):
198
+ """Test parallel execution of Bonifácio and Maria Quitéria."""
199
+
200
+ bonifacio = BonifacioAgent()
201
+ maria_quiteria = MariaQuiteriaAgent()
202
+
203
+ # Create messages for parallel execution
204
+ policy_msg = AgentMessage(
205
+ type="policy_analysis",
206
+ data="Programa Nacional de Cibersegurança",
207
+ sender="master",
208
+ metadata={}
209
+ )
210
+
211
+ security_msg = AgentMessage(
212
+ type="security_audit",
213
+ data="Sistema Nacional de Cibersegurança",
214
+ sender="master",
215
+ metadata={}
216
+ )
217
+
218
+ # Execute agents in parallel
219
+ start_time = datetime.utcnow()
220
+
221
+ policy_task = asyncio.create_task(
222
+ bonifacio.process(policy_msg, investigation_context)
223
+ )
224
+ security_task = asyncio.create_task(
225
+ maria_quiteria.process(security_msg, investigation_context)
226
+ )
227
+
228
+ # Wait for both to complete
229
+ policy_response, security_response = await asyncio.gather(
230
+ policy_task, security_task
231
+ )
232
+
233
+ end_time = datetime.utcnow()
234
+ execution_time = (end_time - start_time).total_seconds()
235
+
236
+ # Verify both completed successfully
237
+ assert policy_response.success is True
238
+ assert security_response.success is True
239
+
240
+ # Verify parallel execution (should take less time than sequential)
241
+ # Both agents have ~2-3 second simulated delays
242
+ assert execution_time < 5 # Should complete in under 5 seconds if parallel
243
+
244
+ @pytest.mark.integration
245
+ @pytest.mark.asyncio
246
+ async def test_agent_error_recovery(self, investigation_context):
247
+ """Test error handling and recovery between agents."""
248
+
249
+ bonifacio = BonifacioAgent()
250
+ maria_quiteria = MariaQuiteriaAgent()
251
+
252
+ # Force an error in Bonifácio
253
+ with patch.object(bonifacio, '_evaluate_policy', side_effect=Exception("Policy database error")):
254
+ policy_msg = AgentMessage(
255
+ type="policy_analysis",
256
+ data={"policy_name": "Test Policy"},
257
+ sender="master",
258
+ metadata={}
259
+ )
260
+
261
+ policy_response = await bonifacio.process(policy_msg, investigation_context)
262
+ assert policy_response.success is False
263
+
264
+ # Maria Quitéria should still work independently
265
+ security_msg = AgentMessage(
266
+ type="security_audit",
267
+ data={"system_name": "Test System"},
268
+ sender="master",
269
+ metadata={"note": "bonifacio_failed"}
270
+ )
271
+
272
+ security_response = await maria_quiteria.process(security_msg, investigation_context)
273
+ assert security_response.success is True
274
+
275
+ @pytest.mark.integration
276
+ @pytest.mark.asyncio
277
+ async def test_comprehensive_compliance_workflow(self, investigation_context):
278
+ """Test complete compliance verification workflow."""
279
+
280
+ # This tests the full cycle:
281
+ # 1. Zumbi detects anomaly
282
+ # 2. Maria Quitéria performs security audit
283
+ # 3. Bonifácio analyzes policy compliance
284
+ # 4. Results are consolidated
285
+
286
+ zumbi = ZumbiAgent()
287
+ maria_quiteria = MariaQuiteriaAgent()
288
+ bonifacio = BonifacioAgent()
289
+
290
+ # Step 1: Anomaly detection
291
+ anomaly_msg = AgentMessage(
292
+ type="analyze",
293
+ data={
294
+ "contract_data": {
295
+ "valor": 10_000_000,
296
+ "objeto": "Sistema de Compliance Integrado",
297
+ "fornecedor": "ComplianceTech"
298
+ }
299
+ },
300
+ sender="master",
301
+ metadata={}
302
+ )
303
+
304
+ with patch("src.agents.zumbi.ZumbiAgent._fetch_contract_details") as mock_fetch:
305
+ mock_fetch.return_value = {
306
+ "id": "123",
307
+ "valor": 10_000_000,
308
+ "objeto": "Sistema de Compliance Integrado"
309
+ }
310
+
311
+ anomaly_response = await zumbi.process(anomaly_msg, investigation_context)
312
+
313
+ # Step 2: Security audit based on anomaly
314
+ if anomaly_response.data.get("anomalies_detected", 0) > 0:
315
+ security_msg = AgentMessage(
316
+ type="security_audit",
317
+ data={
318
+ "system_name": "Sistema de Compliance Integrado",
319
+ "triggered_by": "anomaly_detection",
320
+ "anomaly_details": anomaly_response.data
321
+ },
322
+ sender="zumbi",
323
+ metadata={}
324
+ )
325
+
326
+ security_response = await maria_quiteria.process(security_msg, investigation_context)
327
+
328
+ # Step 3: Policy compliance check
329
+ policy_msg = AgentMessage(
330
+ type="policy_analysis",
331
+ data={
332
+ "policy_name": "Política de Compliance e Segurança",
333
+ "security_assessment": security_response.data,
334
+ "contract_value": 10_000_000
335
+ },
336
+ sender="maria_quiteria",
337
+ metadata={}
338
+ )
339
+
340
+ policy_response = await bonifacio.process(policy_msg, investigation_context)
341
+
342
+ # Verify complete workflow
343
+ assert anomaly_response.success is True
344
+ assert security_response.success is True
345
+ assert policy_response.success is True
346
+
347
+ # Results should be interconnected
348
+ assert "security_assessment" in security_response.data
349
+ assert "policy_evaluation" in policy_response.data
350
+
351
+
352
+ class TestAgentCoordinationPatterns:
353
+ """Test various agent coordination patterns."""
354
+
355
+ @pytest.mark.integration
356
+ @pytest.mark.asyncio
357
+ async def test_sequential_agent_pipeline(self, investigation_context):
358
+ """Test sequential processing pipeline with data passing."""
359
+
360
+ agents = [
361
+ ZumbiAgent(),
362
+ MariaQuiteriaAgent(),
363
+ BonifacioAgent()
364
+ ]
365
+
366
+ # Initial data
367
+ current_data = {
368
+ "investigation_subject": "Contrato de Software de Segurança",
369
+ "initial_value": 5_000_000
370
+ }
371
+
372
+ # Process through pipeline
373
+ for i, agent in enumerate(agents):
374
+ message = AgentMessage(
375
+ type="analyze",
376
+ data=current_data,
377
+ sender=f"agent_{i-1}" if i > 0 else "master",
378
+ metadata={"pipeline_step": i}
379
+ )
380
+
381
+ response = await agent.process(message, investigation_context)
382
+ assert response.success is True
383
+
384
+ # Pass data forward
385
+ current_data.update({
386
+ f"{agent.name}_result": response.data
387
+ })
388
+
389
+ # Verify all agents contributed
390
+ assert "InvestigatorAgent_result" in current_data
391
+ assert "MariaQuiteriaAgent_result" in current_data
392
+ assert "BonifacioAgent_result" in current_data
393
+
394
+ @pytest.mark.integration
395
+ @pytest.mark.asyncio
396
+ async def test_fan_out_fan_in_pattern(self, investigation_context):
397
+ """Test fan-out/fan-in pattern with result aggregation."""
398
+
399
+ # Master coordinates multiple specialized agents
400
+ master = MasterAgent()
401
+
402
+ # Create complex investigation request
403
+ investigation_msg = AgentMessage(
404
+ type="investigate",
405
+ data={
406
+ "query": "Análise completa de contrato com aspectos de segurança e compliance",
407
+ "contract_id": "complex-123",
408
+ "include_analysis": ["anomaly", "security", "policy"]
409
+ },
410
+ sender="user",
411
+ metadata={}
412
+ )
413
+
414
+ # Mock the investigation service to control agent responses
415
+ with patch("src.agents.abaporu.InvestigationService") as mock_service:
416
+ mock_service.return_value.create_investigation.return_value = {
417
+ "investigation_id": "test-123",
418
+ "status": "completed"
419
+ }
420
+
421
+ response = await master.process(investigation_msg, investigation_context)
422
+
423
+ assert response.success is True
424
+ assert "investigation_id" in response.data
tests/multiagent/test_advanced_orchestration.py ADDED
@@ -0,0 +1,522 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Advanced orchestration tests for multi-agent system.
3
+ Tests complex coordination patterns, failure handling, and performance.
4
+ """
5
+
6
+ import pytest
7
+ import asyncio
8
+ from datetime import datetime, timedelta
9
+ from unittest.mock import AsyncMock, MagicMock, patch
10
+ from uuid import uuid4
11
+ import numpy as np
12
+
13
+ from src.agents import (
14
+ MasterAgent,
15
+ ZumbiAgent,
16
+ AnitaAgent,
17
+ TiradentesAgent,
18
+ BonifacioAgent,
19
+ MariaQuiteriaAgent,
20
+ AgentContext,
21
+ AgentMessage,
22
+ AgentResponse
23
+ )
24
+ from src.services.agent_orchestrator import AgentOrchestrator
25
+ from src.models.agent import AgentStatus
26
+
27
+
28
+ @pytest.fixture
29
+ async def orchestrator():
30
+ """Create agent orchestrator for tests."""
31
+ orch = AgentOrchestrator()
32
+ await orch.initialize()
33
+ return orch
34
+
35
+
36
+ @pytest.fixture
37
+ def orchestration_context():
38
+ """Create orchestration context."""
39
+ return AgentContext(
40
+ investigation_id=str(uuid4()),
41
+ user_id="orchestration-tester",
42
+ session_id=str(uuid4()),
43
+ metadata={
44
+ "test_type": "orchestration",
45
+ "timestamp": datetime.utcnow().isoformat()
46
+ }
47
+ )
48
+
49
+
50
+ class TestAdvancedOrchestration:
51
+ """Test advanced orchestration patterns."""
52
+
53
+ @pytest.mark.asyncio
54
+ @pytest.mark.integration
55
+ async def test_dynamic_agent_selection(self, orchestrator, orchestration_context):
56
+ """Test dynamic agent selection based on task requirements."""
57
+
58
+ # Define tasks with different requirements
59
+ tasks = [
60
+ {
61
+ "type": "anomaly_detection",
62
+ "data": {"contract_value": 1_000_000},
63
+ "required_capabilities": ["anomaly_detection", "pattern_analysis"]
64
+ },
65
+ {
66
+ "type": "security_audit",
67
+ "data": {"system_name": "Portal"},
68
+ "required_capabilities": ["security_audit", "threat_detection"]
69
+ },
70
+ {
71
+ "type": "policy_analysis",
72
+ "data": {"policy_name": "Digital Gov"},
73
+ "required_capabilities": ["policy_analysis", "governance"]
74
+ }
75
+ ]
76
+
77
+ # Execute dynamic routing
78
+ results = []
79
+ for task in tasks:
80
+ agent = await orchestrator.select_best_agent(
81
+ task["required_capabilities"]
82
+ )
83
+
84
+ assert agent is not None
85
+
86
+ message = AgentMessage(
87
+ type=task["type"],
88
+ data=task["data"],
89
+ sender="orchestrator",
90
+ metadata={"dynamic_routing": True}
91
+ )
92
+
93
+ response = await agent.process(message, orchestration_context)
94
+ results.append({
95
+ "task": task["type"],
96
+ "agent": agent.name,
97
+ "success": response.success
98
+ })
99
+
100
+ # Verify correct agent selection
101
+ assert results[0]["agent"] in ["InvestigatorAgent", "AnalystAgent"] # Anomaly detection
102
+ assert results[1]["agent"] == "MariaQuiteriaAgent" # Security audit
103
+ assert results[2]["agent"] == "BonifacioAgent" # Policy analysis
104
+
105
+ @pytest.mark.asyncio
106
+ @pytest.mark.integration
107
+ async def test_adaptive_retry_with_fallback(self, orchestrator, orchestration_context):
108
+ """Test adaptive retry mechanism with agent fallback."""
109
+
110
+ primary_agent = MariaQuiteriaAgent()
111
+ fallback_agent = ZumbiAgent()
112
+
113
+ # Mock primary agent to fail initially
114
+ call_count = 0
115
+ original_process = primary_agent.process
116
+
117
+ async def failing_process(message, context):
118
+ nonlocal call_count
119
+ call_count += 1
120
+ if call_count < 3:
121
+ raise Exception("Temporary failure")
122
+ return await original_process(message, context)
123
+
124
+ primary_agent.process = failing_process
125
+
126
+ # Configure orchestrator with retry and fallback
127
+ orchestrator.configure_retry_policy({
128
+ "max_retries": 2,
129
+ "backoff_multiplier": 1.5,
130
+ "fallback_agents": {
131
+ "MariaQuiteriaAgent": "InvestigatorAgent"
132
+ }
133
+ })
134
+
135
+ message = AgentMessage(
136
+ type="security_audit",
137
+ data={"system_name": "Test System"},
138
+ sender="orchestrator",
139
+ metadata={}
140
+ )
141
+
142
+ # Execute with retry logic
143
+ result = await orchestrator.execute_with_retry(
144
+ primary_agent,
145
+ message,
146
+ orchestration_context,
147
+ fallback_agent=fallback_agent
148
+ )
149
+
150
+ # Should succeed after retries
151
+ assert result.success is True
152
+ assert call_count == 3 # Failed twice, succeeded on third
153
+
154
+ @pytest.mark.asyncio
155
+ @pytest.mark.integration
156
+ async def test_conditional_workflow_branching(self, orchestrator, orchestration_context):
157
+ """Test conditional workflow branching based on intermediate results."""
158
+
159
+ # Define workflow with conditions
160
+ workflow = {
161
+ "start": "anomaly_detection",
162
+ "steps": {
163
+ "anomaly_detection": {
164
+ "agent": "zumbi",
165
+ "next": {
166
+ "condition": "anomalies_found",
167
+ "true": "security_audit",
168
+ "false": "generate_report"
169
+ }
170
+ },
171
+ "security_audit": {
172
+ "agent": "maria_quiteria",
173
+ "next": {
174
+ "condition": "high_risk",
175
+ "true": "policy_review",
176
+ "false": "generate_report"
177
+ }
178
+ },
179
+ "policy_review": {
180
+ "agent": "bonifacio",
181
+ "next": "generate_report"
182
+ },
183
+ "generate_report": {
184
+ "agent": "tiradentes",
185
+ "next": None
186
+ }
187
+ }
188
+ }
189
+
190
+ # Execute conditional workflow
191
+ initial_data = {
192
+ "contract_id": "test-123",
193
+ "value": 10_000_000
194
+ }
195
+
196
+ execution_path = await orchestrator.execute_conditional_workflow(
197
+ workflow,
198
+ initial_data,
199
+ orchestration_context
200
+ )
201
+
202
+ # Verify execution followed correct path
203
+ assert len(execution_path) >= 2 # At least start and report
204
+ assert execution_path[0]["step"] == "anomaly_detection"
205
+ assert execution_path[-1]["step"] == "generate_report"
206
+
207
+ @pytest.mark.asyncio
208
+ @pytest.mark.integration
209
+ async def test_parallel_map_reduce_pattern(self, orchestrator, orchestration_context):
210
+ """Test map-reduce pattern for parallel data processing."""
211
+
212
+ # Data to process in parallel
213
+ contracts = [
214
+ {"id": f"contract-{i}", "value": np.random.randint(100_000, 10_000_000)}
215
+ for i in range(5)
216
+ ]
217
+
218
+ # Map phase: Process each contract with appropriate agent
219
+ async def process_contract(contract):
220
+ agent = ZumbiAgent()
221
+ message = AgentMessage(
222
+ type="analyze",
223
+ data={"contract_data": contract},
224
+ sender="mapper",
225
+ metadata={"map_task": True}
226
+ )
227
+ return await agent.process(message, orchestration_context)
228
+
229
+ # Execute map phase in parallel
230
+ map_results = await asyncio.gather(
231
+ *[process_contract(c) for c in contracts]
232
+ )
233
+
234
+ # Reduce phase: Aggregate results
235
+ aggregator = AnitaAgent()
236
+ reduce_message = AgentMessage(
237
+ type="aggregate_analysis",
238
+ data={
239
+ "individual_results": [r.data for r in map_results],
240
+ "aggregation_type": "anomaly_summary"
241
+ },
242
+ sender="reducer",
243
+ metadata={"reduce_task": True}
244
+ )
245
+
246
+ final_result = await aggregator.process(reduce_message, orchestration_context)
247
+
248
+ # Verify map-reduce completed
249
+ assert all(r.success for r in map_results)
250
+ assert final_result.success is True
251
+ assert len(map_results) == len(contracts)
252
+
253
+ @pytest.mark.asyncio
254
+ @pytest.mark.integration
255
+ async def test_agent_capability_discovery(self, orchestrator):
256
+ """Test dynamic agent capability discovery and registration."""
257
+
258
+ # Get all registered agents
259
+ available_agents = await orchestrator.discover_agents()
260
+
261
+ # Verify core agents are discovered
262
+ agent_names = [a["name"] for a in available_agents]
263
+ assert "InvestigatorAgent" in agent_names or "zumbi" in agent_names
264
+ assert "MariaQuiteriaAgent" in agent_names or "maria_quiteria" in agent_names
265
+ assert "BonifacioAgent" in agent_names or "bonifacio" in agent_names
266
+
267
+ # Test capability search
268
+ security_agents = await orchestrator.find_agents_with_capability("security_audit")
269
+ assert len(security_agents) >= 1
270
+ assert any("maria" in a["name"].lower() for a in security_agents)
271
+
272
+ policy_agents = await orchestrator.find_agents_with_capability("policy_analysis")
273
+ assert len(policy_agents) >= 1
274
+ assert any("bonifacio" in a["name"].lower() for a in policy_agents)
275
+
276
+ @pytest.mark.asyncio
277
+ @pytest.mark.integration
278
+ async def test_circuit_breaker_pattern(self, orchestrator, orchestration_context):
279
+ """Test circuit breaker pattern for failing agents."""
280
+
281
+ agent = MariaQuiteriaAgent()
282
+
283
+ # Configure circuit breaker
284
+ orchestrator.configure_circuit_breaker({
285
+ "failure_threshold": 3,
286
+ "recovery_timeout": 5,
287
+ "half_open_requests": 1
288
+ })
289
+
290
+ # Mock agent to fail consistently
291
+ agent.process = AsyncMock(side_effect=Exception("Service unavailable"))
292
+
293
+ message = AgentMessage(
294
+ type="security_audit",
295
+ data={"test": True},
296
+ sender="test",
297
+ metadata={}
298
+ )
299
+
300
+ # Attempt multiple requests
301
+ results = []
302
+ for i in range(5):
303
+ try:
304
+ result = await orchestrator.execute_with_circuit_breaker(
305
+ agent, message, orchestration_context
306
+ )
307
+ results.append(("success", result))
308
+ except Exception as e:
309
+ results.append(("failure", str(e)))
310
+
311
+ await asyncio.sleep(0.1)
312
+
313
+ # Circuit should open after threshold
314
+ failures = [r for r in results if r[0] == "failure"]
315
+ assert len(failures) >= 3
316
+
317
+ # Later requests should fail fast
318
+ assert any("Circuit breaker open" in r[1] for r in failures[3:])
319
+
320
+ @pytest.mark.asyncio
321
+ @pytest.mark.integration
322
+ async def test_agent_performance_monitoring(self, orchestrator, orchestration_context):
323
+ """Test agent performance monitoring and optimization."""
324
+
325
+ agents = [
326
+ ZumbiAgent(),
327
+ AnitaAgent(),
328
+ MariaQuiteriaAgent(),
329
+ BonifacioAgent()
330
+ ]
331
+
332
+ # Execute multiple requests and monitor performance
333
+ performance_stats = {}
334
+
335
+ for agent in agents:
336
+ stats = {
337
+ "response_times": [],
338
+ "success_rate": 0,
339
+ "total_requests": 10
340
+ }
341
+
342
+ success_count = 0
343
+ for i in range(stats["total_requests"]):
344
+ message = AgentMessage(
345
+ type="test_performance",
346
+ data={"iteration": i},
347
+ sender="performance_monitor",
348
+ metadata={}
349
+ )
350
+
351
+ start_time = datetime.utcnow()
352
+ try:
353
+ response = await agent.process(message, orchestration_context)
354
+ if response.success:
355
+ success_count += 1
356
+ except:
357
+ pass
358
+
359
+ elapsed = (datetime.utcnow() - start_time).total_seconds()
360
+ stats["response_times"].append(elapsed)
361
+
362
+ stats["success_rate"] = success_count / stats["total_requests"]
363
+ stats["avg_response_time"] = np.mean(stats["response_times"])
364
+ stats["p95_response_time"] = np.percentile(stats["response_times"], 95)
365
+
366
+ performance_stats[agent.name] = stats
367
+
368
+ # Verify performance metrics
369
+ for agent_name, stats in performance_stats.items():
370
+ assert stats["success_rate"] >= 0.9 # 90% success rate
371
+ assert stats["avg_response_time"] < 5 # Under 5 seconds average
372
+ assert stats["p95_response_time"] < 10 # P95 under 10 seconds
373
+
374
+ @pytest.mark.asyncio
375
+ @pytest.mark.integration
376
+ async def test_distributed_transaction_pattern(self, orchestrator, orchestration_context):
377
+ """Test distributed transaction pattern with compensation."""
378
+
379
+ # Define transaction steps
380
+ transaction_steps = [
381
+ {
382
+ "agent": ZumbiAgent(),
383
+ "action": "reserve_analysis_slot",
384
+ "compensation": "release_analysis_slot"
385
+ },
386
+ {
387
+ "agent": MariaQuiteriaAgent(),
388
+ "action": "allocate_security_resources",
389
+ "compensation": "deallocate_security_resources"
390
+ },
391
+ {
392
+ "agent": BonifacioAgent(),
393
+ "action": "lock_policy_review",
394
+ "compensation": "unlock_policy_review"
395
+ }
396
+ ]
397
+
398
+ completed_steps = []
399
+
400
+ try:
401
+ # Execute transaction steps
402
+ for step in transaction_steps:
403
+ message = AgentMessage(
404
+ type=step["action"],
405
+ data={"transaction_id": "tx-123"},
406
+ sender="transaction_manager",
407
+ metadata={"transaction": True}
408
+ )
409
+
410
+ response = await step["agent"].process(message, orchestration_context)
411
+
412
+ if not response.success:
413
+ raise Exception(f"Transaction step failed: {step['action']}")
414
+
415
+ completed_steps.append(step)
416
+
417
+ # Simulate failure on third step
418
+ if len(completed_steps) == 2:
419
+ raise Exception("Simulated transaction failure")
420
+
421
+ # Commit transaction (not reached in this test)
422
+ await orchestrator.commit_transaction("tx-123")
423
+
424
+ except Exception as e:
425
+ # Compensate completed steps in reverse order
426
+ for step in reversed(completed_steps):
427
+ compensation_message = AgentMessage(
428
+ type=step["compensation"],
429
+ data={"transaction_id": "tx-123"},
430
+ sender="transaction_manager",
431
+ metadata={"compensation": True}
432
+ )
433
+
434
+ await step["agent"].process(compensation_message, orchestration_context)
435
+
436
+ # Verify compensation occurred
437
+ assert len(completed_steps) == 2 # Two steps completed before failure
438
+
439
+
440
+ class TestOrchestrationPatterns:
441
+ """Test specific orchestration patterns."""
442
+
443
+ @pytest.mark.asyncio
444
+ @pytest.mark.integration
445
+ async def test_saga_pattern(self, orchestrator, orchestration_context):
446
+ """Test saga pattern for long-running transactions."""
447
+
448
+ saga_definition = {
449
+ "name": "investigation_saga",
450
+ "steps": [
451
+ {"service": "anomaly_detection", "agent": "zumbi"},
452
+ {"service": "pattern_analysis", "agent": "anita"},
453
+ {"service": "security_check", "agent": "maria_quiteria"},
454
+ {"service": "policy_review", "agent": "bonifacio"},
455
+ {"service": "report_generation", "agent": "tiradentes"}
456
+ ]
457
+ }
458
+
459
+ saga_state = await orchestrator.start_saga(
460
+ saga_definition,
461
+ {"investigation_id": "saga-123"},
462
+ orchestration_context
463
+ )
464
+
465
+ # Process saga steps
466
+ while not saga_state["completed"]:
467
+ next_step = saga_state["current_step"]
468
+ if next_step >= len(saga_definition["steps"]):
469
+ break
470
+
471
+ step = saga_definition["steps"][next_step]
472
+ saga_state = await orchestrator.execute_saga_step(
473
+ saga_state,
474
+ step,
475
+ orchestration_context
476
+ )
477
+
478
+ # Verify saga completed
479
+ assert saga_state["completed"] is True
480
+ assert len(saga_state["completed_steps"]) == len(saga_definition["steps"])
481
+
482
+ @pytest.mark.asyncio
483
+ @pytest.mark.integration
484
+ async def test_event_driven_choreography(self, orchestrator, orchestration_context):
485
+ """Test event-driven agent choreography."""
486
+
487
+ # Setup event bus
488
+ event_bus = orchestrator.get_event_bus()
489
+
490
+ # Register agent event handlers
491
+ agents_triggered = []
492
+
493
+ async def on_anomaly_detected(event):
494
+ agents_triggered.append("security_audit")
495
+ await event_bus.emit("security_audit_required", event.data)
496
+
497
+ async def on_security_audit_required(event):
498
+ agents_triggered.append("policy_review")
499
+ await event_bus.emit("policy_review_required", event.data)
500
+
501
+ async def on_policy_review_required(event):
502
+ agents_triggered.append("report_generation")
503
+ await event_bus.emit("report_ready", event.data)
504
+
505
+ event_bus.on("anomaly_detected", on_anomaly_detected)
506
+ event_bus.on("security_audit_required", on_security_audit_required)
507
+ event_bus.on("policy_review_required", on_policy_review_required)
508
+
509
+ # Trigger initial event
510
+ await event_bus.emit("anomaly_detected", {
511
+ "severity": "high",
512
+ "contract_id": "test-123"
513
+ })
514
+
515
+ # Allow events to propagate
516
+ await asyncio.sleep(0.5)
517
+
518
+ # Verify choreography executed
519
+ assert "security_audit" in agents_triggered
520
+ assert "policy_review" in agents_triggered
521
+ assert "report_generation" in agents_triggered
522
+ assert len(agents_triggered) == 3
tests/performance/test_agent_performance.py ADDED
@@ -0,0 +1,495 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Performance tests for agent system.
3
+ Tests agent response times, throughput, and resource usage.
4
+ """
5
+
6
+ import pytest
7
+ import asyncio
8
+ import time
9
+ import psutil
10
+ import statistics
11
+ from datetime import datetime
12
+ from concurrent.futures import ThreadPoolExecutor
13
+ from unittest.mock import patch
14
+ import numpy as np
15
+
16
+ from src.agents import (
17
+ ZumbiAgent,
18
+ AnitaAgent,
19
+ TiradentesAgent,
20
+ BonifacioAgent,
21
+ MariaQuiteriaAgent,
22
+ AgentContext,
23
+ AgentMessage
24
+ )
25
+ from src.services.agent_pool import AgentPool
26
+
27
+
28
+ class TestAgentPerformance:
29
+ """Performance tests for individual agents."""
30
+
31
+ @pytest.mark.performance
32
+ @pytest.mark.asyncio
33
+ async def test_agent_response_times(self):
34
+ """Test response times for all agents."""
35
+ agents = [
36
+ ("zumbi", ZumbiAgent()),
37
+ ("anita", AnitaAgent()),
38
+ ("tiradentes", TiradentesAgent()),
39
+ ("bonifacio", BonifacioAgent()),
40
+ ("maria_quiteria", MariaQuiteriaAgent())
41
+ ]
42
+
43
+ context = AgentContext(
44
+ investigation_id="perf-test",
45
+ user_id="perf-tester",
46
+ session_id="perf-session"
47
+ )
48
+
49
+ results = {}
50
+
51
+ for agent_name, agent in agents:
52
+ response_times = []
53
+
54
+ # Warm up
55
+ message = AgentMessage(
56
+ type="test",
57
+ data={"test": True},
58
+ sender="performance_tester",
59
+ metadata={}
60
+ )
61
+ await agent.process(message, context)
62
+
63
+ # Measure response times
64
+ for i in range(20):
65
+ start = time.time()
66
+
67
+ message = AgentMessage(
68
+ type="analyze",
69
+ data={
70
+ "iteration": i,
71
+ "data": {"value": np.random.randint(1000, 1000000)}
72
+ },
73
+ sender="performance_tester",
74
+ metadata={"test_run": i}
75
+ )
76
+
77
+ response = await agent.process(message, context)
78
+
79
+ end = time.time()
80
+ response_time = (end - start) * 1000 # Convert to milliseconds
81
+ response_times.append(response_time)
82
+
83
+ results[agent_name] = {
84
+ "mean": statistics.mean(response_times),
85
+ "median": statistics.median(response_times),
86
+ "p95": np.percentile(response_times, 95),
87
+ "p99": np.percentile(response_times, 99),
88
+ "min": min(response_times),
89
+ "max": max(response_times)
90
+ }
91
+
92
+ # Verify performance targets
93
+ for agent_name, metrics in results.items():
94
+ assert metrics["mean"] < 2000 # Mean under 2 seconds
95
+ assert metrics["p95"] < 3000 # P95 under 3 seconds
96
+ assert metrics["p99"] < 5000 # P99 under 5 seconds
97
+
98
+ print(f"\n{agent_name} Performance:")
99
+ print(f" Mean: {metrics['mean']:.2f}ms")
100
+ print(f" P95: {metrics['p95']:.2f}ms")
101
+ print(f" P99: {metrics['p99']:.2f}ms")
102
+
103
+ @pytest.mark.performance
104
+ @pytest.mark.asyncio
105
+ async def test_concurrent_agent_execution(self):
106
+ """Test agent performance under concurrent load."""
107
+ agent = ZumbiAgent()
108
+ context = AgentContext(
109
+ investigation_id="concurrent-test",
110
+ user_id="concurrent-tester",
111
+ session_id="concurrent-session"
112
+ )
113
+
114
+ async def process_request(request_id):
115
+ message = AgentMessage(
116
+ type="analyze",
117
+ data={
118
+ "request_id": request_id,
119
+ "contract_value": np.random.randint(100000, 10000000)
120
+ },
121
+ sender="load_tester",
122
+ metadata={"concurrent": True}
123
+ )
124
+
125
+ start = time.time()
126
+ response = await agent.process(message, context)
127
+ elapsed = time.time() - start
128
+
129
+ return {
130
+ "request_id": request_id,
131
+ "success": response.success,
132
+ "response_time": elapsed
133
+ }
134
+
135
+ # Test with different concurrency levels
136
+ concurrency_levels = [1, 5, 10, 20]
137
+ results = {}
138
+
139
+ for concurrency in concurrency_levels:
140
+ tasks = [
141
+ process_request(f"req-{i}")
142
+ for i in range(concurrency * 10)
143
+ ]
144
+
145
+ start_time = time.time()
146
+ responses = await asyncio.gather(*tasks)
147
+ total_time = time.time() - start_time
148
+
149
+ success_rate = sum(1 for r in responses if r["success"]) / len(responses)
150
+ avg_response_time = statistics.mean(r["response_time"] for r in responses)
151
+ throughput = len(responses) / total_time
152
+
153
+ results[concurrency] = {
154
+ "success_rate": success_rate,
155
+ "avg_response_time": avg_response_time,
156
+ "throughput": throughput,
157
+ "total_requests": len(responses)
158
+ }
159
+
160
+ # Verify performance doesn't degrade significantly
161
+ for concurrency, metrics in results.items():
162
+ assert metrics["success_rate"] >= 0.95 # 95% success rate
163
+ assert metrics["avg_response_time"] < 5 # Under 5 seconds
164
+
165
+ print(f"\nConcurrency {concurrency}:")
166
+ print(f" Success Rate: {metrics['success_rate']:.2%}")
167
+ print(f" Avg Response: {metrics['avg_response_time']:.2f}s")
168
+ print(f" Throughput: {metrics['throughput']:.2f} req/s")
169
+
170
+ @pytest.mark.performance
171
+ @pytest.mark.asyncio
172
+ async def test_agent_pool_performance(self):
173
+ """Test agent pool initialization and management performance."""
174
+ pool = AgentPool(
175
+ min_instances=1,
176
+ max_instances=10,
177
+ idle_timeout=60
178
+ )
179
+
180
+ await pool.initialize()
181
+
182
+ # Measure pool scaling performance
183
+ scaling_times = []
184
+
185
+ for i in range(5):
186
+ start = time.time()
187
+
188
+ # Request multiple agents to trigger scaling
189
+ agents = await asyncio.gather(*[
190
+ pool.get_agent("zumbi") for _ in range(5)
191
+ ])
192
+
193
+ scaling_time = time.time() - start
194
+ scaling_times.append(scaling_time)
195
+
196
+ # Return agents to pool
197
+ for agent in agents:
198
+ await pool.return_agent(agent)
199
+
200
+ # Verify pool performance
201
+ avg_scaling_time = statistics.mean(scaling_times)
202
+ assert avg_scaling_time < 1.0 # Scaling should be fast
203
+
204
+ stats = await pool.get_stats()
205
+ assert stats["total_instances"] <= 10 # Respects max instances
206
+ assert stats["cache_hit_rate"] > 0.5 # Good cache utilization
207
+
208
+ @pytest.mark.performance
209
+ @pytest.mark.asyncio
210
+ async def test_memory_usage_under_load(self):
211
+ """Test memory usage patterns under sustained load."""
212
+ process = psutil.Process()
213
+ initial_memory = process.memory_info().rss / 1024 / 1024 # MB
214
+
215
+ agents = [
216
+ ZumbiAgent(),
217
+ MariaQuiteriaAgent(),
218
+ BonifacioAgent()
219
+ ]
220
+
221
+ context = AgentContext(
222
+ investigation_id="memory-test",
223
+ user_id="memory-tester",
224
+ session_id="memory-session"
225
+ )
226
+
227
+ # Generate sustained load
228
+ memory_samples = []
229
+
230
+ for iteration in range(10):
231
+ # Process batch of requests
232
+ tasks = []
233
+ for agent in agents:
234
+ for i in range(20):
235
+ message = AgentMessage(
236
+ type="analyze",
237
+ data={
238
+ "iteration": iteration,
239
+ "request": i,
240
+ "large_data": "x" * 10000 # 10KB payload
241
+ },
242
+ sender="memory_tester",
243
+ metadata={}
244
+ )
245
+ tasks.append(agent.process(message, context))
246
+
247
+ await asyncio.gather(*tasks)
248
+
249
+ # Sample memory usage
250
+ current_memory = process.memory_info().rss / 1024 / 1024 # MB
251
+ memory_samples.append(current_memory)
252
+
253
+ # Allow garbage collection
254
+ await asyncio.sleep(0.1)
255
+
256
+ # Analyze memory usage
257
+ memory_increase = max(memory_samples) - initial_memory
258
+ memory_variance = statistics.variance(memory_samples)
259
+
260
+ # Verify no significant memory leaks
261
+ assert memory_increase < 500 # Less than 500MB increase
262
+ assert memory_variance < 10000 # Stable memory usage
263
+
264
+ print(f"\nMemory Usage:")
265
+ print(f" Initial: {initial_memory:.2f}MB")
266
+ print(f" Peak: {max(memory_samples):.2f}MB")
267
+ print(f" Increase: {memory_increase:.2f}MB")
268
+
269
+ @pytest.mark.performance
270
+ @pytest.mark.asyncio
271
+ async def test_agent_startup_times(self):
272
+ """Test agent initialization and startup times."""
273
+ agent_classes = [
274
+ ("zumbi", ZumbiAgent),
275
+ ("anita", AnitaAgent),
276
+ ("tiradentes", TiradentesAgent),
277
+ ("bonifacio", BonifacioAgent),
278
+ ("maria_quiteria", MariaQuiteriaAgent)
279
+ ]
280
+
281
+ results = {}
282
+
283
+ for agent_name, agent_class in agent_classes:
284
+ startup_times = []
285
+
286
+ for i in range(10):
287
+ start = time.time()
288
+ agent = agent_class()
289
+ if hasattr(agent, 'initialize'):
290
+ await agent.initialize()
291
+ startup_time = (time.time() - start) * 1000 # ms
292
+ startup_times.append(startup_time)
293
+
294
+ results[agent_name] = {
295
+ "mean": statistics.mean(startup_times),
296
+ "max": max(startup_times),
297
+ "min": min(startup_times)
298
+ }
299
+
300
+ # Verify fast startup
301
+ for agent_name, metrics in results.items():
302
+ assert metrics["mean"] < 100 # Under 100ms average
303
+ assert metrics["max"] < 200 # Under 200ms worst case
304
+
305
+ print(f"\n{agent_name} Startup:")
306
+ print(f" Mean: {metrics['mean']:.2f}ms")
307
+ print(f" Max: {metrics['max']:.2f}ms")
308
+
309
+ @pytest.mark.performance
310
+ @pytest.mark.asyncio
311
+ async def test_agent_throughput_limits(self):
312
+ """Test maximum throughput for each agent."""
313
+ agents = [
314
+ ("zumbi", ZumbiAgent()),
315
+ ("maria_quiteria", MariaQuiteriaAgent()),
316
+ ("bonifacio", BonifacioAgent())
317
+ ]
318
+
319
+ context = AgentContext(
320
+ investigation_id="throughput-test",
321
+ user_id="throughput-tester",
322
+ session_id="throughput-session"
323
+ )
324
+
325
+ results = {}
326
+
327
+ for agent_name, agent in agents:
328
+ # Test duration
329
+ test_duration = 10 # seconds
330
+ request_count = 0
331
+ error_count = 0
332
+
333
+ start_time = time.time()
334
+
335
+ while time.time() - start_time < test_duration:
336
+ message = AgentMessage(
337
+ type="analyze",
338
+ data={"request": request_count},
339
+ sender="throughput_tester",
340
+ metadata={}
341
+ )
342
+
343
+ try:
344
+ response = await agent.process(message, context)
345
+ if not response.success:
346
+ error_count += 1
347
+ except:
348
+ error_count += 1
349
+
350
+ request_count += 1
351
+
352
+ elapsed = time.time() - start_time
353
+ throughput = request_count / elapsed
354
+ error_rate = error_count / request_count if request_count > 0 else 0
355
+
356
+ results[agent_name] = {
357
+ "throughput": throughput,
358
+ "total_requests": request_count,
359
+ "error_rate": error_rate
360
+ }
361
+
362
+ # Verify minimum throughput
363
+ for agent_name, metrics in results.items():
364
+ assert metrics["throughput"] >= 10 # At least 10 req/s
365
+ assert metrics["error_rate"] < 0.01 # Less than 1% errors
366
+
367
+ print(f"\n{agent_name} Throughput:")
368
+ print(f" Rate: {metrics['throughput']:.2f} req/s")
369
+ print(f" Total: {metrics['total_requests']}")
370
+ print(f" Errors: {metrics['error_rate']:.2%}")
371
+
372
+
373
+ class TestMultiAgentPerformance:
374
+ """Performance tests for multi-agent scenarios."""
375
+
376
+ @pytest.mark.performance
377
+ @pytest.mark.asyncio
378
+ async def test_multi_agent_pipeline_performance(self):
379
+ """Test performance of multi-agent processing pipeline."""
380
+ # Create pipeline
381
+ pipeline = [
382
+ ZumbiAgent(),
383
+ AnitaAgent(),
384
+ TiradentesAgent()
385
+ ]
386
+
387
+ context = AgentContext(
388
+ investigation_id="pipeline-test",
389
+ user_id="pipeline-tester",
390
+ session_id="pipeline-session"
391
+ )
392
+
393
+ # Test different data sizes
394
+ data_sizes = [1, 10, 100] # KB
395
+ results = {}
396
+
397
+ for size_kb in data_sizes:
398
+ processing_times = []
399
+
400
+ for i in range(20):
401
+ # Create data payload
402
+ data = {
403
+ "iteration": i,
404
+ "payload": "x" * (size_kb * 1024),
405
+ "results": {}
406
+ }
407
+
408
+ start = time.time()
409
+
410
+ # Process through pipeline
411
+ for agent in pipeline:
412
+ message = AgentMessage(
413
+ type="process",
414
+ data=data,
415
+ sender="pipeline",
416
+ metadata={"stage": agent.name}
417
+ )
418
+
419
+ response = await agent.process(message, context)
420
+ data["results"][agent.name] = response.data
421
+
422
+ elapsed = time.time() - start
423
+ processing_times.append(elapsed)
424
+
425
+ results[f"{size_kb}KB"] = {
426
+ "mean": statistics.mean(processing_times),
427
+ "p95": np.percentile(processing_times, 95),
428
+ "throughput": 1 / statistics.mean(processing_times)
429
+ }
430
+
431
+ # Verify performance scales reasonably
432
+ for size, metrics in results.items():
433
+ print(f"\nPipeline Performance ({size}):")
434
+ print(f" Mean: {metrics['mean']:.3f}s")
435
+ print(f" P95: {metrics['p95']:.3f}s")
436
+ print(f" Throughput: {metrics['throughput']:.2f} ops/s")
437
+
438
+ @pytest.mark.performance
439
+ @pytest.mark.asyncio
440
+ async def test_agent_orchestration_overhead(self):
441
+ """Test overhead of agent orchestration layer."""
442
+ direct_times = []
443
+ orchestrated_times = []
444
+
445
+ agent = ZumbiAgent()
446
+ context = AgentContext(
447
+ investigation_id="overhead-test",
448
+ user_id="overhead-tester",
449
+ session_id="overhead-session"
450
+ )
451
+
452
+ # Direct agent calls
453
+ for i in range(50):
454
+ message = AgentMessage(
455
+ type="analyze",
456
+ data={"test": i},
457
+ sender="direct",
458
+ metadata={}
459
+ )
460
+
461
+ start = time.time()
462
+ await agent.process(message, context)
463
+ direct_times.append(time.time() - start)
464
+
465
+ # Orchestrated calls (with mock orchestrator overhead)
466
+ for i in range(50):
467
+ message = AgentMessage(
468
+ type="analyze",
469
+ data={"test": i},
470
+ sender="orchestrated",
471
+ metadata={}
472
+ )
473
+
474
+ start = time.time()
475
+
476
+ # Simulate orchestration overhead
477
+ await asyncio.sleep(0.001) # 1ms overhead
478
+ await agent.process(message, context)
479
+ await asyncio.sleep(0.001) # Post-processing
480
+
481
+ orchestrated_times.append(time.time() - start)
482
+
483
+ # Calculate overhead
484
+ direct_avg = statistics.mean(direct_times)
485
+ orchestrated_avg = statistics.mean(orchestrated_times)
486
+ overhead = orchestrated_avg - direct_avg
487
+ overhead_percentage = (overhead / direct_avg) * 100
488
+
489
+ # Verify acceptable overhead
490
+ assert overhead_percentage < 10 # Less than 10% overhead
491
+
492
+ print(f"\nOrchestration Overhead:")
493
+ print(f" Direct: {direct_avg*1000:.2f}ms")
494
+ print(f" Orchestrated: {orchestrated_avg*1000:.2f}ms")
495
+ print(f" Overhead: {overhead*1000:.2f}ms ({overhead_percentage:.1f}%)")