from fastapi import HTTPException from datetime import datetime, timedelta import secrets from typing import Dict, Optional class SessionManager: def __init__(self, ttl: int): self.ttl = ttl self._tokens: Dict[str, Dict[str, datetime]] = {} def create(self, user_id: str) -> str: token = secrets.token_urlsafe(32) self._tokens[token] = { "user": user_id, "expires": datetime.utcnow() + timedelta(seconds=self.ttl), } return token def validate(self, token: Optional[str]) -> str: self.cleanup() if not token or token not in self._tokens: raise HTTPException(status_code=401, detail="Not authenticated") token_data = self._tokens[token] if token_data["expires"] < datetime.utcnow(): del self._tokens[token] raise HTTPException(status_code=401, detail="Session expired") return token_data["user"] def cleanup(self) -> None: now = datetime.utcnow() expired = [t for t, data in self._tokens.items() if data["expires"] < now] for t in expired: del self._tokens[t]