autostream-ai-agent/backend/tools.py
2026-04-12 21:03:52 +05:30

64 lines
No EOL
2.3 KiB
Python

"""
tools.py — Tool functions for AutoStream Agent
Includes lead capture, email validation, and logging.
"""
import re
import logging
import time
logger = logging.getLogger("autostream.tools")
# ── Email validator ────────────────────────────────────────────────────────────
EMAIL_RE = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$")
def is_valid_email(value: str) -> bool:
return bool(EMAIL_RE.match(value.strip()))
# ── Lead capture ───────────────────────────────────────────────────────────────
_lead_log: list[dict] = [] # in-memory store; swap for DB in production
def mock_lead_capture(name: str, email: str, platform: str) -> str:
"""
Simulates saving a lead to CRM / database.
Returns a confirmation string.
"""
entry = {
"name": name,
"email": email,
"platform": platform,
"captured_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
_lead_log.append(entry)
logger.info(f"[LEAD CAPTURED] {entry}")
print(f"\n✅ Lead captured: {name} | {email} | {platform}\n")
return (
f"🎉 You're all set, {name}! "
f"Our team will reach out at {email} shortly. "
f"Can't wait to supercharge your {platform} content!"
)
def get_all_leads() -> list[dict]:
"""Return all captured leads (for admin/debug use)."""
return list(_lead_log)
# ── Platform normaliser ────────────────────────────────────────────────────────
PLATFORM_ALIASES = {
"yt": "YouTube", "youtube": "YouTube",
"ig": "Instagram", "insta": "Instagram", "instagram": "Instagram",
"tt": "TikTok", "tiktok": "TikTok",
"tw": "Twitter", "twitter": "Twitter", "x": "Twitter/X",
"fb": "Facebook", "facebook": "Facebook",
"li": "LinkedIn", "linkedin": "LinkedIn",
"twitch": "Twitch",
}
def normalize_platform(value: str) -> str:
key = value.strip().lower().rstrip(".")
return PLATFORM_ALIASES.get(key, value.strip().title())