194 lines
7.8 KiB
Python
194 lines
7.8 KiB
Python
from __future__ import annotations
|
||
|
||
import re
|
||
from decimal import Decimal
|
||
from typing import Any
|
||
|
||
from pydantic import BaseModel, Field
|
||
|
||
from app.services.vehicle_identity import normalize_license_plate, validate_vin
|
||
|
||
FULL_TANK_RE = re.compile(r"(до\s+полного|полный\s+бак|залил\s+полный|full\s+tank)", re.I)
|
||
NUMBER_RE = re.compile(r"(\d+(?:[.,]\d+)?)")
|
||
|
||
|
||
class ParsedRecord(BaseModel):
|
||
event_type: str
|
||
confidence: float = Field(ge=0, le=1)
|
||
missing_fields: list[str] = Field(default_factory=list)
|
||
warnings: list[str] = Field(default_factory=list)
|
||
data: dict[str, Any] = Field(default_factory=dict)
|
||
|
||
|
||
def decimal_from_match(value: str | None) -> Decimal | None:
|
||
if not value:
|
||
return None
|
||
return Decimal(value.replace(",", "."))
|
||
|
||
|
||
def parse_record_text(text: str) -> ParsedRecord:
|
||
source = " ".join(text.strip().split())
|
||
lower = source.lower()
|
||
if not source:
|
||
return ParsedRecord(event_type="unknown", confidence=0, missing_fields=["text"])
|
||
|
||
vin = extract_vin(source)
|
||
plate = extract_license_plate(source)
|
||
|
||
if any(word in lower for word in ("купил", "покупка", "кредит", "loan", "lease")):
|
||
return parse_purchase(source, vin, plate)
|
||
if any(word in lower for word in ("заправ", "литр", "л ", "full tank", "бак")):
|
||
return parse_fuel(source, vin, plate)
|
||
if any(word in lower for word in ("страхов", "полис", "osago", "каско")):
|
||
return parse_expense(source, "insurance", vin, plate)
|
||
if any(word in lower for word in ("штраф", "fine")):
|
||
return parse_expense(source, "fine", vin, plate)
|
||
if any(word in lower for word in ("налог", "tax")):
|
||
return parse_expense(source, "tax", vin, plate)
|
||
if any(word in lower for word in ("то", "сервис", "ремонт", "масл", "diagnostics", "repair")):
|
||
return parse_service(source, vin, plate)
|
||
|
||
return ParsedRecord(
|
||
event_type="unknown",
|
||
confidence=0.2,
|
||
warnings=["Не удалось надежно определить тип записи. Откройте ручной ввод."],
|
||
data=identity_payload(vin, plate),
|
||
)
|
||
|
||
|
||
def parse_fuel(source: str, vin: str | None, plate: str | None) -> ParsedRecord:
|
||
liters = find_decimal(r"(\d+(?:[.,]\d+)?)\s*(?:л|литр|liter|l)\b", source)
|
||
amount = find_decimal(r"(?:на|сумма|total|amount)\s*(\d+(?:[.,]\d+)?)", source)
|
||
if amount is None:
|
||
amount = largest_money_like_number(source, exclude={liters})
|
||
odometer = find_int(r"(?:пробег|одометр|odo|km|км)\s*(\d{2,7})", source)
|
||
price_per_liter = None
|
||
if liters and amount:
|
||
price_per_liter = (amount / liters).quantize(Decimal("0.01"))
|
||
missing = []
|
||
if liters is None:
|
||
missing.append("fuel_liters")
|
||
if amount is None:
|
||
missing.append("amount")
|
||
if odometer is None:
|
||
missing.append("odometer_km")
|
||
return ParsedRecord(
|
||
event_type="fuel",
|
||
confidence=0.9 if not missing else 0.55,
|
||
missing_fields=missing,
|
||
data={
|
||
**identity_payload(vin, plate),
|
||
"is_full_tank": bool(FULL_TANK_RE.search(source)),
|
||
"fuel_liters": float(liters) if liters is not None else None,
|
||
"amount": float(amount) if amount is not None else None,
|
||
"price_per_liter": float(price_per_liter) if price_per_liter is not None else None,
|
||
"odometer_km": odometer,
|
||
},
|
||
)
|
||
|
||
|
||
def parse_purchase(source: str, vin: str | None, plate: str | None) -> ParsedRecord:
|
||
purchase_price = find_decimal(r"(?:за|стоимость|цена)\s*(\d+(?:[.,]\d+)?)", source)
|
||
loan_principal = find_decimal(r"(?:кредит|loan)\s*(\d+(?:[.,]\d+)?)", source)
|
||
term = find_int(r"(?:на|срок)\s*(\d{1,3})\s*(?:мес|месяц|months)", source)
|
||
rate = find_decimal(r"(?:под|ставк[аи]|rate)\s*(\d+(?:[.,]\d+)?)\s*%?", source)
|
||
currency = detect_currency(source)
|
||
missing = []
|
||
if purchase_price is None:
|
||
missing.append("purchase_price")
|
||
return ParsedRecord(
|
||
event_type="vehicle_purchase",
|
||
confidence=0.86 if purchase_price is not None else 0.45,
|
||
missing_fields=missing,
|
||
data={
|
||
**identity_payload(vin, plate),
|
||
"purchase_price": float(purchase_price) if purchase_price is not None else None,
|
||
"purchase_currency": currency,
|
||
"purchase_type": "credit" if loan_principal else "cash",
|
||
"loan_principal": float(loan_principal) if loan_principal is not None else None,
|
||
"loan_term_months": term,
|
||
"annual_interest_rate": float(rate) if rate is not None else None,
|
||
},
|
||
)
|
||
|
||
|
||
def parse_expense(source: str, category: str, vin: str | None, plate: str | None) -> ParsedRecord:
|
||
amount = find_decimal(r"(?:на|сумма|оплатил|total|amount)\s*(\d+(?:[.,]\d+)?)", source) or largest_money_like_number(source)
|
||
return ParsedRecord(
|
||
event_type=category,
|
||
confidence=0.75 if amount is not None else 0.5,
|
||
missing_fields=[] if amount is not None else ["amount"],
|
||
data={
|
||
**identity_payload(vin, plate),
|
||
"category": category,
|
||
"amount": float(amount) if amount is not None else None,
|
||
"currency": detect_currency(source),
|
||
},
|
||
)
|
||
|
||
|
||
def parse_service(source: str, vin: str | None, plate: str | None) -> ParsedRecord:
|
||
amount = find_decimal(r"(?:на|сумма|стоимость|total|amount)\s*(\d+(?:[.,]\d+)?)", source)
|
||
odometer = find_int(r"(?:пробег|одометр|odo|km|км)\s*(\d{2,7})", source)
|
||
title = "Замена масла" if re.search(r"масл", source, re.I) else "Сервисная запись"
|
||
return ParsedRecord(
|
||
event_type="service",
|
||
confidence=0.72,
|
||
missing_fields=[] if odometer is not None else ["odometer_km"],
|
||
data={
|
||
**identity_payload(vin, plate),
|
||
"title": title,
|
||
"amount": float(amount) if amount is not None else 0,
|
||
"odometer_km": odometer,
|
||
"service_type": "maintenance" if title == "Замена масла" else "repair",
|
||
},
|
||
)
|
||
|
||
|
||
def identity_payload(vin: str | None, plate: str | None) -> dict[str, str | None]:
|
||
return {"vin": vin, "license_plate": plate}
|
||
|
||
|
||
def extract_vin(source: str) -> str | None:
|
||
for candidate in re.findall(r"[A-HJ-NPR-Z0-9][A-HJ-NPR-Z0-9\s-]{15,25}[A-HJ-NPR-Z0-9]", source.upper()):
|
||
try:
|
||
return validate_vin(candidate)
|
||
except ValueError:
|
||
continue
|
||
return None
|
||
|
||
|
||
def extract_license_plate(source: str) -> str | None:
|
||
match = re.search(r"(?:номер|госномер|plate)\s*[:#]?\s*([A-ZА-Я0-9가-힣\-\s]{4,14})", source, re.I)
|
||
return normalize_license_plate(match.group(1)) if match else None
|
||
|
||
|
||
def find_decimal(pattern: str, source: str) -> Decimal | None:
|
||
match = re.search(pattern, source, re.I)
|
||
return decimal_from_match(match.group(1)) if match else None
|
||
|
||
|
||
def find_int(pattern: str, source: str) -> int | None:
|
||
match = re.search(pattern, source, re.I)
|
||
return int(match.group(1)) if match else None
|
||
|
||
|
||
def largest_money_like_number(source: str, exclude: set[Decimal | None] | None = None) -> Decimal | None:
|
||
excluded = {item for item in (exclude or set()) if item is not None}
|
||
values = [decimal_from_match(match.group(1)) for match in NUMBER_RE.finditer(source)]
|
||
candidates = [value for value in values if value is not None and value not in excluded]
|
||
if not candidates:
|
||
return None
|
||
return max(candidates)
|
||
|
||
|
||
def detect_currency(source: str) -> str:
|
||
lower = source.lower()
|
||
if "вон" in lower or "krw" in lower or "₩" in lower:
|
||
return "KRW"
|
||
if "usd" in lower or "$" in lower:
|
||
return "USD"
|
||
if "eur" in lower or "€" in lower:
|
||
return "EUR"
|
||
return "RUB"
|