Files
drivers_bot/app/api/service_centers.py
VPN SaaS Dev ac5845d5a0
Some checks failed
ci / test (push) Has been cancelled
Gate STO workplace by role
2026-05-16 10:33:33 +09:00

930 lines
37 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import secrets
from datetime import UTC, datetime, timedelta
from fastapi import APIRouter, Depends, Header, HTTPException, Request, status
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import (
ensure_service_employee,
get_current_telegram_user,
get_or_create_telegram_user,
log_audit,
require_internal_api_token,
)
from app.db.session import get_session
from app.models.car import (
Car,
CarServiceLink,
ServiceCenter,
ServiceCenterReview,
ServiceCenterReviewComment,
ServiceCenterVerification,
ServiceEmployee,
ServiceInboxMessage,
ServiceVisit,
)
from app.models.user import User
from app.schemas.service_center import (
CarServiceLinkCreate,
CarServiceLinkRead,
ServiceCenterAccessRequest,
ServiceCenterCreate,
ServiceCenterPublicRead,
ServiceCenterRead,
ServiceCenterReviewCommentCreate,
ServiceCenterReviewCommentRead,
ServiceCenterReviewCreate,
ServiceCenterReviewRead,
ServiceCenterVerificationCreate,
ServiceCenterVerificationRead,
ServiceEmployeeInvite,
ServiceEmployeeRead,
ServiceEmployeeUpdate,
ServiceInboxCreate,
ServiceInboxRead,
ServiceVisitCreate,
ServiceVisitRead,
VehicleSearchRequest,
VehicleSearchResult,
)
from app.services.notifications import notify_platform_moderators
from app.services.odometer import validate_odometer_change
from app.services.rate_limit import check_rate_limit
from app.services.vehicle_identity import mask_license_plate, mask_vin
router = APIRouter(prefix="/service-centers", tags=["service-centers"])
APPROVED_SERVICE_STATUSES = {"approved", "verified"}
SERVICE_EMPLOYEE_ROLES = {"owner", "manager", "receptionist", "mechanic"}
SERVICE_EMPLOYEE_STATUSES = {"active", "invited", "inactive", "revoked", "expired"}
def validate_employee_role(role: str) -> str:
role = role.strip().lower()
if role not in SERVICE_EMPLOYEE_ROLES:
raise HTTPException(status_code=400, detail="Unsupported service employee role")
return role
def validate_employee_status(status_value: str) -> str:
status_value = status_value.strip().lower()
if status_value not in SERVICE_EMPLOYEE_STATUSES:
raise HTTPException(status_code=400, detail="Unsupported service employee status")
return status_value
async def attach_employee_user_fields(session: AsyncSession, employees: list[ServiceEmployee]) -> list[ServiceEmployee]:
if not employees:
return employees
user_ids = [employee.user_id for employee in employees]
users = {
user.id: user
for user in (await session.execute(select(User).where(User.id.in_(user_ids)))).scalars()
}
for employee in employees:
user = users.get(employee.user_id)
if user is None:
continue
employee.telegram_id = user.telegram_id
employee.username = user.username
employee.first_name = user.first_name
employee.last_name = user.last_name
return employees
@router.post("", response_model=ServiceCenterRead, status_code=status.HTTP_201_CREATED)
async def create_service_center(
payload: ServiceCenterCreate,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
center = ServiceCenter(
name=payload.display_name,
display_name=payload.display_name,
legal_name=payload.legal_name,
country=payload.country.upper() if payload.country else None,
city=payload.city,
address=payload.address,
phone=payload.phone,
contact_phone=payload.contact_phone or payload.phone,
telegram_chat_id=payload.telegram_chat_id,
business_registration_number=payload.business_registration_number,
description=payload.description,
specializations=payload.specializations,
working_hours=payload.working_hours,
facade_photo_url=payload.facade_photo_url,
document_photo_urls=payload.document_photo_urls,
additional_photo_urls=payload.additional_photo_urls,
contact_person=payload.contact_person,
owner_user_id=current_user.id,
verification_status="pending",
)
session.add(center)
await session.flush()
session.add(
ServiceCenterVerification(
service_center_id=center.id,
submitted_documents=[
{"type": "registration", "urls": payload.document_photo_urls or []},
{"type": "facade", "url": payload.facade_photo_url},
{"type": "additional", "urls": payload.additional_photo_urls or []},
],
comment="Initial service center application",
status="pending",
)
)
employee = ServiceEmployee(
service_center_id=center.id,
user_id=current_user.id,
role="owner",
status="active",
)
session.add(employee)
await log_audit(
session,
actor=current_user,
action="service_center.create",
target_type="service_center",
target_id=center.id,
)
await session.commit()
await session.refresh(center)
await notify_platform_moderators(
session,
f"Новая заявка СТО #{center.id}: {center.display_name or center.name}. Откройте /admin_sto_pending для модерации.",
)
return center
@router.patch("/{service_center_id}", response_model=ServiceCenterRead)
async def update_service_center_application(
service_center_id: int,
payload: ServiceCenterCreate,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
await ensure_service_employee(session, service_center_id, current_user, {"owner", "manager"})
center = await session.get(ServiceCenter, service_center_id)
if center is None:
raise HTTPException(status_code=404, detail="Service center not found")
data = payload.model_dump(exclude_unset=True)
for field, value in data.items():
if field == "display_name":
center.display_name = value
center.name = value
elif hasattr(center, field):
setattr(center, field, value)
if center.verification_status in {"draft", "needs_changes", "rejected"}:
center.verification_status = "pending"
session.add(
ServiceCenterVerification(
service_center_id=center.id,
submitted_documents=[
{"type": "registration", "urls": center.document_photo_urls or []},
{"type": "facade", "url": center.facade_photo_url},
{"type": "additional", "urls": center.additional_photo_urls or []},
],
comment="Resubmitted service center application",
status="pending",
)
)
await log_audit(session, actor=current_user, action="service_center.update", target_type="service_center", target_id=center.id)
await session.commit()
await session.refresh(center)
return center
@router.get("/my", response_model=list[ServiceCenterRead])
async def my_service_centers(
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> list[ServiceCenter]:
result = await session.execute(
select(ServiceCenter, ServiceEmployee.role, ServiceEmployee.status)
.join(ServiceEmployee, ServiceEmployee.service_center_id == ServiceCenter.id)
.where(ServiceEmployee.user_id == current_user.id, ServiceEmployee.status == "active")
.order_by(ServiceCenter.created_at.desc())
)
centers = []
for center, role, employee_status in result.all():
center.employee_role = role
center.employee_status = employee_status
centers.append(center)
return centers
@router.get("/public", response_model=list[ServiceCenterPublicRead])
async def public_service_centers(
city: str | None = None,
specialization: str | None = None,
limit: int = 50,
offset: int = 0,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> list[ServiceCenter]:
limit = min(max(limit, 1), 200)
stmt = select(ServiceCenter).where(ServiceCenter.verification_status.in_(APPROVED_SERVICE_STATUSES))
if city:
stmt = stmt.where(ServiceCenter.city.ilike(f"%{city}%"))
result = await session.execute(
stmt.order_by(ServiceCenter.rating_avg.desc().nullslast(), ServiceCenter.display_name.asc())
.limit(limit)
.offset(max(offset, 0))
)
centers = list(result.scalars())
if specialization:
needle = specialization.lower()
centers = [
center
for center in centers
if any(needle in item.lower() for item in (center.specializations or []))
]
return centers
@router.get("/{service_center_id}", response_model=ServiceCenterPublicRead)
async def get_public_service_center(
service_center_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenter:
center = await session.get(ServiceCenter, service_center_id)
if center is None:
raise HTTPException(status_code=404, detail="Service center not found")
if center.verification_status not in APPROVED_SERVICE_STATUSES:
is_employee = await service_employee_or_none(session, service_center_id, current_user)
if not is_employee and center.owner_user_id != current_user.id:
raise HTTPException(status_code=404, detail="Service center not found")
return center
@router.get("", response_model=list[ServiceCenterRead])
async def list_service_centers(
session: AsyncSession = Depends(get_session),
x_internal_api_token: str | None = Header(default=None, alias="X-Internal-API-Token"),
) -> list[ServiceCenter]:
require_internal_api_token(x_internal_api_token)
result = await session.execute(select(ServiceCenter).order_by(ServiceCenter.name))
return list(result.scalars())
@router.post("/{service_center_id}/verification", response_model=ServiceCenterVerificationRead)
async def submit_verification(
service_center_id: int,
payload: ServiceCenterVerificationCreate,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenterVerification:
await ensure_service_employee(session, service_center_id, current_user, {"owner", "manager"})
verification = ServiceCenterVerification(
service_center_id=service_center_id,
submitted_documents=payload.submitted_documents,
comment=payload.comment,
status="pending",
)
session.add(verification)
center = await session.get(ServiceCenter, service_center_id)
if center:
center.verification_status = "pending"
await log_audit(session, actor=current_user, action="service_center.verification.submit", target_type="service_center", target_id=service_center_id)
await session.commit()
await session.refresh(verification)
return verification
@router.post("/{service_center_id}/employees/invite", response_model=ServiceEmployeeRead)
async def invite_employee(
service_center_id: int,
payload: ServiceEmployeeInvite,
request: Request,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceEmployee:
await check_rate_limit(scope="employee_invite", limit=10, window_seconds=3600, request=request, user=current_user, session=session)
await ensure_service_center_approved(session, service_center_id)
await ensure_service_employee(session, service_center_id, current_user, {"owner", "manager"})
role = validate_employee_role(payload.role)
user = await get_or_create_telegram_user(session, telegram_id=payload.telegram_id)
result = await session.execute(
select(ServiceEmployee).where(
ServiceEmployee.service_center_id == service_center_id,
ServiceEmployee.user_id == user.id,
)
)
employee = result.scalar_one_or_none()
if employee is None:
employee = ServiceEmployee(
service_center_id=service_center_id,
user_id=user.id,
role=role,
permissions=payload.permissions,
status="invited",
invite_token=secrets.token_urlsafe(32),
invite_expires_at=datetime.now(UTC) + timedelta(hours=payload.expires_in_hours),
)
session.add(employee)
else:
if employee.role == "owner":
raise HTTPException(status_code=409, detail="Owner role cannot be replaced by invite")
employee.role = role
employee.permissions = payload.permissions
employee.status = "invited"
employee.invite_token = secrets.token_urlsafe(32)
employee.invite_expires_at = datetime.now(UTC) + timedelta(hours=payload.expires_in_hours)
employee.invite_revoked_at = None
employee.activated_at = None
await log_audit(session, actor=current_user, action="service_employee.invite", target_type="service_center", target_id=service_center_id, metadata={"telegram_id": payload.telegram_id})
await session.commit()
await session.refresh(employee)
await attach_employee_user_fields(session, [employee])
return employee
@router.get("/{service_center_id}/employees", response_model=list[ServiceEmployeeRead])
async def list_service_employees(
service_center_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> list[ServiceEmployee]:
await ensure_service_center_approved(session, service_center_id)
await ensure_service_employee(session, service_center_id, current_user, {"owner", "manager"})
result = await session.execute(
select(ServiceEmployee)
.where(ServiceEmployee.service_center_id == service_center_id)
.order_by(ServiceEmployee.role.asc(), ServiceEmployee.created_at.asc())
)
employees = list(result.scalars())
return await attach_employee_user_fields(session, employees)
@router.post("/employees/invites/{invite_token}/accept", response_model=ServiceEmployeeRead)
async def accept_employee_invite(
invite_token: str,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceEmployee:
result = await session.execute(
select(ServiceEmployee).where(ServiceEmployee.invite_token == invite_token)
)
employee = result.scalar_one_or_none()
if employee is None:
raise HTTPException(status_code=404, detail="Invite not found")
if employee.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Invite belongs to another Telegram account")
if employee.status != "invited":
raise HTTPException(status_code=409, detail="Invite is not active")
if employee.invite_revoked_at is not None:
raise HTTPException(status_code=409, detail="Invite was revoked")
if employee.invite_expires_at:
expires_at = employee.invite_expires_at
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=UTC)
else:
expires_at = None
if expires_at and expires_at <= datetime.now(UTC):
employee.status = "expired"
await log_audit(session, actor=current_user, action="service_employee.invite_expired", target_type="service_employee", target_id=employee.id)
await session.commit()
raise HTTPException(status_code=409, detail="Invite expired")
employee.status = "active"
employee.activated_at = datetime.now(UTC)
employee.invite_token = None
await log_audit(
session,
actor=current_user,
action="service_employee.invite_accept",
target_type="service_employee",
target_id=employee.id,
)
await session.commit()
await session.refresh(employee)
await attach_employee_user_fields(session, [employee])
return employee
@router.patch("/employees/{employee_id}", response_model=ServiceEmployeeRead)
async def update_service_employee(
employee_id: int,
payload: ServiceEmployeeUpdate,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceEmployee:
employee = await session.get(ServiceEmployee, employee_id)
if employee is None:
raise HTTPException(status_code=404, detail="Employee not found")
await ensure_service_center_approved(session, employee.service_center_id)
await ensure_service_employee(session, employee.service_center_id, current_user, {"owner", "manager"})
if employee.role == "owner" and payload.role and payload.role != "owner":
raise HTTPException(status_code=409, detail="Owner role cannot be changed")
if employee.role == "owner" and payload.status and payload.status != "active":
raise HTTPException(status_code=409, detail="Owner cannot be deactivated")
data = payload.model_dump(exclude_unset=True)
if "role" in data and data["role"] is not None:
employee.role = validate_employee_role(data["role"])
if "status" in data and data["status"] is not None:
employee.status = validate_employee_status(data["status"])
if employee.status != "invited":
employee.invite_token = None
employee.invite_revoked_at = datetime.now(UTC) if employee.status == "revoked" else employee.invite_revoked_at
if "permissions" in data:
employee.permissions = data["permissions"]
await log_audit(
session,
actor=current_user,
action="service_employee.update",
target_type="service_employee",
target_id=employee.id,
metadata=data,
)
await session.commit()
await session.refresh(employee)
await attach_employee_user_fields(session, [employee])
return employee
@router.post("/employees/{employee_id}/revoke-invite", response_model=ServiceEmployeeRead)
async def revoke_employee_invite(
employee_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceEmployee:
employee = await session.get(ServiceEmployee, employee_id)
if employee is None:
raise HTTPException(status_code=404, detail="Employee not found")
await ensure_service_employee(session, employee.service_center_id, current_user, {"owner", "manager"})
if employee.status != "invited":
raise HTTPException(status_code=409, detail="Only invited employees can be revoked")
employee.status = "revoked"
employee.invite_revoked_at = datetime.now(UTC)
employee.invite_token = None
await log_audit(
session,
actor=current_user,
action="service_employee.invite_revoke",
target_type="service_employee",
target_id=employee.id,
)
await session.commit()
await session.refresh(employee)
await attach_employee_user_fields(session, [employee])
return employee
@router.get("/{service_center_id}/visits", response_model=list[ServiceVisitRead])
async def service_center_visits(
service_center_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> list[ServiceVisit]:
await ensure_service_employee(session, service_center_id, current_user)
result = await session.execute(
select(ServiceVisit)
.where(ServiceVisit.service_center_id == service_center_id)
.order_by(ServiceVisit.visit_date.desc(), ServiceVisit.id.desc())
)
return list(result.scalars())
async def service_employee_or_none(
session: AsyncSession, service_center_id: int, user: User
) -> ServiceEmployee | None:
result = await session.execute(
select(ServiceEmployee).where(
ServiceEmployee.service_center_id == service_center_id,
ServiceEmployee.user_id == user.id,
ServiceEmployee.status == "active",
)
)
return result.scalar_one_or_none()
async def ensure_service_center_approved(session: AsyncSession, service_center_id: int) -> ServiceCenter:
center = await session.get(ServiceCenter, service_center_id)
if center is None:
raise HTTPException(status_code=404, detail="Service center not found")
if center.verification_status not in APPROVED_SERVICE_STATUSES:
raise HTTPException(status_code=403, detail="Service center is awaiting approval")
return center
async def ensure_center_vehicle_access(
session: AsyncSession, service_center_id: int, vehicle: Car, user: User
) -> None:
if vehicle.owner_id == user.id:
return
result = await session.execute(
select(CarServiceLink).where(
CarServiceLink.car_id == vehicle.id,
CarServiceLink.service_center_id == service_center_id,
CarServiceLink.status == "approved",
CarServiceLink.is_active.is_(True),
)
)
if result.scalar_one_or_none() is None:
raise HTTPException(status_code=403, detail="Vehicle access is not confirmed by owner")
@router.post("/{service_center_id}/visits", response_model=ServiceVisitRead, status_code=status.HTTP_201_CREATED)
async def create_visit(
service_center_id: int,
payload: ServiceVisitCreate,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceVisit:
employee = await ensure_service_employee(session, service_center_id, current_user, {"owner", "manager", "receptionist"})
vehicle = await session.get(Car, payload.vehicle_id)
if vehicle is None:
raise HTTPException(status_code=404, detail="Vehicle not found")
validate_odometer_change(vehicle, payload.odometer, source_record_type="service_visit")
await ensure_service_center_approved(session, service_center_id)
await ensure_center_vehicle_access(session, service_center_id, vehicle, current_user)
visit = ServiceVisit(
service_center_id=service_center_id,
vehicle_id=payload.vehicle_id,
owner_id=vehicle.owner_id,
created_by_employee_id=employee.id,
visit_date=payload.visit_date,
odometer=payload.odometer,
notes=payload.notes,
total_cost=payload.total_cost,
currency=payload.currency,
status="draft",
)
session.add(visit)
await log_audit(session, actor=current_user, action="service_visit.create", target_type="service_visit", metadata={"vehicle_id": payload.vehicle_id})
await session.commit()
await session.refresh(visit)
return visit
@router.post("/{service_center_id}/vehicle-access/request")
async def request_vehicle_access(
service_center_id: int,
payload: VehicleSearchRequest,
request: Request,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> VehicleSearchResult:
await check_rate_limit(scope="vehicle_access_request", limit=20, window_seconds=3600, request=request, user=current_user, session=session)
await ensure_service_employee(session, service_center_id, current_user, {"owner", "manager", "receptionist"})
await ensure_service_center_approved(session, service_center_id)
stmt = select(Car)
if payload.vin:
stmt = stmt.where(Car.vin_normalized == payload.vin)
elif payload.license_plate:
stmt = stmt.where(Car.license_plate_normalized == payload.license_plate)
if payload.country_code:
stmt = stmt.where(Car.license_plate_country == payload.country_code.upper())
else:
raise HTTPException(status_code=400, detail="license_plate or vin is required")
vehicle = (await session.execute(stmt.limit(1))).scalar_one_or_none()
await log_audit(
session,
actor=current_user,
action="vehicle_access.request",
target_type="vehicle",
target_id=vehicle.id if vehicle else None,
metadata={"service_center_id": service_center_id, "found": bool(vehicle)},
)
link = None
if vehicle is not None:
link = await upsert_service_link(
session,
car_id=vehicle.id,
service_center_id=service_center_id,
requested_by_user_id=current_user.id,
access_level="basic",
external_vehicle_ref=None,
status_value="pending",
)
await session.commit()
if vehicle is None:
return VehicleSearchResult(access_status="not_found")
return VehicleSearchResult(
vehicle_id=vehicle.id,
make=vehicle.make,
model=vehicle.model,
year=vehicle.year,
masked_license_plate=mask_license_plate(vehicle.license_plate_display or vehicle.plate_number),
masked_vin=mask_vin(vehicle.vin_normalized or vehicle.vin),
access_status="pending_owner_confirmation" if link else "request_logged",
)
@router.post("/{service_center_id}/vehicle-links/request", response_model=CarServiceLinkRead)
async def request_vehicle_link(
service_center_id: int,
payload: ServiceCenterAccessRequest,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> CarServiceLink:
await ensure_service_employee(session, service_center_id, current_user, {"owner", "manager", "receptionist"})
await ensure_service_center_approved(session, service_center_id)
vehicle = await session.get(Car, payload.car_id)
if vehicle is None:
raise HTTPException(status_code=404, detail="Vehicle not found")
link = await upsert_service_link(
session,
car_id=payload.car_id,
service_center_id=service_center_id,
requested_by_user_id=current_user.id,
access_level=payload.access_level,
external_vehicle_ref=payload.external_vehicle_ref,
status_value="pending",
)
await log_audit(
session,
actor=current_user,
action="car_service_link.request",
target_type="car_service_link",
target_id=link.id,
metadata={"car_id": payload.car_id, "service_center_id": service_center_id},
)
await session.commit()
await session.refresh(link)
return link
@router.post("/{service_center_id}/vehicle-links/owner-attach", response_model=CarServiceLinkRead)
async def owner_attach_vehicle_link(
service_center_id: int,
payload: ServiceCenterAccessRequest,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> CarServiceLink:
await ensure_service_center_approved(session, service_center_id)
vehicle = await session.get(Car, payload.car_id)
if vehicle is None:
raise HTTPException(status_code=404, detail="Vehicle not found")
if vehicle.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
link = await upsert_service_link(
session,
car_id=payload.car_id,
service_center_id=service_center_id,
requested_by_user_id=current_user.id,
access_level=payload.access_level,
external_vehicle_ref=payload.external_vehicle_ref,
status_value="approved",
)
link.approved_by_user_id = current_user.id
link.approved_at = datetime.now(UTC)
link.revoked_at = None
await log_audit(
session,
actor=current_user,
action="car_service_link.owner_attach",
target_type="car_service_link",
target_id=link.id,
metadata={"car_id": payload.car_id, "service_center_id": service_center_id},
)
await session.commit()
await session.refresh(link)
return link
@router.post("/links/{link_id}/approve", response_model=CarServiceLinkRead)
async def approve_vehicle_link(
link_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> CarServiceLink:
link = await session.get(CarServiceLink, link_id)
if link is None:
raise HTTPException(status_code=404, detail="Vehicle link not found")
vehicle = await session.get(Car, link.car_id)
if vehicle is None:
raise HTTPException(status_code=404, detail="Vehicle not found")
if vehicle.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
await ensure_service_center_approved(session, link.service_center_id)
link.status = "approved"
link.is_active = True
link.approved_by_user_id = current_user.id
link.approved_at = datetime.now(UTC)
link.revoked_at = None
await log_audit(session, actor=current_user, action="car_service_link.approve", target_type="car_service_link", target_id=link.id)
await session.commit()
await session.refresh(link)
return link
@router.post("/links/{link_id}/revoke", response_model=CarServiceLinkRead)
async def revoke_vehicle_link(
link_id: int,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> CarServiceLink:
link = await session.get(CarServiceLink, link_id)
if link is None:
raise HTTPException(status_code=404, detail="Vehicle link not found")
vehicle = await session.get(Car, link.car_id)
if vehicle is None:
raise HTTPException(status_code=404, detail="Vehicle not found")
if vehicle.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Forbidden")
link.status = "revoked"
link.is_active = False
link.revoked_at = datetime.now(UTC)
await log_audit(session, actor=current_user, action="car_service_link.revoke", target_type="car_service_link", target_id=link.id)
await session.commit()
await session.refresh(link)
return link
async def upsert_service_link(
session: AsyncSession,
*,
car_id: int,
service_center_id: int,
requested_by_user_id: int | None,
access_level: str,
external_vehicle_ref: str | None,
status_value: str,
) -> CarServiceLink:
result = await session.execute(
select(CarServiceLink).where(
CarServiceLink.car_id == car_id,
CarServiceLink.service_center_id == service_center_id,
)
)
link = result.scalar_one_or_none()
if link is None:
link = CarServiceLink(
car_id=car_id,
service_center_id=service_center_id,
requested_by_user_id=requested_by_user_id,
access_level=access_level,
external_vehicle_ref=external_vehicle_ref,
status=status_value,
is_active=status_value == "approved",
)
session.add(link)
await session.flush()
else:
link.requested_by_user_id = requested_by_user_id
link.access_level = access_level
link.external_vehicle_ref = external_vehicle_ref or link.external_vehicle_ref
link.status = status_value
link.is_active = status_value == "approved"
return link
@router.get("/{service_center_id}/reviews", response_model=list[ServiceCenterReviewRead])
async def service_center_reviews(
service_center_id: int,
sort: str = "new",
limit: int = 50,
offset: int = 0,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> list[ServiceCenterReview]:
await get_public_service_center(service_center_id, session, current_user)
limit = min(max(limit, 1), 200)
stmt = select(ServiceCenterReview).where(
ServiceCenterReview.service_center_id == service_center_id,
ServiceCenterReview.status == "published",
)
if sort == "high":
stmt = stmt.order_by(ServiceCenterReview.rating.desc(), ServiceCenterReview.created_at.desc())
elif sort == "low":
stmt = stmt.order_by(ServiceCenterReview.rating.asc(), ServiceCenterReview.created_at.desc())
else:
stmt = stmt.order_by(ServiceCenterReview.created_at.desc())
result = await session.execute(stmt.limit(limit).offset(max(offset, 0)))
return list(result.scalars())
@router.post("/{service_center_id}/reviews", response_model=ServiceCenterReviewRead, status_code=status.HTTP_201_CREATED)
async def create_service_center_review(
service_center_id: int,
payload: ServiceCenterReviewCreate,
request: Request,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenterReview:
await check_rate_limit(scope="service_review", limit=10, window_seconds=3600, request=request, user=current_user, session=session)
await ensure_service_center_approved(session, service_center_id)
result = await session.execute(
select(ServiceCenterReview).where(
ServiceCenterReview.service_center_id == service_center_id,
ServiceCenterReview.user_id == current_user.id,
)
)
review = result.scalar_one_or_none()
if review is None:
review = ServiceCenterReview(
service_center_id=service_center_id,
user_id=current_user.id,
**payload.model_dump(),
)
session.add(review)
else:
review.rating = payload.rating
review.text = payload.text
review.photo_urls = payload.photo_urls
review.status = "published"
await log_audit(session, actor=current_user, action="service_review.upsert", target_type="service_center", target_id=service_center_id)
await session.flush()
await refresh_service_rating(session, service_center_id)
await session.commit()
await session.refresh(review)
return review
@router.post("/reviews/{review_id}/comments", response_model=ServiceCenterReviewCommentRead, status_code=status.HTTP_201_CREATED)
async def create_review_comment(
review_id: int,
payload: ServiceCenterReviewCommentCreate,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenterReviewComment:
review = await session.get(ServiceCenterReview, review_id)
if review is None or review.status != "published":
raise HTTPException(status_code=404, detail="Review not found")
comment = ServiceCenterReviewComment(review_id=review_id, user_id=current_user.id, text=payload.text)
session.add(comment)
await log_audit(session, actor=current_user, action="service_review.comment", target_type="service_review", target_id=review_id)
await session.commit()
await session.refresh(comment)
return comment
@router.post("/reviews/{review_id}/respond", response_model=ServiceCenterReviewRead)
async def respond_to_review(
review_id: int,
payload: ServiceCenterReviewCommentCreate,
session: AsyncSession = Depends(get_session),
current_user: User = Depends(get_current_telegram_user),
) -> ServiceCenterReview:
review = await session.get(ServiceCenterReview, review_id)
if review is None:
raise HTTPException(status_code=404, detail="Review not found")
await ensure_service_employee(session, review.service_center_id, current_user, {"owner", "manager"})
review.service_response = payload.text
review.service_responded_at = datetime.now(UTC)
await log_audit(session, actor=current_user, action="service_review.respond", target_type="service_review", target_id=review_id)
await session.commit()
await session.refresh(review)
return review
async def refresh_service_rating(session: AsyncSession, service_center_id: int) -> None:
result = await session.execute(
select(func.avg(ServiceCenterReview.rating), func.count(ServiceCenterReview.id)).where(
ServiceCenterReview.service_center_id == service_center_id,
ServiceCenterReview.status == "published",
)
)
avg_rating, count = result.one()
center = await session.get(ServiceCenter, service_center_id)
if center is not None:
center.rating_avg = round(avg_rating, 2) if avg_rating is not None else None
center.reviews_count = int(count or 0)
@router.post("/links", response_model=CarServiceLinkRead, status_code=status.HTTP_201_CREATED)
async def link_car_to_service(
payload: CarServiceLinkCreate,
session: AsyncSession = Depends(get_session),
x_internal_api_token: str | None = Header(default=None, alias="X-Internal-API-Token"),
) -> CarServiceLink:
require_internal_api_token(x_internal_api_token)
if await session.get(Car, payload.car_id) is None:
raise HTTPException(status_code=404, detail="Car not found")
if await session.get(ServiceCenter, payload.service_center_id) is None:
raise HTTPException(status_code=404, detail="Service center not found")
link = CarServiceLink(**payload.model_dump(), status="approved", approved_at=datetime.now(UTC))
session.add(link)
await session.commit()
await session.refresh(link)
return link
@router.post("/inbox", response_model=ServiceInboxRead, status_code=status.HTTP_201_CREATED)
async def receive_service_message(
payload: ServiceInboxCreate,
session: AsyncSession = Depends(get_session),
x_internal_api_token: str | None = Header(default=None, alias="X-Internal-API-Token"),
) -> ServiceInboxMessage:
require_internal_api_token(x_internal_api_token)
service_center_id = payload.service_center_id
if not service_center_id and payload.source_chat_id:
result = await session.execute(
select(ServiceCenter).where(ServiceCenter.telegram_chat_id == payload.source_chat_id)
)
center = result.scalar_one_or_none()
service_center_id = center.id if center else None
message = ServiceInboxMessage(
source_chat_id=payload.source_chat_id,
raw_text=payload.raw_text,
car_id=payload.car_id,
service_center_id=service_center_id,
parsed_status="pending",
)
session.add(message)
await session.commit()
await session.refresh(message)
return message