""" Transaction Service - Core business logic Handles transaction creation, approval, reversal with audit trail """ from datetime import datetime from typing import Optional, Dict, Any from decimal import Decimal import logging from sqlalchemy.orm import Session from app.db.models import Transaction, Account, Family, User from app.security.rbac import RBACEngine, Permission, UserContext logger = logging.getLogger(__name__) class TransactionService: """ Manages financial transactions with approval workflow and audit. Features: - Draft → Pending Approval → Executed workflow - Reversal (compensation transactions) - Event logging for all changes - Family-level isolation """ # Configuration APPROVAL_THRESHOLD = Decimal("500") # Amount requiring approval def __init__(self, db: Session): self.db = db async def create_transaction( self, user_context: UserContext, family_id: int, from_wallet_id: Optional[int], to_wallet_id: Optional[int], amount: Decimal, category_id: Optional[int], description: str, requires_approval: bool = False, ) -> Dict[str, Any]: """ Create new transaction with approval workflow. Returns: { "id": 123, "status": "draft" | "executed", "confirmation_required": false, "created_at": "2023-12-10T12:30:00Z" } Raises: - PermissionError: User lacks permission - ValueError: Invalid wallets/amounts - Exception: Database error """ # Permission check RBACEngine.check_permission(user_context, Permission.CREATE_TRANSACTION) RBACEngine.check_family_access(user_context, family_id) # Validate wallets belong to family wallets = self._validate_wallets(family_id, from_wallet_id, to_wallet_id) from_wallet, to_wallet = wallets # Determine if approval required needs_approval = requires_approval or (amount > self.APPROVAL_THRESHOLD and user_context.role.value != "owner") # Create transaction tx_status = "pending_approval" if needs_approval else "executed" transaction = Transaction( family_id=family_id, created_by_id=user_context.user_id, from_wallet_id=from_wallet_id, to_wallet_id=to_wallet_id, amount=amount, category_id=category_id, description=description, status=tx_status, confirmation_required=needs_approval, created_at=datetime.utcnow(), ) self.db.add(transaction) self.db.flush() # Get ID without commit # Update wallet balances if executed immediately if not needs_approval: self._execute_transaction(transaction, from_wallet, to_wallet) transaction.executed_at = datetime.utcnow() # Log event await self._log_event( family_id=family_id, entity_type="transaction", entity_id=transaction.id, action="create", actor_id=user_context.user_id, new_values={ "id": transaction.id, "amount": str(amount), "status": tx_status, }, ip_address=getattr(user_context, "ip_address", None), ) self.db.commit() return { "id": transaction.id, "status": tx_status, "amount": str(amount), "confirmation_required": needs_approval, "created_at": transaction.created_at.isoformat() + "Z", } async def confirm_transaction( self, user_context: UserContext, transaction_id: int, confirmation_token: Optional[str] = None, ) -> Dict[str, Any]: """ Approve pending transaction for execution. Only owner or approver can confirm. """ # Load transaction tx = self.db.query(Transaction).filter( Transaction.id == transaction_id, Transaction.family_id == user_context.family_id, ).first() if not tx: raise ValueError(f"Transaction not found: {transaction_id}") if tx.status != "pending_approval": raise ValueError(f"Transaction status is {tx.status}, cannot approve") # Permission check RBACEngine.check_permission(user_context, Permission.APPROVE_TRANSACTION) # Execute transaction from_wallet = self.db.query(Wallet).get(tx.from_wallet_id) if tx.from_wallet_id else None to_wallet = self.db.query(Wallet).get(tx.to_wallet_id) if tx.to_wallet_id else None self._execute_transaction(tx, from_wallet, to_wallet) # Update transaction status tx.status = "executed" tx.executed_at = datetime.utcnow() tx.approved_by_id = user_context.user_id tx.approved_at = datetime.utcnow() tx.confirmation_token = None # Log event await self._log_event( family_id=user_context.family_id, entity_type="transaction", entity_id=tx.id, action="execute", actor_id=user_context.user_id, new_values={ "status": "executed", "approved_by": user_context.user_id, }, ) self.db.commit() return { "id": tx.id, "status": "executed", "executed_at": tx.executed_at.isoformat() + "Z", } async def reverse_transaction( self, user_context: UserContext, transaction_id: int, reason: str = None, ) -> Dict[str, Any]: """ Reverse (cancel) executed transaction by creating compensation transaction. Original transaction status changes to "reversed". New negative transaction created to compensate. """ # Load transaction tx = self.db.query(Transaction).filter( Transaction.id == transaction_id, Transaction.family_id == user_context.family_id, ).first() if not tx: raise ValueError(f"Transaction not found: {transaction_id}") # Only creator or owner can reverse is_owner = user_context.role.value == "owner" is_creator = tx.created_by_id == user_context.user_id if not (is_owner or is_creator): raise PermissionError("Only creator or owner can reverse transaction") # Create compensation transaction reverse_tx = Transaction( family_id=user_context.family_id, created_by_id=user_context.user_id, from_wallet_id=tx.to_wallet_id, to_wallet_id=tx.from_wallet_id, amount=tx.amount, category_id=tx.category_id, description=f"Reversal of transaction #{tx.id}: {reason or 'No reason provided'}", status="executed", original_transaction_id=tx.id, executed_at=datetime.utcnow(), created_at=datetime.utcnow(), ) # Execute reverse transaction from_wallet = self.db.query(Wallet).get(reverse_tx.from_wallet_id) to_wallet = self.db.query(Wallet).get(reverse_tx.to_wallet_id) self._execute_transaction(reverse_tx, from_wallet, to_wallet) # Mark original as reversed tx.status = "reversed" tx.reversed_at = datetime.utcnow() tx.reversal_reason = reason self.db.add(reverse_tx) # Log events await self._log_event( family_id=user_context.family_id, entity_type="transaction", entity_id=tx.id, action="reverse", actor_id=user_context.user_id, reason=reason, new_values={"status": "reversed", "reversed_at": datetime.utcnow().isoformat()}, ) await self._log_event( family_id=user_context.family_id, entity_type="transaction", entity_id=reverse_tx.id, action="create", actor_id=user_context.user_id, new_values={ "original_transaction_id": tx.id, "status": "executed", }, ) self.db.commit() return { "original_transaction_id": tx.id, "reversal_transaction_id": reverse_tx.id, "reversed_at": tx.reversed_at.isoformat() + "Z", } def _validate_wallets( self, family_id: int, from_wallet_id: Optional[int], to_wallet_id: Optional[int], ) -> tuple: """Validate wallets exist and belong to family""" from_wallet = None to_wallet = None if from_wallet_id: from_wallet = self.db.query(Wallet).filter( Wallet.id == from_wallet_id, Wallet.family_id == family_id, ).first() if not from_wallet: raise ValueError(f"Wallet not found: {from_wallet_id}") if to_wallet_id: to_wallet = self.db.query(Wallet).filter( Wallet.id == to_wallet_id, Wallet.family_id == family_id, ).first() if not to_wallet: raise ValueError(f"Wallet not found: {to_wallet_id}") return from_wallet, to_wallet def _execute_transaction( self, transaction: Transaction, from_wallet: Optional[Wallet], to_wallet: Optional[Wallet], ): """Execute transaction (update wallet balances)""" if from_wallet: from_wallet.balance -= transaction.amount if to_wallet: to_wallet.balance += transaction.amount async def _log_event( self, family_id: int, entity_type: str, entity_id: int, action: str, actor_id: int, new_values: Dict[str, Any] = None, old_values: Dict[str, Any] = None, ip_address: str = None, reason: str = None, ): """Log event to audit trail""" event = EventLog( family_id=family_id, entity_type=entity_type, entity_id=entity_id, action=action, actor_id=actor_id, old_values=old_values, new_values=new_values, ip_address=ip_address, reason=reason, created_at=datetime.utcnow(), ) self.db.add(event)