From f62bd4b69c7a8d61e9ccb84e2c0983f74c19c338 Mon Sep 17 00:00:00 2001 From: tianyufan Date: Thu, 12 Mar 2026 15:10:58 +0800 Subject: [PATCH] Use Hyperliquid for crypto pricing; standardize timestamps to UTC. Replace Alpha Vantage crypto intraday lookups with Hyperliquid public info endpoints (L2 book + candle snapshots). Normalize server-generated timestamps and expiry handling to explicit UTC (Z) to avoid system-local timezone drift. Made-with: Cursor --- .env.example | 4 + service/server/config.py | 4 + service/server/price_fetcher.py | 219 +++++++++++++++++++++++--------- service/server/routes.py | 34 +++-- service/server/services.py | 6 +- service/server/tasks.py | 6 +- service/server/utils.py | 4 +- 7 files changed, 192 insertions(+), 85 deletions(-) diff --git a/.env.example b/.env.example index 2f16d55..858fcbf 100644 --- a/.env.example +++ b/.env.example @@ -15,6 +15,10 @@ ENVIRONMENT=development # Get your free API key at: https://www.alphavantage.co/support/#api-key ALPHA_VANTAGE_API_KEY=demo +# ==================== Hyperliquid (Crypto pricing) ==================== +# Public endpoint, no API key required +HYPERLIQUID_API_URL=https://api.hyperliquid.xyz/info + # ==================== CORS ==================== # Server IP or domain for CORS (comma-separated) CLAWTRADER_CORS_ORIGINS=http://localhost:3000,ttp://ai4trade.ai,https://ai4trade.ai diff --git a/service/server/config.py b/service/server/config.py index 696ec37..be4c7e3 100644 --- a/service/server/config.py +++ b/service/server/config.py @@ -21,6 +21,10 @@ DATABASE_URL = os.getenv("DATABASE_URL", "") # API Keys ALPHA_VANTAGE_API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY", "demo") +# Market data endpoints +# Hyperliquid public info endpoint (used for crypto quotes; no API key required) +HYPERLIQUID_API_URL = os.getenv("HYPERLIQUID_API_URL", "https://api.hyperliquid.xyz/info") + # CORS CORS_ORIGINS = os.getenv("CLAWTRADER_CORS_ORIGINS", "").split(",") if os.getenv("CLAWTRADER_CORS_ORIGINS") else ["http://localhost:3000"] diff --git a/service/server/price_fetcher.py b/service/server/price_fetcher.py index 6ad864c..a8bc26b 100644 --- a/service/server/price_fetcher.py +++ b/service/server/price_fetcher.py @@ -1,7 +1,8 @@ """ Stock Price Fetcher for Server -从 Alpha Vantage 获取价格 +US Stock: 从 Alpha Vantage 获取价格 +Crypto: 从 Hyperliquid 获取价格(停止使用 Alpha Vantage crypto 端点) """ import os @@ -13,11 +14,154 @@ from typing import Optional ALPHA_VANTAGE_API_KEY = os.environ.get("ALPHA_VANTAGE_API_KEY", "demo") BASE_URL = "https://www.alphavantage.co/query" +# Hyperliquid public info endpoint (no API key required for reads) +HYPERLIQUID_API_URL = os.environ.get("HYPERLIQUID_API_URL", "https://api.hyperliquid.xyz/info").strip() + # 时区常量 UTC = timezone.utc ET_OFFSET = timedelta(hours=-4) # EDT is UTC-4 ET_TZ = timezone(ET_OFFSET) +def _parse_executed_at_to_utc(executed_at: str) -> Optional[datetime]: + """ + Parse executed_at into an aware UTC datetime. + Accepts: + - 2026-03-07T14:30:00Z + - 2026-03-07T14:30:00+00:00 + - 2026-03-07T14:30:00 (treated as UTC) + """ + try: + cleaned = executed_at.strip() + if cleaned.endswith("Z"): + cleaned = cleaned.replace("Z", "+00:00") + dt = datetime.fromisoformat(cleaned) + if dt.tzinfo is None: + return dt.replace(tzinfo=UTC) + return dt.astimezone(UTC) + except Exception: + return None + + +def _normalize_hyperliquid_symbol(symbol: str) -> str: + """ + Best-effort normalization for Hyperliquid 'coin' identifiers. + Examples: + - 'btc' -> 'BTC' + - 'BTC-USD' -> 'BTC' + - 'BTC/USD' -> 'BTC' + - 'BTC-PERP' -> 'BTC' + - 'xyz:NVDA' -> 'xyz:NVDA' (keep dex-prefixed builder listings) + """ + raw = symbol.strip() + if ":" in raw: + return raw # builder/dex symbols are case sensitive upstream; keep as-is + + s = raw.upper() + for suffix in ("-PERP", "PERP"): + if s.endswith(suffix): + s = s[: -len(suffix)] + break + + for sep in ("-USD", "/USD"): + if s.endswith(sep): + s = s[: -len(sep)] + break + + return s.strip() + + +def _hyperliquid_post(payload: dict) -> object: + if not HYPERLIQUID_API_URL: + raise RuntimeError("HYPERLIQUID_API_URL is empty") + resp = requests.post(HYPERLIQUID_API_URL, json=payload, timeout=10) + resp.raise_for_status() + return resp.json() + + +def _get_hyperliquid_mid_price(symbol: str) -> Optional[float]: + """ + Fetch mid price from Hyperliquid L2 book. + This is used for 'now' style queries. + """ + coin = _normalize_hyperliquid_symbol(symbol) + data = _hyperliquid_post({"type": "l2Book", "coin": coin}) + if not isinstance(data, dict) or "levels" not in data: + return None + levels = data.get("levels") + if not isinstance(levels, list) or len(levels) < 2: + return None + bids = levels[0] if isinstance(levels[0], list) else [] + asks = levels[1] if isinstance(levels[1], list) else [] + best_bid = None + best_ask = None + if bids and isinstance(bids[0], dict) and "px" in bids[0]: + try: + best_bid = float(bids[0]["px"]) + except Exception: + best_bid = None + if asks and isinstance(asks[0], dict) and "px" in asks[0]: + try: + best_ask = float(asks[0]["px"]) + except Exception: + best_ask = None + if best_bid is None and best_ask is None: + return None + if best_bid is not None and best_ask is not None: + return float(f"{((best_bid + best_ask) / 2):.6f}") + return float(f"{(best_bid if best_bid is not None else best_ask):.6f}") + + +def _get_hyperliquid_candle_close(symbol: str, executed_at: str) -> Optional[float]: + """ + Fetch a 1m candle around executed_at via candleSnapshot and return the closest close. + This approximates "price at time" without requiring any private keys. + """ + dt = _parse_executed_at_to_utc(executed_at) + if not dt: + return None + + # Query a small window around the target time (±10 minutes) + target_ms = int(dt.timestamp() * 1000) + start_ms = target_ms - 10 * 60 * 1000 + end_ms = target_ms + 10 * 60 * 1000 + + coin = _normalize_hyperliquid_symbol(symbol) + payload = { + "type": "candleSnapshot", + "req": { + "coin": coin, + "interval": "1m", + "startTime": start_ms, + "endTime": end_ms, + }, + } + data = _hyperliquid_post(payload) + if not isinstance(data, list) or len(data) == 0: + return None + + closest = None + closest_diff = None + for candle in data: + if not isinstance(candle, dict): + continue + t = candle.get("t") + c = candle.get("c") + if t is None or c is None: + continue + try: + t_ms = int(float(t)) + close = float(c) + except Exception: + continue + diff = abs(target_ms - t_ms) + if closest_diff is None or diff < closest_diff: + closest_diff = diff + closest = close + + if closest is None: + return None + return float(f"{closest:.6f}") + def get_price_from_market(symbol: str, executed_at: str, market: str) -> Optional[float]: """ @@ -31,14 +175,15 @@ def get_price_from_market(symbol: str, executed_at: str, market: str) -> Optiona Returns: 查询到的价格,如果失败返回 None """ - if not ALPHA_VANTAGE_API_KEY or ALPHA_VANTAGE_API_KEY == "demo": - print("Warning: ALPHA_VANTAGE_API_KEY not set, using agent-provided price") - return None - try: if market == "crypto": - price = _get_crypto_price(symbol, executed_at) + # Crypto pricing now uses Hyperliquid public endpoints. + # Try historical candle (when executed_at is provided), then fall back to mid price. + price = _get_hyperliquid_candle_close(symbol, executed_at) or _get_hyperliquid_mid_price(symbol) else: + if not ALPHA_VANTAGE_API_KEY or ALPHA_VANTAGE_API_KEY == "demo": + print("Warning: ALPHA_VANTAGE_API_KEY not set, using agent-provided price") + return None price = _get_us_stock_price(symbol, executed_at) if price is None: @@ -121,60 +266,8 @@ def _get_us_stock_price(symbol: str, executed_at: str) -> Optional[float]: def _get_crypto_price(symbol: str, executed_at: str) -> Optional[float]: - """获取加密货币价格""" - # Alpha Vantage crypto API 返回 UTC 时间,executed_at 也应该是 UTC - try: - # 解析为 UTC 时间 (不是 ET) - dt_naive = datetime.fromisoformat(executed_at.replace('Z', '')) - dt_utc = dt_naive.replace(tzinfo=UTC) - except ValueError: - return None - - params = { - "function": "CRYPTO_INTRADAY", - "symbol": symbol, - "market": "USD", - "interval": "1min", - "outputsize": "compact", - "apikey": ALPHA_VANTAGE_API_KEY - } - - try: - response = requests.get(BASE_URL, params=params, timeout=10) - data = response.json() - - if "Error Message" in data: - print(f"[Price API] Error: {data.get('Error Message')}") - return None - if "Note" in data: - print(f"[Price API] Rate limit: {data.get('Note')}") - return None - - time_series_key = "Time Series Crypto (1min)" - if time_series_key not in data: - print(f"[Price API] No time series data for {symbol}") - return None - - time_series = data[time_series_key] - sorted_times = sorted(time_series.keys()) - - # 找到最接近的时间 - min_diff = float('inf') - closest_price = None - - for time_key in sorted_times: - values = time_series[time_key] - # Alpha Vantage 返回的也是 UTC 时间 - time_utc = datetime.strptime(time_key, "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC) - diff = abs((dt_utc.replace(tzinfo=None) - time_utc.replace(tzinfo=None)).total_seconds()) - if diff < min_diff: - min_diff = diff - closest_price = float(values.get("4. close", 0)) - - if closest_price: - print(f"[Price API] Found closest price for {symbol}: ${closest_price} ({int(min_diff)}s earlier)") - return closest_price - - except Exception as e: - print(f"[Price API] Exception while fetching crypto {symbol}: {e}") - return None + """ + Backwards-compat shim. + AI-Trader 已停止使用 Alpha Vantage 的 crypto 端点;此函数保留仅为避免旧代码引用时报错。 + """ + return _get_hyperliquid_candle_close(symbol, executed_at) or _get_hyperliquid_mid_price(symbol) diff --git a/service/server/routes.py b/service/server/routes.py index 66ad7dd..3f83147 100644 --- a/service/server/routes.py +++ b/service/server/routes.py @@ -21,13 +21,18 @@ PRICE_API_RATE_LIMIT = 1.0 # seconds between requests def check_price_api_rate_limit(agent_id: int) -> bool: """Check if agent can query price API. Returns True if allowed.""" global price_api_last_request - now = datetime.now().timestamp() + now = datetime.now(timezone.utc).timestamp() last = price_api_last_request.get(agent_id, 0) if now - last >= PRICE_API_RATE_LIMIT: price_api_last_request[agent_id] = now return True return False + +def _utc_now_iso_z() -> str: + """Return current time as ISO 8601 UTC with Z suffix.""" + return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + from config import CORS_ORIGINS, SIGNAL_PUBLISH_REWARD, SIGNAL_ADOPT_REWARD from database import get_db_connection from utils import hash_password, verify_password, generate_verification_code, cleanup_expired_tokens, validate_address, _extract_token @@ -205,7 +210,7 @@ def create_app() -> FastAPI: @app.get("/health") async def health_check(): - return {"status": "ok", "timestamp": datetime.now().isoformat()} + return {"status": "ok", "timestamp": _utc_now_iso_z()} # ==================== WebSocket Notifications ==================== @@ -425,7 +430,7 @@ def create_app() -> FastAPI: """, (token, agent_id)) # Create initial positions if provided - now = datetime.now().isoformat() + now = _utc_now_iso_z() if data.positions: for pos in data.positions: cursor.execute(""" @@ -535,7 +540,7 @@ def create_app() -> FastAPI: agent_id = agent["id"] agent_name = agent["name"] signal_id = _get_next_signal_id() - now = datetime.now().isoformat() + now = _utc_now_iso_z() # Store the actual action (buy/sell/short/cover) side = data.action @@ -694,7 +699,7 @@ def create_app() -> FastAPI: INSERT INTO signals (signal_id, agent_id, message_type, market, signal_type, symbol, side, entry_price, quantity, content, timestamp, created_at, executed_at) VALUES (?, ?, 'operation', ?, 'realtime', ?, ?, ?, ?, ?, ?, ?, ?) - """, (follower_signal_id, follower_id, data.market, data.symbol, side, price, data.quantity, copy_content, int(datetime.now().timestamp()), now, executed_at)) + """, (follower_signal_id, follower_id, data.market, data.symbol, side, price, data.quantity, copy_content, int(datetime.now(timezone.utc).timestamp()), now, executed_at)) # Deduct/add cash for follower (with fee) - in same transaction if side in ['buy', 'short']: @@ -757,7 +762,7 @@ def create_app() -> FastAPI: agent_id = agent["id"] agent_name = agent["name"] signal_id = _get_next_signal_id() - now = datetime.now().isoformat() + now = _utc_now_iso_z() conn = get_db_connection() cursor = conn.cursor() @@ -765,7 +770,7 @@ def create_app() -> FastAPI: INSERT INTO signals (signal_id, agent_id, message_type, market, signal_type, title, content, symbols, tags, timestamp, created_at) VALUES (?, ?, 'strategy', ?, 'strategy', ?, ?, ?, ?, ?, ?) - """, (signal_id, agent_id, data.market, data.title, data.content, data.symbols, data.tags, int(datetime.now().timestamp()), now)) + """, (signal_id, agent_id, data.market, data.title, data.content, data.symbols, data.tags, int(datetime.now(timezone.utc).timestamp()), now)) conn.commit() conn.close() @@ -784,7 +789,7 @@ def create_app() -> FastAPI: agent_id = agent["id"] signal_id = _get_next_signal_id() - now = datetime.now().isoformat() + now = _utc_now_iso_z() conn = get_db_connection() cursor = conn.cursor() @@ -792,7 +797,7 @@ def create_app() -> FastAPI: INSERT INTO signals (signal_id, agent_id, message_type, market, signal_type, symbol, title, content, timestamp, created_at) VALUES (?, ?, 'discussion', ?, 'discussion', ?, ?, ?, ?, ?) - """, (signal_id, agent_id, data.market, data.symbol, data.title, data.content, int(datetime.now().timestamp()), now)) + """, (signal_id, agent_id, data.market, data.symbol, data.title, data.content, int(datetime.now(timezone.utc).timestamp()), now)) conn.commit() conn.close() @@ -1274,7 +1279,8 @@ def create_app() -> FastAPI: if not check_price_api_rate_limit(agent["id"]): raise HTTPException(status_code=429, detail="Rate limit exceeded. Please wait 1 second between requests.") - now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + # Always use UTC timestamp to avoid server-local timezone drift + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") price = get_price_from_market(symbol.upper(), now, market) if price: @@ -1307,7 +1313,7 @@ def create_app() -> FastAPI: rows = cursor.fetchall() positions = [] - now_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + now_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") for row in rows: symbol = row["symbol"] @@ -1371,7 +1377,7 @@ def create_app() -> FastAPI: positions = [] total_pnl = 0 - now_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + now_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") for row in rows: symbol = row["symbol"] @@ -1501,7 +1507,7 @@ def create_app() -> FastAPI: # Store code (expires in 5 minutes) verification_codes[data.email] = { "code": code, - "expires_at": datetime.now() + timedelta(minutes=5) + "expires_at": datetime.now(timezone.utc) + timedelta(minutes=5) } # In production, send email here @@ -1517,7 +1523,7 @@ def create_app() -> FastAPI: raise HTTPException(status_code=400, detail="No code sent") stored = verification_codes[data.email] - if stored["expires_at"] < datetime.now(): + if stored["expires_at"] < datetime.now(timezone.utc): raise HTTPException(status_code=400, detail="Code expired") if stored["code"] != data.code: diff --git a/service/server/services.py b/service/server/services.py index cad8f47..38db2ae 100644 --- a/service/server/services.py +++ b/service/server/services.py @@ -5,7 +5,7 @@ Services Module """ import json -from datetime import datetime +from datetime import datetime, timezone from typing import Optional, Dict, Any, List from database import get_db_connection @@ -44,10 +44,10 @@ def _get_user_by_token(token: str) -> Optional[Dict]: def _create_user_session(user_id: int) -> str: """Create a new session for user.""" import secrets - from datetime import datetime, timedelta + from datetime import timedelta token = secrets.token_urlsafe(32) - expires_at = (datetime.now() + timedelta(days=7)).isoformat() + expires_at = (datetime.now(timezone.utc) + timedelta(days=7)).isoformat().replace("+00:00", "Z") conn = get_db_connection() cursor = conn.cursor() diff --git a/service/server/tasks.py b/service/server/tasks.py index 025b555..9eb6cd2 100644 --- a/service/server/tasks.py +++ b/service/server/tasks.py @@ -84,9 +84,9 @@ async def update_position_prices(): async with semaphore: # Run synchronous function in thread pool - # Use UTC time since Alpha Vantage returns UTC + # Use UTC time for consistent pricing timestamps now = datetime.now(timezone.utc) - executed_at = now.strftime("%Y-%m-%dT%H:%M:%S") + executed_at = now.strftime("%Y-%m-%dT%H:%M:%SZ") price = await asyncio.to_thread( get_price_from_market, symbol, executed_at, market ) @@ -187,7 +187,7 @@ async def record_profit_history(): print(f"[Profit History] Agent {agent_id}: cash={cash}, pos_value={position_value}, profit={profit}") # Record history - now = datetime.now(timezone.utc).isoformat() + now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") cursor.execute(""" INSERT INTO profit_history (agent_id, total_value, cash, position_value, profit, recorded_at) VALUES (?, ?, ?, ?, ?, ?) diff --git a/service/server/utils.py b/service/server/utils.py index 3c2539f..c25aefd 100644 --- a/service/server/utils.py +++ b/service/server/utils.py @@ -36,11 +36,11 @@ def generate_verification_code() -> str: def cleanup_expired_tokens(): """Clean up expired user tokens.""" from database import get_db_connection - from datetime import datetime + from datetime import datetime, timezone conn = get_db_connection() cursor = conn.cursor() - now = datetime.now().isoformat() + now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") cursor.execute("DELETE FROM user_tokens WHERE expires_at < ?", (now,)) deleted = cursor.rowcount