Initial implementation of Options Sidekick

Full-stack iOS options trading assistant:
- Python FastAPI backend with SQLite, APScheduler (15-min position monitor),
  APNs push notifications, and yfinance market data integration
- Signal engine: IV Rank (rolling HV proxy), SMA-50/200, swing-based
  support/resistance, earnings detection, signal strength scoring and
  noise-resistant SHA hash for change detection
- Recommendation engine: covered call and cash-secured put strike/expiry
  selection across 0DTE, 1DTE, weekly, and monthly horizons
- REST API: /devices, /portfolio, /recommendations, /positions, /signals, /alerts
- iOS SwiftUI app (iOS 17+): dashboard, recommendations, trades, portfolio,
  and alerts tabs with push notification deep-linking
- Unit + integration tests for signal engine and API layer

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-09 14:38:25 -04:00
commit b7d4e900cc
61 changed files with 4953 additions and 0 deletions

View File

View File

@@ -0,0 +1,124 @@
"""
apns_service.py — Send push notifications via APNs HTTP/2.
Uses JWT authentication with a .p8 key. JWT is cached for 50 minutes
and auto-renewed before the 60-minute APNs expiry.
"""
import json
import logging
import time
from datetime import datetime, timedelta
from typing import Optional
import httpx
import jwt as pyjwt
from app.config import settings
logger = logging.getLogger(__name__)
_APNS_HOST_PROD = "https://api.push.apple.com"
_APNS_HOST_SANDBOX = "https://api.sandbox.push.apple.com"
# Cached JWT state
_jwt_token: Optional[str] = None
_jwt_issued_at: Optional[float] = None
_JWT_TTL_SECONDS = 50 * 60 # 50 minutes — renew before Apple's 60-min limit
def _get_apns_host() -> str:
return _APNS_HOST_SANDBOX if settings.apns_use_sandbox else _APNS_HOST_PROD
def _load_private_key() -> str:
try:
with open(settings.apns_key_path, "r") as f:
return f.read()
except FileNotFoundError:
logger.warning(f"APNs key not found at {settings.apns_key_path} — push disabled")
return ""
def _get_jwt() -> Optional[str]:
global _jwt_token, _jwt_issued_at
now = time.time()
if _jwt_token and _jwt_issued_at and (now - _jwt_issued_at) < _JWT_TTL_SECONDS:
return _jwt_token
private_key = _load_private_key()
if not private_key:
return None
if not settings.apns_key_id or not settings.apns_team_id:
logger.warning("APNs key ID or team ID not configured — push disabled")
return None
payload = {
"iss": settings.apns_team_id,
"iat": int(now),
}
headers = {
"alg": "ES256",
"kid": settings.apns_key_id,
}
try:
token = pyjwt.encode(payload, private_key, algorithm="ES256", headers=headers)
_jwt_token = token if isinstance(token, str) else token.decode("utf-8")
_jwt_issued_at = now
return _jwt_token
except Exception as e:
logger.error(f"Failed to generate APNs JWT: {e}")
return None
async def send_push(
apns_token: str,
title: str,
body: str,
payload: Optional[dict] = None,
) -> bool:
"""
Send a push notification to a device.
Returns True on success, False on failure.
"""
jwt_token = _get_jwt()
if not jwt_token:
logger.warning("APNs push skipped — no valid JWT (check .p8 key config)")
return False
apns_payload = {
"aps": {
"alert": {"title": title, "body": body},
"badge": 1,
"sound": "default",
"category": "POSITION_ALERT",
}
}
if payload:
apns_payload.update(payload)
url = f"{_get_apns_host()}/3/device/{apns_token}"
headers = {
"authorization": f"bearer {jwt_token}",
"apns-topic": settings.apns_bundle_id,
"apns-push-type": "alert",
"content-type": "application/json",
}
try:
async with httpx.AsyncClient(http2=True, timeout=10.0) as client:
response = await client.post(url, headers=headers, content=json.dumps(apns_payload))
if response.status_code == 200:
logger.info(f"Push sent to {apns_token[:8]}... — {title}")
return True
else:
logger.error(f"APNs rejected push: {response.status_code} {response.text}")
return False
except Exception as e:
logger.error(f"APNs push failed: {e}")
return False

View File

@@ -0,0 +1,198 @@
"""
market_data.py — yfinance wrapper with 15-minute in-memory cache.
All other services call through this module. Never import yfinance directly
outside of here — keeps caching and error handling centralized.
"""
import logging
from datetime import datetime, date, timedelta
from functools import lru_cache
from typing import Optional
import time
import pandas as pd
import yfinance as yf
logger = logging.getLogger(__name__)
# ─── Simple time-bucketed cache ───────────────────────────────────────────────
# Key: (ticker, cache_bucket) where cache_bucket = int(time() // 900)
_CACHE: dict = {}
_CACHE_TTL_SECONDS = 900 # 15 minutes
def _bucket() -> int:
return int(time.time() // _CACHE_TTL_SECONDS)
def _cache_get(key: tuple):
return _CACHE.get((key, _bucket()))
def _cache_set(key: tuple, value):
_CACHE[(key, _bucket())] = value
# Prune old entries to avoid unbounded growth
current = _bucket()
stale = [k for k in list(_CACHE) if k[1] < current - 1]
for k in stale:
del _CACHE[k]
# ─── Price history ─────────────────────────────────────────────────────────────
def get_price_history(ticker: str, period: str = "1y") -> Optional[pd.DataFrame]:
"""
Return OHLCV DataFrame for the past year.
Columns: Open, High, Low, Close, Volume
Index: DatetimeIndex
Returns None on failure.
"""
cache_key = ("price_history", ticker, period)
cached = _cache_get(cache_key)
if cached is not None:
return cached
try:
df = yf.download(ticker, period=period, auto_adjust=True, progress=False)
if df.empty:
logger.warning(f"Empty price history for {ticker}")
return None
# Flatten multi-level columns if present (yfinance sometimes returns them)
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
_cache_set(cache_key, df)
return df
except Exception as e:
logger.error(f"Failed to fetch price history for {ticker}: {e}")
return None
# ─── Current price ─────────────────────────────────────────────────────────────
def get_current_price(ticker: str) -> Optional[float]:
cache_key = ("current_price", ticker)
cached = _cache_get(cache_key)
if cached is not None:
return cached
try:
t = yf.Ticker(ticker)
price = t.fast_info.get("last_price") or t.fast_info.get("previousClose")
if price is None:
hist = get_price_history(ticker, period="5d")
if hist is not None and not hist.empty:
price = float(hist["Close"].iloc[-1])
if price is not None:
price = float(price)
_cache_set(cache_key, price)
return price
except Exception as e:
logger.error(f"Failed to fetch current price for {ticker}: {e}")
return None
# ─── Options chain ─────────────────────────────────────────────────────────────
def get_option_expirations(ticker: str) -> list[str]:
"""Return list of available expiration date strings (YYYY-MM-DD)."""
cache_key = ("expirations", ticker)
cached = _cache_get(cache_key)
if cached is not None:
return cached
try:
t = yf.Ticker(ticker)
expirations = list(t.options)
_cache_set(cache_key, expirations)
return expirations
except Exception as e:
logger.error(f"Failed to fetch expirations for {ticker}: {e}")
return []
def get_options_chain(ticker: str, expiration: str) -> Optional[dict]:
"""
Return {'calls': DataFrame, 'puts': DataFrame} for the given expiry.
Columns include: strike, lastPrice, bid, ask, volume, openInterest,
impliedVolatility, delta, theta, gamma (where available).
Returns None on failure.
"""
cache_key = ("options_chain", ticker, expiration)
cached = _cache_get(cache_key)
if cached is not None:
return cached
try:
t = yf.Ticker(ticker)
chain = t.option_chain(expiration)
result = {"calls": chain.calls.copy(), "puts": chain.puts.copy()}
# yfinance does not always include Greeks — add NaN columns if missing
for df in result.values():
for col in ("delta", "theta", "gamma"):
if col not in df.columns:
df[col] = float("nan")
_cache_set(cache_key, result)
return result
except Exception as e:
logger.error(f"Failed to fetch options chain for {ticker} exp={expiration}: {e}")
return None
def get_nearest_expiry(ticker: str, target_date: date) -> Optional[str]:
"""Return the closest available expiry on or after target_date."""
expirations = get_option_expirations(ticker)
if not expirations:
return None
target_str = str(target_date)
future = [e for e in sorted(expirations) if e >= target_str]
return future[0] if future else None
def get_same_day_expiry(ticker: str) -> Optional[str]:
"""Return today's expiry string if it exists, else None."""
today_str = str(date.today())
expirations = get_option_expirations(ticker)
return today_str if today_str in expirations else None
# ─── Earnings / dividends ──────────────────────────────────────────────────────
def get_earnings_date(ticker: str) -> Optional[date]:
"""Return the next earnings date or None."""
cache_key = ("earnings", ticker)
cached = _cache_get(cache_key)
if cached is not None:
return cached
try:
t = yf.Ticker(ticker)
cal = t.calendar
if cal is None:
return None
# calendar can be a dict or DataFrame depending on yfinance version
if isinstance(cal, dict):
earnings_info = cal.get("Earnings Date", [])
else:
# DataFrame — look for "Earnings Date" row or column
try:
earnings_info = cal.loc["Earnings Date"].tolist()
except Exception:
earnings_info = []
result = None
for item in (earnings_info if isinstance(earnings_info, list) else [earnings_info]):
try:
d = pd.Timestamp(item).date()
if d >= date.today():
result = d
break
except Exception:
continue
_cache_set(cache_key, result)
return result
except Exception as e:
logger.error(f"Failed to fetch earnings date for {ticker}: {e}")
return None

View File

@@ -0,0 +1,331 @@
"""
position_monitor.py — 15-minute job that re-evaluates all open option positions
and fires alerts when signals change materially.
Also refreshes recommendations for all stock positions.
"""
import logging
from datetime import datetime, date
from typing import Optional
from sqlalchemy.orm import Session
from app.database import SessionLocal
from app.models.db_models import OptionPosition, StockPosition, Recommendation, Alert, Device
from app.services import market_data as md
from app.services.signal_engine import (
compute_iv_rank,
compute_smas,
compute_swing_levels,
compute_trend,
compute_signal_strength,
compute_signal_hash,
)
from app.services.recommendation_engine import build_recommendation
from app.services.apns_service import send_push
from app.models.schemas import SignalSnapshot
logger = logging.getLogger(__name__)
# Tracks the last time this job ran
last_run: Optional[datetime] = None
def _determine_alert_type(
position: OptionPosition,
current_delta: float,
new_signal_hash: str,
new_rec: Optional[Recommendation],
earnings_warning: bool,
) -> Optional[str]:
"""
Determine if and what type of alert to fire.
Returns alert_type string or None.
"""
# Earnings warning newly triggered
if earnings_warning and not _position_had_earnings_warning(position):
return "earnings_warning"
# Deep ITM — delta threshold
abs_delta = abs(current_delta)
if abs_delta >= 0.45:
return "close_early"
# Profit capture — if premium has decayed significantly
# (We don't track current price here, but high delta ITM is a proxy)
if abs_delta >= 0.40:
return "close_early"
# Expiry-based recommendation changed
if new_rec:
days_to_expiry = (position.expiration - date.today()).days
if days_to_expiry <= 5 and abs_delta <= 0.10:
return "close_early" # expiring nearly worthless — take it off
# Roll suggestion: new recommendation is for a further-out expiry
if new_rec.recommended_expiration > position.expiration:
return "roll_out"
# Strike meaningfully different (more than 2 strikes, roughly $2-5 depending on underlying)
if abs(new_rec.recommended_strike - position.strike) / position.strike > 0.02:
return "roll_up_down"
return None
def _position_had_earnings_warning(position: OptionPosition) -> bool:
"""Best-effort: check if earnings warning was already flagged on last hash."""
# We encode earnings_warning in the hash payload so if it was True before
# the hash would already reflect it. This is a simple flag check.
return False # Simplified — the hash change will trigger the alert naturally
async def monitor_all_positions():
"""Main scheduler job. Runs every 15 minutes."""
global last_run
logger.info("Position monitor: starting run")
db: Session = SessionLocal()
try:
# 1. Get all open positions grouped by ticker to batch data fetching
open_positions: list[OptionPosition] = (
db.query(OptionPosition)
.filter(OptionPosition.status == "open")
.all()
)
tickers_to_check = list({p.ticker for p in open_positions})
# Also collect all stock positions to refresh recommendations
stock_positions: list[StockPosition] = db.query(StockPosition).all()
stock_tickers = list({sp.ticker for sp in stock_positions})
all_tickers = list(set(tickers_to_check + stock_tickers))
if not all_tickers:
logger.info("Position monitor: no tickers to check")
last_run = datetime.utcnow()
return
# 2. Pre-fetch market data for all tickers (cached by market_data module)
logger.info(f"Position monitor: checking {len(all_tickers)} tickers")
signal_snapshots: dict[str, Optional[SignalSnapshot]] = {}
for ticker in all_tickers:
snap = _compute_snapshot(ticker)
signal_snapshots[ticker] = snap
# 3. Evaluate each open option position
for position in open_positions:
snap = signal_snapshots.get(position.ticker)
if snap is None:
continue
await _evaluate_position(db, position, snap)
# 4. Refresh recommendations for all stock positions
for stock_pos in stock_positions:
snap = signal_snapshots.get(stock_pos.ticker)
if snap is None:
continue
_refresh_recommendations(db, stock_pos.device_id, stock_pos.ticker, snap)
db.commit()
last_run = datetime.utcnow()
logger.info("Position monitor: run complete")
except Exception as e:
logger.error(f"Position monitor error: {e}", exc_info=True)
db.rollback()
finally:
db.close()
def _compute_snapshot(ticker: str) -> Optional[SignalSnapshot]:
"""Build signal snapshot from market data."""
import math
df = md.get_price_history(ticker)
if df is None:
return None
current_price = md.get_current_price(ticker)
if current_price is None:
return None
iv_rank = compute_iv_rank(df)
smas = compute_smas(df)
swing = compute_swing_levels(df)
trend = compute_trend(current_price, smas["sma_50"], smas["sma_200"])
earnings_date = md.get_earnings_date(ticker)
return SignalSnapshot(
ticker=ticker,
current_price=current_price,
iv_rank=iv_rank,
sma_50=smas["sma_50"] if not math.isnan(smas["sma_50"]) else 0.0,
sma_200=smas["sma_200"] if not math.isnan(smas["sma_200"]) else 0.0,
nearest_support=swing["nearest_support"],
nearest_resistance=swing["nearest_resistance"],
trend=trend,
earnings_date=earnings_date,
computed_at=datetime.utcnow(),
)
async def _evaluate_position(db: Session, position: OptionPosition, snap: SignalSnapshot):
"""Re-evaluate one open position and fire an alert if signal changed."""
# Get current option data for this specific strike/expiry
expiry_str = str(position.expiration)
chain = md.get_options_chain(position.ticker, expiry_str)
current_delta = 0.25 # fallback
if chain:
chain_df = chain["calls"] if position.strategy == "covered_call" else chain["puts"]
row = chain_df[chain_df["strike"] == position.strike]
if not row.empty and "delta" in row.columns:
delta_val = row["delta"].iloc[0]
if delta_val == delta_val: # not NaN
current_delta = abs(float(delta_val))
# Build a fresh new recommendation to compare expiry/strike
new_rec = build_recommendation(
device_id=position.device_id,
ticker=position.ticker,
strategy=position.strategy,
time_horizon="weekly", # use weekly for monitoring comparisons
snapshot=snap,
)
earnings_warning = bool(snap.earnings_date and snap.earnings_date <= position.expiration)
new_hash = compute_signal_hash(
iv_rank=snap.iv_rank,
sma_50=snap.sma_50,
sma_200=snap.sma_200,
nearest_support=snap.nearest_support,
nearest_resistance=snap.nearest_resistance,
recommended_strike=new_rec.recommended_strike if new_rec else position.strike,
recommended_expiration=new_rec.recommended_expiration if new_rec else position.expiration,
earnings_warning=earnings_warning,
)
# No change — skip
if new_hash == position.last_signal_hash:
return
# Determine alert type
alert_type = _determine_alert_type(position, current_delta, new_hash, new_rec, earnings_warning)
if alert_type is None:
# Still update the hash even if no actionable alert
position.last_signal_hash = new_hash
return
# Build alert message
message = _build_alert_message(position, alert_type, current_delta, snap, new_rec)
# Save alert to DB
alert = Alert(
device_id=position.device_id,
option_position_id=position.id,
ticker=position.ticker,
alert_type=alert_type,
message=message,
old_signal_hash=position.last_signal_hash,
new_signal_hash=new_hash,
sent_at=datetime.utcnow(),
acknowledged=False,
)
db.add(alert)
# Update position hash
position.last_signal_hash = new_hash
# Send push notification
device: Optional[Device] = db.query(Device).filter(Device.id == position.device_id).first()
if device:
strategy_label = "Covered Call" if position.strategy == "covered_call" else "Cash-Secured Put"
await send_push(
apns_token=device.apns_token,
title=f"{position.ticker} {strategy_label} — Action Needed",
body=message,
payload={
"alert_type": alert_type,
"ticker": position.ticker,
"position_id": position.id,
},
)
logger.info(f"Alert fired: {position.ticker} {alert_type}{message[:60]}")
def _build_alert_message(
position: OptionPosition,
alert_type: str,
current_delta: float,
snap: SignalSnapshot,
new_rec: Optional[Recommendation],
) -> str:
strike = position.strike
expiry = position.expiration
if alert_type == "close_early":
if current_delta >= 0.45:
return (
f"Delta has risen to {current_delta:.2f} — position is deep ITM. "
f"Consider closing early to limit risk on the ${strike:.0f} strike expiring {expiry}."
)
return (
f"Signals suggest closing early on ${strike:.0f} strike expiring {expiry}. "
f"Capturing premium now may be optimal."
)
if alert_type == "roll_out" and new_rec:
return (
f"Consider rolling your ${strike:.0f} strike out to {new_rec.recommended_expiration} "
f"at ${new_rec.recommended_strike:.0f} for ${new_rec.estimated_premium:.2f} credit."
)
if alert_type == "roll_up_down" and new_rec:
direction = "up" if new_rec.recommended_strike > strike else "down"
return (
f"Signals favor rolling {direction} from ${strike:.0f} to ${new_rec.recommended_strike:.0f} "
f"at {new_rec.recommended_expiration} for ${new_rec.estimated_premium:.2f} credit."
)
if alert_type == "earnings_warning":
return (
f"⚠️ Earnings date {snap.earnings_date} now falls within your expiry on {expiry}. "
f"Consider closing or rolling before earnings."
)
return f"Signal change detected on {position.ticker} ${strike:.0f} strike. Review your position."
def _refresh_recommendations(db: Session, device_id: int, ticker: str, snap: SignalSnapshot):
"""Rebuild and save latest recommendations for a ticker."""
for strategy in ("covered_call", "cash_secured_put"):
for horizon in ("weekly", "monthly"):
rec = build_recommendation(
device_id=device_id,
ticker=ticker,
strategy=strategy,
time_horizon=horizon,
snapshot=snap,
)
if rec is None:
continue
# Replace existing recommendation for same device/ticker/strategy/horizon
existing = (
db.query(Recommendation)
.filter(
Recommendation.device_id == device_id,
Recommendation.ticker == ticker,
Recommendation.strategy == strategy,
Recommendation.time_horizon == horizon,
)
.first()
)
if existing:
db.delete(existing)
db.add(rec)

View File

@@ -0,0 +1,299 @@
"""
recommendation_engine.py — Select optimal strike and expiry for a given strategy/horizon.
For each (ticker, strategy, time_horizon) combination:
1. Determine the target expiration date
2. Fetch the options chain for that expiry
3. Filter by delta range (0.20-0.30 for CC/CSP)
4. Among qualifying strikes, pick highest mid-price premium
5. Build a Recommendation DB row
"""
import logging
import math
from datetime import date, datetime
from typing import Optional
import pandas as pd
from app.models.db_models import Recommendation
from app.models.schemas import SignalSnapshot
from app.services import market_data as md
from app.services.signal_engine import (
compute_iv_rank,
compute_smas,
compute_swing_levels,
compute_trend,
compute_signal_strength,
compute_signal_hash,
)
from app.utils.date_helpers import (
next_trading_day,
next_friday,
nearest_monthly_expiry,
within_dte_window_for_0dte,
)
logger = logging.getLogger(__name__)
# Delta target ranges for short premium selling
DELTA_MIN = 0.18
DELTA_MAX = 0.35
def _target_expiry(ticker: str, time_horizon: str) -> Optional[str]:
"""
Return the best available expiry string for the given time horizon.
Returns None if no suitable expiry exists.
"""
today = date.today()
if time_horizon == "0dte":
if not within_dte_window_for_0dte():
return None
expiry = md.get_same_day_expiry(ticker)
return expiry # None if today has no options
if time_horizon == "1dte":
target = next_trading_day(today)
return md.get_nearest_expiry(ticker, target)
if time_horizon == "weekly":
target = next_friday(today)
return md.get_nearest_expiry(ticker, target)
if time_horizon == "monthly":
target = nearest_monthly_expiry(today, target_dte=30)
return md.get_nearest_expiry(ticker, target)
return None
def _best_strike(
chain_df: pd.DataFrame,
strategy: str,
current_price: float,
nearest_support: Optional[float],
nearest_resistance: Optional[float],
) -> Optional[pd.Series]:
"""
Filter options chain for the best strike.
Rules:
covered_call — calls, OTM (strike > current_price), delta 0.20-0.35
cash_secured_put — puts, OTM (strike < current_price), |delta| 0.20-0.35,
strike preferably > nearest_support
Returns the best row or None.
"""
df = chain_df.copy()
# Ensure we have a usable delta column
has_delta = "delta" in df.columns and df["delta"].notna().any()
if strategy == "covered_call":
df = df[df["strike"] > current_price]
if has_delta:
df = df[(df["delta"] >= DELTA_MIN) & (df["delta"] <= DELTA_MAX)]
else:
# Approximate OTM calls: within 5% above current price
df = df[df["strike"] <= current_price * 1.07]
elif strategy == "cash_secured_put":
df = df[df["strike"] < current_price]
if has_delta:
df = df[(df["delta"].abs() >= DELTA_MIN) & (df["delta"].abs() <= DELTA_MAX)]
else:
df = df[df["strike"] >= current_price * 0.93]
# Prefer strikes above nearest support to avoid selling below key level
if nearest_support:
above_support = df[df["strike"] >= nearest_support * 0.99]
if not above_support.empty:
df = above_support
if df.empty:
return None
# Compute mid-price and pick highest premium
df = df.copy()
df["mid"] = (df["bid"] + df["ask"]) / 2
df = df[df["mid"] > 0]
if df.empty:
return None
return df.loc[df["mid"].idxmax()]
def build_recommendation(
device_id: int,
ticker: str,
strategy: str,
time_horizon: str,
snapshot: Optional[SignalSnapshot] = None,
) -> Optional[Recommendation]:
"""
Build a Recommendation ORM object for the given parameters.
Returns None if not enough data exists.
"""
# 1. Get signal snapshot (reuse if provided to avoid duplicate yfinance calls)
if snapshot is None:
df = md.get_price_history(ticker)
if df is None:
return None
current_price = md.get_current_price(ticker)
if current_price is None:
return None
iv_rank = compute_iv_rank(df)
smas = compute_smas(df)
swing = compute_swing_levels(df)
trend = compute_trend(current_price, smas["sma_50"], smas["sma_200"])
earnings_date = md.get_earnings_date(ticker)
nearest_support = swing["nearest_support"]
nearest_resistance = swing["nearest_resistance"]
sma_50 = smas["sma_50"]
sma_200 = smas["sma_200"]
else:
current_price = snapshot.current_price
iv_rank = snapshot.iv_rank
sma_50 = snapshot.sma_50
sma_200 = snapshot.sma_200
nearest_support = snapshot.nearest_support
nearest_resistance = snapshot.nearest_resistance
trend = snapshot.trend
earnings_date = snapshot.earnings_date
# 2. Determine target expiry
expiry_str = _target_expiry(ticker, time_horizon)
if expiry_str is None:
logger.debug(f"No expiry available for {ticker} {time_horizon}")
return None
expiry_date = date.fromisoformat(expiry_str)
# 3. Earnings warning
earnings_warning = bool(earnings_date and earnings_date <= expiry_date)
# 4. Fetch options chain
chain = md.get_options_chain(ticker, expiry_str)
if chain is None:
return None
chain_df = chain["calls"] if strategy == "covered_call" else chain["puts"]
# 5. Pick best strike
best = _best_strike(chain_df, strategy, current_price, nearest_support, nearest_resistance)
if best is None:
logger.debug(f"No qualifying strike for {ticker} {strategy} {time_horizon}")
return None
strike = float(best["strike"])
mid_price = float((best["bid"] + best["ask"]) / 2)
delta = float(best["delta"]) if not math.isnan(best.get("delta", float("nan"))) else _estimate_delta(strike, current_price, strategy)
theta = float(best["theta"]) if not math.isnan(best.get("theta", float("nan"))) else 0.0
# 6. Signal strength
signal_strength = compute_signal_strength(
iv_rank=iv_rank,
trend=trend,
strategy=strategy,
nearest_support=nearest_support,
nearest_resistance=nearest_resistance,
recommended_strike=strike,
earnings_warning=earnings_warning,
)
# 7. Signal hash
sig_hash = compute_signal_hash(
iv_rank=iv_rank,
sma_50=sma_50,
sma_200=sma_200,
nearest_support=nearest_support,
nearest_resistance=nearest_resistance,
recommended_strike=strike,
recommended_expiration=expiry_date,
earnings_warning=earnings_warning,
)
# 8. Build human-readable rationale
rationale = _build_rationale(
strategy=strategy,
time_horizon=time_horizon,
current_price=current_price,
strike=strike,
iv_rank=iv_rank,
trend=trend,
earnings_warning=earnings_warning,
earnings_date=earnings_date,
signal_strength=signal_strength,
)
return Recommendation(
device_id=device_id,
ticker=ticker,
strategy=strategy,
time_horizon=time_horizon,
current_price=current_price,
recommended_strike=strike,
recommended_expiration=expiry_date,
estimated_premium=mid_price,
delta=delta,
theta=theta,
iv_rank=iv_rank,
signal_strength=signal_strength,
earnings_warning=earnings_warning,
earnings_date=earnings_date,
rationale=rationale,
signal_hash=sig_hash,
created_at=datetime.utcnow(),
)
def _estimate_delta(strike: float, current_price: float, strategy: str) -> float:
"""Rough delta estimate when yfinance doesn't provide it."""
moneyness = (strike - current_price) / current_price
if strategy == "covered_call":
return max(0.05, 0.50 - moneyness * 3)
else:
return max(0.05, 0.50 - abs(moneyness) * 3)
def _build_rationale(
strategy: str,
time_horizon: str,
current_price: float,
strike: float,
iv_rank: float,
trend: str,
earnings_warning: bool,
earnings_date: Optional[date],
signal_strength: str,
) -> str:
parts = []
strategy_label = "Covered Call" if strategy == "covered_call" else "Cash-Secured Put"
direction = "above" if strategy == "covered_call" else "below"
parts.append(
f"{signal_strength.capitalize()} {strategy_label} setup. "
f"Strike ${strike:.2f} is {direction} current price ${current_price:.2f}."
)
if iv_rank >= 50:
parts.append(f"IV rank is elevated at {iv_rank:.0f}% — favorable premium-selling environment.")
elif iv_rank >= 30:
parts.append(f"IV rank is moderate at {iv_rank:.0f}%.")
else:
parts.append(f"IV rank is low at {iv_rank:.0f}% — premiums may be thin.")
trend_map = {"uptrend": "bullish uptrend", "downtrend": "bearish downtrend", "sideways": "sideways range"}
parts.append(f"Price is in a {trend_map.get(trend, trend)}.")
if earnings_warning and earnings_date:
parts.append(
f"⚠️ Earnings on {earnings_date} fall within this expiry — elevated risk. Consider a shorter expiry."
)
horizon_map = {"0dte": "0DTE", "1dte": "1DTE", "weekly": "weekly", "monthly": "monthly"}
parts.append(f"Horizon: {horizon_map.get(time_horizon, time_horizon)}.")
return " ".join(parts)

View File

@@ -0,0 +1,274 @@
"""
signal_engine.py — Compute all signals for a ticker.
Signals:
- IV Rank (using rolling 30-day HV as proxy, since yfinance lacks historical IV)
- SMA-50, SMA-200
- Swing-based support / resistance (20-day window)
- Trend direction
- Signal strength score
- Signal hash (for change detection)
"""
import hashlib
import json
import logging
import math
from datetime import date
from typing import Optional
import numpy as np
import pandas as pd
from app.services.market_data import get_price_history, get_earnings_date, get_current_price
from app.models.schemas import SignalSnapshot
logger = logging.getLogger(__name__)
# ─── IV Rank ──────────────────────────────────────────────────────────────────
def compute_iv_rank(df: pd.DataFrame) -> float:
"""
Compute IV Rank (0-100) using a 30-day rolling realized volatility
as a proxy for implied volatility.
Returns the current HV's rank within the past 52-week HV range.
Clamped to [0, 100].
"""
if df is None or len(df) < 32:
return 50.0 # neutral fallback
closes = df["Close"].squeeze()
log_returns = np.log(closes / closes.shift(1)).dropna()
# 30-day rolling std, annualized
hv_series = log_returns.rolling(30).std() * math.sqrt(252) * 100 # as percentage
hv_series = hv_series.dropna()
if len(hv_series) < 2:
return 50.0
window = hv_series.iloc[-252:] # last 52 weeks
hv_low = float(window.min())
hv_high = float(window.max())
current_hv = float(hv_series.iloc[-1])
if hv_high == hv_low:
return 50.0
rank = (current_hv - hv_low) / (hv_high - hv_low) * 100
return max(0.0, min(100.0, rank))
# ─── Moving Averages ──────────────────────────────────────────────────────────
def compute_smas(df: pd.DataFrame) -> dict:
"""Return {'sma_50': float, 'sma_200': float}. Uses NaN if insufficient data."""
closes = df["Close"].squeeze()
sma_50 = float(closes.rolling(50).mean().iloc[-1]) if len(closes) >= 50 else float("nan")
sma_200 = float(closes.rolling(200).mean().iloc[-1]) if len(closes) >= 200 else float("nan")
return {"sma_50": sma_50, "sma_200": sma_200}
def compute_trend(current_price: float, sma_50: float, sma_200: float) -> str:
"""
uptrend — price > sma50 > sma200
downtrend — price < sma50 < sma200
sideways — otherwise
"""
if any(math.isnan(v) for v in [sma_50, sma_200]):
return "sideways"
if current_price > sma_50 > sma_200:
return "uptrend"
if current_price < sma_50 < sma_200:
return "downtrend"
return "sideways"
# ─── Support / Resistance ─────────────────────────────────────────────────────
def compute_swing_levels(df: pd.DataFrame, lookback: int = 20, neighbors: int = 2) -> dict:
"""
Find swing highs (resistance) and swing lows (support) over the last
`lookback` trading days using a `neighbors`-bar pivot rule.
Returns:
{
'nearest_support': float | None,
'nearest_resistance': float | None,
'support_levels': [float],
'resistance_levels': [float],
}
"""
recent = df.tail(lookback + neighbors * 2)
highs = recent["High"].squeeze().values
lows = recent["Low"].squeeze().values
n = len(highs)
swing_highs = []
swing_lows = []
for i in range(neighbors, n - neighbors):
if all(highs[i] > highs[i - j] for j in range(1, neighbors + 1)) and \
all(highs[i] > highs[i + j] for j in range(1, neighbors + 1)):
swing_highs.append(float(highs[i]))
if all(lows[i] < lows[i - j] for j in range(1, neighbors + 1)) and \
all(lows[i] < lows[i + j] for j in range(1, neighbors + 1)):
swing_lows.append(float(lows[i]))
def cluster(levels: list[float], pct: float = 0.005) -> list[float]:
"""Merge levels within pct% of each other into their mean."""
if not levels:
return []
sorted_levels = sorted(levels)
clusters = [[sorted_levels[0]]]
for lvl in sorted_levels[1:]:
if abs(lvl - clusters[-1][-1]) / clusters[-1][-1] <= pct:
clusters[-1].append(lvl)
else:
clusters.append([lvl])
return [sum(c) / len(c) for c in clusters]
resistance_levels = cluster(swing_highs)
support_levels = cluster(swing_lows)
# Get current price for context
current_price = float(df["Close"].squeeze().iloc[-1])
nearest_resistance = min(
(r for r in resistance_levels if r > current_price), default=None
)
nearest_support = max(
(s for s in support_levels if s < current_price), default=None
)
return {
"nearest_support": nearest_support,
"nearest_resistance": nearest_resistance,
"support_levels": support_levels,
"resistance_levels": resistance_levels,
}
# ─── Signal Strength Scoring ──────────────────────────────────────────────────
def compute_signal_strength(
iv_rank: float,
trend: str,
strategy: str,
nearest_support: Optional[float],
nearest_resistance: Optional[float],
recommended_strike: Optional[float],
earnings_warning: bool,
) -> str:
"""
Score the signal quality and return 'strong', 'moderate', or 'weak'.
Scoring:
+2 iv_rank >= 50 (premium-rich environment)
+1 iv_rank >= 30
+1 trend aligned (uptrend for covered_call, not downtrend for csp)
+1 sma alignment bonus (trend == uptrend for CC)
+1 strike positioned well vs nearest level
-2 earnings_warning (dangerous to hold through earnings)
"""
score = 0
if iv_rank >= 50:
score += 2
elif iv_rank >= 30:
score += 1
if strategy == "covered_call":
if trend == "uptrend":
score += 2 # trend aligned (counts as +1 trend + +1 sma bonus)
elif trend == "sideways":
score += 1
if nearest_resistance and recommended_strike and recommended_strike < nearest_resistance:
score += 1
elif strategy == "cash_secured_put":
if trend != "downtrend":
score += 2
if trend == "uptrend":
score += 1 # extra bonus for strong bullish trend on a bullish strategy
if nearest_support and recommended_strike and recommended_strike > nearest_support:
score += 1
if earnings_warning:
score -= 2
if score >= 5:
return "strong"
elif score >= 3:
return "moderate"
else:
return "weak"
# ─── Signal Hash ──────────────────────────────────────────────────────────────
def compute_signal_hash(
iv_rank: float,
sma_50: float,
sma_200: float,
nearest_support: Optional[float],
nearest_resistance: Optional[float],
recommended_strike: Optional[float],
recommended_expiration: Optional[date],
earnings_warning: bool,
) -> str:
"""
16-char deterministic hash of rounded signal inputs.
Only changes when signals shift meaningfully — not on every tick.
"""
payload = {
"ivr": round(iv_rank, 1),
"sma50": round(sma_50, 2) if sma_50 and not math.isnan(sma_50) else None,
"sma200": round(sma_200, 2) if sma_200 and not math.isnan(sma_200) else None,
"support": round(nearest_support, 2) if nearest_support else None,
"resistance": round(nearest_resistance, 2) if nearest_resistance else None,
"strike": recommended_strike,
"expiry": str(recommended_expiration) if recommended_expiration else None,
"ew": earnings_warning,
}
raw = json.dumps(payload, sort_keys=True)
return hashlib.sha256(raw.encode()).hexdigest()[:16]
# ─── Full Signal Computation ──────────────────────────────────────────────────
def compute_signals(ticker: str) -> Optional[SignalSnapshot]:
"""
Compute and return a full SignalSnapshot for a ticker.
Returns None if market data is unavailable.
"""
from datetime import datetime
df = get_price_history(ticker)
if df is None or df.empty:
logger.warning(f"No price history for {ticker} — cannot compute signals")
return None
current_price = get_current_price(ticker)
if current_price is None:
current_price = float(df["Close"].squeeze().iloc[-1])
iv_rank = compute_iv_rank(df)
smas = compute_smas(df)
swing = compute_swing_levels(df)
trend = compute_trend(current_price, smas["sma_50"], smas["sma_200"])
earnings_date = get_earnings_date(ticker)
return SignalSnapshot(
ticker=ticker,
current_price=current_price,
iv_rank=iv_rank,
sma_50=smas["sma_50"] if not math.isnan(smas["sma_50"]) else 0.0,
sma_200=smas["sma_200"] if not math.isnan(smas["sma_200"]) else 0.0,
nearest_support=swing["nearest_support"],
nearest_resistance=swing["nearest_resistance"],
trend=trend,
earnings_date=earnings_date,
computed_at=datetime.utcnow(),
)