Files
drivers_bot/app/api/ocr.py
VPN SaaS Dev 83ad880b9d
Some checks failed
ci / test (push) Has been cancelled
Mechanic's work place
2026-05-16 10:04:56 +09:00

265 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from datetime import date
from decimal import Decimal
from fastapi import APIRouter, Depends, File, Request, UploadFile
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_telegram_user
from app.db.session import get_session
from app.models.user import User
from app.services.ocr_provider import get_ocr_provider
from app.services.rate_limit import check_rate_limit
from app.services.uploads import SAFE_IMAGE_TYPES, SAFE_TEXT_TYPES, validate_upload
router = APIRouter(prefix="/ocr", tags=["ocr"])
MAX_OCR_FILE_BYTES = 8 * 1024 * 1024
class ReceiptSuggestion(BaseModel):
entry_date: date | None = None
total_cost: Decimal | None = None
liters: Decimal | None = None
price_per_liter: Decimal | None = None
station: str | None = None
category: str | None = None
confidence: float
message: str
class OCRCandidateRead(BaseModel):
type: str
value: str
confidence: float
class OCRResultRead(BaseModel):
recognized_text: str
candidates: list[OCRCandidateRead]
provider: str = "heuristic"
@router.post("/parse-text-receipt", response_model=ReceiptSuggestion)
async def parse_text_receipt(
request: Request,
file: UploadFile = File(...),
current_user: User = Depends(get_current_telegram_user),
session: AsyncSession = Depends(get_session),
) -> ReceiptSuggestion:
await check_rate_limit(scope="ocr", limit=10, window_seconds=60, request=request, user=current_user, session=session)
content = await file.read()
validate_upload(
content=content,
filename=file.filename,
content_type=file.content_type,
max_bytes=MAX_OCR_FILE_BYTES,
allowed_types=SAFE_IMAGE_TYPES | SAFE_TEXT_TYPES,
)
content_type = (file.content_type or "").lower()
if content_type.startswith("image/") or content_type == "application/pdf":
result = await get_ocr_provider().recognize(content, file.filename)
if not result.recognized_text:
return ReceiptSuggestion(
confidence=0,
message="Не удалось уверенно распознать чек. Открылся ручной ввод: проверьте дату, сумму, литры и цену.",
)
return parse_receipt_text(result.recognized_text)
text = " ".join(
[
file.filename or "",
content.decode("utf-8", errors="ignore"),
]
)
return parse_receipt_text(text)
def parse_receipt_text(text: str) -> ReceiptSuggestion:
normalized = text.replace("\xa0", " ").replace(",", ".")
compact = re.sub(r"\s+", " ", normalized).strip()
numbers = [Decimal(item) for item in re.findall(r"\d+(?:\.\d+)?", compact)]
station = detect_station(compact)
entry_date = detect_date(compact)
liters = find_liters(compact, numbers)
price = find_price_per_liter(compact, numbers)
total = find_total(compact, numbers, liters, price)
if total and liters and not price and liters > 0:
price = (total / liters).quantize(Decimal("0.01"))
if liters and price and not total:
total = (liters * price).quantize(Decimal("0.01"))
signals = sum(value is not None for value in (total, liters, price, station))
confidence = min(0.88, 0.18 + signals * 0.17 + min(len(numbers), 12) * 0.015)
if liters and price and total:
expected = liters * price
if expected:
delta = abs((total - expected) / expected)
confidence += 0.1 if delta <= Decimal("0.08") else -0.08
confidence = max(0, min(float(confidence), 0.95))
return ReceiptSuggestion(
entry_date=entry_date,
total_cost=total,
liters=liters,
price_per_liter=price,
station=station,
category="fuel" if liters or price else None,
confidence=round(confidence, 2) if numbers else 0,
message=(
"Разобрал текст чека и заполнил форму. Проверь значения перед сохранением."
if numbers
else "Не удалось разобрать текст чека. Загрузите текстовый чек или заполните поля вручную."
),
)
@router.post("/fuel-receipt", response_model=ReceiptSuggestion, deprecated=True)
async def scan_fuel_receipt(
request: Request,
file: UploadFile = File(...),
current_user: User = Depends(get_current_telegram_user),
session: AsyncSession = Depends(get_session),
) -> ReceiptSuggestion:
return await parse_text_receipt(request, file, current_user, session)
@router.post("/license-plate", response_model=OCRResultRead)
async def recognize_license_plate(
request: Request,
file: UploadFile = File(...),
current_user: User = Depends(get_current_telegram_user),
session: AsyncSession = Depends(get_session),
) -> OCRResultRead:
await check_rate_limit(scope="ocr_license_plate", limit=8, window_seconds=60, request=request, user=current_user, session=session)
content = await file.read()
validate_upload(content=content, filename=file.filename, content_type=file.content_type, max_bytes=MAX_OCR_FILE_BYTES, allowed_types=SAFE_IMAGE_TYPES | SAFE_TEXT_TYPES)
result = await get_ocr_provider().recognize(content, file.filename)
return OCRResultRead(
recognized_text=result.recognized_text,
candidates=[OCRCandidateRead(**item.__dict__) for item in result.candidates if item.type == "license_plate"],
provider=result.provider,
)
@router.post("/vin", response_model=OCRResultRead)
async def recognize_vin(
request: Request,
file: UploadFile = File(...),
current_user: User = Depends(get_current_telegram_user),
session: AsyncSession = Depends(get_session),
) -> OCRResultRead:
await check_rate_limit(scope="ocr_vin", limit=8, window_seconds=60, request=request, user=current_user, session=session)
content = await file.read()
validate_upload(content=content, filename=file.filename, content_type=file.content_type, max_bytes=MAX_OCR_FILE_BYTES, allowed_types=SAFE_IMAGE_TYPES | SAFE_TEXT_TYPES)
result = await get_ocr_provider().recognize(content, file.filename)
return OCRResultRead(
recognized_text=result.recognized_text,
candidates=[OCRCandidateRead(**item.__dict__) for item in result.candidates if item.type == "vin"],
provider=result.provider,
)
@router.post("/service-document", response_model=OCRResultRead)
async def recognize_service_document(
request: Request,
file: UploadFile = File(...),
current_user: User = Depends(get_current_telegram_user),
session: AsyncSession = Depends(get_session),
) -> OCRResultRead:
await check_rate_limit(scope="ocr_service_document", limit=8, window_seconds=60, request=request, user=current_user, session=session)
content = await file.read()
validate_upload(content=content, filename=file.filename, content_type=file.content_type, max_bytes=MAX_OCR_FILE_BYTES, allowed_types=SAFE_IMAGE_TYPES | SAFE_TEXT_TYPES)
result = await get_ocr_provider().recognize(content, file.filename)
return OCRResultRead(
recognized_text=result.recognized_text,
candidates=[OCRCandidateRead(**item.__dict__) for item in result.candidates],
provider=result.provider,
)
def detect_station(text: str) -> str | None:
stations = {
"shell": "Shell",
"lukoil": "Lukoil",
"лукойл": "Lukoil",
"gazprom": "Gazprom",
"газпром": "Gazprom",
"rosneft": "Rosneft",
"роснефть": "Rosneft",
"neste": "Neste",
}
lower = text.lower()
for needle, name in stations.items():
if needle in lower:
return name
return None
def detect_date(text: str) -> date | None:
for pattern in (
r"\b(\d{4})[-/.](\d{1,2})[-/.](\d{1,2})\b",
r"\b(\d{1,2})[-/.](\d{1,2})[-/.](\d{4})\b",
):
match = re.search(pattern, text)
if not match:
continue
first, second, third = [int(item) for item in match.groups()]
try:
if first > 1900:
return date(first, second, third)
return date(third, second, first)
except ValueError:
continue
return None
def decimal_from_match(match: re.Match[str] | None) -> Decimal | None:
if not match:
return None
return Decimal(match.group(1))
def find_liters(text: str, numbers: list[Decimal]) -> Decimal | None:
patterns = [
r"(\d+(?:\.\d+)?)\s*(?:l|литр|литра|литров|л)\b",
r"(?:volume|qty|кол-?во|количество|объем)\D{0,12}(\d+(?:\.\d+)?)",
]
for pattern in patterns:
value = decimal_from_match(re.search(pattern, text, re.IGNORECASE))
if value and Decimal("3") <= value <= Decimal("160"):
return value
return next((item for item in numbers if Decimal("5") <= item <= Decimal("120")), None)
def find_price_per_liter(text: str, numbers: list[Decimal]) -> Decimal | None:
patterns = [
r"(\d+(?:\.\d+)?)\s*(?:/|за)\s*(?:l|литр|л)\b",
r"(?:price|цена|ppu|руб/л|₽/л)\D{0,12}(\d+(?:\.\d+)?)",
]
for pattern in patterns:
value = decimal_from_match(re.search(pattern, text, re.IGNORECASE))
if value and Decimal("0.1") <= value <= Decimal("500"):
return value
candidates = [item for item in numbers if Decimal("0.1") <= item <= Decimal("500")]
return candidates[-1] if candidates else None
def find_total(
text: str,
numbers: list[Decimal],
liters: Decimal | None,
price: Decimal | None,
) -> Decimal | None:
patterns = [
r"(?:total|sum|amount|итого|сумма|к\s*оплате)\D{0,16}(\d+(?:\.\d+)?)",
r"(\d+(?:\.\d+)?)\s*(?:rub|₽|руб|krw|₩)",
]
for pattern in patterns:
value = decimal_from_match(re.search(pattern, text, re.IGNORECASE))
if value and value > Decimal("50"):
return value
ignored = {value for value in (liters, price) if value is not None}
candidates = [item for item in numbers if item > Decimal("50") and item not in ignored]
return max(candidates) if candidates else None