Files
drivers_bot/tests/test_platform.py
2026-05-14 19:43:51 +09:00

168 lines
5.6 KiB
Python

from io import BytesIO
import pytest
@pytest.mark.asyncio
async def test_vin_validation_rejects_invalid_value(client, auth_headers) -> None:
response = await client.post(
"/api/my/vehicles",
headers=auth_headers,
json={"name": "Bad VIN", "vin": "IOQ123"},
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_service_visit_owner_confirmation_and_change_request(
client, auth_headers, admin_auth_headers, internal_headers
) -> None:
vehicle = (
await client.post(
"/api/my/vehicles",
headers=auth_headers,
json={"name": "Platform car", "license_plate": "12 가 3456", "license_plate_country": "KR"},
)
).json()
center = (
await client.post(
"/api/service-centers",
headers=auth_headers,
json={"display_name": "Careful Service", "country": "KR", "city": "Seoul"},
)
).json()
await client.post(
"/api/users",
headers=internal_headers,
json={"telegram_id": 9001, "platform_role": "admin"},
)
verify_response = await client.post(
f"/api/admin/service-centers/{center['id']}/verify",
headers=admin_auth_headers,
)
assert verify_response.status_code == 200
visit = (
await client.post(
f"/api/service-centers/{center['id']}/visits",
headers=auth_headers,
json={"vehicle_id": vehicle["id"], "visit_date": "2026-05-12", "odometer": 12345},
)
).json()
item_response = await client.post(
f"/api/service-visits/{visit['id']}/work-items",
headers=auth_headers,
json={"work_type": "oil_change", "title": "Engine oil", "oil_viscosity": "5W-30", "price": 100},
)
complete_response = await client.post(f"/api/service-visits/{visit['id']}/complete", headers=auth_headers)
confirm_response = await client.post(f"/api/service-visits/{visit['id']}/confirm", headers=auth_headers)
change_request = (
await client.post(
f"/api/service-visits/{visit['id']}/vehicle-change-requests",
headers=auth_headers,
json={"vehicle_id": vehicle["id"], "field_name": "vin", "new_value": "KMHCT41BAHU123456"},
)
).json()
approve_response = await client.post(
f"/api/vehicle-change-requests/{change_request['id']}/approve",
headers=auth_headers,
)
assert item_response.status_code == 201
assert complete_response.json()["status"] == "pending_owner_confirmation"
assert confirm_response.json()["status"] == "confirmed"
assert approve_response.json()["status"] == "approved"
@pytest.mark.asyncio
async def test_pending_service_center_cannot_create_visit(client, auth_headers) -> None:
vehicle = (
await client.post(
"/api/my/vehicles",
headers=auth_headers,
json={"name": "Client car"},
)
).json()
center = (
await client.post(
"/api/service-centers",
headers=auth_headers,
json={"display_name": "Pending Service", "country": "KR", "city": "Seoul"},
)
).json()
response = await client.post(
f"/api/service-centers/{center['id']}/visits",
headers=auth_headers,
json={"vehicle_id": vehicle["id"], "visit_date": "2026-05-12"},
)
assert response.status_code == 403
@pytest.mark.asyncio
async def test_public_service_center_and_review_flow(
client, auth_headers, admin_auth_headers, internal_headers
) -> None:
center = (
await client.post(
"/api/service-centers",
headers=auth_headers,
json={
"display_name": "Review Service",
"country": "KR",
"city": "Seoul",
"description": "Clean workshop",
"specializations": ["oil", "diagnostics"],
},
)
).json()
pending_list = await client.get("/api/service-centers/public", headers=auth_headers)
assert pending_list.json() == []
await client.post(
"/api/users",
headers=internal_headers,
json={"telegram_id": 9001, "platform_role": "admin"},
)
await client.post(f"/api/admin/service-centers/{center['id']}/verify", headers=admin_auth_headers)
public_list = await client.get("/api/service-centers/public", headers=auth_headers)
assert public_list.status_code == 200
assert public_list.json()[0]["display_name"] == "Review Service"
review = await client.post(
f"/api/service-centers/{center['id']}/reviews",
headers=auth_headers,
json={"rating": 5, "text": "Accurate and transparent service"},
)
assert review.status_code == 201
refreshed = await client.get(f"/api/service-centers/{center['id']}", headers=auth_headers)
assert refreshed.json()["reviews_count"] == 1
vehicle = (
await client.post(
"/api/my/vehicles",
headers=auth_headers,
json={"name": "Review car"},
)
).json()
link = await client.post(
f"/api/service-centers/{center['id']}/vehicle-links/owner-attach",
headers=auth_headers,
json={"car_id": vehicle["id"], "access_level": "basic"},
)
assert link.status_code == 200
assert link.json()["status"] == "approved"
@pytest.mark.asyncio
async def test_ocr_candidates_do_not_write_vehicle_data(client, auth_headers) -> None:
response = await client.post(
"/api/ocr/vin",
headers=auth_headers,
files={"file": ("vin.txt", BytesIO(b"VIN KMHCT41BAHU123456"), "text/plain")},
)
assert response.status_code == 200
assert response.json()["candidates"][0]["type"] == "vin"