anderson-ufrj
feat(performance): implement advanced compression and connection pooling
190953c
raw
history blame
9.63 kB
"""
Advanced compression middleware for API responses with Gzip and Brotli support.
This middleware compresses responses to reduce bandwidth usage,
especially important for mobile applications and slow connections.
"""
import gzip
from typing import Callable, Optional
from io import BytesIO
from fastapi import Request, Response
from fastapi.responses import StreamingResponse
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp, Message, Receive, Scope, Send
from starlette.datastructures import MutableHeaders
from src.core import get_logger
from src.core.json_utils import dumps_bytes, loads
from src.services.compression_service import compression_service, CompressionAlgorithm
try:
import brotli
HAS_BROTLI = True
except ImportError:
HAS_BROTLI = False
brotli = None
logger = get_logger(__name__)
class CompressionMiddleware(BaseHTTPMiddleware):
"""
Middleware to compress responses using gzip.
Features:
- Automatic compression for responses > 1KB
- Respects Accept-Encoding header
- Excludes already compressed content
- Configurable compression level
"""
def __init__(
self,
app: ASGIApp,
minimum_size: int = 1024,
gzip_level: int = 6,
brotli_quality: int = 4,
brotli_mode: str = "text",
exclude_paths: Optional[set] = None
):
"""
Initialize compression middleware.
Args:
app: ASGI application
minimum_size: Minimum response size to compress (bytes)
gzip_level: Gzip compression level (1-9)
brotli_quality: Brotli quality level (0-11)
brotli_mode: Brotli mode - "text", "font", or "generic"
exclude_paths: Set of paths to exclude from compression
"""
super().__init__(app)
self.minimum_size = minimum_size
self.gzip_level = gzip_level
self.brotli_quality = brotli_quality
self.brotli_mode = brotli_mode
self.exclude_paths = exclude_paths or {'/metrics', '/health', '/health/metrics'}
# Brotli mode mapping
if HAS_BROTLI:
self.brotli_modes = {
"text": brotli.MODE_TEXT,
"font": brotli.MODE_FONT,
"generic": brotli.MODE_GENERIC,
}
# Content types to compress
self.compressible_types = {
"application/json",
"text/html",
"text/plain",
"text/css",
"text/javascript",
"application/javascript",
"application/xml",
"text/xml",
}
# Content types to never compress
self.excluded_types = {
"image/jpeg",
"image/png",
"image/gif",
"image/webp",
"video/mp4",
"application/pdf",
"application/zip",
"application/gzip",
}
async def dispatch(self, request: Request, call_next: Callable) -> Response:
"""Process request and potentially compress response."""
# Skip compression for excluded paths
if request.url.path in self.exclude_paths:
return await call_next(request)
# Check client's accepted encodings
accept_encoding = request.headers.get("accept-encoding", "").lower()
accepts_br = HAS_BROTLI and "br" in accept_encoding
accepts_gzip = "gzip" in accept_encoding
if not (accepts_br or accepts_gzip):
return await call_next(request)
# Process request
response = await call_next(request)
# Check if we should compress
if not self._should_compress(response):
return response
# Get response body
body = b""
async for chunk in response.body_iterator:
body += chunk
# Use compression service for optimal compression
compressed_body, encoding, metrics = compression_service.compress(
data=body,
content_type=response.media_type or "application/octet-stream",
accept_encoding=accept_encoding
)
# If no compression applied, return original
if encoding == "identity":
return Response(
content=body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type
)
# Log compression metrics
if metrics.get("ratio"):
logger.debug(
f"Compressed response with {encoding}: {metrics['original_size']}{metrics['compressed_size']} bytes "
f"({metrics['ratio']:.1%} reduction, {metrics.get('compression_time_ms', 0):.1f}ms)"
)
# Update headers
headers = MutableHeaders(response.headers)
headers["content-encoding"] = encoding
headers["content-length"] = str(len(compressed_body))
headers["vary"] = "Accept-Encoding"
# Optional debug headers
if logger.isEnabledFor(10): # DEBUG level
headers["x-uncompressed-size"] = str(len(body))
headers["x-compression-ratio"] = f"{compression_ratio:.1f}%"
# Remove content-length if streaming
if "transfer-encoding" in headers:
headers.pop("content-length", None)
return Response(
content=compressed_body,
status_code=response.status_code,
headers=headers,
media_type=response.media_type
)
def _should_compress(self, response: Response) -> bool:
"""Determine if response should be compressed."""
# Check if already compressed
if response.headers.get("content-encoding"):
return False
# Check content type
content_type = response.media_type or ""
base_type = content_type.split(";")[0].strip().lower()
# Skip if excluded type
if base_type in self.excluded_types:
return False
# Compress if compressible type
if base_type in self.compressible_types:
return True
# Compress text/* by default
if base_type.startswith("text/"):
return True
# Skip everything else
return False
def _compress_gzip(self, data: bytes) -> bytes:
"""Compress data using gzip."""
return gzip.compress(data, compresslevel=self.gzip_level)
def _compress_brotli(self, data: bytes) -> bytes:
"""Compress data using brotli."""
if not HAS_BROTLI:
raise RuntimeError("Brotli not available")
mode = self.brotli_modes.get(self.brotli_mode, brotli.MODE_TEXT)
return brotli.compress(data, quality=self.brotli_quality, mode=mode)
class StreamingCompressionMiddleware:
"""
Middleware for compressing streaming responses (like SSE).
"""
def __init__(self, app: ASGIApp, compression_level: int = 6):
self.app = app
self.compression_level = compression_level
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
# Check for gzip support
headers = dict(scope.get("headers", []))
accept_encoding = headers.get(b"accept-encoding", b"").decode()
if "gzip" not in accept_encoding.lower():
await self.app(scope, receive, send)
return
# Intercept send to compress streaming responses
async def compressed_send(message: Message) -> None:
if message["type"] == "http.response.start":
# Check if this is a streaming response
headers = dict(message.get("headers", []))
content_type = headers.get(b"content-type", b"").decode()
if "text/event-stream" in content_type:
# Add compression header
new_headers = []
for name, value in message.get("headers", []):
if name.lower() != b"content-length":
new_headers.append((name, value))
new_headers.append((b"content-encoding", b"gzip"))
message["headers"] = new_headers
await send(message)
await self.app(scope, receive, compressed_send)
def add_compression_middleware(
app,
minimum_size: int = 1024,
gzip_level: int = 6,
brotli_quality: int = 4,
exclude_paths: Optional[set] = None
):
"""
Add compression middleware to FastAPI app.
Args:
app: FastAPI application
minimum_size: Minimum size to compress (bytes)
gzip_level: Gzip compression level (1-9)
brotli_quality: Brotli quality (0-11)
exclude_paths: Paths to exclude from compression
"""
app.add_middleware(
CompressionMiddleware,
minimum_size=minimum_size,
gzip_level=gzip_level,
brotli_quality=brotli_quality,
exclude_paths=exclude_paths
)
logger.info(
f"Compression middleware enabled "
f"(min_size={minimum_size}, gzip_level={gzip_level}, "
f"brotli={'enabled' if HAS_BROTLI else 'disabled'})"
)