36 lines
1.1 KiB
Python
36 lines
1.1 KiB
Python
from fastapi import HTTPException
|
|
from datetime import datetime, timedelta
|
|
import secrets
|
|
from typing import Dict, Optional
|
|
|
|
class SessionManager:
|
|
def __init__(self, ttl: int):
|
|
self.ttl = ttl
|
|
self._tokens: Dict[str, Dict[str, datetime]] = {}
|
|
|
|
def create(self, user_id: str) -> str:
|
|
token = secrets.token_urlsafe(32)
|
|
self._tokens[token] = {
|
|
"user": user_id,
|
|
"expires": datetime.utcnow() + timedelta(seconds=self.ttl),
|
|
}
|
|
return token
|
|
|
|
def validate(self, token: Optional[str]) -> str:
|
|
self.cleanup()
|
|
if not token or token not in self._tokens:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
|
|
token_data = self._tokens[token]
|
|
if token_data["expires"] < datetime.utcnow():
|
|
del self._tokens[token]
|
|
raise HTTPException(status_code=401, detail="Session expired")
|
|
|
|
return token_data["user"]
|
|
|
|
def cleanup(self) -> None:
|
|
now = datetime.utcnow()
|
|
expired = [t for t, data in self._tokens.items() if data["expires"] < now]
|
|
for t in expired:
|
|
del self._tokens[t]
|