All checks were successful
Build and Push OCI GenAI Gateway Docker Image / docker-build-push (push) Successful in 34s
157 lines
5.7 KiB
Python
157 lines
5.7 KiB
Python
"""
|
|
Logging middleware for request/response debugging.
|
|
"""
|
|
import json
|
|
import logging
|
|
import time
|
|
from typing import Callable
|
|
from fastapi import Request, Response
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.responses import StreamingResponse
|
|
|
|
from core.config import get_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LoggingMiddleware(BaseHTTPMiddleware):
|
|
"""
|
|
Middleware to log detailed request and response information.
|
|
Activated when LOG_REQUESTS or LOG_RESPONSES is enabled.
|
|
"""
|
|
|
|
async def dispatch(self, request: Request, call_next: Callable) -> Response:
|
|
"""Process request and log details."""
|
|
settings = get_settings()
|
|
|
|
# Generate request ID for tracking
|
|
request_id = f"req_{int(time.time() * 1000)}"
|
|
|
|
# Log request details
|
|
if settings.log_requests or settings.debug:
|
|
await self._log_request(request, request_id)
|
|
|
|
# Record start time
|
|
start_time = time.time()
|
|
|
|
# Process request
|
|
response = await call_next(request)
|
|
|
|
# Calculate processing time
|
|
process_time = time.time() - start_time
|
|
|
|
# Log response details (only for non-streaming responses)
|
|
if (settings.log_responses or settings.debug) and not isinstance(response, StreamingResponse):
|
|
await self._log_response(response, request_id, process_time)
|
|
elif isinstance(response, StreamingResponse):
|
|
logger.debug(f"[{request_id}] Response: Streaming response (not logged)")
|
|
|
|
return response
|
|
|
|
async def _log_request(self, request: Request, request_id: str):
|
|
"""Log detailed request information."""
|
|
try:
|
|
# Basic request info
|
|
logger.debug("=" * 80)
|
|
logger.debug(f"[{request_id}] 📨 INCOMING REQUEST")
|
|
logger.debug("=" * 80)
|
|
logger.debug(f"Method: {request.method}")
|
|
logger.debug(f"URL: {request.url.path}")
|
|
logger.debug(f"Query Params: {dict(request.query_params)}")
|
|
|
|
# Headers (filter sensitive data)
|
|
headers = dict(request.headers)
|
|
if "authorization" in headers:
|
|
headers["authorization"] = "Bearer ***"
|
|
logger.debug(f"Headers: {json.dumps(headers, indent=2)}")
|
|
|
|
# Request body
|
|
if request.method in ["POST", "PUT", "PATCH"]:
|
|
body = await self._read_body(request)
|
|
if body:
|
|
try:
|
|
# Try to parse and pretty-print JSON
|
|
body_json = json.loads(body)
|
|
logger.debug(f"Request Body:\n{json.dumps(body_json, indent=2, ensure_ascii=False)}")
|
|
except json.JSONDecodeError:
|
|
# Log raw body if not JSON
|
|
logger.debug(f"Request Body (raw): {body[:1000]}...") # Limit to 1000 chars
|
|
|
|
logger.debug("=" * 80)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error logging request: {e}")
|
|
|
|
async def _log_response(self, response: Response, request_id: str, process_time: float):
|
|
"""Log detailed response information."""
|
|
try:
|
|
logger.debug("=" * 80)
|
|
logger.debug(f"[{request_id}] 📤 OUTGOING RESPONSE")
|
|
logger.debug("=" * 80)
|
|
logger.debug(f"Status Code: {response.status_code}")
|
|
logger.debug(f"Processing Time: {process_time:.3f}s")
|
|
|
|
# Headers
|
|
headers = dict(response.headers)
|
|
logger.debug(f"Headers: {json.dumps(headers, indent=2)}")
|
|
|
|
# Response body
|
|
body = b""
|
|
async for chunk in response.body_iterator:
|
|
body += chunk
|
|
|
|
# Recreate response body iterator
|
|
response.body_iterator = self._create_body_iterator(body)
|
|
|
|
if body:
|
|
try:
|
|
# Try to parse and pretty-print JSON
|
|
body_json = json.loads(body.decode())
|
|
logger.debug(f"Response Body:\n{json.dumps(body_json, indent=2, ensure_ascii=False)}")
|
|
except (json.JSONDecodeError, UnicodeDecodeError):
|
|
# Log raw body if not JSON
|
|
logger.debug(f"Response Body (raw): {body[:1000]}...") # Limit to 1000 chars
|
|
|
|
logger.debug("=" * 80)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error logging response: {e}")
|
|
|
|
async def _read_body(self, request: Request) -> str:
|
|
"""Read request body without consuming it."""
|
|
try:
|
|
body = await request.body()
|
|
|
|
# Create new receive callable to preserve body for downstream handlers
|
|
async def receive():
|
|
return {"type": "http.request", "body": body}
|
|
|
|
# Replace request's receive
|
|
request._receive = receive
|
|
|
|
return body.decode()
|
|
except Exception as e:
|
|
logger.error(f"Error reading request body: {e}")
|
|
return ""
|
|
|
|
def _create_body_iterator(self, body: bytes):
|
|
"""Create an async iterator for response body."""
|
|
async def iterator():
|
|
yield body
|
|
return iterator()
|
|
|
|
|
|
def setup_logging_middleware(app):
|
|
"""
|
|
Add logging middleware to FastAPI app.
|
|
Only active when LOG_REQUESTS or LOG_RESPONSES is enabled.
|
|
"""
|
|
settings = get_settings()
|
|
if settings.log_requests or settings.log_responses or settings.debug:
|
|
app.add_middleware(LoggingMiddleware)
|
|
logger.info("🔍 Request/Response logging middleware enabled")
|
|
if settings.log_requests or settings.debug:
|
|
logger.info(" - Request logging: ON")
|
|
if settings.log_responses or settings.debug:
|
|
logger.info(" - Response logging: ON")
|