""" Unit & Integration Tests for MVP Focus: Authorization, HMAC, JWT, RBAC, Financial Operations """ import pytest from fastapi.testclient import TestClient from sqlalchemy.orm import Session from datetime import datetime, timedelta from decimal import Decimal import json from app.main import app from app.security.jwt_manager import jwt_manager, TokenType from app.security.hmac_manager import hmac_manager from app.security.rbac import RBACEngine, MemberRole, Permission, UserContext from app.db.models import User, Family, FamilyMember, Wallet, Transaction # ========== JWT TESTS ========== class TestJWTManager: """JWT token generation and verification""" def test_create_access_token(self): """Test access token creation""" user_id = 123 token = jwt_manager.create_access_token(user_id=user_id) assert token assert isinstance(token, str) # Verify token payload = jwt_manager.verify_token(token) assert payload.sub == user_id assert payload.type == TokenType.ACCESS.value def test_token_expiration(self): """Test expired token rejection""" user_id = 123 # Create token with instant expiry token = jwt_manager.create_access_token( user_id=user_id, expires_delta=timedelta(seconds=-1) # Already expired ) with pytest.raises(ValueError): jwt_manager.verify_token(token) def test_create_refresh_token(self): """Test refresh token creation and type""" user_id = 123 token = jwt_manager.create_refresh_token(user_id=user_id) payload = jwt_manager.verify_token(token) assert payload.type == TokenType.REFRESH.value def test_service_token(self): """Test service-to-service token""" token = jwt_manager.create_service_token(service_name="telegram_bot") payload = jwt_manager.verify_token(token) assert payload.type == TokenType.SERVICE.value assert "telegram_bot" in payload.sub # ========== HMAC TESTS ========== class TestHMACManager: """HMAC signature verification""" def test_create_signature(self): """Test HMAC signature creation""" timestamp = int(datetime.utcnow().timestamp()) body = {"amount": 50.00, "category_id": 5} sig1 = hmac_manager.create_signature( method="POST", endpoint="/api/v1/transactions", timestamp=timestamp, body=body, ) # Same inputs should produce same signature sig2 = hmac_manager.create_signature( method="POST", endpoint="/api/v1/transactions", timestamp=timestamp, body=body, ) assert sig1 == sig2 def test_signature_mismatch(self): """Test signature verification with wrong secret""" timestamp = int(datetime.utcnow().timestamp()) body = {"amount": 50.00} sig_correct = hmac_manager.create_signature( method="POST", endpoint="/api/v1/transactions", timestamp=timestamp, body=body, ) # Wrong secret should fail is_valid, _ = hmac_manager.verify_signature( method="POST", endpoint="/api/v1/transactions", timestamp=timestamp, signature=sig_correct + "wrong", # Corrupt signature body=body, ) assert not is_valid def test_timestamp_tolerance(self): """Test timestamp freshness checking""" # Very old timestamp old_timestamp = int((datetime.utcnow() - timedelta(minutes=5)).timestamp()) is_valid, error = hmac_manager.verify_signature( method="GET", endpoint="/api/v1/wallets", timestamp=old_timestamp, signature="dummy", ) assert not is_valid assert "too old" in error.lower() # ========== RBAC TESTS ========== class TestRBACEngine: """Role-Based Access Control""" def test_owner_permissions(self): """Owner should have all permissions""" perms = RBACEngine.get_permissions(MemberRole.OWNER) assert Permission.CREATE_TRANSACTION in perms assert Permission.EDIT_ANY_TRANSACTION in perms assert Permission.DELETE_FAMILY in perms assert Permission.APPROVE_TRANSACTION in perms def test_member_permissions(self): """Member should have limited permissions""" perms = RBACEngine.get_permissions(MemberRole.MEMBER) assert Permission.CREATE_TRANSACTION in perms assert Permission.EDIT_OWN_TRANSACTION in perms assert Permission.DELETE_ANY_TRANSACTION not in perms assert Permission.DELETE_FAMILY not in perms def test_child_permissions(self): """Child should have very limited permissions""" perms = RBACEngine.get_permissions(MemberRole.CHILD) assert Permission.CREATE_TRANSACTION in perms assert Permission.VIEW_WALLET_BALANCE in perms assert Permission.EDIT_BUDGET not in perms assert Permission.DELETE_FAMILY not in perms def test_permission_check(self): """Test permission verification""" owner_context = UserContext( user_id=1, family_id=1, role=MemberRole.OWNER, permissions=RBACEngine.get_permissions(MemberRole.OWNER), family_ids=[1], ) # Owner should pass all checks assert RBACEngine.check_permission( owner_context, Permission.DELETE_FAMILY, raise_exception=False ) # Member should fail delete check member_context = UserContext( user_id=2, family_id=1, role=MemberRole.MEMBER, permissions=RBACEngine.get_permissions(MemberRole.MEMBER), family_ids=[1], ) assert not RBACEngine.check_permission( member_context, Permission.DELETE_FAMILY, raise_exception=False ) def test_family_access_control(self): """Test family isolation""" user_context = UserContext( user_id=1, family_id=1, role=MemberRole.MEMBER, permissions=RBACEngine.get_permissions(MemberRole.MEMBER), family_ids=[1, 3], # Can access families 1 and 3 ) # Can access family 1 assert RBACEngine.check_family_access(user_context, 1, raise_exception=False) # Cannot access family 2 assert not RBACEngine.check_family_access(user_context, 2, raise_exception=False) # ========== API ENDPOINT TESTS ========== class TestTransactionAPI: """Transaction creation and management API""" @pytest.fixture def client(self): """FastAPI test client""" return TestClient(app) @pytest.fixture def valid_token(self): """Valid JWT token for testing""" return jwt_manager.create_access_token(user_id=1) def test_create_transaction_unauthorized(self, client): """Request without token should fail""" response = client.post( "/api/v1/transactions", json={ "family_id": 1, "from_wallet_id": 10, "to_wallet_id": 11, "amount": 50.00, "description": "Test", } ) assert response.status_code == 401 def test_create_transaction_with_auth(self, client, valid_token): """Request with valid token should pass auth""" response = client.post( "/api/v1/transactions", json={ "family_id": 1, "from_wallet_id": 10, "to_wallet_id": 11, "amount": 50.00, "description": "Test", }, headers={"Authorization": f"Bearer {valid_token}"} ) # Should not be 401 (unauthorized) # May be 400/403 for validation, but not 401 assert response.status_code != 401 def test_create_large_transaction_requires_approval(self, client, valid_token): """Transaction > $500 should require approval""" # This test needs actual DB setup # Placeholder for integration test pass # ========== DATABASE TESTS ========== class TestDatabaseTransaction: """Database-level transaction tests""" def test_transaction_creates_event_log(self, db: Session): """Creating transaction should log event""" # Setup: Create user, family, wallets user = User(telegram_id=123, username="test", is_active=True) family = Family(owner_id=1, name="Test Family", currency="USD") wallet1 = Wallet(family_id=1, name="Cash", balance=Decimal("100")) wallet2 = Wallet(family_id=1, name="Bank", balance=Decimal("200")) db.add_all([user, family, wallet1, wallet2]) db.flush() # Create transaction tx = Transaction( family_id=1, created_by_id=user.id, from_wallet_id=wallet1.id, to_wallet_id=wallet2.id, amount=Decimal("50"), status="executed", created_at=datetime.utcnow(), ) db.add(tx) db.commit() # Verify balances updated db.refresh(wallet1) db.refresh(wallet2) assert wallet1.balance == Decimal("50") assert wallet2.balance == Decimal("250") def test_transaction_reversal(self, db: Session): """Test reversal creates compensation transaction""" # Setup similar to above # Create transaction # Create reverse transaction # Verify balances return to original pass # ========== SECURITY TESTS ========== class TestSecurityHeaders: """Test security headers in responses""" @pytest.fixture def client(self): return TestClient(app) def test_security_headers_present(self, client): """All responses should have security headers""" response = client.get("/health") assert "X-Content-Type-Options" in response.headers assert response.headers["X-Content-Type-Options"] == "nosniff" assert "X-Frame-Options" in response.headers assert response.headers["X-Frame-Options"] == "DENY" assert "Strict-Transport-Security" in response.headers if __name__ == "__main__": pytest.main([__file__, "-v"])