Files
sfs/app/session_manager.py

36 lines
1.1 KiB
Python

from fastapi import HTTPException
from datetime import datetime, timedelta
import secrets
from typing import Dict, Optional
class SessionManager:
def __init__(self, ttl: int):
self.ttl = ttl
self._tokens: Dict[str, Dict[str, datetime]] = {}
def create(self, user_id: str) -> str:
token = secrets.token_urlsafe(32)
self._tokens[token] = {
"user": user_id,
"expires": datetime.utcnow() + timedelta(seconds=self.ttl),
}
return token
def validate(self, token: Optional[str]) -> str:
self.cleanup()
if not token or token not in self._tokens:
raise HTTPException(status_code=401, detail="Not authenticated")
token_data = self._tokens[token]
if token_data["expires"] < datetime.utcnow():
del self._tokens[token]
raise HTTPException(status_code=401, detail="Session expired")
return token_data["user"]
def cleanup(self) -> None:
now = datetime.utcnow()
expired = [t for t, data in self._tokens.items() if data["expires"] < now]
for t in expired:
del self._tokens[t]