Harden crypto and polymarket trade processing (#161)

This commit is contained in:
Tianyu Fan 2026-03-20 14:33:11 +08:00 committed by GitHub
parent 923adfa3b0
commit b0806d3621
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 484 additions and 243 deletions

View file

@ -204,6 +204,8 @@ def init_database():
market TEXT NOT NULL, -- 'us-stock', 'a-stock', 'crypto', 'polymarket', etc.
signal_type TEXT, -- 'position', 'trade', 'realtime' (for operation type)
symbol TEXT,
token_id TEXT,
outcome TEXT,
symbols TEXT, -- JSON array for multiple symbols
side TEXT, -- 'long', 'short'
entry_price REAL,
@ -254,6 +256,8 @@ def init_database():
leader_id INTEGER, -- null if self-opened
symbol TEXT NOT NULL,
market TEXT NOT NULL DEFAULT 'us-stock',
token_id TEXT,
outcome TEXT,
side TEXT NOT NULL,
quantity REAL NOT NULL,
entry_price REAL NOT NULL,
@ -264,12 +268,51 @@ def init_database():
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS signal_sequence (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT DEFAULT (datetime('now'))
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS polymarket_settlements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
position_id INTEGER NOT NULL,
agent_id INTEGER NOT NULL,
symbol TEXT NOT NULL,
token_id TEXT NOT NULL,
outcome TEXT,
quantity REAL NOT NULL,
entry_price REAL NOT NULL,
settlement_price REAL NOT NULL,
proceeds REAL NOT NULL,
market_slug TEXT,
resolved_outcome TEXT,
resolved_at TEXT,
settled_at TEXT DEFAULT (datetime('now')),
source_data TEXT,
FOREIGN KEY (position_id) REFERENCES positions(id),
FOREIGN KEY (agent_id) REFERENCES agents(id)
)
""")
# Add market column if it doesn't exist (for existing databases)
try:
cursor.execute("ALTER TABLE positions ADD COLUMN market TEXT NOT NULL DEFAULT 'us-stock'")
except:
pass # Column already exists
try:
cursor.execute("ALTER TABLE positions ADD COLUMN token_id TEXT")
except:
pass
try:
cursor.execute("ALTER TABLE positions ADD COLUMN outcome TEXT")
except:
pass
# Add cash column if it doesn't exist (for existing databases)
try:
cursor.execute("ALTER TABLE agents ADD COLUMN cash REAL DEFAULT 100000.0")
@ -282,6 +325,16 @@ def init_database():
except:
pass # Column already exists
try:
cursor.execute("ALTER TABLE signals ADD COLUMN token_id TEXT")
except:
pass
try:
cursor.execute("ALTER TABLE signals ADD COLUMN outcome TEXT")
except:
pass
# Profit history table - tracks agent profit over time
cursor.execute("""
CREATE TABLE IF NOT EXISTS profit_history (
@ -314,6 +367,16 @@ def init_database():
CREATE INDEX IF NOT EXISTS idx_positions_agent ON positions(agent_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_positions_market_symbol
ON positions(market, symbol)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_positions_polymarket_token
ON positions(market, token_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_signals_agent ON signals(agent_id)
""")
@ -331,6 +394,16 @@ def init_database():
CREATE INDEX IF NOT EXISTS idx_signals_created_at ON signals(created_at)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_signals_polymarket_token
ON signals(market, token_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_polymarket_settlements_agent
ON polymarket_settlements(agent_id, settled_at DESC)
""")
conn.commit()
conn.close()
print("[INFO] Database initialized")

View file

@ -8,7 +8,7 @@ Crypto: 从 Hyperliquid 获取价格(停止使用 Alpha Vantage crypto 端点
import os
import requests
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Tuple
from typing import Optional, Dict, Tuple, Any
import re
import time
import json
@ -43,7 +43,7 @@ def _polymarket_price_valid(price: float) -> bool:
except (TypeError, ValueError):
return False
# In-memory cache for Polymarket reference -> (token_id, expiry_epoch_s)
# In-memory cache for Polymarket reference+outcome -> (token_id, expiry_epoch_s)
_polymarket_token_cache: Dict[str, Tuple[str, float]] = {}
_POLYMARKET_TOKEN_CACHE_TTL_S = 300.0
@ -108,38 +108,33 @@ def _polymarket_get_json(url: str, params: Optional[dict] = None) -> object:
return resp.json()
def _polymarket_resolve_token_id(reference: str) -> Optional[str]:
"""
Resolve a Polymarket reference into a concrete CLOB token_id.
def _parse_string_array(value: Any) -> list[str]:
if isinstance(value, list):
return [str(v).strip() for v in value if isinstance(v, (str, int)) and str(v).strip()]
if isinstance(value, str) and value.strip().startswith("["):
try:
parsed = json.loads(value)
if isinstance(parsed, list):
return [str(v).strip() for v in parsed if isinstance(v, (str, int)) and str(v).strip()]
except Exception:
return []
return []
Supported references (best-effort):
- tokenId: "123456" (preferred)
- conditionId: "0x..." (64-byte hex)
- slug: "will-btc-hit-100k-by-2026" (best-effort via Gamma /markets?slug=...)
Returns token_id string or None.
"""
ref = (reference or "").strip()
if not ref:
def _polymarket_fetch_market(reference: str) -> Optional[dict]:
if not POLYMARKET_GAMMA_BASE_URL:
return None
cached = _polymarket_token_cache.get(ref)
now = time.time()
if cached and cached[1] > now:
return cached[0]
# If already a token id, use directly.
if _POLYMARKET_TOKEN_ID_RE.match(ref):
_polymarket_token_cache[ref] = (ref, now + _POLYMARKET_TOKEN_CACHE_TTL_S)
return ref
if not POLYMARKET_GAMMA_BASE_URL:
ref = (reference or "").strip()
if not ref:
return None
url = f"{POLYMARKET_GAMMA_BASE_URL.rstrip('/')}/markets"
params = {"limit": "1"}
if _POLYMARKET_CONDITION_ID_RE.match(ref):
params["conditionId"] = ref
elif _POLYMARKET_TOKEN_ID_RE.match(ref):
params["clob_token_ids"] = ref
else:
params["slug"] = ref
@ -148,40 +143,82 @@ def _polymarket_resolve_token_id(reference: str) -> Optional[str]:
except Exception:
return None
if not isinstance(raw, list) or not raw:
if not isinstance(raw, list) or not raw or not isinstance(raw[0], dict):
return None
market = raw[0]
if not isinstance(market, dict):
return raw[0]
def _polymarket_extract_tokens(market: dict) -> list[dict[str, Optional[str]]]:
token_ids = _parse_string_array(market.get("clobTokenIds")) or _parse_string_array(market.get("clob_token_ids"))
outcomes = _parse_string_array(market.get("outcomes"))
extracted: list[dict[str, Optional[str]]] = []
for idx, token_id in enumerate(token_ids):
if token_id and _POLYMARKET_TOKEN_ID_RE.match(token_id):
extracted.append({
"token_id": token_id,
"outcome": outcomes[idx] if idx < len(outcomes) else None,
})
return extracted
def _polymarket_resolve_reference(reference: str, token_id: Optional[str] = None, outcome: Optional[str] = None) -> Optional[dict]:
"""
Resolve a Polymarket reference into an explicit outcome token.
For ambiguous references (slug/condition with multiple outcomes), caller must provide
either `token_id` or `outcome`.
"""
ref = (reference or "").strip()
if not ref:
return None
def _parse_string_array(value) -> list:
if isinstance(value, list):
return [str(v) for v in value if isinstance(v, (str, int)) and str(v).strip()]
if isinstance(value, str) and value.strip().startswith("["):
try:
parsed = json.loads(value)
if isinstance(parsed, list):
return [str(v) for v in parsed if isinstance(v, (str, int)) and str(v).strip()]
except Exception:
return []
return []
cache_key = f"{ref}::{(token_id or '').strip().lower()}::{(outcome or '').strip().lower()}"
cached = _polymarket_token_cache.get(cache_key)
now = time.time()
if cached and cached[1] > now:
return {
"token_id": cached[0],
"outcome": outcome,
"market": _polymarket_fetch_market(ref),
}
# Gamma sometimes exposes clobTokenIds as a JSON string; handle both.
token_ids = _parse_string_array(market.get("clobTokenIds"))
if not token_ids:
token_ids = _parse_string_array(market.get("clob_token_ids"))
token_id = None
if token_ids:
token_id = token_ids[0].strip()
market = _polymarket_fetch_market(ref)
if not market:
return None
if token_id and _POLYMARKET_TOKEN_ID_RE.match(token_id):
_polymarket_token_cache[ref] = (token_id, now + _POLYMARKET_TOKEN_CACHE_TTL_S)
return token_id
tokens = _polymarket_extract_tokens(market)
requested_token_id = (token_id or "").strip()
requested_outcome = (outcome or "").strip().lower()
return None
selected = None
if requested_token_id:
for candidate in tokens:
if candidate["token_id"] == requested_token_id:
selected = candidate
break
elif _POLYMARKET_TOKEN_ID_RE.match(ref):
selected = {"token_id": ref, "outcome": outcome}
elif requested_outcome:
for candidate in tokens:
if (candidate.get("outcome") or "").strip().lower() == requested_outcome:
selected = candidate
break
elif len(tokens) == 1:
selected = tokens[0]
if not selected or not selected.get("token_id"):
return None
resolved_token_id = str(selected["token_id"])
_polymarket_token_cache[cache_key] = (resolved_token_id, now + _POLYMARKET_TOKEN_CACHE_TTL_S)
return {
"token_id": resolved_token_id,
"outcome": selected.get("outcome"),
"market": market,
}
def _get_polymarket_mid_price(reference: str) -> Optional[float]:
def _get_polymarket_mid_price(reference: str, token_id: Optional[str] = None, outcome: Optional[str] = None) -> Optional[float]:
"""
Fetch a mid price for a Polymarket outcome token.
Price is derived from best bid/ask in the CLOB orderbook.
@ -189,14 +226,15 @@ def _get_polymarket_mid_price(reference: str) -> Optional[float]:
if not POLYMARKET_CLOB_BASE_URL:
return None
token_id = _polymarket_resolve_token_id(reference)
if not token_id:
contract = _polymarket_resolve_reference(reference, token_id=token_id, outcome=outcome)
if not contract:
return None
resolved_token_id = contract["token_id"]
url = f"{POLYMARKET_CLOB_BASE_URL.rstrip('/')}/book"
data = None
try:
data = _polymarket_get_json(url, params={"token_id": token_id})
data = _polymarket_get_json(url, params={"token_id": resolved_token_id})
except Exception:
data = None
@ -225,45 +263,30 @@ def _get_polymarket_mid_price(reference: str) -> Optional[float]:
return None
# Fallback: use Gamma market fields when CLOB orderbook is missing.
market_info = _polymarket_resolve(reference)
if not market_info:
market = contract.get("market")
if not isinstance(market, dict):
return None
# When unresolved, Gamma may still expose an indicative price; settlementPrice is only meaningful after resolve.
# We re-fetch the full market record to get lastTradePrice/outcomePrice/outcomePrices when available.
try:
url = f"{POLYMARKET_GAMMA_BASE_URL.rstrip('/')}/markets"
params = {"limit": "1"}
ref = reference.strip()
if _POLYMARKET_CONDITION_ID_RE.match(ref):
params["conditionId"] = ref
elif _POLYMARKET_TOKEN_ID_RE.match(ref):
params["clob_token_ids"] = ref
else:
params["slug"] = ref
raw = _polymarket_get_json(url, params=params)
if isinstance(raw, list) and raw and isinstance(raw[0], dict):
m = raw[0]
for key in ("lastTradePrice", "outcomePrice"):
v = m.get(key)
if isinstance(v, (int, float)):
outcome_prices = _parse_string_array(market.get("outcomePrices"))
outcomes = _parse_string_array(market.get("outcomes"))
target_outcome = (contract.get("outcome") or "").strip().lower()
if target_outcome and outcome_prices and outcomes:
for idx, label in enumerate(outcomes):
if label.strip().lower() == target_outcome and idx < len(outcome_prices):
p = float(f"{float(outcome_prices[idx]):.6f}")
if _polymarket_price_valid(p):
return p
for key in ("lastTradePrice", "outcomePrice"):
v = market.get(key)
if isinstance(v, (int, float)):
p = float(f"{float(v):.6f}")
if _polymarket_price_valid(p):
return p
if isinstance(v, str) and v.strip():
try:
p = float(f"{float(v):.6f}")
if _polymarket_price_valid(p):
return p
if isinstance(v, str) and v.strip():
try:
p = float(f"{float(v):.6f}")
if _polymarket_price_valid(p):
return p
except Exception:
pass
outcome_prices = m.get("outcomePrices")
if isinstance(outcome_prices, str) and outcome_prices.strip().startswith("["):
try:
parsed = json.loads(outcome_prices)
if isinstance(parsed, list) and parsed:
p = float(f"{float(parsed[0]):.6f}")
if _polymarket_price_valid(p):
return p
except Exception:
pass
except Exception:
@ -272,40 +295,20 @@ def _get_polymarket_mid_price(reference: str) -> Optional[float]:
return None
def _polymarket_resolve(reference: str) -> Optional[dict]:
def _polymarket_resolve(reference: str, token_id: Optional[str] = None, outcome: Optional[str] = None) -> Optional[dict]:
"""
Resolve a Polymarket market via Gamma.
Returns dict: { resolved: bool, outcome: Optional[str], settlementPrice: Optional[float] } or None.
"""
if not POLYMARKET_GAMMA_BASE_URL:
contract = _polymarket_resolve_reference(reference, token_id=token_id, outcome=outcome)
if not contract:
return None
ref = (reference or "").strip()
if not ref:
return None
url = f"{POLYMARKET_GAMMA_BASE_URL.rstrip('/')}/markets"
params = {"limit": "1"}
if _POLYMARKET_CONDITION_ID_RE.match(ref):
params["conditionId"] = ref
elif _POLYMARKET_TOKEN_ID_RE.match(ref):
params["clob_token_ids"] = ref
else:
params["slug"] = ref
try:
raw = _polymarket_get_json(url, params=params)
except Exception:
return None
if not isinstance(raw, list) or not raw:
return None
market = raw[0]
market = contract.get("market")
if not isinstance(market, dict):
return None
resolved_flag = bool(market.get("resolved"))
outcome = market.get("outcome") if isinstance(market.get("outcome"), str) else None
resolved_outcome = market.get("outcome") if isinstance(market.get("outcome"), str) else None
settlement_raw = market.get("settlementPrice")
settlement_price = None
if isinstance(settlement_raw, (int, float)):
@ -320,7 +323,10 @@ def _polymarket_resolve(reference: str) -> Optional[dict]:
return {
"resolved": resolved_flag,
"outcome": outcome,
"token_id": contract.get("token_id"),
"outcome": contract.get("outcome"),
"market_slug": market.get("slug"),
"resolved_outcome": resolved_outcome,
"settlementPrice": settlement_price,
}
@ -387,7 +393,7 @@ def _get_hyperliquid_candle_close(symbol: str, executed_at: str) -> Optional[flo
return None
closest = None
closest_diff = None
closest_ts = None
for candle in data:
if not isinstance(candle, dict):
continue
@ -400,9 +406,10 @@ def _get_hyperliquid_candle_close(symbol: str, executed_at: str) -> Optional[flo
close = float(c)
except Exception:
continue
diff = abs(target_ms - t_ms)
if closest_diff is None or diff < closest_diff:
closest_diff = diff
if t_ms > target_ms:
continue
if closest_ts is None or t_ms > closest_ts:
closest_ts = t_ms
closest = close
if closest is None:
@ -410,7 +417,13 @@ def _get_hyperliquid_candle_close(symbol: str, executed_at: str) -> Optional[flo
return float(f"{closest:.6f}")
def get_price_from_market(symbol: str, executed_at: str, market: str) -> Optional[float]:
def get_price_from_market(
symbol: str,
executed_at: str,
market: str,
token_id: Optional[str] = None,
outcome: Optional[str] = None,
) -> Optional[float]:
"""
根据市场获取价格
@ -430,7 +443,7 @@ def get_price_from_market(symbol: str, executed_at: str, market: str) -> Optiona
elif market == "polymarket":
# Polymarket pricing uses public Gamma + CLOB endpoints.
# We use the current orderbook mid price (paper trading).
price = _get_polymarket_mid_price(symbol)
price = _get_polymarket_mid_price(symbol, token_id=token_id, outcome=outcome)
else:
if not ALPHA_VANTAGE_API_KEY or ALPHA_VANTAGE_API_KEY == "demo":
print("Warning: ALPHA_VANTAGE_API_KEY not set, using agent-provided price")

View file

@ -111,7 +111,7 @@ def _enforce_content_rate_limit(agent_id: int, action: str, content: str, target
from config import CORS_ORIGINS, SIGNAL_PUBLISH_REWARD, SIGNAL_ADOPT_REWARD, DISCUSSION_PUBLISH_REWARD, REPLY_PUBLISH_REWARD
from database import get_db_connection
from utils import hash_password, verify_password, generate_verification_code, cleanup_expired_tokens, validate_address, _extract_token
from services import _get_agent_by_token, _get_user_by_token, _create_user_session, _add_agent_points, _get_agent_points, _get_next_signal_id, _update_position_from_signal, _broadcast_signal_to_followers
from services import _get_agent_by_token, _get_user_by_token, _create_user_session, _add_agent_points, _get_agent_points, _reserve_signal_id, _update_position_from_signal, _broadcast_signal_to_followers
from price_fetcher import get_price_from_market
from zoneinfo import ZoneInfo
@ -239,6 +239,8 @@ def create_app() -> FastAPI:
quantity: float
content: Optional[str] = None
executed_at: str
token_id: Optional[str] = None
outcome: Optional[str] = None
class StrategyRequest(BaseModel):
market: str
@ -807,11 +809,13 @@ def create_app() -> FastAPI:
agent_id = agent["id"]
agent_name = agent["name"]
signal_id = _get_next_signal_id()
now = _utc_now_iso_z()
# Store the actual action (buy/sell/short/cover)
side = data.action
action_lower = side.lower()
polymarket_token_id = None
polymarket_outcome = None
if data.market == "polymarket" and side.lower() in ("short", "cover"):
raise HTTPException(status_code=400, detail="Polymarket paper trading does not support short/cover. Use buy/sell of outcome tokens instead.")
@ -828,6 +832,19 @@ def create_app() -> FastAPI:
if qty > 1_000_000:
raise HTTPException(status_code=400, detail="Quantity too large")
if data.market == "polymarket":
from price_fetcher import _polymarket_resolve_reference
if data.executed_at.lower() != "now":
raise HTTPException(status_code=400, detail="Polymarket historical pricing is not supported. Use executed_at='now'.")
contract = _polymarket_resolve_reference(data.symbol, token_id=data.token_id, outcome=data.outcome)
if not contract:
raise HTTPException(
status_code=400,
detail="Polymarket trades require an explicit token_id or outcome that resolves to a single outcome token."
)
polymarket_token_id = contract["token_id"]
polymarket_outcome = contract.get("outcome")
# Handle "now" - use current UTC time
if data.executed_at.lower() == "now":
# Use current UTC time
@ -851,7 +868,13 @@ def create_app() -> FastAPI:
)
# Fetch current price from API (will handle timezone conversion internally)
actual_price = get_price_from_market(data.symbol, executed_at, data.market)
actual_price = get_price_from_market(
data.symbol,
executed_at,
data.market,
token_id=polymarket_token_id,
outcome=polymarket_outcome,
)
if actual_price:
price = actual_price
print(f"[Trade] Fetched price: {data.symbol} @ {executed_at} = ${price}")
@ -873,7 +896,13 @@ def create_app() -> FastAPI:
# IMPORTANT: For historical trades, always fetch price from backend
# to avoid trusting client-supplied prices (e.g. BTC @ 31.5).
actual_price = get_price_from_market(data.symbol, executed_at, data.market)
actual_price = get_price_from_market(
data.symbol,
executed_at,
data.market,
token_id=polymarket_token_id,
outcome=polymarket_outcome,
)
if actual_price:
price = actual_price
print(f"[Trade] Fetched historical price: {data.symbol} @ {executed_at} = ${price}")
@ -897,81 +926,102 @@ def create_app() -> FastAPI:
timestamp = int(datetime.fromisoformat(executed_at.replace('Z', '+00:00')).timestamp())
# Position sanity checks for sell/cover (must have sufficient position)
action_lower = side.lower()
if action_lower in ("sell", "cover"):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT quantity FROM positions WHERE agent_id = ? AND symbol = ? AND market = ?",
(agent_id, data.symbol, data.market)
)
pos = cursor.fetchone()
conn.close()
current_qty = float(pos["quantity"]) if pos else 0.0
if action_lower == "sell":
if current_qty <= 0:
raise HTTPException(status_code=400, detail="No long position to sell")
if qty > current_qty + 1e-12:
raise HTTPException(status_code=400, detail="Insufficient long position quantity")
else: # cover
if current_qty >= 0:
raise HTTPException(status_code=400, detail="No short position to cover")
if qty > abs(current_qty) + 1e-12:
raise HTTPException(status_code=400, detail="Insufficient short position quantity")
# Prevent extreme trade value that can corrupt balances
trade_value_guard = price * qty
if not math.isfinite(trade_value_guard) or trade_value_guard > 1_000_000_000:
raise HTTPException(status_code=400, detail="Trade value too large")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO signals
(signal_id, agent_id, message_type, market, signal_type, symbol, side, entry_price, quantity, content, timestamp, created_at, executed_at)
VALUES (?, ?, 'operation', ?, 'realtime', ?, ?, ?, ?, ?, ?, ?, ?)
""", (signal_id, agent_id, data.market, data.symbol, side, price, data.quantity, data.content, timestamp, now, executed_at))
conn.commit()
conn.close()
# Update position
_update_position_from_signal(agent_id, data.symbol, data.market, side, qty, price, executed_at)
# Update cash balance
signal_id = None
from fees import TRADE_FEE_RATE
trade_value = price * qty
fee = trade_value * TRADE_FEE_RATE
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("BEGIN IMMEDIATE")
signal_id = _reserve_signal_id(cursor)
# Buy/Short: deduct cash + fee; Sell/Cover: add cash - fee
if side in ['buy', 'short']:
total_deduction = trade_value + fee
# Check if agent has enough cash
cursor.execute("SELECT cash FROM agents WHERE id = ?", (agent_id,))
row = cursor.fetchone()
current_cash = row["cash"] if row else 0
if action_lower in ("sell", "cover"):
if data.market == "polymarket":
cursor.execute(
"SELECT quantity FROM positions WHERE agent_id = ? AND market = ? AND token_id = ?",
(agent_id, data.market, polymarket_token_id)
)
else:
cursor.execute(
"SELECT quantity FROM positions WHERE agent_id = ? AND symbol = ? AND market = ?",
(agent_id, data.symbol, data.market)
)
pos = cursor.fetchone()
current_qty = float(pos["quantity"]) if pos else 0.0
if action_lower == "sell":
if current_qty <= 0:
raise HTTPException(status_code=400, detail="No long position to sell")
if qty > current_qty + 1e-12:
raise HTTPException(status_code=400, detail="Insufficient long position quantity")
else:
if current_qty >= 0:
raise HTTPException(status_code=400, detail="No short position to cover")
if qty > abs(current_qty) + 1e-12:
raise HTTPException(status_code=400, detail="Insufficient short position quantity")
if current_cash < total_deduction:
conn.close()
raise HTTPException(
status_code=400,
detail=f"Insufficient cash. Required: ${total_deduction:.2f} (trade: ${trade_value:.2f} + fee: ${fee:.2f}), Available: ${current_cash:.2f}"
)
if action_lower in ["buy", "short"]:
total_deduction = trade_value + fee
cursor.execute("SELECT cash FROM agents WHERE id = ?", (agent_id,))
row = cursor.fetchone()
current_cash = row["cash"] if row else 0
if current_cash < total_deduction:
raise HTTPException(
status_code=400,
detail=f"Insufficient cash. Required: ${total_deduction:.2f} (trade: ${trade_value:.2f} + fee: ${fee:.2f}), Available: ${current_cash:.2f}"
)
cursor.execute("""
UPDATE agents SET cash = cash - ? WHERE id = ?
""", (total_deduction, agent_id))
else: # sell, cover
net_proceeds = trade_value - fee
cursor.execute("""
UPDATE agents SET cash = cash + ? WHERE id = ?
""", (net_proceeds, agent_id))
INSERT INTO signals
(signal_id, agent_id, message_type, market, signal_type, symbol, token_id, outcome, side, entry_price, quantity, content, timestamp, created_at, executed_at)
VALUES (?, ?, 'operation', ?, 'realtime', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
signal_id,
agent_id,
data.market,
data.symbol,
polymarket_token_id,
polymarket_outcome,
side,
price,
qty,
data.content,
timestamp,
now,
executed_at,
))
conn.commit()
_update_position_from_signal(
agent_id,
data.symbol,
data.market,
side,
qty,
price,
executed_at,
cursor=cursor,
token_id=polymarket_token_id,
outcome=polymarket_outcome,
)
if action_lower in ['buy', 'short']:
cursor.execute("UPDATE agents SET cash = cash - ? WHERE id = ?", (trade_value + fee, agent_id))
else:
cursor.execute("UPDATE agents SET cash = cash + ? WHERE id = ?", (trade_value - fee, agent_id))
conn.commit()
except HTTPException:
conn.rollback()
conn.close()
raise
except Exception as e:
conn.rollback()
conn.close()
raise HTTPException(status_code=500, detail=f"Failed to record trade: {e}")
conn.close()
# Award points
@ -983,6 +1033,7 @@ def create_app() -> FastAPI:
# Get all followers of this agent
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("BEGIN IMMEDIATE")
cursor.execute("""
SELECT follower_id FROM subscriptions
WHERE leader_id = ? AND status = 'active'
@ -998,7 +1049,7 @@ def create_app() -> FastAPI:
cursor.execute("SAVEPOINT follower_{}".format(follower_id))
# Check cash first before doing anything
if side in ['buy', 'short']:
if action_lower in ['buy', 'short']:
follower_fee = trade_value * TRADE_FEE_RATE
follower_total = trade_value + follower_fee
@ -1018,26 +1069,42 @@ def create_app() -> FastAPI:
data.symbol,
data.market,
side,
data.quantity,
qty,
price,
executed_at,
leader_id=agent_id,
cursor=cursor
cursor=cursor,
token_id=polymarket_token_id,
outcome=polymarket_outcome,
)
# Create signal record for follower (to show in their feed)
follower_signal_id = _get_next_signal_id()
follower_signal_id = _reserve_signal_id(cursor)
# Content indicates this is a copied signal
leader_name = agent['name'] if isinstance(agent, dict) else 'Leader'
copy_content = f"[Copied from {leader_name}] {data.content or ''}"
cursor.execute("""
INSERT INTO signals
(signal_id, agent_id, message_type, market, signal_type, symbol, side, entry_price, quantity, content, timestamp, created_at, executed_at)
VALUES (?, ?, 'operation', ?, 'realtime', ?, ?, ?, ?, ?, ?, ?, ?)
""", (follower_signal_id, follower_id, data.market, data.symbol, side, price, data.quantity, copy_content, int(datetime.now(timezone.utc).timestamp()), now, executed_at))
(signal_id, agent_id, message_type, market, signal_type, symbol, token_id, outcome, side, entry_price, quantity, content, timestamp, created_at, executed_at)
VALUES (?, ?, 'operation', ?, 'realtime', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
follower_signal_id,
follower_id,
data.market,
data.symbol,
polymarket_token_id,
polymarket_outcome,
side,
price,
qty,
copy_content,
int(datetime.now(timezone.utc).timestamp()),
now,
executed_at,
))
# Deduct/add cash for follower (with fee) - in same transaction
if side in ['buy', 'short']:
if action_lower in ['buy', 'short']:
follower_fee = trade_value * TRADE_FEE_RATE
follower_total = trade_value + follower_fee
@ -1066,6 +1133,7 @@ def create_app() -> FastAPI:
except:
pass
conn.commit()
conn.close()
print(f"[Copy Trade] Copied signal to {follower_count} followers")
except Exception as e:
@ -1083,7 +1151,9 @@ def create_app() -> FastAPI:
"market": data.market,
"price": price,
"follower_count": follower_count,
"points_earned": SIGNAL_PUBLISH_REWARD
"points_earned": SIGNAL_PUBLISH_REWARD,
"token_id": polymarket_token_id,
"outcome": polymarket_outcome,
}
@app.post("/api/signals/strategy")
@ -1096,7 +1166,7 @@ def create_app() -> FastAPI:
agent_id = agent["id"]
agent_name = agent["name"]
signal_id = _get_next_signal_id()
signal_id = _reserve_signal_id()
now = _utc_now_iso_z()
conn = get_db_connection()
@ -1139,7 +1209,7 @@ def create_app() -> FastAPI:
agent_id = agent["id"]
agent_name = agent["name"]
signal_id = _get_next_signal_id()
signal_id = _reserve_signal_id()
now = _utc_now_iso_z()
conn = get_db_connection()
@ -1218,7 +1288,7 @@ def create_app() -> FastAPI:
# Get position summary
cursor.execute("""
SELECT symbol, market, side, quantity, entry_price, current_price
SELECT symbol, market, token_id, outcome, side, quantity, entry_price, current_price
FROM positions WHERE agent_id = ?
""", (agent_id,))
position_rows = cursor.fetchall()
@ -1238,6 +1308,8 @@ def create_app() -> FastAPI:
position_summary.append({
"symbol": pos_row["symbol"],
"market": pos_row["market"],
"token_id": pos_row["token_id"],
"outcome": pos_row["outcome"],
"side": pos_row["side"],
"quantity": pos_row["quantity"],
"current_price": current_price,
@ -1650,7 +1722,7 @@ def create_app() -> FastAPI:
# Get all positions for this agent
cursor.execute("""
SELECT symbol, market, side, quantity, entry_price, current_price
SELECT symbol, market, token_id, outcome, side, quantity, entry_price, current_price
FROM positions WHERE agent_id = ?
""", (agent_id,))
positions = cursor.fetchall()
@ -1694,9 +1766,9 @@ def create_app() -> FastAPI:
# Get symbols ranked by holder count with current prices
cursor.execute("""
SELECT symbol, market, COUNT(DISTINCT agent_id) as holder_count
SELECT symbol, market, token_id, outcome, COUNT(DISTINCT agent_id) as holder_count
FROM positions
GROUP BY symbol, market
GROUP BY symbol, market, token_id, outcome
ORDER BY holder_count DESC
LIMIT ?
""", (limit,))
@ -1707,14 +1779,16 @@ def create_app() -> FastAPI:
# Get current price from positions table
cursor.execute("""
SELECT current_price FROM positions
WHERE symbol = ? AND market = ?
WHERE symbol = ? AND market = ? AND COALESCE(token_id, '') = COALESCE(?, '')
LIMIT 1
""", (row["symbol"], row["market"]))
""", (row["symbol"], row["market"], row["token_id"]))
price_row = cursor.fetchone()
result.append({
"symbol": row["symbol"],
"market": row["market"],
"token_id": row["token_id"],
"outcome": row["outcome"],
"holder_count": row["holder_count"],
"current_price": price_row["current_price"] if price_row else None
})
@ -1729,6 +1803,8 @@ def create_app() -> FastAPI:
async def get_price(
symbol: str,
market: str = "us-stock",
token_id: Optional[str] = None,
outcome: Optional[str] = None,
authorization: str = Header(None)
):
"""Get current price for a symbol."""
@ -1748,10 +1824,11 @@ def create_app() -> FastAPI:
# Always use UTC timestamp to avoid server-local timezone drift
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
price = get_price_from_market(symbol.upper(), now, market)
normalized_symbol = symbol.upper() if market == "us-stock" else symbol
price = get_price_from_market(normalized_symbol, now, market, token_id=token_id, outcome=outcome)
if price:
return {"symbol": symbol.upper(), "market": market, "price": price}
return {"symbol": normalized_symbol, "market": market, "token_id": token_id, "outcome": outcome, "price": price}
else:
raise HTTPException(status_code=404, detail="Price not available")
@ -1788,7 +1865,13 @@ def create_app() -> FastAPI:
current_price = row["current_price"]
if not current_price:
current_price = get_price_from_market(symbol, now_str, market)
current_price = get_price_from_market(
symbol,
now_str,
market,
token_id=row["token_id"],
outcome=row["outcome"],
)
if current_price:
cursor.execute("UPDATE positions SET current_price = ? WHERE id = ?",
(current_price, row["id"]))
@ -1805,6 +1888,9 @@ def create_app() -> FastAPI:
positions.append({
"id": row["id"],
"symbol": row["symbol"],
"market": row["market"],
"token_id": row["token_id"],
"outcome": row["outcome"],
"side": row["side"],
"quantity": row["quantity"],
"entry_price": row["entry_price"],
@ -1833,7 +1919,7 @@ def create_app() -> FastAPI:
agent_cash = agent_row["cash"] if agent_row else 0
cursor.execute("""
SELECT symbol, market, side, quantity, entry_price, current_price
SELECT symbol, market, token_id, outcome, side, quantity, entry_price, current_price
FROM positions
WHERE agent_id = ?
ORDER BY opened_at DESC
@ -1852,7 +1938,13 @@ def create_app() -> FastAPI:
current_price = row["current_price"]
if not current_price:
current_price = get_price_from_market(symbol, now_str, market)
current_price = get_price_from_market(
symbol,
now_str,
market,
token_id=row["token_id"],
outcome=row["outcome"],
)
pnl = None
if current_price and row["entry_price"]:
@ -1867,6 +1959,8 @@ def create_app() -> FastAPI:
positions.append({
"symbol": symbol,
"market": market,
"token_id": row["token_id"],
"outcome": row["outcome"],
"side": row["side"],
"quantity": row["quantity"],
"entry_price": row["entry_price"],

View file

@ -99,19 +99,39 @@ def _get_agent_points(agent_id: int) -> int:
return row["points"] if row else 0
def _get_next_signal_id() -> int:
"""Get next signal ID."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT MAX(signal_id) as max_id FROM signals")
row = cursor.fetchone()
conn.close()
return (row["max_id"] or 0) + 1
def _reserve_signal_id(cursor=None) -> int:
"""Reserve a unique signal ID using an autoincrement sequence table."""
own_connection = False
if cursor is None:
conn = get_db_connection()
cursor = conn.cursor()
own_connection = True
cursor.execute("INSERT INTO signal_sequence DEFAULT VALUES")
signal_id = cursor.lastrowid
if own_connection:
conn.commit()
conn.close()
return signal_id
# ==================== Position Services ====================
def _update_position_from_signal(agent_id: int, symbol: str, market: str, action: str, quantity: float, price: float, executed_at: str, leader_id: int = None, cursor=None):
def _update_position_from_signal(
agent_id: int,
symbol: str,
market: str,
action: str,
quantity: float,
price: float,
executed_at: str,
leader_id: int = None,
cursor=None,
token_id: Optional[str] = None,
outcome: Optional[str] = None,
):
"""
Update position based on trading signal.
- buy: increase long position
@ -129,11 +149,21 @@ def _update_position_from_signal(agent_id: int, symbol: str, market: str, action
own_connection = True
# Get current position for this symbol
cursor.execute("""
query = """
SELECT id, quantity, entry_price
FROM positions
WHERE agent_id = ? AND symbol = ? AND market = ?
""", (agent_id, symbol, market))
WHERE agent_id = ? AND market = ?
"""
params = [agent_id, market]
if market == "polymarket":
if not token_id:
raise ValueError("Polymarket trades require token_id")
query += " AND token_id = ?"
params.append(token_id)
else:
query += " AND symbol = ?"
params.append(symbol)
cursor.execute(query, params)
row = cursor.fetchone()
current_qty = row["quantity"] if row else 0
@ -164,15 +194,15 @@ def _update_position_from_signal(agent_id: int, symbol: str, market: str, action
# Create new long position
if leader_id:
cursor.execute("""
INSERT INTO positions (agent_id, symbol, market, side, quantity, entry_price, opened_at, leader_id)
VALUES (?, ?, ?, 'long', ?, ?, ?, ?)
""", (agent_id, symbol, market, quantity, price, executed_at, leader_id))
INSERT INTO positions (agent_id, symbol, market, token_id, outcome, side, quantity, entry_price, opened_at, leader_id)
VALUES (?, ?, ?, ?, ?, 'long', ?, ?, ?, ?)
""", (agent_id, symbol, market, token_id, outcome, quantity, price, executed_at, leader_id))
print(f"[Position] {symbol}: created copied long position {quantity} from leader {leader_id}")
else:
cursor.execute("""
INSERT INTO positions (agent_id, symbol, market, side, quantity, entry_price, opened_at)
VALUES (?, ?, ?, 'long', ?, ?, ?)
""", (agent_id, symbol, market, quantity, price, executed_at))
INSERT INTO positions (agent_id, symbol, market, token_id, outcome, side, quantity, entry_price, opened_at)
VALUES (?, ?, ?, ?, ?, 'long', ?, ?, ?)
""", (agent_id, symbol, market, token_id, outcome, quantity, price, executed_at))
print(f"[Position] {symbol}: created long position {quantity}")
elif action_lower == "sell":
@ -207,15 +237,15 @@ def _update_position_from_signal(agent_id: int, symbol: str, market: str, action
# Create new short position (negative quantity for short)
if leader_id:
cursor.execute("""
INSERT INTO positions (agent_id, symbol, market, side, quantity, entry_price, opened_at, leader_id)
VALUES (?, ?, ?, 'short', ?, ?, ?, ?)
""", (agent_id, symbol, market, -quantity, price, executed_at, leader_id))
INSERT INTO positions (agent_id, symbol, market, token_id, outcome, side, quantity, entry_price, opened_at, leader_id)
VALUES (?, ?, ?, ?, ?, 'short', ?, ?, ?, ?)
""", (agent_id, symbol, market, token_id, outcome, -quantity, price, executed_at, leader_id))
print(f"[Position] {symbol}: created copied short position {quantity} from leader {leader_id}")
else:
cursor.execute("""
INSERT INTO positions (agent_id, symbol, market, side, quantity, entry_price, opened_at)
VALUES (?, ?, ?, 'short', ?, ?, ?)
""", (agent_id, symbol, market, -quantity, price, executed_at))
INSERT INTO positions (agent_id, symbol, market, token_id, outcome, side, quantity, entry_price, opened_at)
VALUES (?, ?, ?, ?, ?, 'short', ?, ?, ?)
""", (agent_id, symbol, market, token_id, outcome, -quantity, price, executed_at))
print(f"[Position] {symbol}: created short position {quantity}")
elif action_lower == "cover":

View file

@ -5,6 +5,7 @@ Tasks Module
"""
import asyncio
import json
import os
from datetime import datetime, timezone
from typing import Optional, Dict, Any
@ -22,9 +23,9 @@ def _update_trending_cache():
# Get symbols ranked by holder count with current prices
cursor.execute("""
SELECT symbol, market, COUNT(DISTINCT agent_id) as holder_count
SELECT symbol, market, token_id, outcome, COUNT(DISTINCT agent_id) as holder_count
FROM positions
GROUP BY symbol, market
GROUP BY symbol, market, token_id, outcome
ORDER BY holder_count DESC
LIMIT 20
""")
@ -35,14 +36,16 @@ def _update_trending_cache():
# Get current price from positions table
cursor.execute("""
SELECT current_price FROM positions
WHERE symbol = ? AND market = ?
WHERE symbol = ? AND market = ? AND COALESCE(token_id, '') = COALESCE(?, '')
LIMIT 1
""", (row["symbol"], row["market"]))
""", (row["symbol"], row["market"], row["token_id"]))
price_row = cursor.fetchone()
trending_cache.append({
"symbol": row["symbol"],
"market": row["market"],
"token_id": row["token_id"],
"outcome": row["outcome"],
"holder_count": row["holder_count"],
"current_price": price_row["current_price"] if price_row else None
})
@ -68,7 +71,7 @@ async def update_position_prices():
# Get all unique positions with symbol and market
cursor.execute("""
SELECT DISTINCT symbol, market
SELECT DISTINCT symbol, market, token_id, outcome
FROM positions
""")
unique_positions = cursor.fetchall()
@ -81,6 +84,8 @@ async def update_position_prices():
async def fetch_and_update(row):
symbol = row["symbol"]
market = row["market"]
token_id = row["token_id"]
outcome = row["outcome"]
async with semaphore:
# Run synchronous function in thread pool
@ -88,7 +93,7 @@ async def update_position_prices():
now = datetime.now(timezone.utc)
executed_at = now.strftime("%Y-%m-%dT%H:%M:%SZ")
price = await asyncio.to_thread(
get_price_from_market, symbol, executed_at, market
get_price_from_market, symbol, executed_at, market, token_id, outcome
)
if price:
@ -98,13 +103,13 @@ async def update_position_prices():
cursor2.execute("""
UPDATE positions
SET current_price = ?
WHERE symbol = ? AND market = ?
""", (price, symbol, market))
WHERE symbol = ? AND market = ? AND COALESCE(token_id, '') = COALESCE(?, '')
""", (price, symbol, market, token_id))
conn2.commit()
conn2.close()
print(f"[Price Update] {symbol} ({market}): ${price}")
print(f"[Price Update] {symbol} ({market}, token={token_id or '-'}): ${price}")
else:
print(f"[Price Update] Failed to get price for {symbol} ({market})")
print(f"[Price Update] Failed to get price for {symbol} ({market}, token={token_id or '-'})")
return price
@ -219,9 +224,10 @@ async def settle_polymarket_positions():
Background task to auto-settle resolved Polymarket positions.
When a Polymarket market resolves, Gamma exposes `resolved` and `settlementPrice`.
We treat the held outcome token as spot-like inventory:
We treat each held outcome token as explicit spot-like inventory:
- proceeds = quantity * settlementPrice
- credit proceeds to agent cash
- record an immutable settlement ledger entry
- delete the position
"""
from database import get_db_connection
@ -240,7 +246,7 @@ async def settle_polymarket_positions():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id, agent_id, symbol, quantity, entry_price
SELECT id, agent_id, symbol, token_id, outcome, quantity, entry_price
FROM positions
WHERE market = 'polymarket'
""")
@ -253,9 +259,15 @@ async def settle_polymarket_positions():
pos_id = row["id"]
agent_id = row["agent_id"]
symbol = row["symbol"]
token_id = row["token_id"]
outcome = row["outcome"]
qty = row["quantity"] or 0
resolution = _polymarket_resolve(symbol)
if not token_id:
skipped += 1
continue
resolution = _polymarket_resolve(symbol, token_id=token_id, outcome=outcome)
if not resolution or not resolution.get("resolved"):
skipped += 1
continue
@ -269,6 +281,25 @@ async def settle_polymarket_positions():
# Apply settlement atomically
cursor.execute("UPDATE agents SET cash = cash + ? WHERE id = ?", (proceeds, agent_id))
cursor.execute("""
INSERT INTO polymarket_settlements
(position_id, agent_id, symbol, token_id, outcome, quantity, entry_price, settlement_price, proceeds, market_slug, resolved_outcome, resolved_at, source_data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
pos_id,
agent_id,
symbol,
token_id,
outcome,
qty,
row["entry_price"],
settlement_price,
proceeds,
resolution.get("market_slug"),
resolution.get("resolved_outcome"),
datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
json.dumps(resolution),
))
cursor.execute("DELETE FROM positions WHERE id = ?", (pos_id,))
settled += 1