feat: Integrate GA4 analytics into SEO agent
- Added GA4 traffic monitoring to seo-agent.py - Tracks sessions, users, bounce rate from GA4 - Detects traffic anomalies (>50% drop triggers alert) - Maintains 30-day traffic history in state - Updated daily-report.sh with enhanced GA4 metrics - GA4 data now flows to morning brief - Hourly checks every 6 hours to avoid API fatigue
This commit is contained in:
@@ -3,26 +3,30 @@
|
||||
Marketing-SEO Agent - 24/7 Continuous Monitoring
|
||||
Monitors: site health, rankings, traffic, competitors
|
||||
Alerts: Telegram/email on critical issues
|
||||
GA4 Integration: Tracks traffic anomalies, session drops, user engagement
|
||||
"""
|
||||
import json
|
||||
import time
|
||||
import urllib.request
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
WORKSPACE = Path(__file__).parent.parent
|
||||
LOG_DIR = WORKSPACE / "logs"
|
||||
STATE_FILE = WORKSPACE / "state" / "agent-state.json"
|
||||
CONFIG_FILE = WORKSPACE / "config" / "agent-config.yaml"
|
||||
|
||||
GA4_SCRIPT = WORKSPACE / "scripts" / "ga4-direct.py"
|
||||
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
SITES = [
|
||||
"https://www.hoaledgeriq.com",
|
||||
"https://app.hoaledgeriq.com"
|
||||
]
|
||||
|
||||
MONITOR_INTERVAL = 3600 # 1 hour
|
||||
TRAFFIC_DROP_THRESHOLD = 0.50 # Alert if traffic drops >50%
|
||||
|
||||
def log(msg):
|
||||
ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
@@ -34,7 +38,7 @@ def log(msg):
|
||||
def load_state():
|
||||
if STATE_FILE.exists():
|
||||
return json.loads(STATE_FILE.read_text())
|
||||
return {"last_check": None, "alerts_today": 0, "status": "running"}
|
||||
return {"last_check": None, "alerts_today": 0, "status": "running", "traffic_history": []}
|
||||
|
||||
def save_state(s):
|
||||
STATE_FILE.write_text(json.dumps(s, indent=2))
|
||||
@@ -49,36 +53,75 @@ def check_site_health(url):
|
||||
except Exception as e:
|
||||
return False, str(e), None
|
||||
|
||||
def run_seo_audit():
|
||||
"""Run basic SEO checks using web tools"""
|
||||
results = {
|
||||
"site_up": False,
|
||||
"response_time": None,
|
||||
"ssl_valid": True,
|
||||
"robots_accessible": False,
|
||||
"sitemap_exists": False
|
||||
}
|
||||
|
||||
# Check main site
|
||||
start = time.time()
|
||||
results["site_up"], status = check_site_health()
|
||||
results["response_time"] = round(time.time() - start, 2)
|
||||
|
||||
# Check robots.txt
|
||||
def get_ga4_data():
|
||||
"""
|
||||
Get GA4 traffic data from ga4-direct.py
|
||||
Returns: dict with sessions, users, bounce_rate or None if error
|
||||
"""
|
||||
try:
|
||||
urllib.request.urlopen(f"{SITE_URL}/robots.txt", timeout=5)
|
||||
results["robots_accessible"] = True
|
||||
except:
|
||||
pass
|
||||
result = subprocess.run(
|
||||
[sys.executable, str(GA4_SCRIPT)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
# Parse output - looks for lines like "Sessions: 123"
|
||||
output = result.stdout
|
||||
data = {}
|
||||
|
||||
for line in output.split('\n'):
|
||||
if 'Sessions:' in line:
|
||||
data['sessions'] = int(''.join(filter(str.isdigit, line.split('Sessions:')[1])))
|
||||
elif 'Users:' in line or 'Active Users:' in line:
|
||||
data['users'] = int(''.join(filter(str.isdigit, line.split('Users:')[1])))
|
||||
elif 'Bounce Rate:' in line:
|
||||
data['bounce_rate'] = float(''.join(filter(lambda x: x.isdigit() or x == '.', line.split('Bounce Rate:')[1])))
|
||||
|
||||
return data if data else None
|
||||
except Exception as e:
|
||||
log(f"GA4 fetch error: {e}")
|
||||
return None
|
||||
|
||||
def check_traffic_anomalies(current_data):
|
||||
"""
|
||||
Check if current traffic has significant drops compared to historical data
|
||||
Returns: alert message if anomaly detected, None otherwise
|
||||
"""
|
||||
state = load_state()
|
||||
history = state.get('traffic_history', [])
|
||||
|
||||
# Check sitemap
|
||||
try:
|
||||
urllib.request.urlopen(f"{SITE_URL}/sitemap.xml", timeout=5)
|
||||
results["sitemap_exists"] = True
|
||||
except:
|
||||
pass
|
||||
if not current_data or 'sessions' not in current_data:
|
||||
return None
|
||||
|
||||
return results
|
||||
current_sessions = current_data.get('sessions', 0)
|
||||
|
||||
# Need at least 2 days of history to compare
|
||||
if len(history) >= 2:
|
||||
avg_sessions = sum(h.get('sessions', 0) for h in history[-7:]) / min(len(history), 7)
|
||||
|
||||
if avg_sessions > 0:
|
||||
drop_pct = (avg_sessions - current_sessions) / avg_sessions
|
||||
|
||||
if drop_pct > TRAFFIC_DROP_THRESHOLD:
|
||||
return f"🚨 Traffic drop detected: {drop_pct*100:.1f}% below average\n• Current: {current_sessions} sessions\n• 7-day avg: {avg_sessions:.0f} sessions\n• Threshold: {TRAFFIC_DROP_THRESHOLD*100}% drop"
|
||||
|
||||
# Add to history (keep last 30 days)
|
||||
history.append({
|
||||
'date': datetime.now().strftime('%Y-%m-%d'),
|
||||
'sessions': current_sessions,
|
||||
'users': current_data.get('users', 0),
|
||||
'timestamp': datetime.now().isoformat()
|
||||
})
|
||||
|
||||
if len(history) > 30:
|
||||
history = history[-30:]
|
||||
|
||||
state['traffic_history'] = history
|
||||
save_state(state)
|
||||
|
||||
return None
|
||||
|
||||
def send_alert(title, message, severity="warning"):
|
||||
"""Send alert via multiple channels"""
|
||||
@@ -87,65 +130,87 @@ def send_alert(title, message, severity="warning"):
|
||||
# Telegram alert
|
||||
try:
|
||||
tg_msg = f"🔔 *SEO Alert: {title}*\n\n{message}\n\n⏰ {datetime.now().strftime('%H:%M')}"
|
||||
subprocess.run(["openclaw", "message", "send", "--text", tg_msg],
|
||||
capture_output=True, timeout=10)
|
||||
except:
|
||||
pass
|
||||
subprocess.run(["openclaw", "message", "send", "--text", tg_msg], capture_output=True, timeout=10)
|
||||
except Exception as e:
|
||||
log(f"Telegram send failed: {e}")
|
||||
|
||||
# Log to alerts
|
||||
# Log to alerts file
|
||||
with open(LOG_DIR / f"alerts-{datetime.now().strftime('%Y%m%d')}.log", 'a') as f:
|
||||
f.write(f"[{severity.upper()}] {datetime.now().isoformat()}: {title}\n{message}\n\n")
|
||||
|
||||
def hourly_check():
|
||||
"""Run every hour - check both sites"""
|
||||
"""Run every hour - check both sites and GA4 traffic"""
|
||||
log("=== Hourly Site Check ===")
|
||||
|
||||
all_healthy = True
|
||||
results = {}
|
||||
results = {
|
||||
'sites': {},
|
||||
'traffic': None,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
}
|
||||
|
||||
# Check site health
|
||||
for site in SITES:
|
||||
log(f"Checking {site}...")
|
||||
is_up, status, response_time = check_site_health(site)
|
||||
results[site] = {"up": is_up, "status": status, "time": response_time}
|
||||
results['sites'][site] = {"up": is_up, "status": status, "time": response_time}
|
||||
|
||||
if is_up:
|
||||
log(f"✅ {site}: UP ({status}) - {response_time}s")
|
||||
else:
|
||||
log(f"❌ {site}: DOWN ({status})")
|
||||
send_alert(f"SITE DOWN: {site}", f"Status: {status}", "critical")
|
||||
all_healthy = False
|
||||
send_alert(f"SITE DOWN: {site}", f"Status: {status}\nURL: {site}", "critical")
|
||||
|
||||
# Check GA4 traffic (every 6 hours to avoid API fatigue)
|
||||
if datetime.now().hour % 6 == 0:
|
||||
log("Fetching GA4 traffic data...")
|
||||
traffic_data = get_ga4_data()
|
||||
|
||||
if traffic_data:
|
||||
results['traffic'] = traffic_data
|
||||
log(f"📊 GA4 Data: {traffic_data.get('sessions', 0)} sessions, {traffic_data.get('users', 0)} users")
|
||||
|
||||
# Check for anomalies
|
||||
anomaly = check_traffic_anomalies(traffic_data)
|
||||
if anomaly:
|
||||
send_alert("Traffic Anomaly Detected", anomaly, "warning")
|
||||
else:
|
||||
log("⚠️ Could not fetch GA4 data")
|
||||
|
||||
return results
|
||||
|
||||
def daily_report():
|
||||
"""Generate daily summary"""
|
||||
"""Generate comprehensive daily summary with GA4 data"""
|
||||
log("=== Daily SEO Report ===")
|
||||
|
||||
# Compile stats
|
||||
s = load_state()
|
||||
# Get GA4 data
|
||||
traffic_data = get_ga4_data()
|
||||
traffic_report = ""
|
||||
|
||||
# Check Search Console (if configured)
|
||||
# This would integrate with actual APIs
|
||||
if traffic_data:
|
||||
traffic_report = f"""
|
||||
📈 *Traffic (24h):*
|
||||
• Sessions: {traffic_data.get('sessions', 'N/A')}
|
||||
• Users: {traffic_data.get('users', 'N/A')}
|
||||
• Bounce Rate: {traffic_data.get('bounce_rate', 'N/A')}%"""
|
||||
else:
|
||||
traffic_report = "\n📈 *Traffic:* Data unavailable"
|
||||
|
||||
report = f"""📊 SEO Daily Report - {datetime.now().strftime('%Y-%m-%d')}
|
||||
|
||||
Site Status: ✅ Healthy
|
||||
Response Time: ~200ms
|
||||
SSL: Valid
|
||||
Monitoring: 24/7 Active
|
||||
🌐 *Site Status:*
|
||||
• www.hoaledgeriq.com: ✅ UP
|
||||
• app.hoaledgeriq.com: ✅ UP
|
||||
{traffic_report}
|
||||
|
||||
Tomorrow's Focus:
|
||||
- Competitor analysis
|
||||
- Rankings check
|
||||
- Content opportunities
|
||||
⚡ *Status:* Healthy
|
||||
🔍 *Focus:* Competitor analysis & rankings"""
|
||||
|
||||
No critical issues detected."""
|
||||
|
||||
send_alert("Daily SEO Summary", report, "info")
|
||||
|
||||
def main():
|
||||
log("🚀 Marketing-SEO Agent Started - Hourly Mode")
|
||||
log("🚀 Marketing-SEO Agent Started - Hourly Mode with GA4 Integration")
|
||||
log(f"Monitoring: {', '.join(SITES)}")
|
||||
log(f"GA4 Script: {GA4_SCRIPT}")
|
||||
|
||||
last_check = 0
|
||||
last_daily = None
|
||||
@@ -160,11 +225,11 @@ def main():
|
||||
last_check = now_ts
|
||||
|
||||
# Daily report at 08:00
|
||||
if now.hour == 8 and now.strftime('%Y-%m-%d') != last_daily:
|
||||
if now.hour == 8 and now.minute == 0 and now.strftime('%Y-%m-%d') != last_daily:
|
||||
daily_report()
|
||||
last_daily = now.strftime('%Y-%m-%d')
|
||||
|
||||
time.sleep(60) # Check every minute for hourly trigger
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user