Full-stack iOS options trading assistant: - Python FastAPI backend with SQLite, APScheduler (15-min position monitor), APNs push notifications, and yfinance market data integration - Signal engine: IV Rank (rolling HV proxy), SMA-50/200, swing-based support/resistance, earnings detection, signal strength scoring and noise-resistant SHA hash for change detection - Recommendation engine: covered call and cash-secured put strike/expiry selection across 0DTE, 1DTE, weekly, and monthly horizons - REST API: /devices, /portfolio, /recommendations, /positions, /signals, /alerts - iOS SwiftUI app (iOS 17+): dashboard, recommendations, trades, portfolio, and alerts tabs with push notification deep-linking - Unit + integration tests for signal engine and API layer Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
275 lines
9.5 KiB
Python
275 lines
9.5 KiB
Python
"""
|
|
signal_engine.py — Compute all signals for a ticker.
|
|
|
|
Signals:
|
|
- IV Rank (using rolling 30-day HV as proxy, since yfinance lacks historical IV)
|
|
- SMA-50, SMA-200
|
|
- Swing-based support / resistance (20-day window)
|
|
- Trend direction
|
|
- Signal strength score
|
|
- Signal hash (for change detection)
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import math
|
|
from datetime import date
|
|
from typing import Optional
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from app.services.market_data import get_price_history, get_earnings_date, get_current_price
|
|
from app.models.schemas import SignalSnapshot
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ─── IV Rank ──────────────────────────────────────────────────────────────────
|
|
|
|
def compute_iv_rank(df: pd.DataFrame) -> float:
|
|
"""
|
|
Compute IV Rank (0-100) using a 30-day rolling realized volatility
|
|
as a proxy for implied volatility.
|
|
|
|
Returns the current HV's rank within the past 52-week HV range.
|
|
Clamped to [0, 100].
|
|
"""
|
|
if df is None or len(df) < 32:
|
|
return 50.0 # neutral fallback
|
|
|
|
closes = df["Close"].squeeze()
|
|
log_returns = np.log(closes / closes.shift(1)).dropna()
|
|
|
|
# 30-day rolling std, annualized
|
|
hv_series = log_returns.rolling(30).std() * math.sqrt(252) * 100 # as percentage
|
|
hv_series = hv_series.dropna()
|
|
|
|
if len(hv_series) < 2:
|
|
return 50.0
|
|
|
|
window = hv_series.iloc[-252:] # last 52 weeks
|
|
hv_low = float(window.min())
|
|
hv_high = float(window.max())
|
|
current_hv = float(hv_series.iloc[-1])
|
|
|
|
if hv_high == hv_low:
|
|
return 50.0
|
|
|
|
rank = (current_hv - hv_low) / (hv_high - hv_low) * 100
|
|
return max(0.0, min(100.0, rank))
|
|
|
|
|
|
# ─── Moving Averages ──────────────────────────────────────────────────────────
|
|
|
|
def compute_smas(df: pd.DataFrame) -> dict:
|
|
"""Return {'sma_50': float, 'sma_200': float}. Uses NaN if insufficient data."""
|
|
closes = df["Close"].squeeze()
|
|
sma_50 = float(closes.rolling(50).mean().iloc[-1]) if len(closes) >= 50 else float("nan")
|
|
sma_200 = float(closes.rolling(200).mean().iloc[-1]) if len(closes) >= 200 else float("nan")
|
|
return {"sma_50": sma_50, "sma_200": sma_200}
|
|
|
|
|
|
def compute_trend(current_price: float, sma_50: float, sma_200: float) -> str:
|
|
"""
|
|
uptrend — price > sma50 > sma200
|
|
downtrend — price < sma50 < sma200
|
|
sideways — otherwise
|
|
"""
|
|
if any(math.isnan(v) for v in [sma_50, sma_200]):
|
|
return "sideways"
|
|
if current_price > sma_50 > sma_200:
|
|
return "uptrend"
|
|
if current_price < sma_50 < sma_200:
|
|
return "downtrend"
|
|
return "sideways"
|
|
|
|
|
|
# ─── Support / Resistance ─────────────────────────────────────────────────────
|
|
|
|
def compute_swing_levels(df: pd.DataFrame, lookback: int = 20, neighbors: int = 2) -> dict:
|
|
"""
|
|
Find swing highs (resistance) and swing lows (support) over the last
|
|
`lookback` trading days using a `neighbors`-bar pivot rule.
|
|
|
|
Returns:
|
|
{
|
|
'nearest_support': float | None,
|
|
'nearest_resistance': float | None,
|
|
'support_levels': [float],
|
|
'resistance_levels': [float],
|
|
}
|
|
"""
|
|
recent = df.tail(lookback + neighbors * 2)
|
|
highs = recent["High"].squeeze().values
|
|
lows = recent["Low"].squeeze().values
|
|
n = len(highs)
|
|
|
|
swing_highs = []
|
|
swing_lows = []
|
|
|
|
for i in range(neighbors, n - neighbors):
|
|
if all(highs[i] > highs[i - j] for j in range(1, neighbors + 1)) and \
|
|
all(highs[i] > highs[i + j] for j in range(1, neighbors + 1)):
|
|
swing_highs.append(float(highs[i]))
|
|
if all(lows[i] < lows[i - j] for j in range(1, neighbors + 1)) and \
|
|
all(lows[i] < lows[i + j] for j in range(1, neighbors + 1)):
|
|
swing_lows.append(float(lows[i]))
|
|
|
|
def cluster(levels: list[float], pct: float = 0.005) -> list[float]:
|
|
"""Merge levels within pct% of each other into their mean."""
|
|
if not levels:
|
|
return []
|
|
sorted_levels = sorted(levels)
|
|
clusters = [[sorted_levels[0]]]
|
|
for lvl in sorted_levels[1:]:
|
|
if abs(lvl - clusters[-1][-1]) / clusters[-1][-1] <= pct:
|
|
clusters[-1].append(lvl)
|
|
else:
|
|
clusters.append([lvl])
|
|
return [sum(c) / len(c) for c in clusters]
|
|
|
|
resistance_levels = cluster(swing_highs)
|
|
support_levels = cluster(swing_lows)
|
|
|
|
# Get current price for context
|
|
current_price = float(df["Close"].squeeze().iloc[-1])
|
|
|
|
nearest_resistance = min(
|
|
(r for r in resistance_levels if r > current_price), default=None
|
|
)
|
|
nearest_support = max(
|
|
(s for s in support_levels if s < current_price), default=None
|
|
)
|
|
|
|
return {
|
|
"nearest_support": nearest_support,
|
|
"nearest_resistance": nearest_resistance,
|
|
"support_levels": support_levels,
|
|
"resistance_levels": resistance_levels,
|
|
}
|
|
|
|
|
|
# ─── Signal Strength Scoring ──────────────────────────────────────────────────
|
|
|
|
def compute_signal_strength(
|
|
iv_rank: float,
|
|
trend: str,
|
|
strategy: str,
|
|
nearest_support: Optional[float],
|
|
nearest_resistance: Optional[float],
|
|
recommended_strike: Optional[float],
|
|
earnings_warning: bool,
|
|
) -> str:
|
|
"""
|
|
Score the signal quality and return 'strong', 'moderate', or 'weak'.
|
|
|
|
Scoring:
|
|
+2 iv_rank >= 50 (premium-rich environment)
|
|
+1 iv_rank >= 30
|
|
+1 trend aligned (uptrend for covered_call, not downtrend for csp)
|
|
+1 sma alignment bonus (trend == uptrend for CC)
|
|
+1 strike positioned well vs nearest level
|
|
-2 earnings_warning (dangerous to hold through earnings)
|
|
"""
|
|
score = 0
|
|
|
|
if iv_rank >= 50:
|
|
score += 2
|
|
elif iv_rank >= 30:
|
|
score += 1
|
|
|
|
if strategy == "covered_call":
|
|
if trend == "uptrend":
|
|
score += 2 # trend aligned (counts as +1 trend + +1 sma bonus)
|
|
elif trend == "sideways":
|
|
score += 1
|
|
if nearest_resistance and recommended_strike and recommended_strike < nearest_resistance:
|
|
score += 1
|
|
elif strategy == "cash_secured_put":
|
|
if trend != "downtrend":
|
|
score += 2
|
|
if trend == "uptrend":
|
|
score += 1 # extra bonus for strong bullish trend on a bullish strategy
|
|
if nearest_support and recommended_strike and recommended_strike > nearest_support:
|
|
score += 1
|
|
|
|
if earnings_warning:
|
|
score -= 2
|
|
|
|
if score >= 5:
|
|
return "strong"
|
|
elif score >= 3:
|
|
return "moderate"
|
|
else:
|
|
return "weak"
|
|
|
|
|
|
# ─── Signal Hash ──────────────────────────────────────────────────────────────
|
|
|
|
def compute_signal_hash(
|
|
iv_rank: float,
|
|
sma_50: float,
|
|
sma_200: float,
|
|
nearest_support: Optional[float],
|
|
nearest_resistance: Optional[float],
|
|
recommended_strike: Optional[float],
|
|
recommended_expiration: Optional[date],
|
|
earnings_warning: bool,
|
|
) -> str:
|
|
"""
|
|
16-char deterministic hash of rounded signal inputs.
|
|
Only changes when signals shift meaningfully — not on every tick.
|
|
"""
|
|
payload = {
|
|
"ivr": round(iv_rank, 1),
|
|
"sma50": round(sma_50, 2) if sma_50 and not math.isnan(sma_50) else None,
|
|
"sma200": round(sma_200, 2) if sma_200 and not math.isnan(sma_200) else None,
|
|
"support": round(nearest_support, 2) if nearest_support else None,
|
|
"resistance": round(nearest_resistance, 2) if nearest_resistance else None,
|
|
"strike": recommended_strike,
|
|
"expiry": str(recommended_expiration) if recommended_expiration else None,
|
|
"ew": earnings_warning,
|
|
}
|
|
raw = json.dumps(payload, sort_keys=True)
|
|
return hashlib.sha256(raw.encode()).hexdigest()[:16]
|
|
|
|
|
|
# ─── Full Signal Computation ──────────────────────────────────────────────────
|
|
|
|
def compute_signals(ticker: str) -> Optional[SignalSnapshot]:
|
|
"""
|
|
Compute and return a full SignalSnapshot for a ticker.
|
|
Returns None if market data is unavailable.
|
|
"""
|
|
from datetime import datetime
|
|
|
|
df = get_price_history(ticker)
|
|
if df is None or df.empty:
|
|
logger.warning(f"No price history for {ticker} — cannot compute signals")
|
|
return None
|
|
|
|
current_price = get_current_price(ticker)
|
|
if current_price is None:
|
|
current_price = float(df["Close"].squeeze().iloc[-1])
|
|
|
|
iv_rank = compute_iv_rank(df)
|
|
smas = compute_smas(df)
|
|
swing = compute_swing_levels(df)
|
|
trend = compute_trend(current_price, smas["sma_50"], smas["sma_200"])
|
|
earnings_date = get_earnings_date(ticker)
|
|
|
|
return SignalSnapshot(
|
|
ticker=ticker,
|
|
current_price=current_price,
|
|
iv_rank=iv_rank,
|
|
sma_50=smas["sma_50"] if not math.isnan(smas["sma_50"]) else 0.0,
|
|
sma_200=smas["sma_200"] if not math.isnan(smas["sma_200"]) else 0.0,
|
|
nearest_support=swing["nearest_support"],
|
|
nearest_resistance=swing["nearest_resistance"],
|
|
trend=trend,
|
|
earnings_date=earnings_date,
|
|
computed_at=datetime.utcnow(),
|
|
)
|