cidadao.ai-backend / src /api /middleware /streaming_compression.py
anderson-ufrj
feat(performance): implement advanced compression and connection pooling
190953c
raw
history blame
7.24 kB
"""
Module: api.middleware.streaming_compression
Description: Compression middleware for streaming responses (SSE, WebSocket)
Author: Anderson H. Silva
Date: 2025-01-25
License: Proprietary - All rights reserved
"""
import gzip
import asyncio
from typing import AsyncIterator, Optional
from io import BytesIO
from starlette.types import ASGIApp, Message, Receive, Scope, Send
from starlette.responses import StreamingResponse
from src.core import get_logger
logger = get_logger(__name__)
class GzipStream:
"""Streaming gzip compressor."""
def __init__(self, level: int = 6):
self.level = level
self._buffer = BytesIO()
self._gzip = gzip.GzipFile(
fileobj=self._buffer,
mode='wb',
compresslevel=level
)
def compress(self, data: bytes) -> bytes:
"""Compress chunk of data."""
self._gzip.write(data)
self._gzip.flush()
# Get compressed data
self._buffer.seek(0)
compressed = self._buffer.read()
# Reset buffer
self._buffer.seek(0)
self._buffer.truncate()
return compressed
def close(self) -> bytes:
"""Finish compression and get final data."""
self._gzip.close()
self._buffer.seek(0)
return self._buffer.read()
class StreamingCompressionMiddleware:
"""
Middleware for compressing streaming responses.
Handles:
- Server-Sent Events (SSE)
- Large file downloads
- Chunked responses
"""
def __init__(
self,
app: ASGIApp,
minimum_size: int = 256,
compression_level: int = 6,
chunk_size: int = 8192
):
self.app = app
self.minimum_size = minimum_size
self.compression_level = compression_level
self.chunk_size = chunk_size
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
# Check accept-encoding
headers = dict(scope.get("headers", []))
accept_encoding = headers.get(b"accept-encoding", b"").decode().lower()
if "gzip" not in accept_encoding:
await self.app(scope, receive, send)
return
# Intercept send
compressor = None
content_type = None
should_compress = False
async def wrapped_send(message: Message) -> None:
nonlocal compressor, content_type, should_compress
if message["type"] == "http.response.start":
# Check content type
headers_dict = dict(message.get("headers", []))
content_type = headers_dict.get(b"content-type", b"").decode()
# Determine if we should compress
if self._should_compress_stream(content_type):
should_compress = True
compressor = GzipStream(self.compression_level)
# Update headers
new_headers = []
for name, value in message.get("headers", []):
# Skip content-length for streaming
if name.lower() not in (b"content-length", b"content-encoding"):
new_headers.append((name, value))
new_headers.extend([
(b"content-encoding", b"gzip"),
(b"vary", b"Accept-Encoding")
])
message["headers"] = new_headers
logger.debug(
"streaming_compression_enabled",
content_type=content_type
)
elif message["type"] == "http.response.body" and should_compress:
body = message.get("body", b"")
more_body = message.get("more_body", False)
if body:
# Compress chunk
compressed = compressor.compress(body)
message["body"] = compressed
if not more_body and compressor:
# Final chunk - close compressor
final_data = compressor.close()
if final_data:
# Send final compressed data
await send({
"type": "http.response.body",
"body": final_data,
"more_body": True
})
compressor = None
await send(message)
await self.app(scope, receive, wrapped_send)
def _should_compress_stream(self, content_type: str) -> bool:
"""Check if streaming content should be compressed."""
content_type = content_type.lower()
# Always compress SSE
if "text/event-stream" in content_type:
return True
# Compress JSON streams
if "application/json" in content_type and "stream" in content_type:
return True
# Compress text streams
if content_type.startswith("text/") and "stream" in content_type:
return True
# Compress CSV exports
if "text/csv" in content_type:
return True
# Compress NDJSON (newline-delimited JSON)
if "application/x-ndjson" in content_type:
return True
return False
async def compress_streaming_response(
response_iterator: AsyncIterator[str],
content_type: str = "text/plain",
encoding: str = "gzip"
) -> StreamingResponse:
"""
Create a compressed streaming response.
Args:
response_iterator: Async iterator yielding response chunks
content_type: Content type of response
encoding: Compression encoding (only gzip supported currently)
Returns:
StreamingResponse with compression
"""
async def compressed_iterator():
compressor = GzipStream()
try:
async for chunk in response_iterator:
if isinstance(chunk, str):
chunk = chunk.encode('utf-8')
compressed = compressor.compress(chunk)
if compressed:
yield compressed
# Yield final compressed data
final = compressor.close()
if final:
yield final
except Exception as e:
logger.error(
"streaming_compression_error",
error=str(e),
exc_info=True
)
raise
headers = {
"Content-Type": content_type,
"Content-Encoding": encoding,
"Vary": "Accept-Encoding"
}
return StreamingResponse(
compressed_iterator(),
headers=headers
)