|
|
""" |
|
|
Telemetry stub for HuggingFace deployment with minimal dependencies. |
|
|
Provides mock implementations when full OpenTelemetry is not available. |
|
|
""" |
|
|
|
|
|
import functools |
|
|
from typing import Any, Callable, Optional |
|
|
|
|
|
|
|
|
class MockTracer: |
|
|
"""Mock tracer for when OpenTelemetry is not available.""" |
|
|
|
|
|
def start_as_current_span(self, name: str, **kwargs): |
|
|
"""Mock span context manager.""" |
|
|
class MockSpan: |
|
|
def __enter__(self): |
|
|
return self |
|
|
def __exit__(self, *args): |
|
|
pass |
|
|
def set_attribute(self, key: str, value: Any): |
|
|
pass |
|
|
def set_status(self, status: Any): |
|
|
pass |
|
|
return MockSpan() |
|
|
|
|
|
|
|
|
class MockMeter: |
|
|
"""Mock meter for when OpenTelemetry is not available.""" |
|
|
|
|
|
def create_counter(self, name: str, **kwargs): |
|
|
"""Mock counter.""" |
|
|
class MockCounter: |
|
|
def add(self, amount: int = 1, attributes: Optional[dict] = None): |
|
|
pass |
|
|
return MockCounter() |
|
|
|
|
|
def create_histogram(self, name: str, **kwargs): |
|
|
"""Mock histogram.""" |
|
|
class MockHistogram: |
|
|
def record(self, amount: float, attributes: Optional[dict] = None): |
|
|
pass |
|
|
return MockHistogram() |
|
|
|
|
|
def create_up_down_counter(self, name: str, **kwargs): |
|
|
"""Mock up-down counter.""" |
|
|
class MockUpDownCounter: |
|
|
def add(self, amount: int = 1, attributes: Optional[dict] = None): |
|
|
pass |
|
|
return MockUpDownCounter() |
|
|
|
|
|
|
|
|
|
|
|
mock_tracer = MockTracer() |
|
|
mock_meter = MockMeter() |
|
|
|
|
|
|
|
|
def trace_method(span_name: Optional[str] = None) -> Callable: |
|
|
"""Decorator for tracing methods - no-op version.""" |
|
|
def decorator(func: Callable) -> Callable: |
|
|
@functools.wraps(func) |
|
|
async def async_wrapper(*args, **kwargs): |
|
|
return await func(*args, **kwargs) |
|
|
|
|
|
@functools.wraps(func) |
|
|
def sync_wrapper(*args, **kwargs): |
|
|
return func(*args, **kwargs) |
|
|
|
|
|
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper |
|
|
return decorator |
|
|
|
|
|
|
|
|
import asyncio |