Full-stack iOS options trading assistant: - Python FastAPI backend with SQLite, APScheduler (15-min position monitor), APNs push notifications, and yfinance market data integration - Signal engine: IV Rank (rolling HV proxy), SMA-50/200, swing-based support/resistance, earnings detection, signal strength scoring and noise-resistant SHA hash for change detection - Recommendation engine: covered call and cash-secured put strike/expiry selection across 0DTE, 1DTE, weekly, and monthly horizons - REST API: /devices, /portfolio, /recommendations, /positions, /signals, /alerts - iOS SwiftUI app (iOS 17+): dashboard, recommendations, trades, portfolio, and alerts tabs with push notification deep-linking - Unit + integration tests for signal engine and API layer Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
125 lines
3.4 KiB
Python
125 lines
3.4 KiB
Python
"""
|
|
apns_service.py — Send push notifications via APNs HTTP/2.
|
|
|
|
Uses JWT authentication with a .p8 key. JWT is cached for 50 minutes
|
|
and auto-renewed before the 60-minute APNs expiry.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
import jwt as pyjwt
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_APNS_HOST_PROD = "https://api.push.apple.com"
|
|
_APNS_HOST_SANDBOX = "https://api.sandbox.push.apple.com"
|
|
|
|
# Cached JWT state
|
|
_jwt_token: Optional[str] = None
|
|
_jwt_issued_at: Optional[float] = None
|
|
_JWT_TTL_SECONDS = 50 * 60 # 50 minutes — renew before Apple's 60-min limit
|
|
|
|
|
|
def _get_apns_host() -> str:
|
|
return _APNS_HOST_SANDBOX if settings.apns_use_sandbox else _APNS_HOST_PROD
|
|
|
|
|
|
def _load_private_key() -> str:
|
|
try:
|
|
with open(settings.apns_key_path, "r") as f:
|
|
return f.read()
|
|
except FileNotFoundError:
|
|
logger.warning(f"APNs key not found at {settings.apns_key_path} — push disabled")
|
|
return ""
|
|
|
|
|
|
def _get_jwt() -> Optional[str]:
|
|
global _jwt_token, _jwt_issued_at
|
|
|
|
now = time.time()
|
|
if _jwt_token and _jwt_issued_at and (now - _jwt_issued_at) < _JWT_TTL_SECONDS:
|
|
return _jwt_token
|
|
|
|
private_key = _load_private_key()
|
|
if not private_key:
|
|
return None
|
|
|
|
if not settings.apns_key_id or not settings.apns_team_id:
|
|
logger.warning("APNs key ID or team ID not configured — push disabled")
|
|
return None
|
|
|
|
payload = {
|
|
"iss": settings.apns_team_id,
|
|
"iat": int(now),
|
|
}
|
|
headers = {
|
|
"alg": "ES256",
|
|
"kid": settings.apns_key_id,
|
|
}
|
|
|
|
try:
|
|
token = pyjwt.encode(payload, private_key, algorithm="ES256", headers=headers)
|
|
_jwt_token = token if isinstance(token, str) else token.decode("utf-8")
|
|
_jwt_issued_at = now
|
|
return _jwt_token
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate APNs JWT: {e}")
|
|
return None
|
|
|
|
|
|
async def send_push(
|
|
apns_token: str,
|
|
title: str,
|
|
body: str,
|
|
payload: Optional[dict] = None,
|
|
) -> bool:
|
|
"""
|
|
Send a push notification to a device.
|
|
Returns True on success, False on failure.
|
|
"""
|
|
jwt_token = _get_jwt()
|
|
if not jwt_token:
|
|
logger.warning("APNs push skipped — no valid JWT (check .p8 key config)")
|
|
return False
|
|
|
|
apns_payload = {
|
|
"aps": {
|
|
"alert": {"title": title, "body": body},
|
|
"badge": 1,
|
|
"sound": "default",
|
|
"category": "POSITION_ALERT",
|
|
}
|
|
}
|
|
if payload:
|
|
apns_payload.update(payload)
|
|
|
|
url = f"{_get_apns_host()}/3/device/{apns_token}"
|
|
headers = {
|
|
"authorization": f"bearer {jwt_token}",
|
|
"apns-topic": settings.apns_bundle_id,
|
|
"apns-push-type": "alert",
|
|
"content-type": "application/json",
|
|
}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(http2=True, timeout=10.0) as client:
|
|
response = await client.post(url, headers=headers, content=json.dumps(apns_payload))
|
|
|
|
if response.status_code == 200:
|
|
logger.info(f"Push sent to {apns_token[:8]}... — {title}")
|
|
return True
|
|
else:
|
|
logger.error(f"APNs rejected push: {response.status_code} {response.text}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"APNs push failed: {e}")
|
|
return False
|