import pytest from conftest import make_init_data from app.core.config import settings from app.services import admin_notifications async def ensure_admin(client, internal_headers) -> None: await client.post( "/api/users", headers=internal_headers, json={"telegram_id": 9001, "first_name": "Admin", "platform_role": "admin"}, ) async def ensure_analyst(client, internal_headers) -> dict[str, str]: await client.post( "/api/users", headers=internal_headers, json={"telegram_id": 7001, "first_name": "Analyst", "platform_role": "analyst"}, ) return {"X-Telegram-Init-Data": make_init_data(7001, "Analyst")} @pytest.mark.asyncio async def test_new_user_creates_admin_notification(client, admin_auth_headers, internal_headers) -> None: await ensure_admin(client, internal_headers) response = await client.post( "/api/users", headers=internal_headers, json={"telegram_id": 123456, "first_name": "Ivan", "username": "ivan"}, ) notifications = await client.get("/api/admin/notifications?limit=100", headers=admin_auth_headers) assert response.status_code == 200 assert any( item["event_type"] == "user_registered" and item["idempotency_key"] == "user_registered:123456" for item in notifications.json()["rows"] ) @pytest.mark.asyncio async def test_admin_notification_idempotency_for_user_registration( client, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) payload = {"telegram_id": 223344, "first_name": "Repeat"} await client.post("/api/users", headers=internal_headers, json=payload) await client.post("/api/users", headers=internal_headers, json=payload) notifications = await client.get("/api/admin/notifications?limit=100", headers=admin_auth_headers) matches = [ item for item in notifications.json()["rows"] if item["idempotency_key"] == "user_registered:223344" ] assert len(matches) == 1 @pytest.mark.asyncio async def test_telegram_admin_delivery_failure_does_not_break_user_flow( client, admin_auth_headers, internal_headers, monkeypatch ) -> None: class BrokenTelegramClient: def __init__(self, *args, **kwargs): # noqa: D107 pass async def __aenter__(self): return self async def __aexit__(self, *args): return None async def post(self, *args, **kwargs): # noqa: ARG002 raise RuntimeError("telegram down") monkeypatch.setattr(settings, "admin_telegram_ids", "777") monkeypatch.setattr(admin_notifications.httpx, "AsyncClient", BrokenTelegramClient) response = await client.post( "/api/users", headers=internal_headers, json={"telegram_id": 334455, "first_name": "Best Effort"}, ) await ensure_admin(client, internal_headers) notifications = await client.get("/api/admin/notifications?limit=100", headers=admin_auth_headers) assert response.status_code == 200 created = [ item for item in notifications.json()["rows"] if item["idempotency_key"] == "user_registered:334455" ][0] assert created["telegram_status"] == "failed" @pytest.mark.asyncio async def test_new_sto_application_creates_admin_notification( client, auth_headers, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) center = await client.post( "/api/service-centers", headers=auth_headers, json={ "display_name": "Auto Master", "country": "KR", "city": "Gwangju", "phone": "+82-10-0000-0000", "document_photo_urls": ["doc-a.jpg", "doc-b.jpg"], }, ) notifications = await client.get("/api/admin/notifications?limit=100", headers=admin_auth_headers) assert center.status_code == 201 assert any( item["event_type"] == "sto_application_created" and item["entity_id"] == str(center.json()["id"]) for item in notifications.json()["rows"] ) @pytest.mark.asyncio async def test_first_vehicle_and_first_record_create_admin_notifications( client, auth_headers, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) vehicle = ( await client.post( "/api/cars", headers=auth_headers, json={"name": "First admin-visible car", "current_odometer": 1500}, ) ).json() fuel = await client.post( "/api/fuel", headers=auth_headers, json={ "car_id": vehicle["id"], "entry_date": "2026-05-19", "odometer": 1510, "liters": 30, "price_per_liter": 2, }, ) notifications = await client.get("/api/admin/notifications?limit=100", headers=admin_auth_headers) events = {item["event_type"] for item in notifications.json()["rows"]} assert fuel.status_code == 201 assert "vehicle_created" in events assert "first_record_created" in events @pytest.mark.asyncio async def test_admin_dashboard_requires_admin_role(client, auth_headers, admin_auth_headers, internal_headers) -> None: forbidden = await client.get("/api/admin/dashboard", headers=auth_headers) await ensure_admin(client, internal_headers) allowed = await client.get("/api/admin/dashboard", headers=admin_auth_headers) assert forbidden.status_code == 403 assert allowed.status_code == 200 assert "users_total" in allowed.json() @pytest.mark.asyncio async def test_data_explorer_rejects_unknown_source_and_field( client, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) unknown_source = await client.post( "/api/admin/data/query", headers=admin_auth_headers, json={"source": "raw_sql", "limit": 25}, ) forbidden_field = await client.post( "/api/admin/data/query", headers=admin_auth_headers, json={"source": "users", "limit": 25, "sql": "select * from users"}, ) assert unknown_source.status_code == 400 assert forbidden_field.status_code == 422 @pytest.mark.asyncio async def test_data_explorer_masks_sensitive_data_and_applies_limit( client, internal_headers ) -> None: analyst_headers = await ensure_analyst(client, internal_headers) await client.post( "/api/users", headers=internal_headers, json={"telegram_id": 889900, "first_name": "Visible", "platform_role": "user"}, ) response = await client.post( "/api/admin/data/query", headers=analyst_headers, json={"source": "users", "limit": 1}, ) assert response.status_code == 200 rows = response.json()["rows"] assert len(rows) == 1 assert isinstance(rows[0]["telegram_id"], str) assert rows[0]["telegram_id"] != "889900" @pytest.mark.asyncio async def test_sensitive_data_requires_admin_and_reason( client, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) missing_reason = await client.post( "/api/admin/data/query", headers=admin_auth_headers, json={"source": "users", "include_sensitive": True, "limit": 25}, ) with_reason = await client.post( "/api/admin/data/query", headers=admin_auth_headers, json={ "source": "users", "include_sensitive": True, "reason": "support request", "telegram_id": 9001, "limit": 25, }, ) assert missing_reason.status_code == 400 assert with_reason.status_code == 200 assert with_reason.json()["rows"][0]["telegram_id"] == 9001 @pytest.mark.asyncio async def test_data_query_creates_audit_log(client, admin_auth_headers, internal_headers) -> None: await ensure_admin(client, internal_headers) await client.post( "/api/admin/data/query", headers=admin_auth_headers, json={"source": "users", "limit": 25}, ) audit = await client.get("/api/admin/audit-log?action=admin.data.query", headers=admin_auth_headers) assert audit.status_code == 200 assert any(item["action"] == "admin.data.query" for item in audit.json()) @pytest.mark.asyncio async def test_admin_data_update_requires_reason_and_audits( client, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) user = ( await client.post( "/api/users", headers=internal_headers, json={"telegram_id": 456789, "first_name": "Before"}, ) ).json() missing_reason = await client.patch( f"/api/admin/data/users/{user['id']}", headers=admin_auth_headers, json={"values": {"first_name": "After"}, "reason": ""}, ) updated = await client.patch( f"/api/admin/data/users/{user['id']}", headers=admin_auth_headers, json={"values": {"first_name": "After"}, "reason": "support correction"}, ) audit = await client.get("/api/admin/audit-log?action=admin.data.update", headers=admin_auth_headers) assert missing_reason.status_code == 400 assert updated.status_code == 200 assert updated.json()["row"]["first_name"] == "After" assert any(item["action"] == "admin.data.update" for item in audit.json()) @pytest.mark.asyncio async def test_admin_data_update_rejects_forbidden_field( client, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) user = ( await client.post( "/api/users", headers=internal_headers, json={"telegram_id": 556677, "first_name": "Nope"}, ) ).json() response = await client.patch( f"/api/admin/data/users/{user['id']}", headers=admin_auth_headers, json={"values": {"telegram_id": 1}, "reason": "support correction"}, ) assert response.status_code == 400 assert "Forbidden fields" in response.text @pytest.mark.asyncio async def test_admin_data_delete_soft_blocks_user( client, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) user = ( await client.post( "/api/users", headers=internal_headers, json={"telegram_id": 667788, "first_name": "Blocked soon"}, ) ).json() deleted = await client.request( "DELETE", f"/api/admin/data/users/{user['id']}", headers=admin_auth_headers, json={"reason": "support requested block"}, ) query = await client.post( "/api/admin/data/query", headers=admin_auth_headers, json={"source": "users", "user_id": user["id"], "limit": 25}, ) audit = await client.get("/api/admin/audit-log?action=admin.data.delete", headers=admin_auth_headers) assert deleted.status_code == 200 assert deleted.json()["mode"] == "soft" assert query.json()["rows"][0]["platform_role"] == "blocked" assert any(item["action"] == "admin.data.delete" for item in audit.json()) @pytest.mark.asyncio async def test_admin_data_delete_hard_deletes_fuel_entry( client, auth_headers, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) car = (await client.post("/api/cars", headers=auth_headers, json={"name": "Admin delete fuel"})).json() fuel = ( await client.post( "/api/fuel", headers=auth_headers, json={ "car_id": car["id"], "entry_date": "2026-05-18", "odometer": 1200, "liters": 35, "price_per_liter": 2, }, ) ).json() deleted = await client.request( "DELETE", f"/api/admin/data/fuel_entries/{fuel['id']}", headers=admin_auth_headers, json={"reason": "duplicate record cleanup"}, ) query = await client.post( "/api/admin/data/query", headers=admin_auth_headers, json={"source": "fuel_entries", "vehicle_id": car["id"], "limit": 25}, ) assert deleted.status_code == 200 assert deleted.json()["mode"] == "hard" assert query.json()["rows"] == [] @pytest.mark.asyncio async def test_pending_sto_queue_and_approve_audit( client, auth_headers, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) center = ( await client.post( "/api/service-centers", headers=auth_headers, json={"display_name": "Pending Admin Queue", "country": "KR", "city": "Seoul"}, ) ).json() pending = await client.get("/api/admin/sto-applications", headers=admin_auth_headers) approved = await client.post( f"/api/admin/sto-applications/{center['id']}/approve", headers=admin_auth_headers, json={"comment": "ok"}, ) audit = await client.get("/api/admin/audit-log?action=service_center.verify", headers=admin_auth_headers) assert center["id"] in [item["id"] for item in pending.json()["rows"]] assert approved.status_code == 200 assert approved.json()["verification_status"] == "approved" assert any(item["action"] == "service_center.verify" for item in audit.json()) @pytest.mark.asyncio async def test_blocked_ocr_upload_creates_admin_notification( client, auth_headers, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) response = await client.post( "/api/ocr/vin", headers=auth_headers, files={"file": ("invoice.exe", b"not an image", "image/jpeg")}, ) notifications = await client.get("/api/admin/notifications?limit=100", headers=admin_auth_headers) assert response.status_code == 415 assert any(item["event_type"] == "upload_blocked" for item in notifications.json()["rows"]) @pytest.mark.asyncio async def test_ocr_preview_is_available_in_admin_data_explorer( client, auth_headers, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) response = await client.post( "/api/ocr/vin", headers=auth_headers, files={"file": ("vin.txt", b"VIN KMHCT41BAHU123456", "text/plain")}, ) query = await client.post( "/api/admin/data/query", headers=admin_auth_headers, json={"source": "ocr_results", "category": "vin", "limit": 25}, ) assert response.status_code == 200 assert query.status_code == 200 assert query.json()["rows"][0]["scope"] == "vin" assert query.json()["rows"][0]["status"] == "preview" @pytest.mark.asyncio async def test_rate_limit_creates_admin_notification( client, auth_headers, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) last_response = None for index in range(9): last_response = await client.post( "/api/ocr/vin", headers=auth_headers, files={"file": (f"vin-{index}.txt", b"VIN KMHCT41BAHU123456", "text/plain")}, ) notifications = await client.get("/api/admin/notifications?limit=100", headers=admin_auth_headers) assert last_response is not None assert last_response.status_code == 429 assert any(item["event_type"] == "rate_limit_exceeded" for item in notifications.json()["rows"]) @pytest.mark.asyncio async def test_admin_can_retry_notification_queues( client, admin_auth_headers, internal_headers ) -> None: await ensure_admin(client, internal_headers) response = await client.post("/api/admin/notifications/retry", headers=admin_auth_headers) audit = await client.get("/api/admin/audit-log?action=admin.notifications.retry", headers=admin_auth_headers) assert response.status_code == 200 assert {"service_delivered", "admin_delivered", "limit"} <= response.json().keys() assert any(item["action"] == "admin.notifications.retry" for item in audit.json())