Merge pull request #158 from HKUDS/codex/notification-leaderboard-guards

Add notification and leaderboard safeguards
This commit is contained in:
Tianyu Fan 2026-03-18 20:40:17 +08:00 committed by GitHub
commit 7bf5191937
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 751 additions and 61 deletions

View file

@ -25,11 +25,16 @@ const API_BASE = '/api'
// Refresh interval from environment variable (default: 5 minutes)
const REFRESH_INTERVAL = parseInt(import.meta.env.VITE_REFRESH_INTERVAL || '300000', 10)
const NOTIFICATION_POLL_INTERVAL = 60 * 1000
const FIVE_MINUTES_MS = 5 * 60 * 1000
const ONE_DAY_MS = 24 * 60 * 60 * 1000
type LeaderboardChartRange = 'all' | '24h'
function getLeaderboardDays(chartRange: LeaderboardChartRange) {
return chartRange === '24h' ? 1 : 7
}
function parseRecordedAt(recordedAt: string) {
const normalized = /(?:Z|[+-]\d{2}:\d{2})$/.test(recordedAt) ? recordedAt : `${recordedAt}Z`
const parsed = new Date(normalized)
@ -138,6 +143,11 @@ function Toast({ message, type, onClose }: { message: string, type: 'success' |
return <div className={`toast ${type}`}>{message}</div>
}
type NotificationCounts = {
discussion: number
strategy: number
}
// Language Switcher
function LanguageSwitcher() {
const { language, setLanguage } = useLanguage()
@ -179,7 +189,19 @@ function LanguageSwitcher() {
}
// Sidebar Component
function Sidebar({ token, agentInfo, onLogout }: { token: string | null, agentInfo: any, onLogout: () => void }) {
function Sidebar({
token,
agentInfo,
onLogout,
notificationCounts,
onMarkCategoryRead
}: {
token: string | null
agentInfo: any
onLogout: () => void
notificationCounts: NotificationCounts
onMarkCategoryRead: (category: 'discussion' | 'strategy') => void
}) {
const location = useLocation()
const { t, language } = useLanguage()
const [showToken, setShowToken] = useState(false)
@ -188,13 +210,20 @@ function Sidebar({ token, agentInfo, onLogout }: { token: string | null, agentIn
{ path: '/', icon: '📊', label: t.nav.signals, requiresAuth: false },
{ path: '/leaderboard', icon: '🏆', label: language === 'zh' ? '排行榜' : 'Leaderboard', requiresAuth: false },
{ path: '/copytrading', icon: '📋', label: language === 'zh' ? '跟单' : 'Copy Trading', requiresAuth: true },
{ path: '/strategies', icon: '📈', label: t.nav.strategies, requiresAuth: false },
{ path: '/discussions', icon: '💬', label: t.nav.discussions, requiresAuth: false },
{ path: '/strategies', icon: '📈', label: t.nav.strategies, requiresAuth: false, badge: notificationCounts.strategy, category: 'strategy' as const },
{ path: '/discussions', icon: '💬', label: t.nav.discussions, requiresAuth: false, badge: notificationCounts.discussion, category: 'discussion' as const },
{ path: '/positions', icon: '💼', label: t.nav.positions, requiresAuth: false },
{ path: '/trade', icon: '💰', label: t.nav.trade, requiresAuth: true },
{ path: '/exchange', icon: '🎁', label: t.nav.exchange, requiresAuth: true },
]
useEffect(() => {
const activeItem = navItems.find((item) => item.path === location.pathname)
if (activeItem?.category && (activeItem.badge || 0) > 0) {
onMarkCategoryRead(activeItem.category)
}
}, [location.pathname, notificationCounts.discussion, notificationCounts.strategy])
return (
<div className="sidebar">
<div className="logo">
@ -210,10 +239,35 @@ function Sidebar({ token, agentInfo, onLogout }: { token: string | null, agentIn
to={item.path}
className={`nav-link ${location.pathname === item.path ? 'active' : ''}`}
title={!token && item.requiresAuth ? (language === 'zh' ? '登录后可用' : 'Login required') : undefined}
onClick={() => {
if (item.category && (item.badge || 0) > 0) {
onMarkCategoryRead(item.category)
}
}}
>
<span className="nav-icon">{item.icon}</span>
<span style={{ display: 'flex', alignItems: 'center', justifyContent: 'space-between', width: '100%', gap: '8px' }}>
<span>{item.label}</span>
<span style={{ display: 'flex', alignItems: 'center', gap: '8px' }}>
<span>{item.label}</span>
{(item.badge || 0) > 0 && (
<span style={{
minWidth: '18px',
height: '18px',
padding: '0 6px',
borderRadius: '999px',
background: '#ef4444',
color: '#fff',
fontSize: '11px',
fontWeight: 700,
display: 'inline-flex',
alignItems: 'center',
justifyContent: 'center',
lineHeight: 1
}}>
{item.badge && item.badge > 99 ? '99+' : item.badge}
</span>
)}
</span>
{!token && item.requiresAuth && (
<span style={{ fontSize: '11px', color: 'var(--text-muted)' }}>
{language === 'zh' ? '需登录' : 'Login'}
@ -358,9 +412,13 @@ function SignalCard({ signal, onRefresh }: { signal: any, onRefresh?: () => void
setReplyContent('')
loadReplies()
onRefresh?.()
} else {
const data = await res.json()
alert(data.detail || (language === 'zh' ? '回复发送失败' : 'Failed to send reply'))
}
} catch (e) {
console.error(e)
alert(language === 'zh' ? '回复发送失败' : 'Failed to send reply')
}
setSubmitting(false)
}
@ -1184,11 +1242,12 @@ function LeaderboardPage({ token }: { token?: string | null }) {
loadProfitHistory()
}, REFRESH_INTERVAL)
return () => clearInterval(interval)
}, [])
}, [chartRange])
const loadProfitHistory = async () => {
try {
const res = await fetch(`${API_BASE}/profit/history?limit=10`)
const days = getLeaderboardDays(chartRange)
const res = await fetch(`${API_BASE}/profit/history?limit=10&days=${days}`)
const data = await res.json()
setProfitHistory(data.top_agents || [])
} catch (e) {
@ -1545,17 +1604,22 @@ function StrategiesPage() {
function DiscussionsPage() {
const [token] = useState<string | null>(localStorage.getItem('claw_token'))
const [discussions, setDiscussions] = useState<any[]>([])
const [recentNotifications, setRecentNotifications] = useState<any[]>([])
const [loading, setLoading] = useState(true)
const [showForm, setShowForm] = useState(false)
const [formData, setFormData] = useState({ title: '', content: '', tags: '', market: 'us-stock' })
const { t, language } = useLanguage()
const location = useLocation()
const navigate = useNavigate()
// Get signal ID from query parameter
const signalIdFromQuery = new URLSearchParams(location.search).get('signal')
useEffect(() => {
loadDiscussions()
if (token) {
loadRecentNotifications()
}
}, [])
const loadDiscussions = async () => {
@ -1577,6 +1641,24 @@ function DiscussionsPage() {
setLoading(false)
}
const loadRecentNotifications = async () => {
if (!token) return
try {
const res = await fetch(`${API_BASE}/claw/messages/recent?category=discussion&limit=8`, {
headers: { 'Authorization': `Bearer ${token}` }
})
if (!res.ok) {
setRecentNotifications([])
return
}
const data = await res.json()
setRecentNotifications(data.messages || [])
} catch (e) {
console.error(e)
setRecentNotifications([])
}
}
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault()
if (!token) return
@ -1599,9 +1681,14 @@ function DiscussionsPage() {
setFormData({ title: '', content: '', tags: '', market: 'us-stock' })
setShowForm(false)
loadDiscussions()
loadRecentNotifications()
} else {
const data = await res.json()
alert(data.detail || (language === 'zh' ? '发布讨论失败' : 'Failed to post discussion'))
}
} catch (e) {
console.error(e)
alert(language === 'zh' ? '发布讨论失败' : 'Failed to post discussion')
}
}
@ -1619,6 +1706,53 @@ function DiscussionsPage() {
)}
</div>
{token && recentNotifications.length > 0 && (
<div className="card" style={{ marginBottom: '20px' }}>
<div style={{ display: 'flex', alignItems: 'center', justifyContent: 'space-between', marginBottom: '16px' }}>
<h3 className="card-title" style={{ marginBottom: 0 }}>
{language === 'zh' ? '最近通知' : 'Recent Notifications'}
</h3>
<button
className="btn btn-ghost"
style={{ padding: '6px 10px', fontSize: '12px' }}
onClick={loadRecentNotifications}
>
{language === 'zh' ? '刷新' : 'Refresh'}
</button>
</div>
<div style={{ display: 'flex', flexDirection: 'column', gap: '10px' }}>
{recentNotifications.map((message: any) => {
const signalId = message.data?.signal_id
return (
<button
key={message.id}
type="button"
onClick={() => signalId && navigate(`/discussions?signal=${signalId}`)}
style={{
textAlign: 'left',
padding: '12px 14px',
background: message.read ? 'var(--bg-tertiary)' : 'rgba(34, 197, 94, 0.08)',
border: '1px solid var(--border-color)',
borderRadius: '10px',
cursor: signalId ? 'pointer' : 'default'
}}
>
<div style={{ fontSize: '14px', fontWeight: 600, marginBottom: '4px' }}>
{message.content}
</div>
<div style={{ fontSize: '12px', color: 'var(--text-secondary)' }}>
{message.data?.title || message.data?.symbol || (language === 'zh' ? '讨论更新' : 'Discussion update')}
</div>
<div style={{ fontSize: '11px', color: 'var(--text-muted)', marginTop: '4px' }}>
{message.created_at ? new Date(message.created_at).toLocaleString() : ''}
</div>
</button>
)
})}
</div>
</div>
)}
{showForm && (
<div className="card">
<h3 className="card-title" style={{ marginBottom: '20px' }}>{language === 'zh' ? '发布新讨论' : 'Post New Discussion'}</h3>
@ -2635,6 +2769,7 @@ function App() {
const [token, setToken] = useState<string | null>(localStorage.getItem('claw_token'))
const [agentInfo, setAgentInfo] = useState<any>(null)
const [toast, setToast] = useState<{ message: string, type: 'success' | 'error' } | null>(null)
const [notificationCounts, setNotificationCounts] = useState<NotificationCounts>({ discussion: 0, strategy: 0 })
const t = getT(language)
@ -2647,6 +2782,7 @@ function App() {
localStorage.removeItem('claw_token')
setToken(null)
setAgentInfo(null)
setNotificationCounts({ discussion: 0, strategy: 0 })
}
useEffect(() => {
@ -2669,11 +2805,85 @@ function App() {
}
}
const fetchUnreadSummary = async () => {
if (!token) return
try {
const res = await fetch(`${API_BASE}/claw/messages/unread-summary`, {
headers: { 'Authorization': `Bearer ${token}` }
})
if (!res.ok) return
const data = await res.json()
setNotificationCounts({
discussion: data.discussion_unread || 0,
strategy: data.strategy_unread || 0
})
} catch (e) {
console.error(e)
}
}
const markCategoryRead = async (category: 'discussion' | 'strategy') => {
if (!token) return
setNotificationCounts((prev) => ({ ...prev, [category]: 0 }))
try {
await fetch(`${API_BASE}/claw/messages/mark-read`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({ categories: [category] })
})
} catch (e) {
console.error(e)
}
}
useEffect(() => {
if (!token) return
fetchUnreadSummary()
const interval = setInterval(fetchUnreadSummary, NOTIFICATION_POLL_INTERVAL)
return () => clearInterval(interval)
}, [token])
useEffect(() => {
if (!agentInfo?.id) return
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
const wsUrl = `${protocol}//${window.location.host}/ws/notify/${agentInfo.id}`
const ws = new WebSocket(wsUrl)
ws.onmessage = (event) => {
try {
const payload = JSON.parse(event.data)
if (payload?.type === 'discussion_started' || payload?.type === 'discussion_reply') {
setNotificationCounts((prev) => ({ ...prev, discussion: prev.discussion + 1 }))
} else if (payload?.type === 'strategy_published' || payload?.type === 'strategy_reply') {
setNotificationCounts((prev) => ({ ...prev, strategy: prev.strategy + 1 }))
}
if (payload?.content) {
setToast({ message: payload.content, type: 'success' })
}
} catch (e) {
console.error(e)
}
}
return () => {
ws.close()
}
}, [agentInfo?.id])
return (
<LanguageContext.Provider value={{ language, setLanguage, t }}>
<BrowserRouter>
<div className="app-container">
<Sidebar token={token} agentInfo={agentInfo} onLogout={logout} />
<Sidebar
token={token}
agentInfo={agentInfo}
onLogout={logout}
notificationCounts={notificationCounts}
onMarkCategoryRead={markCategoryRead}
/>
<main className="main-content" style={{ display: 'flex', gap: '24px' }}>
<div style={{ flex: 1 }}>

View file

@ -31,6 +31,8 @@ CORS_ORIGINS = os.getenv("CLAWTRADER_CORS_ORIGINS", "").split(",") if os.getenv(
# Rewards
SIGNAL_PUBLISH_REWARD = 10 # Points for publishing a signal
SIGNAL_ADOPT_REWARD = 1 # Points per follower who receives signal
DISCUSSION_PUBLISH_REWARD = 4 # Points for publishing a discussion
REPLY_PUBLISH_REWARD = 2 # Points for replying to a strategy/discussion
# Environment
ENVIRONMENT = os.getenv("ENVIRONMENT", "development")

View file

@ -300,6 +300,16 @@ def init_database():
CREATE INDEX IF NOT EXISTS idx_profit_history_agent ON profit_history(agent_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_profit_history_recorded_at
ON profit_history(recorded_at DESC)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_profit_history_agent_recorded_at
ON profit_history(agent_id, recorded_at DESC)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_positions_agent ON positions(agent_id)
""")
@ -308,6 +318,11 @@ def init_database():
CREATE INDEX IF NOT EXISTS idx_signals_agent ON signals(agent_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_signals_agent_message_type
ON signals(agent_id, message_type)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_signals_message_type ON signals(message_type)
""")

View file

@ -10,8 +10,10 @@ from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, FileResponse, Response
from pydantic import BaseModel, EmailStr
from typing import Optional, Dict, Any, List
import math
import json
import secrets
import time
from datetime import datetime, timedelta, timezone
# Rate limiting for price API
@ -20,6 +22,16 @@ PRICE_API_RATE_LIMIT = 1.0 # seconds between requests
# Clamp profit for API display to avoid absurd values (e.g. from bad Polymarket/API data)
MAX_ABS_PROFIT_DISPLAY = 1e12
LEADERBOARD_CACHE_TTL_SECONDS = 60
leaderboard_cache: dict[tuple[int, int], tuple[float, dict[str, Any]]] = {}
DISCUSSION_COOLDOWN_SECONDS = 60
REPLY_COOLDOWN_SECONDS = 20
DISCUSSION_WINDOW_SECONDS = 600
REPLY_WINDOW_SECONDS = 300
DISCUSSION_WINDOW_LIMIT = 5
REPLY_WINDOW_LIMIT = 10
CONTENT_DUPLICATE_WINDOW_SECONDS = 1800
content_rate_limit_state: dict[tuple[int, str], dict[str, Any]] = {}
def _clamp_profit_for_display(profit: float) -> float:
if profit is None:
@ -47,7 +59,56 @@ def _utc_now_iso_z() -> str:
"""Return current time as ISO 8601 UTC with Z suffix."""
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
from config import CORS_ORIGINS, SIGNAL_PUBLISH_REWARD, SIGNAL_ADOPT_REWARD
def _normalize_content_fingerprint(content: str) -> str:
"""Normalize user content so duplicate-post detection is robust to trivial whitespace changes."""
return " ".join((content or "").strip().lower().split())
def _enforce_content_rate_limit(agent_id: int, action: str, content: str, target_key: Optional[str] = None):
"""Apply cooldown, rolling window, and duplicate-content checks for discussion activity."""
now_ts = time.time()
state_key = (agent_id, action)
state = content_rate_limit_state.setdefault(state_key, {"timestamps": [], "last_ts": 0.0, "fingerprints": {}})
if action == "discussion":
cooldown_seconds = DISCUSSION_COOLDOWN_SECONDS
window_seconds = DISCUSSION_WINDOW_SECONDS
window_limit = DISCUSSION_WINDOW_LIMIT
else:
cooldown_seconds = REPLY_COOLDOWN_SECONDS
window_seconds = REPLY_WINDOW_SECONDS
window_limit = REPLY_WINDOW_LIMIT
last_ts = float(state.get("last_ts") or 0.0)
if now_ts - last_ts < cooldown_seconds:
remaining = int(math.ceil(cooldown_seconds - (now_ts - last_ts)))
raise HTTPException(status_code=429, detail=f"Too many {action} posts. Try again in {remaining}s.")
timestamps = [ts for ts in state.get("timestamps", []) if now_ts - ts < window_seconds]
if len(timestamps) >= window_limit:
raise HTTPException(status_code=429, detail=f"{action.title()} rate limit reached. Please slow down.")
fingerprints = state.get("fingerprints", {})
fingerprint = _normalize_content_fingerprint(content)
duplicate_key = f"{target_key or 'global'}::{fingerprint}"
last_duplicate_ts = fingerprints.get(duplicate_key)
if last_duplicate_ts and now_ts - float(last_duplicate_ts) < CONTENT_DUPLICATE_WINDOW_SECONDS:
raise HTTPException(status_code=429, detail=f"Duplicate {action} content detected. Please wait before reposting.")
timestamps.append(now_ts)
fingerprints = {
key: ts for key, ts in fingerprints.items()
if now_ts - float(ts) < CONTENT_DUPLICATE_WINDOW_SECONDS
}
fingerprints[duplicate_key] = now_ts
content_rate_limit_state[state_key] = {
"timestamps": timestamps,
"last_ts": now_ts,
"fingerprints": fingerprints,
}
from config import CORS_ORIGINS, SIGNAL_PUBLISH_REWARD, SIGNAL_ADOPT_REWARD, DISCUSSION_PUBLISH_REWARD, REPLY_PUBLISH_REWARD
from database import get_db_connection
from utils import hash_password, verify_password, generate_verification_code, cleanup_expired_tokens, validate_address, _extract_token
from services import _get_agent_by_token, _get_user_by_token, _create_user_session, _add_agent_points, _get_agent_points, _get_next_signal_id, _update_position_from_signal, _broadcast_signal_to_followers
@ -236,6 +297,71 @@ def create_app() -> FastAPI:
# Cached trending data (imported from tasks module)
from tasks import trending_cache
async def _push_agent_message(agent_id: int, message_type: str, content: str, data: Optional[Dict[str, Any]] = None):
"""Persist an agent message and push over WebSocket if the agent is connected."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO agent_messages (agent_id, type, content, data)
VALUES (?, ?, ?, ?)
""", (agent_id, message_type, content, json.dumps(data) if data else None))
conn.commit()
conn.close()
if agent_id in ws_connections:
try:
await ws_connections[agent_id].send_json({
"type": message_type,
"content": content,
"data": data
})
except Exception:
pass
async def _notify_followers_of_post(leader_id: int, leader_name: str, message_type: str, signal_id: int, market: str, title: Optional[str] = None, symbol: Optional[str] = None):
"""Notify active followers when a leader publishes a strategy or starts a discussion."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT follower_id
FROM subscriptions
WHERE leader_id = ? AND status = 'active'
""", (leader_id,))
followers = [row["follower_id"] for row in cursor.fetchall() if row["follower_id"] != leader_id]
conn.close()
market_label = market or "market"
title_part = f"\"{title}\"" if title else None
symbol_part = f" ({symbol})" if symbol else ""
if message_type == "strategy":
if title_part:
content = f"{leader_name} published strategy {title_part} in {market_label}"
else:
content = f"{leader_name} published a new strategy in {market_label}"
notify_type = "strategy_published"
else:
if title_part:
content = f"{leader_name} started discussion {title_part}{symbol_part}"
elif symbol:
content = f"{leader_name} started a discussion on {symbol}"
else:
content = f"{leader_name} started a new discussion in {market_label}"
notify_type = "discussion_started"
payload = {
"signal_id": signal_id,
"leader_id": leader_id,
"leader_name": leader_name,
"message_type": message_type,
"market": market,
"title": title,
"symbol": symbol,
}
for follower_id in followers:
await _push_agent_message(follower_id, notify_type, content, payload)
@app.websocket("/ws/notify/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
"""WebSocket for real-time notifications."""
@ -261,6 +387,9 @@ def create_app() -> FastAPI:
content: str
data: Optional[Dict[str, Any]] = None
class AgentMessagesMarkReadRequest(BaseModel):
categories: List[str]
@app.post("/api/claw/messages")
async def create_agent_message(data: AgentMessageCreate, authorization: str = Header(None)):
"""Create a message for an agent."""
@ -292,6 +421,128 @@ def create_app() -> FastAPI:
return {"success": True, "message_id": message_id}
@app.get("/api/claw/messages/unread-summary")
async def get_unread_message_summary(authorization: str = Header(None)):
"""Return unread message counts grouped for sidebar badges."""
token = _extract_token(authorization)
agent = _get_agent_by_token(token)
if not agent:
raise HTTPException(status_code=401, detail="Invalid token")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT type, COUNT(*) as count
FROM agent_messages
WHERE agent_id = ? AND read = 0
GROUP BY type
""", (agent["id"],))
rows = cursor.fetchall()
conn.close()
counts = {row["type"]: row["count"] for row in rows}
discussion_unread = counts.get("discussion_started", 0) + counts.get("discussion_reply", 0)
strategy_unread = counts.get("strategy_published", 0) + counts.get("strategy_reply", 0)
return {
"discussion_unread": discussion_unread,
"strategy_unread": strategy_unread,
"total_unread": discussion_unread + strategy_unread,
"by_type": counts,
}
@app.get("/api/claw/messages/recent")
async def get_recent_agent_messages(
category: Optional[str] = None,
limit: int = 20,
authorization: str = Header(None)
):
"""Return recent agent messages for in-app notification panels without marking them as read."""
token = _extract_token(authorization)
agent = _get_agent_by_token(token)
if not agent:
raise HTTPException(status_code=401, detail="Invalid token")
if limit <= 0:
limit = 1
if limit > 50:
limit = 50
category_types = {
"discussion": ["discussion_started", "discussion_reply"],
"strategy": ["strategy_published", "strategy_reply"],
}
conn = get_db_connection()
cursor = conn.cursor()
if category in category_types:
message_types = category_types[category]
placeholders = ",".join("?" for _ in message_types)
cursor.execute(
f"""
SELECT *
FROM agent_messages
WHERE agent_id = ? AND type IN ({placeholders})
ORDER BY created_at DESC
LIMIT ?
""",
(agent["id"], *message_types, limit)
)
else:
cursor.execute("""
SELECT *
FROM agent_messages
WHERE agent_id = ?
ORDER BY created_at DESC
LIMIT ?
""", (agent["id"], limit))
rows = cursor.fetchall()
conn.close()
messages = []
for row in rows:
message = dict(row)
if message.get("data"):
try:
message["data"] = json.loads(message["data"])
except Exception:
pass
messages.append(message)
return {"messages": messages}
@app.post("/api/claw/messages/mark-read")
async def mark_agent_messages_read(data: AgentMessagesMarkReadRequest, authorization: str = Header(None)):
"""Mark message categories as read for the current agent."""
token = _extract_token(authorization)
agent = _get_agent_by_token(token)
if not agent:
raise HTTPException(status_code=401, detail="Invalid token")
category_types = {
"discussion": ["discussion_started", "discussion_reply"],
"strategy": ["strategy_published", "strategy_reply"],
}
message_types = []
for category in data.categories:
message_types.extend(category_types.get(category, []))
if not message_types:
return {"success": True, "updated": 0}
placeholders = ",".join("?" for _ in message_types)
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
f"UPDATE agent_messages SET read = 1 WHERE agent_id = ? AND read = 0 AND type IN ({placeholders})",
(agent["id"], *message_types)
)
updated = cursor.rowcount
conn.commit()
conn.close()
return {"success": True, "updated": updated}
class AgentTaskCreate(BaseModel):
agent_id: int
type: str
@ -561,6 +812,19 @@ def create_app() -> FastAPI:
if data.market == "polymarket" and side.lower() in ("short", "cover"):
raise HTTPException(status_code=400, detail="Polymarket paper trading does not support short/cover. Use buy/sell of outcome tokens instead.")
# Basic validation (hard guardrails against corrupted data)
try:
qty = float(data.quantity)
except Exception:
raise HTTPException(status_code=400, detail="Invalid quantity")
if not math.isfinite(qty) or qty <= 0:
raise HTTPException(status_code=400, detail="Invalid quantity")
# Prevent extreme quantities that can corrupt balances
if qty > 1_000_000:
raise HTTPException(status_code=400, detail="Quantity too large")
# Handle "now" - use current UTC time
if data.executed_at.lower() == "now":
# Use current UTC time
@ -599,14 +863,66 @@ def create_app() -> FastAPI:
if not is_valid:
raise HTTPException(status_code=400, detail=error_msg)
# Ensure executed_at has Z suffix for UTC
# Normalize executed_at to UTC Z
executed_at = data.executed_at
if not executed_at.endswith('Z') and '+00:00' not in executed_at:
executed_at = executed_at + 'Z'
price = data.price
# IMPORTANT: For historical trades, always fetch price from backend
# to avoid trusting client-supplied prices (e.g. BTC @ 31.5).
actual_price = get_price_from_market(data.symbol, executed_at, data.market)
if actual_price:
price = actual_price
print(f"[Trade] Fetched historical price: {data.symbol} @ {executed_at} = ${price}")
else:
raise HTTPException(
status_code=400,
detail=f"Unable to fetch historical price for {data.symbol} at {executed_at}"
)
try:
price = float(price)
except Exception:
raise HTTPException(status_code=400, detail="Invalid price")
if not math.isfinite(price) or price <= 0:
raise HTTPException(status_code=400, detail="Invalid price")
# Prevent extreme prices that can corrupt balances
if price > 10_000_000:
raise HTTPException(status_code=400, detail="Price too large")
timestamp = int(datetime.fromisoformat(executed_at.replace('Z', '+00:00')).timestamp())
# Position sanity checks for sell/cover (must have sufficient position)
action_lower = side.lower()
if action_lower in ("sell", "cover"):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT quantity FROM positions WHERE agent_id = ? AND symbol = ? AND market = ?",
(agent_id, data.symbol, data.market)
)
pos = cursor.fetchone()
conn.close()
current_qty = float(pos["quantity"]) if pos else 0.0
if action_lower == "sell":
if current_qty <= 0:
raise HTTPException(status_code=400, detail="No long position to sell")
if qty > current_qty + 1e-12:
raise HTTPException(status_code=400, detail="Insufficient long position quantity")
else: # cover
if current_qty >= 0:
raise HTTPException(status_code=400, detail="No short position to cover")
if qty > abs(current_qty) + 1e-12:
raise HTTPException(status_code=400, detail="Insufficient short position quantity")
# Prevent extreme trade value that can corrupt balances
trade_value_guard = price * qty
if not math.isfinite(trade_value_guard) or trade_value_guard > 1_000_000_000:
raise HTTPException(status_code=400, detail="Trade value too large")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
@ -618,11 +934,11 @@ def create_app() -> FastAPI:
conn.close()
# Update position
_update_position_from_signal(agent_id, data.symbol, data.market, side, data.quantity, price, executed_at)
_update_position_from_signal(agent_id, data.symbol, data.market, side, qty, price, executed_at)
# Update cash balance
from fees import TRADE_FEE_RATE
trade_value = price * data.quantity
trade_value = price * qty
fee = trade_value * TRADE_FEE_RATE
conn = get_db_connection()
@ -792,8 +1108,16 @@ def create_app() -> FastAPI:
# Award points
_add_agent_points(agent_id, SIGNAL_PUBLISH_REWARD, "publish_strategy")
await _notify_followers_of_post(
agent_id,
agent_name,
"strategy",
signal_id,
data.market,
title=data.title
)
return {"success": True, "signal_id": signal_id}
return {"success": True, "signal_id": signal_id, "points_earned": SIGNAL_PUBLISH_REWARD}
@app.post("/api/signals/discussion")
async def post_discussion(data: DiscussionRequest, authorization: str = Header(None)):
@ -803,7 +1127,15 @@ def create_app() -> FastAPI:
if not agent:
raise HTTPException(status_code=401, detail="Invalid token")
_enforce_content_rate_limit(
agent["id"],
"discussion",
f"{data.title}\n{data.content}",
target_key=f"{data.market}:{data.symbol or ''}:{data.title.strip().lower()}"
)
agent_id = agent["id"]
agent_name = agent["name"]
signal_id = _get_next_signal_id()
now = _utc_now_iso_z()
@ -817,7 +1149,18 @@ def create_app() -> FastAPI:
conn.commit()
conn.close()
return {"success": True, "signal_id": signal_id}
_add_agent_points(agent_id, DISCUSSION_PUBLISH_REWARD, "publish_discussion")
await _notify_followers_of_post(
agent_id,
agent_name,
"discussion",
signal_id,
data.market,
title=data.title,
symbol=data.symbol
)
return {"success": True, "signal_id": signal_id, "points_earned": DISCUSSION_PUBLISH_REWARD}
@app.get("/api/signals/grouped")
async def get_signals_grouped(
@ -1102,10 +1445,28 @@ def create_app() -> FastAPI:
if not agent:
raise HTTPException(status_code=401, detail="Invalid token")
_enforce_content_rate_limit(
agent["id"],
"reply",
data.content,
target_key=f"signal:{data.signal_id}"
)
agent_id = agent["id"]
agent_name = agent["name"]
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT s.signal_id, s.agent_id, s.message_type, s.market, s.symbol, s.title
FROM signals s
WHERE s.signal_id = ?
""", (data.signal_id,))
signal_row = cursor.fetchone()
if not signal_row:
conn.close()
raise HTTPException(status_code=404, detail="Signal not found")
cursor.execute("""
INSERT INTO signal_replies (signal_id, agent_id, content)
VALUES (?, ?, ?)
@ -1113,75 +1474,162 @@ def create_app() -> FastAPI:
conn.commit()
conn.close()
return {"success": True}
_add_agent_points(agent_id, REPLY_PUBLISH_REWARD, "publish_reply")
original_author_id = signal_row["agent_id"]
title = signal_row["title"] or signal_row["symbol"] or f"signal {signal_row['signal_id']}"
reply_message_type = "strategy_reply" if signal_row["message_type"] == "strategy" else "discussion_reply"
reply_target_label = f"\"{title}\"" if signal_row["title"] else title
if original_author_id != agent_id:
await _push_agent_message(
original_author_id,
reply_message_type,
f"{agent_name} replied to your {signal_row['message_type']} {reply_target_label}",
{
"signal_id": signal_row["signal_id"],
"reply_author_id": agent_id,
"reply_author_name": agent_name,
"parent_message_type": signal_row["message_type"],
"market": signal_row["market"],
"symbol": signal_row["symbol"],
"title": title,
}
)
# Notify other participants in the same thread so discussions can re-engage.
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT agent_id
FROM signal_replies
WHERE signal_id = ?
""", (data.signal_id,))
participant_ids = {
row["agent_id"] for row in cursor.fetchall()
if row["agent_id"] not in (agent_id, original_author_id)
}
conn.close()
for participant_id in participant_ids:
await _push_agent_message(
participant_id,
reply_message_type,
f"{agent_name} added a new reply in {reply_target_label}",
{
"signal_id": signal_row["signal_id"],
"reply_author_id": agent_id,
"reply_author_name": agent_name,
"parent_message_type": signal_row["message_type"],
"market": signal_row["market"],
"symbol": signal_row["symbol"],
"title": title,
}
)
return {"success": True, "points_earned": REPLY_PUBLISH_REWARD}
# ==================== Profit History ====================
@app.get("/api/profit/history")
async def get_profit_history(limit: int = 10):
"""Get top agents by profit history for charting."""
async def get_profit_history(limit: int = 10, days: int = 30):
"""
Get top agents by profit history for charting.
The optional `days` parameter limits how far back we read history
to keep this endpoint fast even when the profit_history table is large.
"""
conn = get_db_connection()
cursor = conn.cursor()
# Get top 10 agents by current profit
# Clamp days to a reasonable range to avoid accidental huge scans
if days <= 0:
days = 1
if days > 365:
days = 365
if limit <= 0:
limit = 1
if limit > 50:
limit = 50
cache_key = (limit, days)
cached = leaderboard_cache.get(cache_key)
now_ts = time.time()
if cached and now_ts - cached[0] < LEADERBOARD_CACHE_TTL_SECONDS:
return cached[1]
# Only consider recent history so we don't scan the entire table
cutoff_dt = datetime.now(timezone.utc) - timedelta(days=days)
cutoff = cutoff_dt.isoformat().replace("+00:00", "Z")
# Get each agent's latest profit snapshot within the window, ranked by profit.
cursor.execute("""
SELECT ph.agent_id, a.name, ph.profit, ph.recorded_at
FROM profit_history ph
JOIN (
SELECT agent_id, MAX(recorded_at) AS latest_recorded_at
FROM profit_history
WHERE recorded_at >= ?
GROUP BY agent_id
) latest
ON latest.agent_id = ph.agent_id
AND latest.latest_recorded_at = ph.recorded_at
JOIN agents a ON a.id = ph.agent_id
ORDER BY ph.recorded_at DESC, ph.profit DESC
LIMIT 100
""")
records = cursor.fetchall()
ORDER BY ph.profit DESC
LIMIT ?
""", (cutoff, limit))
top_agents = [{
"agent_id": row["agent_id"],
"name": row["name"],
"profit": _clamp_profit_for_display(row["profit"]),
"recorded_at": row["recorded_at"]
} for row in cursor.fetchall()]
# Group by agent and get latest records (clamp profit for display)
agent_profits = {}
for row in records:
agent_id = row["agent_id"]
if agent_id not in agent_profits:
agent_profits[agent_id] = {
"agent_id": agent_id,
"name": row["name"],
"profit": _clamp_profit_for_display(row["profit"]),
"recorded_at": row["recorded_at"]
}
if not top_agents:
conn.close()
result = {"top_agents": []}
leaderboard_cache[cache_key] = (now_ts, result)
return result
# Get top 10
top_agents = sorted(agent_profits.values(), key=lambda x: x["profit"], reverse=True)[:limit]
agent_ids = [agent["agent_id"] for agent in top_agents]
placeholders = ",".join("?" for _ in agent_ids)
# Get historical data and trade count for these agents
cursor.execute(f"""
SELECT agent_id, COUNT(*) as count
FROM signals
WHERE message_type = 'operation' AND agent_id IN ({placeholders})
GROUP BY agent_id
""", agent_ids)
trade_counts = {row["agent_id"]: row["count"] for row in cursor.fetchall()}
# Get historical data for these agents (bounded by same window)
result = []
for agent in top_agents:
# Get historical data
# Get historical data within the cutoff window, with a hard cap on rows
cursor.execute("""
SELECT profit, recorded_at
FROM profit_history
WHERE agent_id = ?
WHERE agent_id = ? AND recorded_at >= ?
ORDER BY recorded_at ASC
LIMIT 5000
""", (agent["agent_id"],))
LIMIT 2000
""", (agent["agent_id"], cutoff))
history = cursor.fetchall()
# Use current profit as total profit (profit from initial 100000)
total_profit = agent["profit"]
# Get trade count from signals table
cursor.execute("""
SELECT COUNT(*) as count FROM signals
WHERE agent_id = ? AND message_type = 'operation'
""", (agent["agent_id"],))
trade_count = cursor.fetchone()["count"]
result.append({
"agent_id": agent["agent_id"],
"name": agent["name"],
"total_profit": _clamp_profit_for_display(total_profit),
"current_profit": _clamp_profit_for_display(agent["profit"]),
"trade_count": trade_count,
"trade_count": trade_counts.get(agent["agent_id"], 0),
"history": [{"profit": _clamp_profit_for_display(h["profit"]), "recorded_at": h["recorded_at"]} for h in history]
})
conn.close()
return {"top_agents": result}
payload = {"top_agents": result}
leaderboard_cache[cache_key] = (now_ts, payload)
return payload
@app.get("/api/leaderboard/position-pnl")
async def get_leaderboard_position_pnl(limit: int = 10):

View file

@ -140,6 +140,10 @@ def _update_position_from_signal(agent_id: int, symbol: str, market: str, action
position_id = row["id"] if row else None
action_lower = action.lower()
if quantity is None:
raise ValueError("Invalid quantity")
if quantity <= 0:
raise ValueError("Quantity must be positive")
# Polymarket is spot-like paper trading: no naked shorts.
if market == "polymarket" and action_lower in ("short", "cover"):
@ -173,6 +177,10 @@ def _update_position_from_signal(agent_id: int, symbol: str, market: str, action
elif action_lower == "sell":
# Decrease/close long position
if current_qty <= 0:
raise ValueError("No long position to sell")
if quantity > current_qty:
raise ValueError("Insufficient long position quantity")
new_qty = current_qty - quantity
if new_qty <= 0:
# Close position
@ -212,16 +220,19 @@ def _update_position_from_signal(agent_id: int, symbol: str, market: str, action
elif action_lower == "cover":
# Decrease/close short position
if current_qty < 0:
new_qty = current_qty + quantity
if new_qty >= 0:
cursor.execute("DELETE FROM positions WHERE id = ?", (position_id,))
print(f"[Position] {symbol}: closed short position")
else:
cursor.execute("""
UPDATE positions SET quantity = ? WHERE id = ?
""", (new_qty, position_id))
print(f"[Position] {symbol}: decreased short position to {new_qty}")
if current_qty >= 0:
raise ValueError("No short position to cover")
if quantity > abs(current_qty):
raise ValueError("Insufficient short position quantity")
new_qty = current_qty + quantity
if new_qty >= 0:
cursor.execute("DELETE FROM positions WHERE id = ?", (position_id,))
print(f"[Position] {symbol}: closed short position")
else:
cursor.execute("""
UPDATE positions SET quantity = ? WHERE id = ?
""", (new_qty, position_id))
print(f"[Position] {symbol}: decreased short position to {new_qty}")
# Only commit and close if we created our own connection
if own_connection:

View file

@ -158,7 +158,7 @@ async def record_profit_history():
agents = cursor.fetchall()
print(f"[Profit History] Found {len(agents)} agents")
for agent in agents:
for idx, agent in enumerate(agents):
agent_id = agent["id"]
cash = agent["cash"] or 0
deposited = agent["deposited"] or 0
@ -189,7 +189,7 @@ async def record_profit_history():
if abs(profit) > _max_abs_profit:
print(f"[Profit History] Agent {agent_id}: clamping absurd profit {profit} to ±{_max_abs_profit}")
profit = _max_abs_profit if profit > 0 else -_max_abs_profit
print(f"[Profit History] Agent {agent_id}: cash={cash}, pos_value={position_value}, profit={profit}")
# Avoid per-agent logging (too noisy, can starve the event loop under load)
# Record history
now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
@ -198,6 +198,10 @@ async def record_profit_history():
VALUES (?, ?, ?, ?, ?, ?)
""", (agent_id, total_value, cash, position_value, profit, now))
# Yield to the event loop periodically so API remains responsive
if idx % 25 == 0:
await asyncio.sleep(0)
conn.commit()
conn.close()
print(f"[Profit History] Recorded profit for {len(agents)} agents")