from __future__ import annotations import mimetypes from pathlib import PurePath from fastapi import HTTPException SAFE_IMAGE_TYPES = {"image/jpeg", "image/png", "image/webp", "image/heic", "image/heif"} SAFE_TEXT_TYPES = {"text/plain", "application/pdf"} BLOCKED_EXTENSIONS = {".exe", ".bat", ".cmd", ".sh", ".php", ".js", ".html", ".svg"} def sanitize_filename(filename: str | None) -> str: name = PurePath(filename or "upload.bin").name return "".join(char if char.isalnum() or char in {".", "-", "_"} else "_" for char in name)[:160] def validate_upload( *, content: bytes, filename: str | None, content_type: str | None, max_bytes: int, allowed_types: set[str], ) -> str: safe_name = sanitize_filename(filename) suffix = PurePath(safe_name).suffix.lower() if len(content) > max_bytes: raise HTTPException(status_code=413, detail="File is too large") if suffix in BLOCKED_EXTENSIONS: raise HTTPException(status_code=415, detail="Executable or unsafe file type is not allowed") detected_type = (content_type or mimetypes.guess_type(safe_name)[0] or "application/octet-stream").lower() if detected_type not in allowed_types: raise HTTPException(status_code=415, detail="Unsupported file type") if detected_type in SAFE_IMAGE_TYPES: validate_image(content) return safe_name def validate_image(content: bytes) -> None: try: from PIL import Image except ImportError: return try: with Image.open(__import__("io").BytesIO(content)) as image: width, height = image.size if width * height > 24_000_000: raise HTTPException(status_code=413, detail="Image dimensions are too large") image.verify() except HTTPException: raise except Exception as exc: raise HTTPException(status_code=415, detail="Corrupted image file") from exc