"""Service for importing transactions from CSV files.""" from pathlib import Path from typing import List, Dict, Any, NamedTuple from sqlalchemy.orm import Session from sqlalchemy.exc import IntegrityError from app.parsers import FidelityParser from app.models import Transaction from app.utils import generate_transaction_hash class ImportResult(NamedTuple): """ Result of an import operation. Attributes: imported: Number of successfully imported transactions skipped: Number of skipped duplicate transactions errors: List of error messages total_rows: Total number of rows processed """ imported: int skipped: int errors: List[str] total_rows: int class ImportService: """ Service for importing transactions from brokerage CSV files. Handles parsing, deduplication, and database insertion. """ def __init__(self, db: Session): """ Initialize import service. Args: db: Database session """ self.db = db self.parser = FidelityParser() # Can be extended to support multiple parsers def import_from_file(self, file_path: Path, account_id: int) -> ImportResult: """ Import transactions from a CSV file. Args: file_path: Path to CSV file account_id: ID of the account to import transactions for Returns: ImportResult with statistics Raises: FileNotFoundError: If file doesn't exist ValueError: If file format is invalid """ # Parse CSV file parse_result = self.parser.parse(file_path) imported = 0 skipped = 0 errors = list(parse_result.errors) # Process each transaction for txn_data in parse_result.transactions: try: # Generate deduplication hash unique_hash = generate_transaction_hash( account_id=account_id, run_date=txn_data["run_date"], symbol=txn_data.get("symbol"), action=txn_data["action"], amount=txn_data.get("amount"), quantity=txn_data.get("quantity"), price=txn_data.get("price"), ) # Check if transaction already exists existing = ( self.db.query(Transaction) .filter(Transaction.unique_hash == unique_hash) .first() ) if existing: skipped += 1 continue # Create new transaction transaction = Transaction( account_id=account_id, unique_hash=unique_hash, **txn_data ) self.db.add(transaction) self.db.commit() imported += 1 except IntegrityError: # Duplicate hash (edge case if concurrent imports) self.db.rollback() skipped += 1 except Exception as e: self.db.rollback() errors.append(f"Failed to import transaction: {str(e)}") return ImportResult( imported=imported, skipped=skipped, errors=errors, total_rows=parse_result.row_count, ) def import_from_directory( self, directory: Path, account_id: int, pattern: str = "*.csv" ) -> Dict[str, ImportResult]: """ Import transactions from all CSV files in a directory. Args: directory: Path to directory containing CSV files account_id: ID of the account to import transactions for pattern: Glob pattern for matching files (default: *.csv) Returns: Dictionary mapping filename to ImportResult """ if not directory.exists() or not directory.is_dir(): raise ValueError(f"Invalid directory: {directory}") results = {} for file_path in directory.glob(pattern): try: result = self.import_from_file(file_path, account_id) results[file_path.name] = result except Exception as e: results[file_path.name] = ImportResult( imported=0, skipped=0, errors=[str(e)], total_rows=0, ) return results