File size: 7,397 Bytes
190953c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
"""Tests for compression middleware and service."""
import pytest
import gzip
import json
from fastapi import FastAPI, Response
from fastapi.responses import StreamingResponse
from httpx import AsyncClient
import asyncio
from src.services.compression_service import CompressionService, CompressionAlgorithm
from src.api.middleware.compression import CompressionMiddleware
from src.api.middleware.streaming_compression import compress_streaming_response
class TestCompressionService:
"""Test compression service."""
@pytest.fixture
def compression_service(self):
"""Create compression service instance."""
return CompressionService()
def test_compress_gzip(self, compression_service):
"""Test gzip compression."""
data = b"Hello World! " * 100 # Repeat to ensure compression
compressed, encoding, metrics = compression_service.compress(
data=data,
content_type="text/plain",
accept_encoding="gzip"
)
assert encoding == "gzip"
assert len(compressed) < len(data)
assert metrics["algorithm"] == CompressionAlgorithm.GZIP
assert metrics["ratio"] > 0.5 # Should achieve >50% compression
# Verify can decompress
decompressed = gzip.decompress(compressed)
assert decompressed == data
def test_compress_below_threshold(self, compression_service):
"""Test compression with data below threshold."""
data = b"Small"
compressed, encoding, metrics = compression_service.compress(
data=data,
content_type="text/plain",
accept_encoding="gzip"
)
assert encoding == "identity"
assert compressed == data
assert metrics["reason"] == "below_min_size"
def test_algorithm_selection(self, compression_service):
"""Test algorithm selection based on accept-encoding."""
data = b"Test data " * 100
# Test with multiple encodings
compressed, encoding, metrics = compression_service.compress(
data=data,
content_type="application/json",
accept_encoding="gzip, deflate, br;q=0.9"
)
# Should prefer br if available, otherwise gzip
assert encoding in ["br", "gzip"]
assert len(compressed) < len(data)
def test_content_type_profiles(self, compression_service):
"""Test different compression profiles for content types."""
data = b'{"key": "value"}' * 100
# JSON should use optimal settings
compressed, encoding, metrics = compression_service.compress(
data=data,
content_type="application/json",
accept_encoding="gzip"
)
assert encoding == "gzip"
assert metrics["ratio"] > 0.8 # JSON compresses very well
def test_metrics_tracking(self, compression_service):
"""Test metrics tracking."""
# Perform several compressions
for _ in range(5):
compression_service.compress(
data=b"Test data " * 100,
content_type="text/plain",
accept_encoding="gzip"
)
metrics = compression_service.get_metrics()
assert metrics["total_requests"] == 5
assert metrics["total_bytes_saved"] > 0
assert "text/plain" in metrics["content_types"]
assert CompressionAlgorithm.GZIP in metrics["algorithms"]
@pytest.mark.asyncio
class TestCompressionMiddleware:
"""Test compression middleware."""
@pytest.fixture
def app(self):
"""Create test FastAPI app."""
app = FastAPI()
# Add compression middleware
app.add_middleware(
CompressionMiddleware,
minimum_size=100,
gzip_level=6
)
@app.get("/text")
def get_text():
return Response(
content="Hello World! " * 50,
media_type="text/plain"
)
@app.get("/json")
def get_json():
return {"data": "value " * 50}
@app.get("/small")
def get_small():
return "Small"
@app.get("/stream")
async def get_stream():
async def generate():
for i in range(10):
yield f"Chunk {i}\n" * 10
await asyncio.sleep(0.01)
return compress_streaming_response(
generate(),
content_type="text/plain"
)
return app
async def test_text_compression(self, app):
"""Test text response compression."""
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get(
"/text",
headers={"Accept-Encoding": "gzip"}
)
assert response.status_code == 200
assert response.headers.get("content-encoding") == "gzip"
assert "vary" in response.headers
# Content should be compressed
assert len(response.content) < len("Hello World! " * 50)
async def test_json_compression(self, app):
"""Test JSON response compression."""
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get(
"/json",
headers={"Accept-Encoding": "gzip"}
)
assert response.status_code == 200
assert response.headers.get("content-encoding") == "gzip"
# Should be able to decode JSON
data = response.json()
assert "data" in data
async def test_no_compression_small(self, app):
"""Test no compression for small responses."""
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get(
"/small",
headers={"Accept-Encoding": "gzip"}
)
assert response.status_code == 200
assert response.headers.get("content-encoding") is None
assert response.text == "Small"
async def test_no_accept_encoding(self, app):
"""Test response without accept-encoding."""
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get("/text")
assert response.status_code == 200
assert response.headers.get("content-encoding") is None
async def test_streaming_compression(self, app):
"""Test streaming response compression."""
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get(
"/stream",
headers={"Accept-Encoding": "gzip"}
)
assert response.status_code == 200
assert response.headers.get("content-encoding") == "gzip"
# Decompress and verify content
content = gzip.decompress(response.content).decode()
assert "Chunk 0" in content
assert "Chunk 9" in content |