- Added GA4 traffic monitoring to seo-agent.py - Tracks sessions, users, bounce rate from GA4 - Detects traffic anomalies (>50% drop triggers alert) - Maintains 30-day traffic history in state - Updated daily-report.sh with enhanced GA4 metrics - GA4 data now flows to morning brief - Hourly checks every 6 hours to avoid API fatigue
236 lines
7.8 KiB
Python
Executable File
236 lines
7.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Marketing-SEO Agent - 24/7 Continuous Monitoring
|
|
Monitors: site health, rankings, traffic, competitors
|
|
Alerts: Telegram/email on critical issues
|
|
GA4 Integration: Tracks traffic anomalies, session drops, user engagement
|
|
"""
|
|
import json
|
|
import time
|
|
import urllib.request
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
import subprocess
|
|
import sys
|
|
|
|
WORKSPACE = Path(__file__).parent.parent
|
|
LOG_DIR = WORKSPACE / "logs"
|
|
STATE_FILE = WORKSPACE / "state" / "agent-state.json"
|
|
CONFIG_FILE = WORKSPACE / "config" / "agent-config.yaml"
|
|
GA4_SCRIPT = WORKSPACE / "scripts" / "ga4-direct.py"
|
|
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
SITES = [
|
|
"https://www.hoaledgeriq.com",
|
|
"https://app.hoaledgeriq.com"
|
|
]
|
|
|
|
MONITOR_INTERVAL = 3600 # 1 hour
|
|
TRAFFIC_DROP_THRESHOLD = 0.50 # Alert if traffic drops >50%
|
|
|
|
def log(msg):
|
|
ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
line = f"[{ts}] {msg}"
|
|
print(line)
|
|
with open(LOG_DIR / f"seo-agent-{datetime.now().strftime('%Y%m%d')}.log", 'a') as f:
|
|
f.write(line + '\n')
|
|
|
|
def load_state():
|
|
if STATE_FILE.exists():
|
|
return json.loads(STATE_FILE.read_text())
|
|
return {"last_check": None, "alerts_today": 0, "status": "running", "traffic_history": []}
|
|
|
|
def save_state(s):
|
|
STATE_FILE.write_text(json.dumps(s, indent=2))
|
|
|
|
def check_site_health(url):
|
|
"""Check if site is up"""
|
|
start = time.time()
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "SEO-Agent/1.0"})
|
|
with urllib.request.urlopen(req, timeout=15) as r:
|
|
return r.getcode() == 200, r.getcode(), round(time.time() - start, 2)
|
|
except Exception as e:
|
|
return False, str(e), None
|
|
|
|
def get_ga4_data():
|
|
"""
|
|
Get GA4 traffic data from ga4-direct.py
|
|
Returns: dict with sessions, users, bounce_rate or None if error
|
|
"""
|
|
try:
|
|
result = subprocess.run(
|
|
[sys.executable, str(GA4_SCRIPT)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# Parse output - looks for lines like "Sessions: 123"
|
|
output = result.stdout
|
|
data = {}
|
|
|
|
for line in output.split('\n'):
|
|
if 'Sessions:' in line:
|
|
data['sessions'] = int(''.join(filter(str.isdigit, line.split('Sessions:')[1])))
|
|
elif 'Users:' in line or 'Active Users:' in line:
|
|
data['users'] = int(''.join(filter(str.isdigit, line.split('Users:')[1])))
|
|
elif 'Bounce Rate:' in line:
|
|
data['bounce_rate'] = float(''.join(filter(lambda x: x.isdigit() or x == '.', line.split('Bounce Rate:')[1])))
|
|
|
|
return data if data else None
|
|
except Exception as e:
|
|
log(f"GA4 fetch error: {e}")
|
|
return None
|
|
|
|
def check_traffic_anomalies(current_data):
|
|
"""
|
|
Check if current traffic has significant drops compared to historical data
|
|
Returns: alert message if anomaly detected, None otherwise
|
|
"""
|
|
state = load_state()
|
|
history = state.get('traffic_history', [])
|
|
|
|
if not current_data or 'sessions' not in current_data:
|
|
return None
|
|
|
|
current_sessions = current_data.get('sessions', 0)
|
|
|
|
# Need at least 2 days of history to compare
|
|
if len(history) >= 2:
|
|
avg_sessions = sum(h.get('sessions', 0) for h in history[-7:]) / min(len(history), 7)
|
|
|
|
if avg_sessions > 0:
|
|
drop_pct = (avg_sessions - current_sessions) / avg_sessions
|
|
|
|
if drop_pct > TRAFFIC_DROP_THRESHOLD:
|
|
return f"🚨 Traffic drop detected: {drop_pct*100:.1f}% below average\n• Current: {current_sessions} sessions\n• 7-day avg: {avg_sessions:.0f} sessions\n• Threshold: {TRAFFIC_DROP_THRESHOLD*100}% drop"
|
|
|
|
# Add to history (keep last 30 days)
|
|
history.append({
|
|
'date': datetime.now().strftime('%Y-%m-%d'),
|
|
'sessions': current_sessions,
|
|
'users': current_data.get('users', 0),
|
|
'timestamp': datetime.now().isoformat()
|
|
})
|
|
|
|
if len(history) > 30:
|
|
history = history[-30:]
|
|
|
|
state['traffic_history'] = history
|
|
save_state(state)
|
|
|
|
return None
|
|
|
|
def send_alert(title, message, severity="warning"):
|
|
"""Send alert via multiple channels"""
|
|
log(f"🔔 ALERT [{severity}]: {title}")
|
|
|
|
# Telegram alert
|
|
try:
|
|
tg_msg = f"🔔 *SEO Alert: {title}*\n\n{message}\n\n⏰ {datetime.now().strftime('%H:%M')}"
|
|
subprocess.run(["openclaw", "message", "send", "--text", tg_msg], capture_output=True, timeout=10)
|
|
except Exception as e:
|
|
log(f"Telegram send failed: {e}")
|
|
|
|
# Log to alerts file
|
|
with open(LOG_DIR / f"alerts-{datetime.now().strftime('%Y%m%d')}.log", 'a') as f:
|
|
f.write(f"[{severity.upper()}] {datetime.now().isoformat()}: {title}\n{message}\n\n")
|
|
|
|
def hourly_check():
|
|
"""Run every hour - check both sites and GA4 traffic"""
|
|
log("=== Hourly Site Check ===")
|
|
|
|
results = {
|
|
'sites': {},
|
|
'traffic': None,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
# Check site health
|
|
for site in SITES:
|
|
log(f"Checking {site}...")
|
|
is_up, status, response_time = check_site_health(site)
|
|
results['sites'][site] = {"up": is_up, "status": status, "time": response_time}
|
|
|
|
if is_up:
|
|
log(f"✅ {site}: UP ({status}) - {response_time}s")
|
|
else:
|
|
log(f"❌ {site}: DOWN ({status})")
|
|
send_alert(f"SITE DOWN: {site}", f"Status: {status}\nURL: {site}", "critical")
|
|
|
|
# Check GA4 traffic (every 6 hours to avoid API fatigue)
|
|
if datetime.now().hour % 6 == 0:
|
|
log("Fetching GA4 traffic data...")
|
|
traffic_data = get_ga4_data()
|
|
|
|
if traffic_data:
|
|
results['traffic'] = traffic_data
|
|
log(f"📊 GA4 Data: {traffic_data.get('sessions', 0)} sessions, {traffic_data.get('users', 0)} users")
|
|
|
|
# Check for anomalies
|
|
anomaly = check_traffic_anomalies(traffic_data)
|
|
if anomaly:
|
|
send_alert("Traffic Anomaly Detected", anomaly, "warning")
|
|
else:
|
|
log("⚠️ Could not fetch GA4 data")
|
|
|
|
return results
|
|
|
|
def daily_report():
|
|
"""Generate comprehensive daily summary with GA4 data"""
|
|
log("=== Daily SEO Report ===")
|
|
|
|
# Get GA4 data
|
|
traffic_data = get_ga4_data()
|
|
traffic_report = ""
|
|
|
|
if traffic_data:
|
|
traffic_report = f"""
|
|
📈 *Traffic (24h):*
|
|
• Sessions: {traffic_data.get('sessions', 'N/A')}
|
|
• Users: {traffic_data.get('users', 'N/A')}
|
|
• Bounce Rate: {traffic_data.get('bounce_rate', 'N/A')}%"""
|
|
else:
|
|
traffic_report = "\n📈 *Traffic:* Data unavailable"
|
|
|
|
report = f"""📊 SEO Daily Report - {datetime.now().strftime('%Y-%m-%d')}
|
|
|
|
🌐 *Site Status:*
|
|
• www.hoaledgeriq.com: ✅ UP
|
|
• app.hoaledgeriq.com: ✅ UP
|
|
{traffic_report}
|
|
|
|
⚡ *Status:* Healthy
|
|
🔍 *Focus:* Competitor analysis & rankings"""
|
|
|
|
send_alert("Daily SEO Summary", report, "info")
|
|
|
|
def main():
|
|
log("🚀 Marketing-SEO Agent Started - Hourly Mode with GA4 Integration")
|
|
log(f"Monitoring: {', '.join(SITES)}")
|
|
log(f"GA4 Script: {GA4_SCRIPT}")
|
|
|
|
last_check = 0
|
|
last_daily = None
|
|
|
|
while True:
|
|
now = datetime.now()
|
|
now_ts = int(now.timestamp())
|
|
|
|
# Hourly check
|
|
if now_ts - last_check >= MONITOR_INTERVAL:
|
|
hourly_check()
|
|
last_check = now_ts
|
|
|
|
# Daily report at 08:00
|
|
if now.hour == 8 and now.minute == 0 and now.strftime('%Y-%m-%d') != last_daily:
|
|
daily_report()
|
|
last_daily = now.strftime('%Y-%m-%d')
|
|
|
|
time.sleep(60) # Check every minute for hourly trigger
|
|
|
|
if __name__ == "__main__":
|
|
main()
|