diff --git a/.env.example b/.env.example index 8a1d9a8..a236cdb 100644 --- a/.env.example +++ b/.env.example @@ -50,3 +50,9 @@ PROFIT_HISTORY_COMPACT_BUCKET_MINUTES=15 # Minimum interval between prune/compact passes. PROFIT_HISTORY_PRUNE_INTERVAL_SECONDS=3600 +# ==================== Price Fetch Reliability ==================== +PRICE_FETCH_TIMEOUT_SECONDS=10 +PRICE_FETCH_MAX_RETRIES=2 +PRICE_FETCH_BACKOFF_BASE_SECONDS=0.35 +PRICE_FETCH_ERROR_COOLDOWN_SECONDS=20 +PRICE_FETCH_RATE_LIMIT_COOLDOWN_SECONDS=60 \ No newline at end of file diff --git a/.gitignore b/.gitignore index cf6a178..f7d9458 100644 --- a/.gitignore +++ b/.gitignore @@ -127,4 +127,4 @@ CLAUDE.md /service/data/ /service/server/data/ /TODO -change.md +change.md \ No newline at end of file diff --git a/README.md b/README.md index 4ade220..5381ed2 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ Supports all major AI agents, including OpenClaw, nanobot, Claude Code, Codex, C ## 🚀 Latest Updates: +- **2026-04-10**: **Production stability hardening**. The FastAPI web service now runs separately from background workers, keeping user-facing pages and health checks responsive while prices, profit history, settlements, and market-intel jobs run out of band. - **2026-04-09**: **Major codebase streamlining for agent-native development**. AI-Trader is now leaner, more modular, and far easier for agents and developers to understand, navigate, modify, and operate with confidence. - **2026-03-21**: Launched new **Dashboard** page ([https://ai4trade.ai/financial-events](https://ai4trade.ai/financial-events)) — your unified control center for all trading insights. - **2026-03-03**: **Polymarket paper trading** now live with real market data + simulated execution. Auto-settlement handles resolved markets seamlessly via background processing. diff --git a/README_ZH.md b/README_ZH.md index 42dbc2b..a242fd7 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -37,6 +37,7 @@ Read https://ai4trade.ai/SKILL.md and register. ## 🚀 最新更新: +- **2026-04-10**: **生产环境稳定性增强**。FastAPI Web 服务已与后台 worker 拆分运行,前端页面和健康检查保持快速响应,价格刷新、收益历史、Polymarket 结算和市场情报任务改由独立后台进程处理。 - **2026-04-09**: **面向 Agent 原生开发的大规模代码瘦身**。AI-Trader 现在更轻、更模块化,也更适合 Agent 与开发者高效阅读、定位、修改和操作。 - **2026-03-21**: 全新 **Dashboard 看板页** 已上线([https://ai4trade.ai/financial-events](https://ai4trade.ai/financial-events)),成为你统一查看交易洞察的控制中心。 - **2026-03-03**: **Polymarket 模拟交易**正式上线,支持真实市场数据 + 模拟执行;已结算市场可通过后台任务自动完成结算。 diff --git a/impeccable.context.tmp b/impeccable.context.tmp new file mode 100644 index 0000000..5575163 --- /dev/null +++ b/impeccable.context.tmp @@ -0,0 +1,17 @@ +## Design Context + +### Users +AI4Trade serves both AI agent developers and human traders. They arrive either to understand what the platform can do, or to enter a workflow where they can browse trader activity, publish operations, discuss ideas, and participate in copy trading. + +### Brand Personality +Professional, sharp, market-native. The product should feel credible enough for traders and technical enough for agent builders, without drifting into generic AI branding. + +### Aesthetic Direction +Use a professional trading-terminal direction with dark, dense surfaces, disciplined typography, and information-rich composition. Avoid white-background layouts and avoid the familiar AI aesthetic of purple/blue gradients, glowing neon accents, and futuristic cliches. + +### Design Principles +1. Lead with trading credibility. +2. Use dark, tinted surfaces and restrained accent colors. +3. Blend human and agent workflows in one visual story. +4. Make entry screens feel premium and intentional. +5. Favor density, hierarchy, and signal over decorative fluff. diff --git a/service/frontend/src/AppPages.tsx b/service/frontend/src/AppPages.tsx index b4fae8b..4c16d9a 100644 --- a/service/frontend/src/AppPages.tsx +++ b/service/frontend/src/AppPages.tsx @@ -4,8 +4,10 @@ import { CartesianGrid, Line, LineChart, ResponsiveContainer, Tooltip, XAxis, YA import { API_BASE, + COPY_TRADING_PAGE_SIZE, FINANCIAL_NEWS_PAGE_SIZE, LEADERBOARD_LINE_COLORS, + LEADERBOARD_PAGE_SIZE, MARKETS, REFRESH_INTERVAL, SIGNALS_FEED_PAGE_SIZE, @@ -1527,47 +1529,56 @@ export function SignalsFeed({ token }: { token?: string | null }) { // Copy Trading Page export function CopyTradingPage({ token }: { token: string }) { const [providers, setProviders] = useState([]) + const [providerPage, setProviderPage] = useState(1) + const [providerTotal, setProviderTotal] = useState(0) const [following, setFollowing] = useState([]) + const [followingPage, setFollowingPage] = useState(1) + const [followingTotal, setFollowingTotal] = useState(0) const [loading, setLoading] = useState(true) const [activeTab, setActiveTab] = useState<'discover' | 'following'>('discover') const navigate = useNavigate() const { language } = useLanguage() useEffect(() => { - loadData() - const interval = setInterval(() => loadData(), REFRESH_INTERVAL) + loadData(providerPage, followingPage) + const interval = setInterval(() => loadData(providerPage, followingPage), REFRESH_INTERVAL) return () => clearInterval(interval) - }, []) + }, [providerPage, followingPage]) - const loadData = async () => { - console.log('CopyTradingPage loadData - token:', token) + const loadData = async (providerPageToLoad = providerPage, followingPageToLoad = followingPage) => { try { - // Get list of signal providers (top traders) - const res = await fetch(`${API_BASE}/profit/history?limit=20`) + const providerOffset = (providerPageToLoad - 1) * COPY_TRADING_PAGE_SIZE + const res = await fetch( + `${API_BASE}/profit/history?limit=${COPY_TRADING_PAGE_SIZE}&offset=${providerOffset}&include_history=false` + ) if (!res.ok) { console.error('Failed to load providers:', res.status) setProviders([]) + setProviderTotal(0) } else { const data = await res.json() setProviders(data.top_agents || []) + setProviderTotal(data.total || 0) } - // Get following list if (token) { - console.log('Fetching following with token:', token.substring(0, 10) + '...') - const followRes = await fetch(`${API_BASE}/signals/following`, { + const followingOffset = (followingPageToLoad - 1) * COPY_TRADING_PAGE_SIZE + const followRes = await fetch(`${API_BASE}/signals/following?limit=${COPY_TRADING_PAGE_SIZE}&offset=${followingOffset}`, { headers: { 'Authorization': `Bearer ${token}` } }) - console.log('Following response:', followRes.status, followRes.statusText) if (followRes.ok) { const followData = await followRes.json() setFollowing(followData.following || []) + setFollowingTotal(followData.total || 0) } else { const errorText = await followRes.text() console.error('Failed to load following:', followRes.status, errorText) + setFollowing([]) + setFollowingTotal(0) } } else { - console.warn('No token available for following request') + setFollowing([]) + setFollowingTotal(0) } } catch (e) { console.error('Error loading copy trading data:', e) @@ -1591,7 +1602,7 @@ export function CopyTradingPage({ token }: { token: string }) { }) const data = await res.json() if (res.ok && (data.success || data.message === 'Already following')) { - loadData() + loadData(providerPage, followingPage) } else { console.error('Follow failed:', data) } @@ -1616,7 +1627,7 @@ export function CopyTradingPage({ token }: { token: string }) { }) const data = await res.json() if (data.success) { - loadData() + loadData(providerPage, followingPage) } } catch (e) { console.error(e) @@ -1642,6 +1653,9 @@ export function CopyTradingPage({ token }: { token: string }) { ) + const providerTotalPages = Math.max(1, Math.ceil(providerTotal / COPY_TRADING_PAGE_SIZE)) + const followingTotalPages = Math.max(1, Math.ceil(followingTotal / COPY_TRADING_PAGE_SIZE)) + if (loading) { return
} @@ -1687,7 +1701,7 @@ export function CopyTradingPage({ token }: { token: string }) { fontWeight: 500 }} > - {language === 'zh' ? `我的跟单 (${following.length})` : `My Following (${following.length})`} + {language === 'zh' ? `我的跟单 (${followingTotal})` : `My Following (${followingTotal})`} @@ -1700,12 +1714,14 @@ export function CopyTradingPage({ token }: { token: string }) { ) : (
- {providers.map((provider, index) => ( + {providers.map((provider, index) => { + const rank = (providerPage - 1) * COPY_TRADING_PAGE_SIZE + index + 1 + return (
- #{index + 1} + #{rank}
{provider.name || `Agent ${provider.agent_id}`}
@@ -1753,7 +1769,31 @@ export function CopyTradingPage({ token }: { token: string }) { )}
- ))} + ) + })} + {providerTotalPages > 1 && ( +
+ +
+ {language === 'zh' + ? `第 ${providerPage} / ${providerTotalPages} 页,共 ${providerTotal} 位交易员` + : `Page ${providerPage} / ${providerTotalPages}, ${providerTotal} traders total`} +
+ +
+ )}
)}
@@ -1848,6 +1888,29 @@ export function CopyTradingPage({ token }: { token: string }) {
) })} + {followingTotalPages > 1 && ( +
+ +
+ {language === 'zh' + ? `第 ${followingPage} / ${followingTotalPages} 页,共 ${followingTotal} 个跟单` + : `Page ${followingPage} / ${followingTotalPages}, ${followingTotal} follows total`} +
+ +
+ )} )} @@ -1859,25 +1922,29 @@ export function CopyTradingPage({ token }: { token: string }) { // Leaderboard Page - Top 10 Traders (no market distinction) export function LeaderboardPage({ token }: { token?: string | null }) { const [profitHistory, setProfitHistory] = useState([]) + const [totalTraders, setTotalTraders] = useState(0) + const [leaderboardPage, setLeaderboardPage] = useState(1) const [loading, setLoading] = useState(true) const [chartRange, setChartRange] = useState('24h') const { language } = useLanguage() const navigate = useNavigate() useEffect(() => { - loadProfitHistory() + loadProfitHistory(leaderboardPage) const interval = setInterval(() => { - loadProfitHistory() + loadProfitHistory(leaderboardPage) }, REFRESH_INTERVAL) return () => clearInterval(interval) - }, [chartRange]) + }, [chartRange, leaderboardPage]) - const loadProfitHistory = async () => { + const loadProfitHistory = async (pageToLoad = leaderboardPage) => { try { const days = getLeaderboardDays(chartRange) - const res = await fetch(`${API_BASE}/profit/history?limit=10&days=${days}`) + const offset = (pageToLoad - 1) * LEADERBOARD_PAGE_SIZE + const res = await fetch(`${API_BASE}/profit/history?limit=${LEADERBOARD_PAGE_SIZE}&offset=${offset}&days=${days}`) const data = await res.json() setProfitHistory(data.top_agents || []) + setTotalTraders(data.total || 0) } catch (e) { console.error(e) } @@ -1893,6 +1960,8 @@ export function LeaderboardPage({ token }: { token?: string | null }) { [profitHistory, chartRange, language] ) const topChartAgents = useMemo(() => profitHistory.slice(0, 10), [profitHistory]) + const leaderboardTotalPages = Math.max(1, Math.ceil(totalTraders / LEADERBOARD_PAGE_SIZE)) + const leaderboardOffset = (leaderboardPage - 1) * LEADERBOARD_PAGE_SIZE if (loading) { return
@@ -1932,7 +2001,10 @@ export function LeaderboardPage({ token }: { token?: string | null }) {
- ))} + ) + })}
)} - {/* Top 10 Traders Cards */} + {/* Traders Cards */}
-

{language === 'zh' ? '🏆 Top 10 交易员' : '🏆 Top 10 Traders'}

+

{language === 'zh' ? '🏆 交易员' : '🏆 Traders'}

{profitHistory.length === 0 ? (
@@ -2052,7 +2130,10 @@ export function LeaderboardPage({ token }: { token?: string | null }) {
) : (
- {profitHistory.map((agent: any, idx: number) => ( + {profitHistory.map((agent: any, idx: number) => { + const rank = leaderboardOffset + idx + 1 + const podiumIndex = rank - 1 + return (
handleAgentClick(agent)} @@ -2062,7 +2143,7 @@ export function LeaderboardPage({ token }: { token?: string | null }) { borderRadius: '12px', cursor: 'pointer', transition: 'all 0.3s ease', - border: idx < 3 ? `2px solid ${['#FFD700', '#C0C0C0', '#CD7F32'][idx]}` : '1px solid var(--border-color)' + border: rank <= 3 ? `2px solid ${['#FFD700', '#C0C0C0', '#CD7F32'][podiumIndex]}` : '1px solid var(--border-color)' }} >
@@ -2070,15 +2151,15 @@ export function LeaderboardPage({ token }: { token?: string | null }) { width: '40px', height: '40px', borderRadius: '50%', - background: idx < 3 ? ['linear-gradient(135deg, #FFD700, #FFA500)', 'linear-gradient(135deg, #C0C0C0, #A0A0A0)', 'linear-gradient(135deg, #CD7F32, #8B4513)'][idx] : 'var(--accent-gradient)', + background: rank <= 3 ? ['linear-gradient(135deg, #FFD700, #FFA500)', 'linear-gradient(135deg, #C0C0C0, #A0A0A0)', 'linear-gradient(135deg, #CD7F32, #8B4513)'][podiumIndex] : 'var(--accent-gradient)', display: 'flex', alignItems: 'center', justifyContent: 'center', fontWeight: 'bold', fontSize: '18px', - color: idx < 3 ? '#000' : '#fff' + color: rank <= 3 ? '#000' : '#fff' }}> - {idx + 1} + {rank}
{agent.name}
@@ -2105,7 +2186,31 @@ export function LeaderboardPage({ token }: { token?: string | null }) {
- ))} + ) + })} +
+ )} + {leaderboardTotalPages > 1 && ( +
+ +
+ {language === 'zh' + ? `第 ${leaderboardPage} / ${leaderboardTotalPages} 页,共 ${totalTraders} 位交易员` + : `Page ${leaderboardPage} / ${leaderboardTotalPages}, ${totalTraders} traders total`} +
+
)} diff --git a/service/frontend/src/appCommunityPages.tsx b/service/frontend/src/appCommunityPages.tsx index 1ea05d2..4c940b3 100644 --- a/service/frontend/src/appCommunityPages.tsx +++ b/service/frontend/src/appCommunityPages.tsx @@ -2,7 +2,7 @@ import { useEffect, useState, type FormEvent, type ReactNode } from 'react' import { Link, useLocation, useNavigate } from 'react-router-dom' -import { API_BASE, MARKETS, useLanguage } from './appShared' +import { API_BASE, COMMUNITY_FEED_PAGE_SIZE, MARKETS, useLanguage } from './appShared' function AuthShell({ mode, @@ -314,6 +314,8 @@ function SignalCard({ export function StrategiesPage() { const [token] = useState(localStorage.getItem('claw_token')) const [strategies, setStrategies] = useState([]) + const [strategyPage, setStrategyPage] = useState(1) + const [strategyTotal, setStrategyTotal] = useState(0) const [followingLeaderIds, setFollowingLeaderIds] = useState([]) const [viewerId, setViewerId] = useState(null) const [loading, setLoading] = useState(true) @@ -325,13 +327,14 @@ export function StrategiesPage() { const signalIdFromQuery = new URLSearchParams(location.search).get('signal') const autoOpenReplyBox = new URLSearchParams(location.search).get('reply') === '1' + const strategyTotalPages = Math.max(1, Math.ceil(strategyTotal / COMMUNITY_FEED_PAGE_SIZE)) useEffect(() => { - loadStrategies() + loadStrategies(strategyPage) if (token) { loadViewerContext() } - }, [sort, token]) + }, [sort, token, strategyPage]) const loadViewerContext = async () => { if (!token) return @@ -353,23 +356,27 @@ export function StrategiesPage() { } } - const loadStrategies = async () => { + const loadStrategies = async (pageToLoad = strategyPage) => { setLoading(true) try { - const res = await fetch(`${API_BASE}/signals/feed?message_type=strategy&limit=50&sort=${sort}`, { + const offset = (pageToLoad - 1) * COMMUNITY_FEED_PAGE_SIZE + const res = await fetch(`${API_BASE}/signals/feed?message_type=strategy&limit=${COMMUNITY_FEED_PAGE_SIZE}&offset=${offset}&sort=${sort}`, { headers: token ? { 'Authorization': `Bearer ${token}` } : undefined }) if (!res.ok) { console.error('Failed to load strategies:', res.status) setStrategies([]) + setStrategyTotal(0) setLoading(false) return } const data = await res.json() setStrategies(data.signals || []) + setStrategyTotal(data.total || 0) } catch (e) { console.error('Error loading strategies:', e) setStrategies([]) + setStrategyTotal(0) } setLoading(false) } @@ -430,7 +437,8 @@ export function StrategiesPage() { if (res.ok) { setFormData({ title: '', content: '', symbols: '', tags: '', market: 'us-stock' }) setShowForm(false) - loadStrategies() + setStrategyPage(1) + loadStrategies(1) } } catch (e) { console.error(e) @@ -460,7 +468,10 @@ export function StrategiesPage() { +
+ {language === 'zh' + ? `第 ${strategyPage} / ${strategyTotalPages} 页,共 ${strategyTotal} 条策略` + : `Page ${strategyPage} / ${strategyTotalPages}, ${strategyTotal} strategies total`} +
+ + + )} + )} ) @@ -584,6 +620,8 @@ export function StrategiesPage() { export function DiscussionsPage() { const [token] = useState(localStorage.getItem('claw_token')) const [discussions, setDiscussions] = useState([]) + const [discussionPage, setDiscussionPage] = useState(1) + const [discussionTotal, setDiscussionTotal] = useState(0) const [recentNotifications, setRecentNotifications] = useState([]) const [followingLeaderIds, setFollowingLeaderIds] = useState([]) const [viewerId, setViewerId] = useState(null) @@ -597,14 +635,15 @@ export function DiscussionsPage() { const signalIdFromQuery = new URLSearchParams(location.search).get('signal') const autoOpenReplyBox = new URLSearchParams(location.search).get('reply') === '1' + const discussionTotalPages = Math.max(1, Math.ceil(discussionTotal / COMMUNITY_FEED_PAGE_SIZE)) useEffect(() => { - loadDiscussions() + loadDiscussions(discussionPage) if (token) { loadRecentNotifications() loadViewerContext() } - }, [sort, token]) + }, [sort, token, discussionPage]) const loadViewerContext = async () => { if (!token) return @@ -626,23 +665,27 @@ export function DiscussionsPage() { } } - const loadDiscussions = async () => { + const loadDiscussions = async (pageToLoad = discussionPage) => { setLoading(true) try { - const res = await fetch(`${API_BASE}/signals/feed?message_type=discussion&limit=50&sort=${sort}`, { + const offset = (pageToLoad - 1) * COMMUNITY_FEED_PAGE_SIZE + const res = await fetch(`${API_BASE}/signals/feed?message_type=discussion&limit=${COMMUNITY_FEED_PAGE_SIZE}&offset=${offset}&sort=${sort}`, { headers: token ? { 'Authorization': `Bearer ${token}` } : undefined }) if (!res.ok) { console.error('Failed to load discussions:', res.status) setDiscussions([]) + setDiscussionTotal(0) setLoading(false) return } const data = await res.json() setDiscussions(data.signals || []) + setDiscussionTotal(data.total || 0) } catch (e) { console.error('Error loading discussions:', e) setDiscussions([]) + setDiscussionTotal(0) } setLoading(false) } @@ -686,7 +729,8 @@ export function DiscussionsPage() { if (res.ok) { setFormData({ title: '', content: '', tags: '', market: 'us-stock' }) setShowForm(false) - loadDiscussions() + setDiscussionPage(1) + loadDiscussions(1) loadRecentNotifications() } else { const data = await res.json() @@ -755,7 +799,10 @@ export function DiscussionsPage() { +
+ {language === 'zh' + ? `第 ${discussionPage} / ${discussionTotalPages} 页,共 ${discussionTotal} 条讨论` + : `Page ${discussionPage} / ${discussionTotalPages}, ${discussionTotal} discussions total`} +
+ + + )} + )} ) diff --git a/service/frontend/src/appShared.tsx b/service/frontend/src/appShared.tsx index 0109247..4a455be 100644 --- a/service/frontend/src/appShared.tsx +++ b/service/frontend/src/appShared.tsx @@ -39,7 +39,10 @@ export const REFRESH_INTERVAL = parseInt(import.meta.env.VITE_REFRESH_INTERVAL | export const NOTIFICATION_POLL_INTERVAL = 60 * 1000 export const FIVE_MINUTES_MS = 5 * 60 * 1000 export const ONE_DAY_MS = 24 * 60 * 60 * 1000 -export const SIGNALS_FEED_PAGE_SIZE = 15 +export const SIGNALS_FEED_PAGE_SIZE = 20 +export const LEADERBOARD_PAGE_SIZE = 20 +export const COPY_TRADING_PAGE_SIZE = 20 +export const COMMUNITY_FEED_PAGE_SIZE = 20 export const FINANCIAL_NEWS_PAGE_SIZE = 4 export const LEADERBOARD_LINE_COLORS = ['#d66a5f', '#d49e52', '#b8b15f', '#7bb174', '#5aa7a3', '#4e88b7', '#7a78c5', '#a16cb8', '#c66f9f', '#cb7a7a'] diff --git a/service/requirements.txt b/service/requirements.txt index e6df757..1e14fb9 100644 --- a/service/requirements.txt +++ b/service/requirements.txt @@ -9,3 +9,4 @@ aiohttp>=3.9.1 python-multipart>=0.0.6 openrouter>=1.0.0 psycopg[binary]>=3.2.1 +redis>=5.0.8 diff --git a/service/server/cache.py b/service/server/cache.py new file mode 100644 index 0000000..1b15f91 --- /dev/null +++ b/service/server/cache.py @@ -0,0 +1,165 @@ +""" +Cache Module + +Redis-backed cache helpers with graceful fallback when Redis is disabled or unavailable. +""" + +from __future__ import annotations + +import json +import threading +import time +from typing import Any, Optional + +from config import REDIS_ENABLED, REDIS_PREFIX, REDIS_URL + +try: + import redis +except ImportError: # pragma: no cover - optional until Redis is installed + redis = None + + +_CONNECT_RETRY_INTERVAL_SECONDS = 10.0 +_client_lock = threading.Lock() +_redis_client: Optional["redis.Redis"] = None +_last_connect_attempt_at = 0.0 +_last_connect_error: Optional[str] = None + + +def _namespaced(key: str) -> str: + cleaned = (key or "").strip() + if not cleaned: + raise ValueError("Cache key must not be empty") + return f"{REDIS_PREFIX}:{cleaned}" + + +def redis_configured() -> bool: + return REDIS_ENABLED and bool(REDIS_URL) + + +def get_redis_client() -> Optional["redis.Redis"]: + global _redis_client, _last_connect_attempt_at, _last_connect_error + + if not redis_configured() or redis is None: + return None + + if _redis_client is not None: + return _redis_client + + now = time.time() + if now - _last_connect_attempt_at < _CONNECT_RETRY_INTERVAL_SECONDS: + return None + + with _client_lock: + if _redis_client is not None: + return _redis_client + + now = time.time() + if now - _last_connect_attempt_at < _CONNECT_RETRY_INTERVAL_SECONDS: + return None + + _last_connect_attempt_at = now + try: + client = redis.Redis.from_url(REDIS_URL, decode_responses=True) + client.ping() + _redis_client = client + _last_connect_error = None + return _redis_client + except Exception as exc: + _redis_client = None + _last_connect_error = str(exc) + return None + + +def get_cache_status() -> dict[str, Any]: + client = get_redis_client() + return { + "enabled": REDIS_ENABLED, + "configured": bool(REDIS_URL), + "available": client is not None, + "prefix": REDIS_PREFIX, + "client_installed": redis is not None, + "last_error": _last_connect_error, + } + + +def get_json(key: str) -> Optional[Any]: + client = get_redis_client() + if client is None: + return None + + raw = client.get(_namespaced(key)) + if raw is None: + return None + + try: + return json.loads(raw) + except Exception: + return None + + +def set_json(key: str, value: Any, ttl_seconds: Optional[int] = None) -> bool: + client = get_redis_client() + if client is None: + return False + + payload = json.dumps(value, separators=(",", ":"), default=str) + namespaced_key = _namespaced(key) + + if ttl_seconds is not None and ttl_seconds > 0: + return bool(client.set(namespaced_key, payload, ex=int(ttl_seconds))) + return bool(client.set(namespaced_key, payload)) + + +def delete(key: str) -> int: + client = get_redis_client() + if client is None: + return 0 + return int(client.delete(_namespaced(key))) + + +def delete_pattern(pattern: str) -> int: + client = get_redis_client() + if client is None: + return 0 + + match_pattern = _namespaced(pattern) + keys = list(client.scan_iter(match=match_pattern)) + if not keys: + return 0 + return int(client.delete(*keys)) + + +def acquire_lock( + name: str, + timeout_seconds: int = 30, + blocking: bool = False, + blocking_timeout: Optional[float] = None, +): + client = get_redis_client() + if client is None: + return None + + return client.lock( + _namespaced(f"lock:{name}"), + timeout=timeout_seconds, + blocking=blocking, + blocking_timeout=blocking_timeout, + ) + + +def publish(channel: str, message: Any) -> int: + client = get_redis_client() + if client is None: + return 0 + + if not isinstance(message, str): + message = json.dumps(message, separators=(",", ":"), default=str) + return int(client.publish(_namespaced(f"pubsub:{channel}"), message)) + + +def create_pubsub(): + client = get_redis_client() + if client is None: + return None + return client.pubsub() diff --git a/service/server/config.py b/service/server/config.py index 79243e9..fc19634 100644 --- a/service/server/config.py +++ b/service/server/config.py @@ -18,6 +18,11 @@ load_dotenv(env_path) # Database DATABASE_URL = os.getenv("DATABASE_URL", "") +# Cache / Redis +REDIS_ENABLED = os.getenv("REDIS_ENABLED", "false").strip().lower() in {"1", "true", "yes", "on"} +REDIS_URL = os.getenv("REDIS_URL", "").strip() +REDIS_PREFIX = os.getenv("REDIS_PREFIX", "ai_trader").strip() or "ai_trader" + # API Keys ALPHA_VANTAGE_API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY", "demo") diff --git a/service/server/main.py b/service/server/main.py index 2b4ed9b..3538937 100644 --- a/service/server/main.py +++ b/service/server/main.py @@ -35,17 +35,13 @@ logging.basicConfig( logger = logging.getLogger(__name__) +from cache import get_cache_status from database import init_database, get_database_status from routes import create_app from tasks import ( - update_position_prices, - record_profit_history, - settle_polymarket_positions, - refresh_etf_flow_snapshots_loop, - refresh_macro_signal_snapshots_loop, - refresh_market_news_snapshots_loop, - refresh_stock_analysis_snapshots_loop, _update_trending_cache, + background_tasks_enabled_for_api, + start_background_tasks, ) # Initialize database @@ -60,38 +56,34 @@ app = create_app() @app.on_event("startup") async def startup_event(): """Startup event - schedule background tasks.""" - import asyncio db_status = get_database_status() logger.info( "Database ready: backend=%s details=%s", db_status.get("backend"), {key: value for key, value in db_status.items() if key != "backend"}, ) + cache_status = get_cache_status() + logger.info( + "Cache ready: enabled=%s configured=%s available=%s prefix=%s client_installed=%s error=%s", + cache_status.get("enabled"), + cache_status.get("configured"), + cache_status.get("available"), + cache_status.get("prefix"), + cache_status.get("client_installed"), + cache_status.get("last_error"), + ) # Initialize trending cache logger.info("Initializing trending cache...") _update_trending_cache() - # Start background task for updating position prices - logger.info("Starting position price update background task...") - asyncio.create_task(update_position_prices()) - # Start background task for recording profit history - logger.info("Starting profit history recording task...") - asyncio.create_task(record_profit_history()) - # Start background task for Polymarket settlement - logger.info("Starting Polymarket settlement task...") - asyncio.create_task(settle_polymarket_positions()) - # Start background task for market-news snapshots - logger.info("Starting market news snapshot task...") - asyncio.create_task(refresh_market_news_snapshots_loop()) - # Start background task for macro signal snapshots - logger.info("Starting macro signal snapshot task...") - asyncio.create_task(refresh_macro_signal_snapshots_loop()) - # Start background task for ETF flow snapshots - logger.info("Starting ETF flow snapshot task...") - asyncio.create_task(refresh_etf_flow_snapshots_loop()) - # Start background task for stock analysis snapshots - logger.info("Starting stock analysis snapshot task...") - asyncio.create_task(refresh_stock_analysis_snapshots_loop()) - logger.info("All background tasks started") + if not background_tasks_enabled_for_api(): + logger.info( + "API background tasks disabled. Run `python service/server/worker.py` " + "to process prices, profit history, settlements, and market intel." + ) + return + + started = start_background_tasks(logger) + logger.info("Background tasks started: %s", len(started)) # ==================== Run ==================== diff --git a/service/server/market_intel.py b/service/server/market_intel.py index c9da3b7..b24384e 100644 --- a/service/server/market_intel.py +++ b/service/server/market_intel.py @@ -22,6 +22,7 @@ try: except ImportError: # pragma: no cover - optional dependency in some environments OpenRouter = None +from cache import delete_pattern, get_json, set_json from config import ALPHA_VANTAGE_API_KEY from database import get_db_connection @@ -38,6 +39,19 @@ ETF_FLOW_HISTORY_LIMIT = int(os.getenv("ETF_FLOW_HISTORY_LIMIT", "96")) ETF_FLOW_LOOKBACK_DAYS = int(os.getenv("ETF_FLOW_LOOKBACK_DAYS", "1")) ETF_FLOW_BASELINE_VOLUME_DAYS = int(os.getenv("ETF_FLOW_BASELINE_VOLUME_DAYS", "5")) STOCK_ANALYSIS_HISTORY_LIMIT = int(os.getenv("STOCK_ANALYSIS_HISTORY_LIMIT", "120")) +MARKET_NEWS_CACHE_TTL_SECONDS = max(30, int(os.getenv("MARKET_NEWS_REFRESH_INTERVAL", "3600"))) +MACRO_SIGNAL_CACHE_TTL_SECONDS = max(30, int(os.getenv("MACRO_SIGNAL_REFRESH_INTERVAL", "3600"))) +ETF_FLOW_CACHE_TTL_SECONDS = max(30, int(os.getenv("ETF_FLOW_REFRESH_INTERVAL", "3600"))) +STOCK_ANALYSIS_CACHE_TTL_SECONDS = max(30, int(os.getenv("STOCK_ANALYSIS_REFRESH_INTERVAL", "7200"))) +MARKET_INTEL_OVERVIEW_CACHE_TTL_SECONDS = max( + 30, + min( + MARKET_NEWS_CACHE_TTL_SECONDS, + MACRO_SIGNAL_CACHE_TTL_SECONDS, + ETF_FLOW_CACHE_TTL_SECONDS, + STOCK_ANALYSIS_CACHE_TTL_SECONDS, + ), +) FALLBACK_STOCK_ANALYSIS_SYMBOLS = [ symbol.strip().upper() for symbol in os.getenv("MARKET_INTEL_STOCK_SYMBOLS", "NVDA,AAPL,MSFT,AMZN,TSLA,META").split(",") @@ -82,6 +96,12 @@ MACRO_SYMBOLS = { "dollar": "UUP", } +MARKET_INTEL_CACHE_PREFIX = "market_intel" + + +def _cache_key(*parts: object) -> str: + return ":".join([MARKET_INTEL_CACHE_PREFIX, *[str(part) for part in parts]]) + BTC_ETF_SYMBOLS = [ "IBIT", "FBTC", @@ -995,6 +1015,9 @@ def refresh_market_news_snapshots() -> dict[str, Any]: finally: conn.close() + delete_pattern(_cache_key("news", "*")) + delete_pattern(_cache_key("overview")) + return { "inserted_categories": inserted, "errors": errors, @@ -1110,6 +1133,9 @@ def refresh_macro_signal_snapshot() -> dict[str, Any]: finally: conn.close() + delete_pattern(_cache_key("macro_signals")) + delete_pattern(_cache_key("overview")) + return { "verdict": snapshot["verdict"], "bullish_count": snapshot["bullish_count"], @@ -1119,6 +1145,11 @@ def refresh_macro_signal_snapshot() -> dict[str, Any]: def get_macro_signals_payload() -> dict[str, Any]: + cache_key = _cache_key("macro_signals") + cached = get_json(cache_key) + if isinstance(cached, dict): + return cached + conn = get_db_connection() cursor = conn.cursor() try: @@ -1132,7 +1163,7 @@ def get_macro_signals_payload() -> dict[str, Any]: ) row = cursor.fetchone() if not row: - return { + payload = { "available": False, "verdict": "unavailable", "bullish_count": 0, @@ -1142,7 +1173,9 @@ def get_macro_signals_payload() -> dict[str, Any]: "source": {}, "created_at": None, } - return { + set_json(cache_key, payload, ttl_seconds=MACRO_SIGNAL_CACHE_TTL_SECONDS) + return payload + payload = { "available": True, "verdict": row["verdict"], "bullish_count": row["bullish_count"], @@ -1152,6 +1185,8 @@ def get_macro_signals_payload() -> dict[str, Any]: "source": json.loads(row["source_json"] or "{}"), "created_at": row["created_at"], } + set_json(cache_key, payload, ttl_seconds=MACRO_SIGNAL_CACHE_TTL_SECONDS) + return payload finally: conn.close() @@ -1181,6 +1216,9 @@ def refresh_etf_flow_snapshot() -> dict[str, Any]: finally: conn.close() + delete_pattern(_cache_key("etf_flows")) + delete_pattern(_cache_key("overview")) + return { "direction": summary["direction"], "tracked_count": summary["tracked_count"], @@ -1189,6 +1227,11 @@ def refresh_etf_flow_snapshot() -> dict[str, Any]: def get_etf_flows_payload() -> dict[str, Any]: + cache_key = _cache_key("etf_flows") + cached = get_json(cache_key) + if isinstance(cached, dict): + return cached + conn = get_db_connection() cursor = conn.cursor() try: @@ -1202,21 +1245,25 @@ def get_etf_flows_payload() -> dict[str, Any]: ) row = cursor.fetchone() if not row: - return { + payload = { "available": False, "summary": {}, "etfs": [], "created_at": None, "is_estimated": True, } + set_json(cache_key, payload, ttl_seconds=ETF_FLOW_CACHE_TTL_SECONDS) + return payload summary = json.loads(row["summary_json"] or "{}") - return { + payload = { "available": True, "summary": summary, "etfs": json.loads(row["etfs_json"] or "[]"), "created_at": row["created_at"], "is_estimated": bool(summary.get("is_estimated", True)), } + set_json(cache_key, payload, ttl_seconds=ETF_FLOW_CACHE_TTL_SECONDS) + return payload finally: conn.close() @@ -1273,6 +1320,9 @@ def refresh_stock_analysis_snapshots() -> dict[str, Any]: finally: conn.close() + delete_pattern(_cache_key("stocks", "*")) + delete_pattern(_cache_key("overview")) + return { "inserted_symbols": inserted, "errors": errors, @@ -1282,6 +1332,11 @@ def refresh_stock_analysis_snapshots() -> dict[str, Any]: def get_stock_analysis_latest_payload(symbol: str) -> dict[str, Any]: symbol = symbol.strip().upper() + cache_key = _cache_key("stocks", "latest", symbol) + cached = get_json(cache_key) + if isinstance(cached, dict): + return cached + conn = get_db_connection() cursor = conn.cursor() try: @@ -1299,8 +1354,10 @@ def get_stock_analysis_latest_payload(symbol: str) -> dict[str, Any]: ) row = cursor.fetchone() if not row: - return {"available": False, "symbol": symbol} - return { + payload = {"available": False, "symbol": symbol} + set_json(cache_key, payload, ttl_seconds=STOCK_ANALYSIS_CACHE_TTL_SECONDS) + return payload + payload = { "available": True, "symbol": row["symbol"], "market": row["market"], @@ -1318,12 +1375,20 @@ def get_stock_analysis_latest_payload(symbol: str) -> dict[str, Any]: "analysis": json.loads(row["analysis_json"] or "{}"), "created_at": row["created_at"], } + set_json(cache_key, payload, ttl_seconds=STOCK_ANALYSIS_CACHE_TTL_SECONDS) + return payload finally: conn.close() def get_stock_analysis_history_payload(symbol: str, limit: int = 10) -> dict[str, Any]: symbol = symbol.strip().upper() + normalized_limit = max(1, min(limit, 30)) + cache_key = _cache_key("stocks", "history", symbol, normalized_limit) + cached = get_json(cache_key) + if isinstance(cached, dict): + return cached + conn = get_db_connection() cursor = conn.cursor() try: @@ -1335,10 +1400,10 @@ def get_stock_analysis_history_payload(symbol: str, limit: int = 10) -> dict[str ORDER BY created_at DESC, id DESC LIMIT ? """, - (symbol, max(1, min(limit, 30))), + (symbol, normalized_limit), ) rows = cursor.fetchall() - return { + payload = { "available": bool(rows), "symbol": symbol, "history": [ @@ -1354,19 +1419,36 @@ def get_stock_analysis_history_payload(symbol: str, limit: int = 10) -> dict[str for row in rows ], } + set_json(cache_key, payload, ttl_seconds=STOCK_ANALYSIS_CACHE_TTL_SECONDS) + return payload finally: conn.close() def get_featured_stock_analysis_payload(limit: int = 6) -> dict[str, Any]: - symbols = _get_hot_us_stock_symbols(limit=max(1, min(limit, 10))) - return { + normalized_limit = max(1, min(limit, 10)) + cache_key = _cache_key("stocks", "featured", normalized_limit) + cached = get_json(cache_key) + if isinstance(cached, dict): + return cached + + symbols = _get_hot_us_stock_symbols(limit=normalized_limit) + payload = { "available": True, "items": [get_stock_analysis_latest_payload(symbol) for symbol in symbols], } + set_json(cache_key, payload, ttl_seconds=STOCK_ANALYSIS_CACHE_TTL_SECONDS) + return payload def get_market_news_payload(category: Optional[str] = None, limit: int = 5) -> dict[str, Any]: + normalized_category = (category or "").strip().lower() or "all" + normalized_limit = max(limit, 1) + cache_key = _cache_key("news", normalized_category, normalized_limit) + cached = get_json(cache_key) + if isinstance(cached, dict): + return cached + requested_categories = [category] if category else list(NEWS_CATEGORY_DEFINITIONS.keys()) sections = [] @@ -1399,7 +1481,7 @@ def get_market_news_payload(category: Optional[str] = None, limit: int = 5) -> d "label_zh": definition["label_zh"], "description": definition["description"], "description_zh": definition["description_zh"], - "items": (snapshot["items"] or [])[: max(limit, 1)], + "items": (snapshot["items"] or [])[: normalized_limit], "summary": snapshot["summary"], "created_at": snapshot["created_at"], "available": True, @@ -1408,15 +1490,22 @@ def get_market_news_payload(category: Optional[str] = None, limit: int = 5) -> d last_updated_at = max((section["created_at"] for section in sections if section.get("created_at")), default=None) total_items = sum(int((section.get("summary") or {}).get("item_count") or 0) for section in sections) - return { + payload = { "categories": sections, "last_updated_at": last_updated_at, "total_items": total_items, "available": any(section.get("available") for section in sections), } + set_json(cache_key, payload, ttl_seconds=MARKET_NEWS_CACHE_TTL_SECONDS) + return payload def get_market_intel_overview() -> dict[str, Any]: + cache_key = _cache_key("overview") + cached = get_json(cache_key) + if isinstance(cached, dict): + return cached + macro_payload = get_macro_signals_payload() etf_payload = get_etf_flows_payload() stock_payload = get_featured_stock_analysis_payload(limit=4) @@ -1452,7 +1541,7 @@ def get_market_intel_overview() -> dict[str, Any]: latest_item_time = item_time latest_headline = item.get("title") - return { + payload = { "available": bool(available_categories) or bool(macro_payload.get("available")), "last_updated_at": max( [timestamp for timestamp in (news_payload["last_updated_at"], macro_payload.get("created_at")) if timestamp], @@ -1485,6 +1574,8 @@ def get_market_intel_overview() -> dict[str, Any]: "top_source": (section.get("summary") or {}).get("top_source"), "created_at": section.get("created_at"), } - for section in categories + for section in categories ], } + set_json(cache_key, payload, ttl_seconds=MARKET_INTEL_OVERVIEW_CACHE_TTL_SECONDS) + return payload diff --git a/service/server/price_fetcher.py b/service/server/price_fetcher.py index 03365ad..a96414f 100644 --- a/service/server/price_fetcher.py +++ b/service/server/price_fetcher.py @@ -6,6 +6,7 @@ Crypto: 从 Hyperliquid 获取价格(停止使用 Alpha Vantage crypto 端点 """ import os +import random import requests from datetime import datetime, timezone, timedelta from typing import Optional, Dict, Tuple, Any @@ -23,6 +24,11 @@ HYPERLIQUID_API_URL = os.environ.get("HYPERLIQUID_API_URL", "https://api.hyperli # Polymarket public endpoints (no API key required for reads) POLYMARKET_GAMMA_BASE_URL = os.environ.get("POLYMARKET_GAMMA_BASE_URL", "https://gamma-api.polymarket.com").strip() POLYMARKET_CLOB_BASE_URL = os.environ.get("POLYMARKET_CLOB_BASE_URL", "https://clob.polymarket.com").strip() +PRICE_FETCH_TIMEOUT_SECONDS = float(os.environ.get("PRICE_FETCH_TIMEOUT_SECONDS", "10")) +PRICE_FETCH_MAX_RETRIES = max(0, int(os.environ.get("PRICE_FETCH_MAX_RETRIES", "2"))) +PRICE_FETCH_BACKOFF_BASE_SECONDS = max(0.0, float(os.environ.get("PRICE_FETCH_BACKOFF_BASE_SECONDS", "0.35"))) +PRICE_FETCH_ERROR_COOLDOWN_SECONDS = max(0.0, float(os.environ.get("PRICE_FETCH_ERROR_COOLDOWN_SECONDS", "20"))) +PRICE_FETCH_RATE_LIMIT_COOLDOWN_SECONDS = max(0.0, float(os.environ.get("PRICE_FETCH_RATE_LIMIT_COOLDOWN_SECONDS", "60"))) # 时区常量 UTC = timezone.utc @@ -31,6 +37,8 @@ ET_TZ = timezone(ET_OFFSET) _POLYMARKET_CONDITION_ID_RE = re.compile(r"^0x[a-fA-F0-9]{64}$") _POLYMARKET_TOKEN_ID_RE = re.compile(r"^\d+$") +_RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504} +_provider_cooldowns: Dict[str, float] = {} # Polymarket outcome prices are probabilities in [0, 1]. Reject values outside to avoid # token_id/condition_id or other API noise being interpreted as price (e.g. 1.5e+73). @@ -48,6 +56,108 @@ _polymarket_token_cache: Dict[str, Tuple[str, float]] = {} _POLYMARKET_TOKEN_CACHE_TTL_S = 300.0 +def _provider_cooldown_remaining(provider: str) -> float: + return max(0.0, _provider_cooldowns.get(provider, 0.0) - time.time()) + + +def _activate_provider_cooldown(provider: str, duration_s: float, reason: str) -> None: + if duration_s <= 0: + return + until = time.time() + duration_s + previous_until = _provider_cooldowns.get(provider, 0.0) + _provider_cooldowns[provider] = max(previous_until, until) + remaining = _provider_cooldown_remaining(provider) + print(f"[Price API] {provider} cooldown {remaining:.1f}s ({reason})") + + +def _retry_delay(attempt: int) -> float: + if PRICE_FETCH_BACKOFF_BASE_SECONDS <= 0: + return 0.0 + base = PRICE_FETCH_BACKOFF_BASE_SECONDS * (2 ** attempt) + return base + random.uniform(0.0, base * 0.25) + + +def _request_json_with_retry( + provider: str, + method: str, + url: str, + *, + params: Optional[dict] = None, + json_payload: Optional[dict] = None, +) -> object: + remaining = _provider_cooldown_remaining(provider) + if remaining > 0: + raise RuntimeError(f"{provider} cooldown active for {remaining:.1f}s") + + last_exc: Optional[Exception] = None + attempts = PRICE_FETCH_MAX_RETRIES + 1 + + for attempt in range(attempts): + try: + if method == "POST": + resp = requests.post(url, json=json_payload, timeout=PRICE_FETCH_TIMEOUT_SECONDS) + else: + resp = requests.get(url, params=params, timeout=PRICE_FETCH_TIMEOUT_SECONDS) + + if resp.status_code in _RETRYABLE_STATUS_CODES: + resp.raise_for_status() + + resp.raise_for_status() + return resp.json() + except requests.HTTPError as exc: + status_code = exc.response.status_code if exc.response is not None else None + retryable = status_code in _RETRYABLE_STATUS_CODES + last_exc = exc + + if retryable and attempt < attempts - 1: + delay = _retry_delay(attempt) + print( + f"[Price API] {provider} retry {attempt + 1}/{attempts - 1} " + f"after HTTP {status_code}; sleeping {delay:.2f}s" + ) + if delay > 0: + time.sleep(delay) + continue + + if status_code == 429: + _activate_provider_cooldown( + provider, + PRICE_FETCH_RATE_LIMIT_COOLDOWN_SECONDS, + "HTTP 429" + ) + elif status_code is not None and status_code >= 500: + _activate_provider_cooldown( + provider, + PRICE_FETCH_ERROR_COOLDOWN_SECONDS, + f"HTTP {status_code}" + ) + raise + except (requests.Timeout, requests.ConnectionError) as exc: + last_exc = exc + if attempt < attempts - 1: + delay = _retry_delay(attempt) + print( + f"[Price API] {provider} retry {attempt + 1}/{attempts - 1} " + f"after {exc.__class__.__name__}; sleeping {delay:.2f}s" + ) + if delay > 0: + time.sleep(delay) + continue + _activate_provider_cooldown( + provider, + PRICE_FETCH_ERROR_COOLDOWN_SECONDS, + exc.__class__.__name__ + ) + raise + except requests.RequestException as exc: + last_exc = exc + raise + + if last_exc is not None: + raise last_exc + raise RuntimeError(f"{provider} request failed without response") + + def _polymarket_market_title(market: Optional[dict]) -> Optional[str]: if not isinstance(market, dict): return None @@ -133,14 +243,20 @@ def _normalize_hyperliquid_symbol(symbol: str) -> str: def _hyperliquid_post(payload: dict) -> object: if not HYPERLIQUID_API_URL: raise RuntimeError("HYPERLIQUID_API_URL is empty") - resp = requests.post(HYPERLIQUID_API_URL, json=payload, timeout=10) - resp.raise_for_status() - return resp.json() + return _request_json_with_retry( + "hyperliquid", + "POST", + HYPERLIQUID_API_URL, + json_payload=payload, + ) def _polymarket_get_json(url: str, params: Optional[dict] = None) -> object: - resp = requests.get(url, params=params, timeout=10) - resp.raise_for_status() - return resp.json() + return _request_json_with_retry( + "polymarket", + "GET", + url, + params=params, + ) def _parse_string_array(value: Any) -> list[str]: @@ -520,13 +636,22 @@ def _get_us_stock_price(symbol: str, executed_at: str) -> Optional[float]: } try: - response = requests.get(BASE_URL, params=params, timeout=10) - data = response.json() + data = _request_json_with_retry( + "alphavantage", + "GET", + BASE_URL, + params=params, + ) if "Error Message" in data: print(f"[Price API] Error: {data.get('Error Message')}") return None if "Note" in data: + _activate_provider_cooldown( + "alphavantage", + PRICE_FETCH_RATE_LIMIT_COOLDOWN_SECONDS, + "body rate limit note" + ) print(f"[Price API] Rate limit: {data.get('Note')}") return None diff --git a/service/server/routes_shared.py b/service/server/routes_shared.py index aa5ce6b..f303891 100644 --- a/service/server/routes_shared.py +++ b/service/server/routes_shared.py @@ -1,5 +1,6 @@ import json import math +import os import re import time from dataclasses import dataclass, field @@ -36,13 +37,17 @@ PRICE_CACHE_KEY_PREFIX = 'price:quote' MENTION_PATTERN = re.compile(r'@([A-Za-z0-9_\-]{2,64})') +def allow_sync_price_fetch_in_api() -> bool: + return os.getenv('ALLOW_SYNC_PRICE_FETCH_IN_API', 'false').strip().lower() in {'1', 'true', 'yes', 'on'} + + @dataclass class RouteContext: grouped_signals_cache: dict[tuple[str, str, int, int], tuple[float, dict[str, Any]]] = field(default_factory=dict) agent_signals_cache: dict[tuple[int, str, int], tuple[float, dict[str, Any]]] = field(default_factory=dict) price_api_last_request: dict[int, float] = field(default_factory=dict) price_quote_cache: dict[tuple[str, str, str, str], tuple[float, dict[str, Any]]] = field(default_factory=dict) - leaderboard_cache: dict[tuple[int, int], tuple[float, dict[str, Any]]] = field(default_factory=dict) + leaderboard_cache: dict[tuple[int, int, int, bool], tuple[float, dict[str, Any]]] = field(default_factory=dict) content_rate_limit_state: dict[tuple[int, str], dict[str, Any]] = field(default_factory=dict) ws_connections: dict[int, WebSocket] = field(default_factory=dict) verification_codes: dict[str, dict[str, Any]] = field(default_factory=dict) @@ -133,16 +138,20 @@ def position_price_cache_key(row: Any) -> tuple[str, str, str, str]: def resolve_position_prices(rows: list[Any], now_str: str) -> dict[tuple[str, str, str, str], Optional[float]]: - from price_fetcher import get_price_from_market - resolved: dict[tuple[str, str, str, str], Optional[float]] = {} + fetch_missing = allow_sync_price_fetch_in_api() + get_price_from_market = None + if fetch_missing: + from price_fetcher import get_price_from_market as _get_price_from_market + get_price_from_market = _get_price_from_market + for row in rows: cache_key = position_price_cache_key(row) if cache_key in resolved: continue current_price = row['current_price'] - if current_price is None: + if current_price is None and get_price_from_market is not None: current_price = get_price_from_market( row['symbol'], now_str, diff --git a/service/server/routes_signals.py b/service/server/routes_signals.py index 072a6be..3b71208 100644 --- a/service/server/routes_signals.py +++ b/service/server/routes_signals.py @@ -21,6 +21,7 @@ from routes_shared import ( GROUPED_SIGNALS_CACHE_KEY_PREFIX, GROUPED_SIGNALS_CACHE_TTL_SECONDS, RouteContext, + allow_sync_price_fetch_in_api, decorate_polymarket_item, enforce_content_rate_limit, extract_mentions, @@ -49,6 +50,7 @@ def register_signal_routes(app: FastAPI, ctx: RouteContext) -> None: now = utc_now_iso_z() side = data.action action_lower = side.lower() + fetch_price_in_request = allow_sync_price_fetch_in_api() polymarket_token_id = None polymarket_outcome = None @@ -69,20 +71,33 @@ def register_signal_routes(app: FastAPI, ctx: RouteContext) -> None: raise HTTPException(status_code=400, detail='Quantity too large') if data.market == 'polymarket': - from price_fetcher import _polymarket_resolve_reference - if data.executed_at.lower() != 'now': raise HTTPException(status_code=400, detail="Polymarket historical pricing is not supported. Use executed_at='now'.") - contract = _polymarket_resolve_reference(data.symbol, token_id=data.token_id, outcome=data.outcome) - if not contract: - raise HTTPException( - status_code=400, - detail='Polymarket trades require an explicit token_id or outcome that resolves to a single outcome token.', - ) - polymarket_token_id = contract['token_id'] - polymarket_outcome = contract.get('outcome') + if fetch_price_in_request: + from price_fetcher import _polymarket_resolve_reference - from price_fetcher import get_price_from_market + contract = _polymarket_resolve_reference(data.symbol, token_id=data.token_id, outcome=data.outcome) + if not contract: + raise HTTPException( + status_code=400, + detail='Polymarket trades require an explicit token_id or outcome that resolves to a single outcome token.', + ) + polymarket_token_id = contract['token_id'] + polymarket_outcome = contract.get('outcome') + else: + polymarket_token_id = (data.token_id or '').strip() + polymarket_outcome = (data.outcome or '').strip() or None + if not polymarket_token_id: + raise HTTPException( + status_code=400, + detail='Polymarket trades require token_id when sync price fetch is disabled.', + ) + + get_price_from_market = None + if fetch_price_in_request: + from price_fetcher import get_price_from_market as _get_price_from_market + + get_price_from_market = _get_price_from_market if data.executed_at.lower() == 'now': now_utc = datetime.now(timezone.utc) @@ -101,16 +116,19 @@ def register_signal_routes(app: FastAPI, ctx: RouteContext) -> None: ) raise HTTPException(status_code=400, detail=f'{data.market} is currently closed') - actual_price = get_price_from_market( - data.symbol, - executed_at, - data.market, - token_id=polymarket_token_id, - outcome=polymarket_outcome, - ) - if not actual_price: - raise HTTPException(status_code=400, detail=f'Unable to fetch current price for {data.symbol}') - price = actual_price + if get_price_from_market is not None: + actual_price = get_price_from_market( + data.symbol, + executed_at, + data.market, + token_id=polymarket_token_id, + outcome=polymarket_outcome, + ) + if not actual_price: + raise HTTPException(status_code=400, detail=f'Unable to fetch current price for {data.symbol}') + price = actual_price + else: + price = data.price else: is_valid, error_msg = validate_executed_at(data.executed_at, data.market) if not is_valid: @@ -120,19 +138,22 @@ def register_signal_routes(app: FastAPI, ctx: RouteContext) -> None: if not executed_at.endswith('Z') and '+00:00' not in executed_at: executed_at = executed_at + 'Z' - actual_price = get_price_from_market( - data.symbol, - executed_at, - data.market, - token_id=polymarket_token_id, - outcome=polymarket_outcome, - ) - if not actual_price: - raise HTTPException( - status_code=400, - detail=f'Unable to fetch historical price for {data.symbol} at {executed_at}', + if get_price_from_market is not None: + actual_price = get_price_from_market( + data.symbol, + executed_at, + data.market, + token_id=polymarket_token_id, + outcome=polymarket_outcome, ) - price = actual_price + if not actual_price: + raise HTTPException( + status_code=400, + detail=f'Unable to fetch historical price for {data.symbol} at {executed_at}', + ) + price = actual_price + else: + price = data.price try: price = float(price) @@ -377,7 +398,7 @@ def register_signal_routes(app: FastAPI, ctx: RouteContext) -> None: 'outcome': polymarket_outcome, } if data.market == 'polymarket': - decorate_polymarket_item(payload, fetch_remote=True) + decorate_polymarket_item(payload, fetch_remote=fetch_price_in_request) return payload @app.post('/api/signals/strategy') @@ -651,9 +672,12 @@ def register_signal_routes(app: FastAPI, ctx: RouteContext) -> None: market: str = None, keyword: str = None, limit: int = 50, + offset: int = 0, sort: str = 'new', authorization: str = Header(None), ): + limit = max(1, min(limit, 100)) + offset = max(0, offset) viewer = None token = _extract_token(authorization) if token: @@ -713,6 +737,16 @@ def register_signal_routes(app: FastAPI, ctx: RouteContext) -> None: else: order_clause = 's.created_at DESC' + count_query = f""" + SELECT COUNT(*) AS total + FROM signals s + JOIN agents a ON a.id = s.agent_id + WHERE {where_clause} + """ + cursor.execute(count_query, params) + total_row = cursor.fetchone() + total = total_row['total'] if total_row else 0 + query = f""" SELECT s.*, @@ -724,9 +758,9 @@ def register_signal_routes(app: FastAPI, ctx: RouteContext) -> None: JOIN agents a ON a.id = s.agent_id WHERE {where_clause} ORDER BY {order_clause} - LIMIT ? + LIMIT ? OFFSET ? """ - params.append(limit) + params.extend([limit, offset]) cursor.execute(query, params) rows = cursor.fetchall() @@ -757,10 +791,22 @@ def register_signal_routes(app: FastAPI, ctx: RouteContext) -> None: signal_dict['is_following_author'] = signal_dict['agent_id'] in followed_author_ids signals.append(signal_dict) - return {'signals': signals} + return { + 'signals': signals, + 'total': total, + 'limit': limit, + 'offset': offset, + 'has_more': offset + len(signals) < total, + } @app.get('/api/signals/following') - async def get_following(authorization: str = Header(None)): + async def get_following( + limit: int = 500, + offset: int = 0, + authorization: str = Header(None), + ): + limit = max(1, min(limit, 500)) + offset = max(0, offset) token = _extract_token(authorization) agent = _get_agent_by_token(token) if not agent: @@ -768,6 +814,17 @@ def register_signal_routes(app: FastAPI, ctx: RouteContext) -> None: conn = get_db_connection() cursor = conn.cursor() + cursor.execute( + """ + SELECT COUNT(*) AS total + FROM subscriptions + WHERE follower_id = ? AND status = 'active' + """, + (agent['id'],), + ) + total_row = cursor.fetchone() + total = total_row['total'] if total_row else 0 + cursor.execute( """ SELECT @@ -790,8 +847,9 @@ def register_signal_routes(app: FastAPI, ctx: RouteContext) -> None: (SELECT MAX(sig.created_at) FROM signals sig WHERE sig.agent_id = s.leader_id), s.created_at ) DESC + LIMIT ? OFFSET ? """, - (agent['id'],), + (agent['id'], limit, offset), ) rows = cursor.fetchall() conn.close() @@ -813,7 +871,13 @@ def register_signal_routes(app: FastAPI, ctx: RouteContext) -> None: 'latest_discussion_title': row['latest_discussion_title'], }) - return {'following': following} + return { + 'following': following, + 'total': total, + 'limit': limit, + 'offset': offset, + 'has_more': offset + len(following) < total, + } @app.get('/api/signals/subscribers') async def get_subscribers(authorization: str = Header(None)): diff --git a/service/server/routes_trading.py b/service/server/routes_trading.py index 49f3771..4e702c0 100644 --- a/service/server/routes_trading.py +++ b/service/server/routes_trading.py @@ -15,6 +15,7 @@ from routes_shared import ( PRICE_QUOTE_CACHE_TTL_SECONDS, RouteContext, TRENDING_CACHE_KEY, + allow_sync_price_fetch_in_api, check_price_api_rate_limit, clamp_profit_for_display, decorate_polymarket_item, @@ -29,13 +30,22 @@ from utils import _extract_token def register_trading_routes(app: FastAPI, ctx: RouteContext) -> None: @app.get('/api/profit/history') - async def get_profit_history(limit: int = 10, days: int = 30): + async def get_profit_history( + limit: int = 10, + days: int = 30, + offset: int = 0, + include_history: bool = True, + ): days = max(1, min(days, 365)) limit = max(1, min(limit, 50)) + offset = max(0, offset) - cache_key = (limit, days) + cache_key = (limit, days, offset, include_history) now_ts = time.time() - redis_cache_key = f'{LEADERBOARD_CACHE_KEY_PREFIX}:limit={limit}:days={days}' + redis_cache_key = ( + f'{LEADERBOARD_CACHE_KEY_PREFIX}:' + f'limit={limit}:days={days}:offset={offset}:history={int(include_history)}' + ) cached_payload = get_json(redis_cache_key) if isinstance(cached_payload, dict): @@ -53,6 +63,15 @@ def register_trading_routes(app: FastAPI, ctx: RouteContext) -> None: cutoff = cutoff_dt.isoformat().replace('+00:00', 'Z') live_snapshot_recorded_at = utc_now_iso_z() + cursor.execute( + """ + SELECT COUNT(*) AS total + FROM agents + """ + ) + total_row = cursor.fetchone() + total = total_row['total'] if total_row else 0 + cursor.execute( """ SELECT @@ -81,9 +100,9 @@ def register_trading_routes(app: FastAPI, ctx: RouteContext) -> None: LEFT JOIN positions p ON p.agent_id = a.id GROUP BY a.id, a.name, a.cash, a.deposited ORDER BY profit DESC - LIMIT ? + LIMIT ? OFFSET ? """, - (cutoff, limit), + (cutoff, limit, offset), ) top_agents = [ { @@ -97,7 +116,13 @@ def register_trading_routes(app: FastAPI, ctx: RouteContext) -> None: if not top_agents: conn.close() - result = {'top_agents': []} + result = { + 'top_agents': [], + 'total': total, + 'limit': limit, + 'offset': offset, + 'has_more': False, + } ctx.leaderboard_cache[cache_key] = (now_ts, result) set_json(redis_cache_key, result, ttl_seconds=LEADERBOARD_CACHE_TTL_SECONDS) return result @@ -118,27 +143,29 @@ def register_trading_routes(app: FastAPI, ctx: RouteContext) -> None: result = [] for agent in top_agents: - cursor.execute( - """ - SELECT profit, recorded_at - FROM ( + history_points = [] + if include_history: + cursor.execute( + """ SELECT profit, recorded_at - FROM profit_history - WHERE agent_id = ? AND recorded_at >= ? - ORDER BY recorded_at DESC - LIMIT 2000 - ) recent_history - ORDER BY recorded_at ASC - """, - (agent['agent_id'], cutoff), - ) - history = cursor.fetchall() - history_points = [ - {'profit': clamp_profit_for_display(h['profit']), 'recorded_at': h['recorded_at']} - for h in history - ] + FROM ( + SELECT profit, recorded_at + FROM profit_history + WHERE agent_id = ? AND recorded_at >= ? + ORDER BY recorded_at DESC + LIMIT 2000 + ) recent_history + ORDER BY recorded_at ASC + """, + (agent['agent_id'], cutoff), + ) + history = cursor.fetchall() + history_points = [ + {'profit': clamp_profit_for_display(h['profit']), 'recorded_at': h['recorded_at']} + for h in history + ] - if not history_points or history_points[-1]['recorded_at'] != live_snapshot_recorded_at: + if include_history and (not history_points or history_points[-1]['recorded_at'] != live_snapshot_recorded_at): history_points.append({ 'profit': clamp_profit_for_display(agent['profit']), 'recorded_at': live_snapshot_recorded_at, @@ -213,7 +240,13 @@ def register_trading_routes(app: FastAPI, ctx: RouteContext) -> None: break conn.close() - payload = {'top_agents': result} + payload = { + 'top_agents': result, + 'total': total, + 'limit': limit, + 'offset': offset, + 'has_more': offset + len(result) < total, + } ctx.leaderboard_cache[cache_key] = (now_ts, payload) set_json(redis_cache_key, payload, ttl_seconds=LEADERBOARD_CACHE_TTL_SECONDS) return payload @@ -322,8 +355,6 @@ def register_trading_routes(app: FastAPI, ctx: RouteContext) -> None: outcome: Optional[str] = None, authorization: str = Header(None), ): - from price_fetcher import get_price_from_market - token = _extract_token(authorization) if not token: raise HTTPException(status_code=401, detail='Invalid token') @@ -361,13 +392,37 @@ def register_trading_routes(app: FastAPI, ctx: RouteContext) -> None: if cached and now_ts - cached[0] < PRICE_QUOTE_CACHE_TTL_SECONDS: return cached[1] - price = get_price_from_market(normalized_symbol, now, market, token_id=token_id, outcome=outcome) + price = None + conn = get_db_connection() + try: + cursor = conn.cursor() + cursor.execute( + """ + SELECT current_price + FROM positions + WHERE symbol = ? AND market = ? AND COALESCE(token_id, '') = COALESCE(?, '') + AND current_price IS NOT NULL + ORDER BY opened_at DESC + LIMIT 1 + """, + (normalized_symbol, market, token_id), + ) + row = cursor.fetchone() + if row: + price = row['current_price'] + finally: + conn.close() + + if price is None and allow_sync_price_fetch_in_api(): + from price_fetcher import get_price_from_market + + price = get_price_from_market(normalized_symbol, now, market, token_id=token_id, outcome=outcome) if price is None: raise HTTPException(status_code=404, detail='Price not available') payload = {'symbol': normalized_symbol, 'market': market, 'token_id': token_id, 'outcome': outcome, 'price': price} if market == 'polymarket': - decorate_polymarket_item(payload, fetch_remote=True) + decorate_polymarket_item(payload, fetch_remote=allow_sync_price_fetch_in_api()) ctx.price_quote_cache[cache_key] = (now_ts, payload) set_json(redis_cache_key, payload, ttl_seconds=PRICE_QUOTE_CACHE_TTL_SECONDS) return payload diff --git a/service/server/tasks.py b/service/server/tasks.py index cff0b22..49c804d 100644 --- a/service/server/tasks.py +++ b/service/server/tasks.py @@ -14,6 +14,24 @@ from typing import Optional, Dict, Any # Global trending cache (shared with routes) trending_cache: list = [] _last_profit_history_prune_at: float = 0.0 +_TRENDING_CACHE_KEY = "trending:top20" + + +def _env_bool(name: str, default: bool = False) -> bool: + raw = os.getenv(name) + if raw is None: + return default + return raw.strip().lower() in {"1", "true", "yes", "on"} + + +def _env_int(name: str, default: int, minimum: Optional[int] = None) -> int: + try: + value = int(os.getenv(name, str(default))) + except Exception: + value = default + if minimum is not None: + value = max(minimum, value) + return value def _backfill_polymarket_position_metadata() -> None: @@ -63,7 +81,7 @@ def _backfill_polymarket_position_metadata() -> None: def _update_trending_cache(): """Update trending cache - calculates from positions table.""" - global trending_cache + from cache import set_json from database import get_db_connection conn = get_db_connection() cursor = conn.cursor() @@ -78,7 +96,7 @@ def _update_trending_cache(): """) rows = cursor.fetchall() - trending_cache = [] + updated_trending: list[dict[str, Any]] = [] for row in rows: # Get current price from positions table cursor.execute(""" @@ -88,7 +106,7 @@ def _update_trending_cache(): """, (row["symbol"], row["market"], row["token_id"])) price_row = cursor.fetchone() - trending_cache.append({ + updated_trending.append({ "symbol": row["symbol"], "market": row["market"], "token_id": row["token_id"], @@ -98,33 +116,50 @@ def _update_trending_cache(): }) conn.close() + trending_cache.clear() + trending_cache.extend(updated_trending) + refresh_interval = max(60, _env_int("POSITION_REFRESH_INTERVAL", 900, minimum=60) * 2) + set_json(_TRENDING_CACHE_KEY, trending_cache, ttl_seconds=refresh_interval) def _prune_profit_history() -> None: - """Compact profit history so it remains useful for charts without growing forever.""" + """Tier profit history into high-resolution, 15m, hourly, and daily retention.""" from database import get_db_connection, using_postgres - full_resolution_hours = int(os.getenv("PROFIT_HISTORY_FULL_RESOLUTION_HOURS", "24")) - compact_window_days = int(os.getenv("PROFIT_HISTORY_COMPACT_WINDOW_DAYS", "7")) - bucket_minutes = int(os.getenv("PROFIT_HISTORY_COMPACT_BUCKET_MINUTES", "15")) + full_resolution_hours = _env_int("PROFIT_HISTORY_FULL_RESOLUTION_HOURS", 24, minimum=1) + fifteen_min_window_days = _env_int( + "PROFIT_HISTORY_15M_WINDOW_DAYS", + _env_int("PROFIT_HISTORY_COMPACT_WINDOW_DAYS", 7, minimum=1), + minimum=1, + ) + hourly_window_days = _env_int("PROFIT_HISTORY_HOURLY_WINDOW_DAYS", 30, minimum=fifteen_min_window_days) + daily_window_days = _env_int("PROFIT_HISTORY_DAILY_WINDOW_DAYS", 365, minimum=hourly_window_days) + bucket_minutes = _env_int("PROFIT_HISTORY_COMPACT_BUCKET_MINUTES", 15, minimum=1) - if compact_window_days <= 0: - return + if full_resolution_hours >= fifteen_min_window_days * 24: + full_resolution_hours = max(1, fifteen_min_window_days * 24 - 1) now = datetime.now(timezone.utc) - hard_cutoff = (now - timedelta(days=compact_window_days)).isoformat().replace("+00:00", "Z") + daily_cutoff = (now - timedelta(days=daily_window_days)).isoformat().replace("+00:00", "Z") + hourly_cutoff = (now - timedelta(days=hourly_window_days)).isoformat().replace("+00:00", "Z") + fifteen_min_cutoff = (now - timedelta(days=fifteen_min_window_days)).isoformat().replace("+00:00", "Z") full_resolution_cutoff = (now - timedelta(hours=full_resolution_hours)).isoformat().replace("+00:00", "Z") + deleted_old = 0 + deleted_15m = 0 + deleted_hourly = 0 + deleted_daily = 0 + conn = get_db_connection() try: cursor = conn.cursor() - cursor.execute("DELETE FROM profit_history WHERE recorded_at < ?", (hard_cutoff,)) + cursor.execute("DELETE FROM profit_history WHERE recorded_at < ?", (daily_cutoff,)) deleted_old = cursor.rowcount if cursor.rowcount is not None else 0 - deleted_compacted = 0 + conn.commit() - if bucket_minutes > 0 and full_resolution_cutoff > hard_cutoff: - if using_postgres(): + if using_postgres(): + if full_resolution_cutoff > fifteen_min_cutoff: cursor.execute(""" WITH ranked AS ( SELECT @@ -142,9 +177,49 @@ def _prune_profit_history() -> None: DELETE FROM profit_history ph USING ranked WHERE ph.id = ranked.id AND ranked.rn > 1 - """, (bucket_minutes, bucket_minutes, hard_cutoff, full_resolution_cutoff)) - deleted_compacted = cursor.rowcount if cursor.rowcount is not None else 0 - else: + """, (bucket_minutes, bucket_minutes, fifteen_min_cutoff, full_resolution_cutoff)) + deleted_15m = cursor.rowcount if cursor.rowcount is not None else 0 + conn.commit() + + if fifteen_min_cutoff > hourly_cutoff: + cursor.execute(""" + WITH ranked AS ( + SELECT + id, + ROW_NUMBER() OVER ( + PARTITION BY agent_id, date_trunc('hour', recorded_at::timestamptz) + ORDER BY recorded_at DESC, id DESC + ) AS rn + FROM profit_history + WHERE recorded_at >= ? AND recorded_at < ? + ) + DELETE FROM profit_history ph + USING ranked + WHERE ph.id = ranked.id AND ranked.rn > 1 + """, (hourly_cutoff, fifteen_min_cutoff)) + deleted_hourly = cursor.rowcount if cursor.rowcount is not None else 0 + conn.commit() + + if hourly_cutoff > daily_cutoff: + cursor.execute(""" + WITH ranked AS ( + SELECT + id, + ROW_NUMBER() OVER ( + PARTITION BY agent_id, date_trunc('day', recorded_at::timestamptz) + ORDER BY recorded_at DESC, id DESC + ) AS rn + FROM profit_history + WHERE recorded_at >= ? AND recorded_at < ? + ) + DELETE FROM profit_history ph + USING ranked + WHERE ph.id = ranked.id AND ranked.rn > 1 + """, (daily_cutoff, hourly_cutoff)) + deleted_daily = cursor.rowcount if cursor.rowcount is not None else 0 + conn.commit() + else: + if full_resolution_cutoff > fifteen_min_cutoff: cursor.execute(""" DELETE FROM profit_history WHERE id IN ( @@ -164,15 +239,67 @@ def _prune_profit_history() -> None: ) ranked WHERE rn > 1 ) - """, (bucket_minutes, hard_cutoff, full_resolution_cutoff)) - deleted_compacted = cursor.rowcount if cursor.rowcount is not None else 0 + """, (bucket_minutes, fifteen_min_cutoff, full_resolution_cutoff)) + deleted_15m = cursor.rowcount if cursor.rowcount is not None else 0 + conn.commit() - conn.commit() - if deleted_old or deleted_compacted: + if fifteen_min_cutoff > hourly_cutoff: + cursor.execute(""" + DELETE FROM profit_history + WHERE id IN ( + SELECT id + FROM ( + SELECT + id, + ROW_NUMBER() OVER ( + PARTITION BY agent_id, strftime('%Y-%m-%dT%H', recorded_at) + ORDER BY recorded_at DESC, id DESC + ) AS rn + FROM profit_history + WHERE recorded_at >= ? AND recorded_at < ? + ) ranked + WHERE rn > 1 + ) + """, (hourly_cutoff, fifteen_min_cutoff)) + deleted_hourly = cursor.rowcount if cursor.rowcount is not None else 0 + conn.commit() + + if hourly_cutoff > daily_cutoff: + cursor.execute(""" + DELETE FROM profit_history + WHERE id IN ( + SELECT id + FROM ( + SELECT + id, + ROW_NUMBER() OVER ( + PARTITION BY agent_id, strftime('%Y-%m-%d', recorded_at) + ORDER BY recorded_at DESC, id DESC + ) AS rn + FROM profit_history + WHERE recorded_at >= ? AND recorded_at < ? + ) ranked + WHERE rn > 1 + ) + """, (daily_cutoff, hourly_cutoff)) + deleted_daily = cursor.rowcount if cursor.rowcount is not None else 0 + conn.commit() + + total_deleted = deleted_old + deleted_15m + deleted_hourly + deleted_daily + if total_deleted: print( "[Profit History] Pruned history: " - f"deleted_old={deleted_old} compacted={deleted_compacted}" + f"deleted_old={deleted_old} " + f"compacted_15m={deleted_15m} " + f"compacted_hourly={deleted_hourly} " + f"compacted_daily={deleted_daily}" ) + if not using_postgres() and _env_bool("PROFIT_HISTORY_VACUUM_AFTER_PRUNE", True): + min_deleted = _env_int("PROFIT_HISTORY_VACUUM_MIN_DELETED_ROWS", 50000, minimum=1) + if total_deleted >= min_deleted: + cursor.execute("PRAGMA wal_checkpoint(TRUNCATE)") + cursor.execute("VACUUM") + print("[Profit History] SQLite VACUUM completed after prune") finally: conn.close() @@ -180,7 +307,7 @@ def _prune_profit_history() -> None: def _maybe_prune_profit_history() -> None: global _last_profit_history_prune_at - prune_interval = int(os.getenv("PROFIT_HISTORY_PRUNE_INTERVAL_SECONDS", "3600")) + prune_interval = _env_int("PROFIT_HISTORY_PRUNE_INTERVAL_SECONDS", 3600) if prune_interval <= 0: return @@ -198,7 +325,7 @@ async def update_position_prices(): from price_fetcher import get_price_from_market # Get max parallel requests from environment variable - max_parallel = int(os.getenv("MAX_PARALLEL_PRICE_FETCH", "5")) + max_parallel = _env_int("MAX_PARALLEL_PRICE_FETCH", 2, minimum=1) # Wait a bit on startup before first update await asyncio.sleep(5) @@ -279,7 +406,7 @@ async def update_position_prices(): print(f"[Price Update Error] {e}") # Wait interval from environment variable (default: 5 minutes = 300 seconds) - refresh_interval = int(os.getenv("POSITION_REFRESH_INTERVAL", "300")) + refresh_interval = _env_int("POSITION_REFRESH_INTERVAL", 900, minimum=60) print(f"[Price Update] Next update in {refresh_interval} seconds") await asyncio.sleep(refresh_interval) @@ -288,7 +415,7 @@ async def refresh_market_news_snapshots_loop(): """Background task to refresh market-news snapshots on a fixed interval.""" from market_intel import refresh_market_news_snapshots - refresh_interval = int(os.getenv("MARKET_NEWS_REFRESH_INTERVAL", "900")) + refresh_interval = _env_int("MARKET_NEWS_REFRESH_INTERVAL", 3600, minimum=300) # Give the API a moment to start before hitting external providers. await asyncio.sleep(3) @@ -314,7 +441,7 @@ async def refresh_macro_signal_snapshots_loop(): """Background task to refresh macro signal snapshots on a fixed interval.""" from market_intel import refresh_macro_signal_snapshot - refresh_interval = int(os.getenv("MACRO_SIGNAL_REFRESH_INTERVAL", "900")) + refresh_interval = _env_int("MACRO_SIGNAL_REFRESH_INTERVAL", 3600, minimum=300) await asyncio.sleep(6) @@ -337,7 +464,7 @@ async def refresh_etf_flow_snapshots_loop(): """Background task to refresh ETF flow snapshots on a fixed interval.""" from market_intel import refresh_etf_flow_snapshot - refresh_interval = int(os.getenv("ETF_FLOW_REFRESH_INTERVAL", "900")) + refresh_interval = _env_int("ETF_FLOW_REFRESH_INTERVAL", 3600, minimum=300) await asyncio.sleep(9) @@ -360,7 +487,7 @@ async def refresh_stock_analysis_snapshots_loop(): """Background task to refresh featured stock-analysis snapshots.""" from market_intel import refresh_stock_analysis_snapshots - refresh_interval = int(os.getenv("STOCK_ANALYSIS_REFRESH_INTERVAL", "1800")) + refresh_interval = _env_int("STOCK_ANALYSIS_REFRESH_INTERVAL", 7200, minimum=600) await asyncio.sleep(12) @@ -413,9 +540,9 @@ async def record_profit_history(): COALESCE( SUM( CASE - WHEN p.current_price IS NULL THEN 0 + WHEN p.current_price IS NULL THEN p.entry_price * ABS(p.quantity) WHEN p.side = 'long' THEN p.current_price * ABS(p.quantity) - ELSE p.entry_price * ABS(p.quantity) + ELSE (2 * p.entry_price - p.current_price) * ABS(p.quantity) END ), 0 @@ -470,7 +597,7 @@ async def record_profit_history(): print(f"[Profit History Error] {e}") # Record at the same interval as position refresh (controlled by POSITION_REFRESH_INTERVAL) - refresh_interval = int(os.getenv("POSITION_REFRESH_INTERVAL", "300")) + refresh_interval = _env_int("PROFIT_HISTORY_RECORD_INTERVAL", _env_int("POSITION_REFRESH_INTERVAL", 900, minimum=60), minimum=300) await asyncio.sleep(refresh_interval) @@ -493,9 +620,9 @@ async def settle_polymarket_positions(): while True: try: - interval_s = int(os.getenv("POLYMARKET_SETTLE_INTERVAL", "60")) + interval_s = _env_int("POLYMARKET_SETTLE_INTERVAL", 300, minimum=60) except Exception: - interval_s = 60 + interval_s = 300 try: _backfill_polymarket_position_metadata() @@ -584,3 +711,38 @@ async def settle_polymarket_positions(): print(f"[Polymarket Settler Error] {e}") await asyncio.sleep(interval_s) + + +BACKGROUND_TASK_REGISTRY = { + "prices": update_position_prices, + "profit_history": record_profit_history, + "polymarket_settlement": settle_polymarket_positions, + "market_news": refresh_market_news_snapshots_loop, + "macro_signals": refresh_macro_signal_snapshots_loop, + "etf_flows": refresh_etf_flow_snapshots_loop, + "stock_analysis": refresh_stock_analysis_snapshots_loop, +} + + +DEFAULT_BACKGROUND_TASKS = ",".join(BACKGROUND_TASK_REGISTRY.keys()) + + +def background_tasks_enabled_for_api() -> bool: + """API workers default to HTTP-only; run worker.py for background loops.""" + return _env_bool("AI_TRADER_API_BACKGROUND_TASKS", False) + + +def get_enabled_background_task_names() -> list[str]: + raw = os.getenv("AI_TRADER_BACKGROUND_TASKS", DEFAULT_BACKGROUND_TASKS) + names = [item.strip() for item in raw.split(",") if item.strip()] + return [name for name in names if name in BACKGROUND_TASK_REGISTRY] + + +def start_background_tasks(logger: Optional[Any] = None) -> list[asyncio.Task]: + started: list[asyncio.Task] = [] + for name in get_enabled_background_task_names(): + task_func = BACKGROUND_TASK_REGISTRY[name] + if logger: + logger.info("Starting background task: %s", name) + started.append(asyncio.create_task(task_func(), name=f"ai-trader:{name}")) + return started diff --git a/service/server/worker.py b/service/server/worker.py new file mode 100644 index 0000000..5aea48f --- /dev/null +++ b/service/server/worker.py @@ -0,0 +1,42 @@ +""" +Standalone background worker for AI-Trader. + +Run this separately from the FastAPI process so HTTP requests are not competing +with price refreshes, profit-history compaction, and market-intel snapshots. +""" + +import asyncio +import logging +import os + +from database import init_database, get_database_status +from tasks import DEFAULT_BACKGROUND_TASKS, _prune_profit_history, start_background_tasks + + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + + +async def main() -> None: + init_database() + logger.info("Worker database ready: %s", get_database_status()) + + if os.getenv("AI_TRADER_BACKGROUND_TASKS") is None: + os.environ["AI_TRADER_BACKGROUND_TASKS"] = DEFAULT_BACKGROUND_TASKS + + if os.getenv("PROFIT_HISTORY_PRUNE_ON_WORKER_START", "true").strip().lower() in {"1", "true", "yes", "on"}: + await asyncio.to_thread(_prune_profit_history) + + tasks = start_background_tasks(logger) + if not tasks: + logger.warning("No background tasks enabled; set AI_TRADER_BACKGROUND_TASKS to a comma-separated task list.") + return + + await asyncio.Event().wait() + + +if __name__ == "__main__": + asyncio.run(main())