Files
drivers_bot/tests/test_admin_control_center.py
2026-05-18 18:17:53 +09:00

301 lines
10 KiB
Python

import pytest
from conftest import make_init_data
from app.core.config import settings
from app.services import admin_notifications
async def ensure_admin(client, internal_headers) -> None:
await client.post(
"/api/users",
headers=internal_headers,
json={"telegram_id": 9001, "first_name": "Admin", "platform_role": "admin"},
)
async def ensure_analyst(client, internal_headers) -> dict[str, str]:
await client.post(
"/api/users",
headers=internal_headers,
json={"telegram_id": 7001, "first_name": "Analyst", "platform_role": "analyst"},
)
return {"X-Telegram-Init-Data": make_init_data(7001, "Analyst")}
@pytest.mark.asyncio
async def test_new_user_creates_admin_notification(client, admin_auth_headers, internal_headers) -> None:
await ensure_admin(client, internal_headers)
response = await client.post(
"/api/users",
headers=internal_headers,
json={"telegram_id": 123456, "first_name": "Ivan", "username": "ivan"},
)
notifications = await client.get("/api/admin/notifications?limit=100", headers=admin_auth_headers)
assert response.status_code == 200
assert any(
item["event_type"] == "user_registered" and item["idempotency_key"] == "user_registered:123456"
for item in notifications.json()["rows"]
)
@pytest.mark.asyncio
async def test_admin_notification_idempotency_for_user_registration(
client, admin_auth_headers, internal_headers
) -> None:
await ensure_admin(client, internal_headers)
payload = {"telegram_id": 223344, "first_name": "Repeat"}
await client.post("/api/users", headers=internal_headers, json=payload)
await client.post("/api/users", headers=internal_headers, json=payload)
notifications = await client.get("/api/admin/notifications?limit=100", headers=admin_auth_headers)
matches = [
item
for item in notifications.json()["rows"]
if item["idempotency_key"] == "user_registered:223344"
]
assert len(matches) == 1
@pytest.mark.asyncio
async def test_telegram_admin_delivery_failure_does_not_break_user_flow(
client, admin_auth_headers, internal_headers, monkeypatch
) -> None:
class BrokenTelegramClient:
def __init__(self, *args, **kwargs): # noqa: D107
pass
async def __aenter__(self):
return self
async def __aexit__(self, *args):
return None
async def post(self, *args, **kwargs): # noqa: ARG002
raise RuntimeError("telegram down")
monkeypatch.setattr(settings, "admin_telegram_ids", "777")
monkeypatch.setattr(admin_notifications.httpx, "AsyncClient", BrokenTelegramClient)
response = await client.post(
"/api/users",
headers=internal_headers,
json={"telegram_id": 334455, "first_name": "Best Effort"},
)
await ensure_admin(client, internal_headers)
notifications = await client.get("/api/admin/notifications?limit=100", headers=admin_auth_headers)
assert response.status_code == 200
created = [
item
for item in notifications.json()["rows"]
if item["idempotency_key"] == "user_registered:334455"
][0]
assert created["telegram_status"] == "failed"
@pytest.mark.asyncio
async def test_new_sto_application_creates_admin_notification(
client, auth_headers, admin_auth_headers, internal_headers
) -> None:
await ensure_admin(client, internal_headers)
center = await client.post(
"/api/service-centers",
headers=auth_headers,
json={
"display_name": "Auto Master",
"country": "KR",
"city": "Gwangju",
"phone": "+82-10-0000-0000",
"document_photo_urls": ["doc-a.jpg", "doc-b.jpg"],
},
)
notifications = await client.get("/api/admin/notifications?limit=100", headers=admin_auth_headers)
assert center.status_code == 201
assert any(
item["event_type"] == "sto_application_created"
and item["entity_id"] == str(center.json()["id"])
for item in notifications.json()["rows"]
)
@pytest.mark.asyncio
async def test_admin_dashboard_requires_admin_role(client, auth_headers, admin_auth_headers, internal_headers) -> None:
forbidden = await client.get("/api/admin/dashboard", headers=auth_headers)
await ensure_admin(client, internal_headers)
allowed = await client.get("/api/admin/dashboard", headers=admin_auth_headers)
assert forbidden.status_code == 403
assert allowed.status_code == 200
assert "users_total" in allowed.json()
@pytest.mark.asyncio
async def test_data_explorer_rejects_unknown_source_and_field(
client, admin_auth_headers, internal_headers
) -> None:
await ensure_admin(client, internal_headers)
unknown_source = await client.post(
"/api/admin/data/query",
headers=admin_auth_headers,
json={"source": "raw_sql", "limit": 25},
)
forbidden_field = await client.post(
"/api/admin/data/query",
headers=admin_auth_headers,
json={"source": "users", "limit": 25, "sql": "select * from users"},
)
assert unknown_source.status_code == 400
assert forbidden_field.status_code == 422
@pytest.mark.asyncio
async def test_data_explorer_masks_sensitive_data_and_applies_limit(
client, internal_headers
) -> None:
analyst_headers = await ensure_analyst(client, internal_headers)
await client.post(
"/api/users",
headers=internal_headers,
json={"telegram_id": 889900, "first_name": "Visible", "platform_role": "user"},
)
response = await client.post(
"/api/admin/data/query",
headers=analyst_headers,
json={"source": "users", "limit": 1},
)
assert response.status_code == 200
rows = response.json()["rows"]
assert len(rows) == 1
assert isinstance(rows[0]["telegram_id"], str)
assert rows[0]["telegram_id"] != "889900"
@pytest.mark.asyncio
async def test_sensitive_data_requires_admin_and_reason(
client, admin_auth_headers, internal_headers
) -> None:
await ensure_admin(client, internal_headers)
missing_reason = await client.post(
"/api/admin/data/query",
headers=admin_auth_headers,
json={"source": "users", "include_sensitive": True, "limit": 25},
)
with_reason = await client.post(
"/api/admin/data/query",
headers=admin_auth_headers,
json={
"source": "users",
"include_sensitive": True,
"reason": "support request",
"telegram_id": 9001,
"limit": 25,
},
)
assert missing_reason.status_code == 400
assert with_reason.status_code == 200
assert with_reason.json()["rows"][0]["telegram_id"] == 9001
@pytest.mark.asyncio
async def test_data_query_creates_audit_log(client, admin_auth_headers, internal_headers) -> None:
await ensure_admin(client, internal_headers)
await client.post(
"/api/admin/data/query",
headers=admin_auth_headers,
json={"source": "users", "limit": 25},
)
audit = await client.get("/api/admin/audit-log?action=admin.data.query", headers=admin_auth_headers)
assert audit.status_code == 200
assert any(item["action"] == "admin.data.query" for item in audit.json())
@pytest.mark.asyncio
async def test_pending_sto_queue_and_approve_audit(
client, auth_headers, admin_auth_headers, internal_headers
) -> None:
await ensure_admin(client, internal_headers)
center = (
await client.post(
"/api/service-centers",
headers=auth_headers,
json={"display_name": "Pending Admin Queue", "country": "KR", "city": "Seoul"},
)
).json()
pending = await client.get("/api/admin/sto-applications", headers=admin_auth_headers)
approved = await client.post(
f"/api/admin/sto-applications/{center['id']}/approve",
headers=admin_auth_headers,
json={"comment": "ok"},
)
audit = await client.get("/api/admin/audit-log?action=service_center.verify", headers=admin_auth_headers)
assert center["id"] in [item["id"] for item in pending.json()["rows"]]
assert approved.status_code == 200
assert approved.json()["verification_status"] == "approved"
assert any(item["action"] == "service_center.verify" for item in audit.json())
@pytest.mark.asyncio
async def test_blocked_ocr_upload_creates_admin_notification(
client, auth_headers, admin_auth_headers, internal_headers
) -> None:
await ensure_admin(client, internal_headers)
response = await client.post(
"/api/ocr/vin",
headers=auth_headers,
files={"file": ("invoice.exe", b"not an image", "image/jpeg")},
)
notifications = await client.get("/api/admin/notifications?limit=100", headers=admin_auth_headers)
assert response.status_code == 415
assert any(item["event_type"] == "upload_blocked" for item in notifications.json()["rows"])
@pytest.mark.asyncio
async def test_rate_limit_creates_admin_notification(
client, auth_headers, admin_auth_headers, internal_headers
) -> None:
await ensure_admin(client, internal_headers)
last_response = None
for index in range(9):
last_response = await client.post(
"/api/ocr/vin",
headers=auth_headers,
files={"file": (f"vin-{index}.txt", b"VIN KMHCT41BAHU123456", "text/plain")},
)
notifications = await client.get("/api/admin/notifications?limit=100", headers=admin_auth_headers)
assert last_response is not None
assert last_response.status_code == 429
assert any(item["event_type"] == "rate_limit_exceeded" for item in notifications.json()["rows"])
@pytest.mark.asyncio
async def test_admin_can_retry_notification_queues(
client, admin_auth_headers, internal_headers
) -> None:
await ensure_admin(client, internal_headers)
response = await client.post("/api/admin/notifications/retry", headers=admin_auth_headers)
audit = await client.get("/api/admin/audit-log?action=admin.notifications.retry", headers=admin_auth_headers)
assert response.status_code == 200
assert {"service_delivered", "admin_delivered", "limit"} <= response.json().keys()
assert any(item["action"] == "admin.notifications.retry" for item in audit.json())