"""Service for tracking and calculating trading positions.""" from sqlalchemy.orm import Session from sqlalchemy import and_ from typing import List, Optional, Dict from decimal import Decimal from collections import defaultdict from datetime import datetime import re from app.models import Transaction, Position, PositionTransaction from app.models.position import PositionType, PositionStatus from app.utils import parse_option_symbol class PositionTracker: """ Service for tracking trading positions from transactions. Matches opening and closing transactions using FIFO (First-In-First-Out) method. Handles stocks, calls, and puts including complex scenarios like assignments and expirations. """ def __init__(self, db: Session): """ Initialize position tracker. Args: db: Database session """ self.db = db def rebuild_positions(self, account_id: int) -> int: """ Rebuild all positions for an account from transactions. Deletes existing positions and recalculates from scratch. Args: account_id: Account ID to rebuild positions for Returns: Number of positions created """ # Delete existing positions self.db.query(Position).filter(Position.account_id == account_id).delete() self.db.commit() # Get all transactions ordered by date transactions = ( self.db.query(Transaction) .filter(Transaction.account_id == account_id) .order_by(Transaction.run_date, Transaction.id) .all() ) # Group transactions by symbol and option details # For options, we need to group by the full option contract (symbol + strike + expiration) # For stocks, we group by symbol only symbol_txns = defaultdict(list) for txn in transactions: if txn.symbol: # Create a unique grouping key grouping_key = self._get_grouping_key(txn) symbol_txns[grouping_key].append(txn) # Process each symbol/contract group position_count = 0 for grouping_key, txns in symbol_txns.items(): positions = self._process_symbol_transactions(account_id, grouping_key, txns) position_count += len(positions) self.db.commit() return position_count def _process_symbol_transactions( self, account_id: int, symbol: str, transactions: List[Transaction] ) -> List[Position]: """ Process all transactions for a single symbol to create positions. Args: account_id: Account ID symbol: Trading symbol transactions: List of transactions for this symbol Returns: List of created Position objects """ positions = [] # Determine position type from first transaction position_type = self._determine_position_type_from_txn(transactions[0]) if transactions else PositionType.STOCK # Track open positions using FIFO open_positions: List[Dict] = [] for txn in transactions: action = txn.action.upper() # Determine if this is an opening or closing transaction if self._is_opening_transaction(action): # Create new open position open_pos = { "transactions": [txn], "quantity": abs(txn.quantity) if txn.quantity else Decimal("0"), "entry_price": txn.price, "open_date": txn.run_date, "is_short": "SELL" in action or "SOLD" in action, } open_positions.append(open_pos) elif self._is_closing_transaction(action): # Close positions using FIFO close_quantity = abs(txn.quantity) if txn.quantity else Decimal("0") remaining_to_close = close_quantity while remaining_to_close > 0 and open_positions: open_pos = open_positions[0] open_qty = open_pos["quantity"] if open_qty <= remaining_to_close: # Close entire position open_pos["transactions"].append(txn) position = self._create_position( account_id, symbol, position_type, open_pos, close_date=txn.run_date, exit_price=txn.price, close_quantity=open_qty, ) positions.append(position) open_positions.pop(0) remaining_to_close -= open_qty else: # Partially close position # Split into closed portion closed_portion = { "transactions": open_pos["transactions"] + [txn], "quantity": remaining_to_close, "entry_price": open_pos["entry_price"], "open_date": open_pos["open_date"], "is_short": open_pos["is_short"], } position = self._create_position( account_id, symbol, position_type, closed_portion, close_date=txn.run_date, exit_price=txn.price, close_quantity=remaining_to_close, ) positions.append(position) # Update open position with remaining quantity open_pos["quantity"] -= remaining_to_close remaining_to_close = Decimal("0") elif self._is_expiration(action): # Handle option expirations expire_quantity = abs(txn.quantity) if txn.quantity else Decimal("0") remaining_to_expire = expire_quantity while remaining_to_expire > 0 and open_positions: open_pos = open_positions[0] open_qty = open_pos["quantity"] if open_qty <= remaining_to_expire: # Expire entire position open_pos["transactions"].append(txn) position = self._create_position( account_id, symbol, position_type, open_pos, close_date=txn.run_date, exit_price=Decimal("0"), # Expired worthless close_quantity=open_qty, ) positions.append(position) open_positions.pop(0) remaining_to_expire -= open_qty else: # Partially expire closed_portion = { "transactions": open_pos["transactions"] + [txn], "quantity": remaining_to_expire, "entry_price": open_pos["entry_price"], "open_date": open_pos["open_date"], "is_short": open_pos["is_short"], } position = self._create_position( account_id, symbol, position_type, closed_portion, close_date=txn.run_date, exit_price=Decimal("0"), close_quantity=remaining_to_expire, ) positions.append(position) open_pos["quantity"] -= remaining_to_expire remaining_to_expire = Decimal("0") # Create positions for any remaining open positions for open_pos in open_positions: position = self._create_position( account_id, symbol, position_type, open_pos ) positions.append(position) return positions def _create_position( self, account_id: int, symbol: str, position_type: PositionType, position_data: Dict, close_date: Optional[datetime] = None, exit_price: Optional[Decimal] = None, close_quantity: Optional[Decimal] = None, ) -> Position: """ Create a Position database object. Args: account_id: Account ID symbol: Trading symbol position_type: Type of position position_data: Dictionary with position information close_date: Close date (if closed) exit_price: Exit price (if closed) close_quantity: Quantity closed (if closed) Returns: Created Position object """ is_closed = close_date is not None quantity = close_quantity if close_quantity else position_data["quantity"] # Calculate P&L if closed realized_pnl = None if is_closed and position_data["entry_price"] and exit_price is not None: if position_data["is_short"]: # Short position: profit when price decreases realized_pnl = ( position_data["entry_price"] - exit_price ) * quantity * 100 else: # Long position: profit when price increases realized_pnl = ( exit_price - position_data["entry_price"] ) * quantity * 100 # Subtract fees and commissions for txn in position_data["transactions"]: if txn.commission: realized_pnl -= txn.commission if txn.fees: realized_pnl -= txn.fees # Extract option symbol from first transaction if this is an option option_symbol = None if position_type != PositionType.STOCK and position_data["transactions"]: first_txn = position_data["transactions"][0] # Try to extract option details from description option_symbol = self._extract_option_symbol_from_description( first_txn.description, first_txn.action, symbol ) # Create position position = Position( account_id=account_id, symbol=symbol, option_symbol=option_symbol, position_type=position_type, status=PositionStatus.CLOSED if is_closed else PositionStatus.OPEN, open_date=position_data["open_date"], close_date=close_date, total_quantity=quantity if not position_data["is_short"] else -quantity, avg_entry_price=position_data["entry_price"], avg_exit_price=exit_price, realized_pnl=realized_pnl, ) self.db.add(position) self.db.flush() # Get position ID # Link transactions to position for txn in position_data["transactions"]: link = PositionTransaction( position_id=position.id, transaction_id=txn.id ) self.db.add(link) return position def _extract_option_symbol_from_description( self, description: str, action: str, base_symbol: str ) -> Optional[str]: """ Extract option symbol from transaction description. Example: "CALL (TGT) TARGET CORP JAN 16 26 $95 (100 SHS)" Returns: "-TGT260116C95" Args: description: Transaction description action: Transaction action base_symbol: Underlying symbol Returns: Option symbol in standard format, or None if can't parse """ if not description: return None # Determine if CALL or PUT call_or_put = None if "CALL" in description.upper(): call_or_put = "C" elif "PUT" in description.upper(): call_or_put = "P" else: return None # Extract date and strike: "JAN 16 26 $95" # Pattern: MONTH DAY YY $STRIKE date_strike_pattern = r'([A-Z]{3})\s+(\d{1,2})\s+(\d{2})\s+\$([\d.]+)' match = re.search(date_strike_pattern, description) if not match: return None month_abbr, day, year, strike = match.groups() # Convert month abbreviation to number month_map = { 'JAN': '01', 'FEB': '02', 'MAR': '03', 'APR': '04', 'MAY': '05', 'JUN': '06', 'JUL': '07', 'AUG': '08', 'SEP': '09', 'OCT': '10', 'NOV': '11', 'DEC': '12' } month = month_map.get(month_abbr.upper()) if not month: return None # Format: -SYMBOL + YYMMDD + C/P + STRIKE # Remove decimal point from strike if it's a whole number strike_num = float(strike) strike_str = str(int(strike_num)) if strike_num.is_integer() else strike.replace('.', '') option_symbol = f"-{base_symbol}{year}{month}{day.zfill(2)}{call_or_put}{strike_str}" return option_symbol def _determine_position_type_from_txn(self, txn: Transaction) -> PositionType: """ Determine position type from transaction action/description. Args: txn: Transaction to analyze Returns: PositionType (STOCK, CALL, or PUT) """ # Check action and description for option indicators action_upper = txn.action.upper() if txn.action else "" desc_upper = txn.description.upper() if txn.description else "" # Look for CALL or PUT keywords if "CALL" in action_upper or "CALL" in desc_upper: return PositionType.CALL elif "PUT" in action_upper or "PUT" in desc_upper: return PositionType.PUT # Fall back to checking symbol format (for backwards compatibility) if txn.symbol and txn.symbol.startswith("-"): option_info = parse_option_symbol(txn.symbol) if option_info: return ( PositionType.CALL if option_info.option_type == "CALL" else PositionType.PUT ) return PositionType.STOCK def _get_base_symbol(self, symbol: str) -> str: """Extract base symbol from option symbol.""" if symbol.startswith("-"): option_info = parse_option_symbol(symbol) if option_info: return option_info.underlying_symbol return symbol def _is_opening_transaction(self, action: str) -> bool: """Check if action represents opening a position.""" opening_keywords = [ "OPENING TRANSACTION", "YOU BOUGHT OPENING", "YOU SOLD OPENING", ] return any(keyword in action for keyword in opening_keywords) def _is_closing_transaction(self, action: str) -> bool: """Check if action represents closing a position.""" closing_keywords = [ "CLOSING TRANSACTION", "YOU BOUGHT CLOSING", "YOU SOLD CLOSING", "ASSIGNED", ] return any(keyword in action for keyword in closing_keywords) def _is_expiration(self, action: str) -> bool: """Check if action represents an expiration.""" return "EXPIRED" in action def _get_grouping_key(self, txn: Transaction) -> str: """ Create a unique grouping key for transactions. For options, returns: symbol + option details (e.g., "TGT-JAN16-100C") For stocks, returns: just the symbol (e.g., "TGT") Args: txn: Transaction to create key for Returns: Grouping key string """ # Determine if this is an option transaction action_upper = txn.action.upper() if txn.action else "" desc_upper = txn.description.upper() if txn.description else "" is_option = "CALL" in action_upper or "CALL" in desc_upper or "PUT" in action_upper or "PUT" in desc_upper if not is_option or not txn.description: # Stock transaction - group by symbol only return txn.symbol # Option transaction - extract strike and expiration to create unique key # Pattern: "CALL (TGT) TARGET CORP JAN 16 26 $100 (100 SHS)" date_strike_pattern = r'([A-Z]{3})\s+(\d{1,2})\s+(\d{2})\s+\$([\d.]+)' match = re.search(date_strike_pattern, txn.description) if not match: # Can't parse option details, fall back to symbol only return txn.symbol month_abbr, day, year, strike = match.groups() # Determine call or put call_or_put = "C" if "CALL" in desc_upper else "P" # Create key: SYMBOL-MONTHDAY-STRIKEC/P # e.g., "TGT-JAN16-100C" strike_num = float(strike) strike_str = str(int(strike_num)) if strike_num.is_integer() else strike grouping_key = f"{txn.symbol}-{month_abbr}{day}-{strike_str}{call_or_put}" return grouping_key