Files
sfs/app/api.py
2025-08-27 12:29:38 +03:00

83 lines
2.6 KiB
Python

from fastapi import FastAPI, File, UploadFile, Depends, HTTPException, Security
from fastapi.responses import FileResponse, PlainTextResponse
from fastapi.security import APIKeyHeader
from sqlalchemy import exists
import hashlib
from . import db
from dotenv import load_dotenv
import os
load_dotenv()
FILES_DIR = os.getenv("FILES_DIR")
API_KEY = os.getenv("API_KEY")
api_key_header = APIKeyHeader(name="X-API-Key")
def verify_api_key(api_key: str = Security(api_key_header)):
if api_key != API_KEY:
raise HTTPException(status_code=403, detail="Forbidden")
return api_key
def compute_hash(data: bytes, algorithm="sha256") -> str:
h = hashlib.new(algorithm)
h.update(data)
return h.hexdigest()
app = FastAPI()
@app.get("/")
async def root():
return {"message": "hiii from sfs"}
@app.post("/file")
async def save_file(file: UploadFile = File(...), api_key: str = Depends(verify_api_key)):
contents = await file.read()
hash = compute_hash(contents)
existed_url = db.file_exists(file.size, hash)
if not existed_url:
file_url = db.add_file(file.filename, file.content_type, file.size, hash)
try:
with open(f"{FILES_DIR}/{file_url}", "wb") as f:
f.write(contents)
return {"status": "saved", "filename": file_url}
except Exception as e:
db.remove_file(file_url)
return {"status": "error", "message": f"Could not save file {file_url}: {e}"}
else:
return {"status": "file_exists", "filename": existed_url}
@app.get("/file/{filename}")
async def get_file(filename: str, raw: bool = False, api_key: str = Depends(verify_api_key)):
file_path = os.path.join(FILES_DIR, filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
if raw:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
return PlainTextResponse(f.read())
return FileResponse(file_path, filename=filename)
@app.delete("/file/{filename}")
async def delete_file(filename: str, api_key: str = Depends(verify_api_key)):
if db.remove_file(filename):
file_path = f"{FILES_DIR}/{filename}"
if os.path.exists(file_path):
os.remove(file_path)
return {"status": "deleted"}
return {"status": "error", "message": "no file like that"}
@app.get("/files/")
async def get_list_of_files(api_key: str = Depends(verify_api_key)):
return db.get_all_files()
@app.get("/healthchecker")
async def healthchecker():
return {"message": "Howdy, all is fine :3"}