- Created monitor.py to replace bash script - Checks ROI Calculator API (interest form endpoint not found) - Properly parses JSON responses and tracks processed leads - Sends Telegram notifications for NEW leads only - Fixed timeout issues from bash script - State file now tracks both ROI and interest form leads - 3 test leads (IDs 5, 6, 7) detected and notifications sent
172 lines
5.2 KiB
Python
172 lines
5.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
HOA Ledger IQ Sales Lead Monitor
|
|
Checks both ROI Calculator and Interest Form endpoints
|
|
Sends Telegram notifications for new leads
|
|
"""
|
|
import json
|
|
import urllib.request
|
|
import subprocess
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
STATE_FILE = Path(__file__).parent / "state.json"
|
|
LOG_FILE = Path(__file__).parent / "monitor.log"
|
|
API_KEY = "K9mP2vL8x4qR7nZ"
|
|
TELEGRAM_TARGET = "telegram:8269921691"
|
|
|
|
# API Endpoints
|
|
ENDPOINTS = {
|
|
"roi_calculator": "https://www.hoaledgeriq.com/api/calc-submissions",
|
|
"interest_form": "https://www.hoaledgeriq.com/api/interest-submissions"
|
|
}
|
|
|
|
def log(message):
|
|
"""Log message with timestamp"""
|
|
ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
log_line = f"[{ts}] {message}"
|
|
print(log_line)
|
|
with open(LOG_FILE, 'a') as f:
|
|
f.write(log_line + '\n')
|
|
|
|
def load_state():
|
|
"""Load processed lead IDs from state file"""
|
|
if STATE_FILE.exists():
|
|
data = json.loads(STATE_FILE.read_text())
|
|
# Ensure both keys exist
|
|
if 'processed_roi' not in data:
|
|
data['processed_roi'] = data.get('processed_calc_ids', [])
|
|
if 'processed_interest' not in data:
|
|
data['processed_interest'] = []
|
|
return data
|
|
return {
|
|
"processed_roi": [],
|
|
"processed_interest": [],
|
|
"last_check": None
|
|
}
|
|
|
|
def save_state(state):
|
|
"""Save state to file"""
|
|
state['last_check'] = datetime.now().isoformat()
|
|
STATE_FILE.write_text(json.dumps(state, indent=2))
|
|
|
|
def fetch_endpoint(url):
|
|
"""Fetch data from API endpoint"""
|
|
try:
|
|
req = urllib.request.Request(
|
|
url,
|
|
headers={"x-admin-key": API_KEY}
|
|
)
|
|
with urllib.request.urlopen(req, timeout=10) as response:
|
|
return json.loads(response.read().decode())
|
|
except Exception as e:
|
|
log(f"❌ Error fetching {url}: {e}")
|
|
return None
|
|
|
|
def send_telegram_notification(lead_id, email, source, property_type=None, homesites=None):
|
|
"""Send Telegram notification for new lead"""
|
|
message = f"🎉 *NEW LEAD ALERT*\n\n"
|
|
message += f"*Source:* {source}\n"
|
|
message += f"*Lead ID:* {lead_id}\n"
|
|
message += f"*Email:* `{email}`\n"
|
|
|
|
if property_type:
|
|
message += f"*Property Type:* {property_type}\n"
|
|
if homesites:
|
|
message += f"*Homesites:* {homesites:,}\n"
|
|
|
|
message += f"\n*Time:* {datetime.now().strftime('%Y-%m-%d %H:%M')}"
|
|
|
|
try:
|
|
subprocess.run([
|
|
"openclaw", "message", "send",
|
|
"--channel", "telegram",
|
|
"--target", TELEGRAM_TARGET,
|
|
"--message", message
|
|
], capture_output=True, timeout=30)
|
|
log(f"✅ Notification sent for lead {lead_id} ({email})")
|
|
except Exception as e:
|
|
log(f"❌ Failed to send notification: {e}")
|
|
|
|
def check_roi_calculator(state):
|
|
"""Check ROI Calculator submissions"""
|
|
log("📊 Checking ROI Calculator...")
|
|
data = fetch_endpoint(ENDPOINTS["roi_calculator"])
|
|
|
|
if not data or 'submissions' not in data:
|
|
log(" No submissions found")
|
|
return state
|
|
|
|
new_count = 0
|
|
for submission in data['submissions']:
|
|
lead_id = submission.get('id')
|
|
email = submission.get('email', 'unknown')
|
|
|
|
if lead_id not in state['processed_roi']:
|
|
log(f" 🎯 NEW: ID {lead_id} - {email}")
|
|
send_telegram_notification(
|
|
lead_id=lead_id,
|
|
email=email,
|
|
source="ROI Calculator",
|
|
property_type=submission.get('property_type', '').upper(),
|
|
homesites=submission.get('homesites')
|
|
)
|
|
state['processed_roi'].append(lead_id)
|
|
new_count += 1
|
|
|
|
if new_count == 0:
|
|
log(f" ✓ No new submissions (last ID: {data['submissions'][0]['id'] if data['submissions'] else 'none'})")
|
|
|
|
return state
|
|
|
|
def check_interest_form(state):
|
|
"""Check Interest Form submissions"""
|
|
log("📝 Checking Interest Form...")
|
|
data = fetch_endpoint(ENDPOINTS["interest_form"])
|
|
|
|
if not data or 'submissions' not in data:
|
|
log(" No submissions found")
|
|
return state
|
|
|
|
new_count = 0
|
|
for submission in data['submissions']:
|
|
lead_id = submission.get('id')
|
|
email = submission.get('email', 'unknown')
|
|
|
|
if lead_id not in state['processed_interest']:
|
|
log(f" 🎯 NEW: ID {lead_id} - {email}")
|
|
send_telegram_notification(
|
|
lead_id=lead_id,
|
|
email=email,
|
|
source="Interest Form",
|
|
property_type=submission.get('property_type', ''),
|
|
homesites=submission.get('homesites')
|
|
)
|
|
state['processed_interest'].append(lead_id)
|
|
new_count += 1
|
|
|
|
if new_count == 0:
|
|
log(f" ✓ No new submissions")
|
|
|
|
return state
|
|
|
|
def main():
|
|
log("=" * 50)
|
|
log("Starting Sales Lead Monitor Check")
|
|
|
|
# Load state
|
|
state = load_state()
|
|
|
|
# Check both endpoints
|
|
state = check_roi_calculator(state)
|
|
state = check_interest_form(state)
|
|
|
|
# Save updated state
|
|
save_state(state)
|
|
|
|
log("✓ Monitor check complete")
|
|
log("=" * 50)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|