55 lines
1.9 KiB
Python
55 lines
1.9 KiB
Python
from __future__ import annotations
|
|
|
|
import mimetypes
|
|
from pathlib import PurePath
|
|
|
|
from fastapi import HTTPException
|
|
|
|
SAFE_IMAGE_TYPES = {"image/jpeg", "image/png", "image/webp", "image/heic", "image/heif"}
|
|
SAFE_TEXT_TYPES = {"text/plain", "application/pdf"}
|
|
BLOCKED_EXTENSIONS = {".exe", ".bat", ".cmd", ".sh", ".php", ".js", ".html", ".svg"}
|
|
|
|
|
|
def sanitize_filename(filename: str | None) -> str:
|
|
name = PurePath(filename or "upload.bin").name
|
|
return "".join(char if char.isalnum() or char in {".", "-", "_"} else "_" for char in name)[:160]
|
|
|
|
|
|
def validate_upload(
|
|
*,
|
|
content: bytes,
|
|
filename: str | None,
|
|
content_type: str | None,
|
|
max_bytes: int,
|
|
allowed_types: set[str],
|
|
) -> str:
|
|
safe_name = sanitize_filename(filename)
|
|
suffix = PurePath(safe_name).suffix.lower()
|
|
if len(content) > max_bytes:
|
|
raise HTTPException(status_code=413, detail="File is too large")
|
|
if suffix in BLOCKED_EXTENSIONS:
|
|
raise HTTPException(status_code=415, detail="Executable or unsafe file type is not allowed")
|
|
detected_type = (content_type or mimetypes.guess_type(safe_name)[0] or "application/octet-stream").lower()
|
|
if detected_type not in allowed_types:
|
|
raise HTTPException(status_code=415, detail="Unsupported file type")
|
|
if detected_type in SAFE_IMAGE_TYPES:
|
|
validate_image(content)
|
|
return safe_name
|
|
|
|
|
|
def validate_image(content: bytes) -> None:
|
|
try:
|
|
from PIL import Image
|
|
except ImportError:
|
|
return
|
|
try:
|
|
with Image.open(__import__("io").BytesIO(content)) as image:
|
|
width, height = image.size
|
|
if width * height > 24_000_000:
|
|
raise HTTPException(status_code=413, detail="Image dimensions are too large")
|
|
image.verify()
|
|
except HTTPException:
|
|
raise
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=415, detail="Corrupted image file") from exc
|