币安交易所合约量化交易思路
·39 分钟阅读·7757 字··作者:xinglei.wang
什么是合约量化交易
合约量化交易是指使用程序化方式,基于预设的数学模型和交易规则,在加密货币合约市场自动执行交易的方法。与手动交易相比,量化交易能够消除情绪影响、24小时不间断运行、快速响应市场变化。
币安永续合约特点
| 特性 | 说明 |
|---|---|
| 合约类型 | USDT 本位永续合约、币本位永续合约 |
| 杠杆倍数 | 1-125倍可调 |
| 交易时间 | 24/7 全天候 |
| 资金费率 | 每 8 小时结算一次 |
| 强平机制 | 逐仓/全仓模式 |
| API 限制 | 请求频率限制、订单数量限制 |
量化 vs 手动交易
手动交易的问题:
1. 情绪影响
- 恐惧: 该入场时不敢入场
- 贪婪: 该止盈时不舍得平仓
- FOMO: 追涨杀跌
2. 执行效率
- 反应速度慢
- 无法 24 小时盯盘
- 多币种难以兼顾
3. 一致性
- 同样的信号,不同的执行
- 容易违反交易纪律
量化交易的优势:
1. 纪律性
- 严格按规则执行
- 无情绪干扰
2. 效率
- 毫秒级响应
- 多币种并行监控
- 24/7 运行
3. 可回测
- 历史数据验证
- 策略优化迭代
量化交易系统架构
整体架构
flowchart TB
classDef data fill:#2b2d42,stroke:#8d99ae,stroke-width:2px,color:#edf2f4
classDef strategy fill:#7209b7,stroke:#3a0ca3,stroke-width:2px,color:#ffffff
classDef execution fill:#2ec4b6,stroke:#011627,stroke-width:2px,color:#ffffff
classDef risk fill:#e71d36,stroke:#011627,stroke-width:2px,color:#ffffff
classDef monitor fill:#fca311,stroke:#e85d04,stroke-width:2px,color:#ffffff
subgraph DataLayer ["数据层"]
MD["行情数据<br/>K线/Tick/深度"]:::data
FD["资金费率"]:::data
OI["持仓量数据"]:::data
News["消息/情绪"]:::data
end
subgraph StrategyLayer ["策略层"]
Signal["信号生成"]:::strategy
Filter["信号过滤"]:::strategy
Position["仓位计算"]:::strategy
end
subgraph ExecutionLayer ["执行层"]
Order["订单管理"]:::execution
Split["拆单算法"]:::execution
API["API 接口"]:::execution
end
subgraph RiskLayer ["风控层"]
StopLoss["止损控制"]:::risk
Exposure["敞口控制"]:::risk
Drawdown["回撤控制"]:::risk
end
subgraph MonitorLayer ["监控层"]
Log["日志系统"]:::monitor
Alert["告警系统"]:::monitor
Dashboard["仪表盘"]:::monitor
end
MD --> Signal
FD --> Signal
OI --> Signal
News --> Filter
Signal --> Filter
Filter --> Position
Position --> Order
Order --> Split
Split --> API
API --> StopLoss
StopLoss --> Exposure
Exposure --> Drawdown
API --> Log
Drawdown --> Alert
Log --> Dashboard
linkStyle default stroke:#8d99ae,stroke-width:2px,fill:none
核心模块
"""
量化交易系统核心模块
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional
from enum import Enum
import asyncio
class Side(Enum):
LONG = "LONG"
SHORT = "SHORT"
class OrderType(Enum):
MARKET = "MARKET"
LIMIT = "LIMIT"
STOP_MARKET = "STOP_MARKET"
TAKE_PROFIT_MARKET = "TAKE_PROFIT_MARKET"
@dataclass
class Signal:
"""交易信号"""
symbol: str
side: Side
strength: float # 信号强度 0-1
price: float
timestamp: int
reason: str
@dataclass
class Position:
"""持仓信息"""
symbol: str
side: Side
size: float
entry_price: float
unrealized_pnl: float
leverage: int
@dataclass
class Order:
"""订单信息"""
symbol: str
side: Side
order_type: OrderType
quantity: float
price: Optional[float] = None
stop_price: Optional[float] = None
reduce_only: bool = False
class Strategy(ABC):
"""策略基类"""
@abstractmethod
def on_kline(self, symbol: str, kline: dict) -> Optional[Signal]:
"""K线更新回调"""
pass
@abstractmethod
def on_tick(self, symbol: str, tick: dict) -> Optional[Signal]:
"""Tick 更新回调"""
pass
class RiskManager(ABC):
"""风控基类"""
@abstractmethod
def check_order(self, order: Order) -> bool:
"""检查订单是否符合风控规则"""
pass
@abstractmethod
def get_position_size(self, signal: Signal) -> float:
"""计算仓位大小"""
pass
class Executor(ABC):
"""执行器基类"""
@abstractmethod
async def place_order(self, order: Order) -> dict:
"""下单"""
pass
@abstractmethod
async def cancel_order(self, order_id: str) -> bool:
"""撤单"""
pass
数据获取与处理
币安 API 连接
import hmac
import hashlib
import time
import aiohttp
import asyncio
from typing import Dict, List, Optional
import json
class BinanceFuturesClient:
"""币安合约 API 客户端"""
BASE_URL = "https://fapi.binance.com"
WS_URL = "wss://fstream.binance.com"
def __init__(self, api_key: str, api_secret: str):
self.api_key = api_key
self.api_secret = api_secret
self.session: Optional[aiohttp.ClientSession] = None
async def _init_session(self):
if self.session is None:
self.session = aiohttp.ClientSession()
def _sign(self, params: dict) -> str:
"""生成签名"""
query_string = '&'.join(f"{k}={v}" for k, v in sorted(params.items()))
signature = hmac.new(
self.api_secret.encode(),
query_string.encode(),
hashlib.sha256
).hexdigest()
return signature
async def _request(self, method: str, endpoint: str, params: dict = None, signed: bool = False) -> dict:
"""发送请求"""
await self._init_session()
url = f"{self.BASE_URL}{endpoint}"
headers = {"X-MBX-APIKEY": self.api_key}
if params is None:
params = {}
if signed:
params["timestamp"] = int(time.time() * 1000)
params["signature"] = self._sign(params)
async with self.session.request(method, url, params=params, headers=headers) as resp:
data = await resp.json()
if resp.status != 200:
raise Exception(f"API Error: {data}")
return data
# ========== 行情接口 ==========
async def get_klines(self, symbol: str, interval: str, limit: int = 500) -> List[dict]:
"""获取 K 线数据"""
data = await self._request("GET", "/fapi/v1/klines", {
"symbol": symbol,
"interval": interval,
"limit": limit
})
return [{
"open_time": k[0],
"open": float(k[1]),
"high": float(k[2]),
"low": float(k[3]),
"close": float(k[4]),
"volume": float(k[5]),
"close_time": k[6],
"quote_volume": float(k[7]),
"trades": k[8],
"taker_buy_volume": float(k[9]),
"taker_buy_quote_volume": float(k[10])
} for k in data]
async def get_ticker(self, symbol: str) -> dict:
"""获取最新价格"""
return await self._request("GET", "/fapi/v1/ticker/price", {"symbol": symbol})
async def get_orderbook(self, symbol: str, limit: int = 20) -> dict:
"""获取订单簿"""
return await self._request("GET", "/fapi/v1/depth", {
"symbol": symbol,
"limit": limit
})
async def get_funding_rate(self, symbol: str) -> dict:
"""获取资金费率"""
return await self._request("GET", "/fapi/v1/fundingRate", {
"symbol": symbol,
"limit": 1
})
async def get_open_interest(self, symbol: str) -> dict:
"""获取持仓量"""
return await self._request("GET", "/fapi/v1/openInterest", {"symbol": symbol})
# ========== 交易接口 ==========
async def place_order(
self,
symbol: str,
side: str,
order_type: str,
quantity: float,
price: float = None,
stop_price: float = None,
reduce_only: bool = False,
time_in_force: str = "GTC"
) -> dict:
"""下单"""
params = {
"symbol": symbol,
"side": side,
"type": order_type,
"quantity": quantity,
}
if price:
params["price"] = price
if stop_price:
params["stopPrice"] = stop_price
if reduce_only:
params["reduceOnly"] = "true"
if order_type == "LIMIT":
params["timeInForce"] = time_in_force
return await self._request("POST", "/fapi/v1/order", params, signed=True)
async def cancel_order(self, symbol: str, order_id: int) -> dict:
"""撤销订单"""
return await self._request("DELETE", "/fapi/v1/order", {
"symbol": symbol,
"orderId": order_id
}, signed=True)
async def get_position(self, symbol: str = None) -> List[dict]:
"""获取持仓"""
data = await self._request("GET", "/fapi/v2/positionRisk", {}, signed=True)
if symbol:
return [p for p in data if p["symbol"] == symbol]
return data
async def get_account(self) -> dict:
"""获取账户信息"""
return await self._request("GET", "/fapi/v2/account", {}, signed=True)
async def set_leverage(self, symbol: str, leverage: int) -> dict:
"""设置杠杆"""
return await self._request("POST", "/fapi/v1/leverage", {
"symbol": symbol,
"leverage": leverage
}, signed=True)
async def close(self):
"""关闭连接"""
if self.session:
await self.session.close()
WebSocket 实时数据
import websockets
import json
import asyncio
from typing import Callable, Dict
class BinanceFuturesWebSocket:
"""币安合约 WebSocket 客户端"""
WS_URL = "wss://fstream.binance.com/ws"
def __init__(self):
self.ws = None
self.callbacks: Dict[str, Callable] = {}
self.running = False
async def connect(self):
"""建立连接"""
self.ws = await websockets.connect(self.WS_URL)
self.running = True
async def subscribe_kline(self, symbol: str, interval: str, callback: Callable):
"""订阅 K 线"""
stream = f"{symbol.lower()}@kline_{interval}"
self.callbacks[stream] = callback
await self.ws.send(json.dumps({
"method": "SUBSCRIBE",
"params": [stream],
"id": 1
}))
async def subscribe_ticker(self, symbol: str, callback: Callable):
"""订阅 Ticker"""
stream = f"{symbol.lower()}@ticker"
self.callbacks[stream] = callback
await self.ws.send(json.dumps({
"method": "SUBSCRIBE",
"params": [stream],
"id": 2
}))
async def subscribe_depth(self, symbol: str, callback: Callable, level: int = 20):
"""订阅深度"""
stream = f"{symbol.lower()}@depth{level}@100ms"
self.callbacks[stream] = callback
await self.ws.send(json.dumps({
"method": "SUBSCRIBE",
"params": [stream],
"id": 3
}))
async def subscribe_agg_trade(self, symbol: str, callback: Callable):
"""订阅聚合成交"""
stream = f"{symbol.lower()}@aggTrade"
self.callbacks[stream] = callback
await self.ws.send(json.dumps({
"method": "SUBSCRIBE",
"params": [stream],
"id": 4
}))
async def listen(self):
"""监听消息"""
while self.running:
try:
msg = await asyncio.wait_for(self.ws.recv(), timeout=30)
data = json.loads(msg)
# 处理订阅确认
if "result" in data:
continue
# 处理数据
stream = data.get("stream") or data.get("e")
if stream in self.callbacks:
await self.callbacks[stream](data)
except asyncio.TimeoutError:
# 发送 ping 保持连接
await self.ws.ping()
except Exception as e:
print(f"WebSocket error: {e}")
break
async def close(self):
"""关闭连接"""
self.running = False
if self.ws:
await self.ws.close()
# 使用示例
async def main():
ws = BinanceFuturesWebSocket()
await ws.connect()
async def on_kline(data):
kline = data["k"]
print(f"K线: {kline['s']} {kline['i']} Close: {kline['c']}")
async def on_ticker(data):
print(f"Ticker: {data['s']} Price: {data['c']} Change: {data['P']}%")
await ws.subscribe_kline("BTCUSDT", "1m", on_kline)
await ws.subscribe_ticker("BTCUSDT", on_ticker)
await ws.listen()
# asyncio.run(main())
常见量化策略
1. 趋势跟踪策略
import numpy as np
from typing import Optional, List
class TrendFollowingStrategy(Strategy):
"""
趋势跟踪策略
原理:
- 使用移动平均线判断趋势方向
- 快线上穿慢线做多,下穿做空
- 结合 ADX 过滤震荡行情
"""
def __init__(
self,
fast_period: int = 10,
slow_period: int = 30,
adx_period: int = 14,
adx_threshold: float = 25
):
self.fast_period = fast_period
self.slow_period = slow_period
self.adx_period = adx_period
self.adx_threshold = adx_threshold
self.klines: Dict[str, List[dict]] = {}
def _calculate_ema(self, prices: List[float], period: int) -> float:
"""计算 EMA"""
if len(prices) < period:
return prices[-1]
multiplier = 2 / (period + 1)
ema = prices[0]
for price in prices[1:]:
ema = (price - ema) * multiplier + ema
return ema
def _calculate_adx(self, klines: List[dict], period: int) -> float:
"""计算 ADX (平均趋向指数)"""
if len(klines) < period + 1:
return 0
# 计算 +DM 和 -DM
plus_dm = []
minus_dm = []
tr_list = []
for i in range(1, len(klines)):
high = klines[i]["high"]
low = klines[i]["low"]
prev_high = klines[i-1]["high"]
prev_low = klines[i-1]["low"]
prev_close = klines[i-1]["close"]
# True Range
tr = max(high - low, abs(high - prev_close), abs(low - prev_close))
tr_list.append(tr)
# +DM, -DM
up_move = high - prev_high
down_move = prev_low - low
if up_move > down_move and up_move > 0:
plus_dm.append(up_move)
else:
plus_dm.append(0)
if down_move > up_move and down_move > 0:
minus_dm.append(down_move)
else:
minus_dm.append(0)
# 平滑处理
atr = self._calculate_ema(tr_list, period)
plus_di = 100 * self._calculate_ema(plus_dm, period) / atr if atr > 0 else 0
minus_di = 100 * self._calculate_ema(minus_dm, period) / atr if atr > 0 else 0
# 计算 DX 和 ADX
di_diff = abs(plus_di - minus_di)
di_sum = plus_di + minus_di
dx = 100 * di_diff / di_sum if di_sum > 0 else 0
return dx # 简化,实际应计算 ADX (DX 的 EMA)
def on_kline(self, symbol: str, kline: dict) -> Optional[Signal]:
"""K线更新"""
if symbol not in self.klines:
self.klines[symbol] = []
self.klines[symbol].append(kline)
# 保留足够的历史数据
max_period = max(self.slow_period, self.adx_period) + 10
if len(self.klines[symbol]) > max_period * 2:
self.klines[symbol] = self.klines[symbol][-max_period:]
if len(self.klines[symbol]) < self.slow_period:
return None
# 计算指标
closes = [k["close"] for k in self.klines[symbol]]
fast_ema = self._calculate_ema(closes, self.fast_period)
slow_ema = self._calculate_ema(closes, self.slow_period)
adx = self._calculate_adx(self.klines[symbol], self.adx_period)
# 计算前一根 K 线的 EMA
prev_closes = closes[:-1]
prev_fast_ema = self._calculate_ema(prev_closes, self.fast_period)
prev_slow_ema = self._calculate_ema(prev_closes, self.slow_period)
# 生成信号
signal = None
# ADX 过滤震荡
if adx < self.adx_threshold:
return None
# 金叉做多
if prev_fast_ema <= prev_slow_ema and fast_ema > slow_ema:
signal = Signal(
symbol=symbol,
side=Side.LONG,
strength=min(1.0, adx / 50),
price=kline["close"],
timestamp=kline["close_time"],
reason=f"Golden cross, ADX={adx:.2f}"
)
# 死叉做空
elif prev_fast_ema >= prev_slow_ema and fast_ema < slow_ema:
signal = Signal(
symbol=symbol,
side=Side.SHORT,
strength=min(1.0, adx / 50),
price=kline["close"],
timestamp=kline["close_time"],
reason=f"Death cross, ADX={adx:.2f}"
)
return signal
def on_tick(self, symbol: str, tick: dict) -> Optional[Signal]:
return None
2. 均值回归策略
class MeanReversionStrategy(Strategy):
"""
均值回归策略
原理:
- 价格总会回归到均值附近
- 使用布林带判断超买超卖
- 价格触及上轨做空,触及下轨做多
"""
def __init__(
self,
period: int = 20,
std_dev: float = 2.0,
rsi_period: int = 14,
rsi_oversold: float = 30,
rsi_overbought: float = 70
):
self.period = period
self.std_dev = std_dev
self.rsi_period = rsi_period
self.rsi_oversold = rsi_oversold
self.rsi_overbought = rsi_overbought
self.klines: Dict[str, List[dict]] = {}
def _calculate_bollinger_bands(self, prices: List[float]) -> tuple:
"""计算布林带"""
if len(prices) < self.period:
return None, None, None
recent_prices = prices[-self.period:]
middle = np.mean(recent_prices)
std = np.std(recent_prices)
upper = middle + self.std_dev * std
lower = middle - self.std_dev * std
return upper, middle, lower
def _calculate_rsi(self, prices: List[float]) -> float:
"""计算 RSI"""
if len(prices) < self.rsi_period + 1:
return 50
changes = [prices[i] - prices[i-1] for i in range(1, len(prices))]
recent_changes = changes[-self.rsi_period:]
gains = [c if c > 0 else 0 for c in recent_changes]
losses = [-c if c < 0 else 0 for c in recent_changes]
avg_gain = np.mean(gains)
avg_loss = np.mean(losses)
if avg_loss == 0:
return 100
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
def on_kline(self, symbol: str, kline: dict) -> Optional[Signal]:
"""K线更新"""
if symbol not in self.klines:
self.klines[symbol] = []
self.klines[symbol].append(kline)
# 保留历史数据
if len(self.klines[symbol]) > self.period * 3:
self.klines[symbol] = self.klines[symbol][-self.period * 2:]
if len(self.klines[symbol]) < self.period:
return None
closes = [k["close"] for k in self.klines[symbol]]
current_price = closes[-1]
# 计算指标
upper, middle, lower = self._calculate_bollinger_bands(closes)
rsi = self._calculate_rsi(closes)
signal = None
# 价格触及下轨 + RSI 超卖 → 做多
if current_price <= lower and rsi < self.rsi_oversold:
# 计算距离均值的偏离程度作为信号强度
deviation = (middle - current_price) / middle
signal = Signal(
symbol=symbol,
side=Side.LONG,
strength=min(1.0, deviation * 10),
price=current_price,
timestamp=kline["close_time"],
reason=f"Oversold, RSI={rsi:.2f}, Below BB lower"
)
# 价格触及上轨 + RSI 超买 → 做空
elif current_price >= upper and rsi > self.rsi_overbought:
deviation = (current_price - middle) / middle
signal = Signal(
symbol=symbol,
side=Side.SHORT,
strength=min(1.0, deviation * 10),
price=current_price,
timestamp=kline["close_time"],
reason=f"Overbought, RSI={rsi:.2f}, Above BB upper"
)
return signal
def on_tick(self, symbol: str, tick: dict) -> Optional[Signal]:
return None
3. 网格交易策略
from decimal import Decimal, ROUND_DOWN
from typing import Dict, List
class GridTradingStrategy:
"""
网格交易策略
原理:
- 在价格区间内设置多个买卖网格
- 价格下跌触发买入,上涨触发卖出
- 适合震荡行情,持续赚取网格利润
"""
def __init__(
self,
symbol: str,
upper_price: float,
lower_price: float,
grid_count: int,
total_investment: float,
leverage: int = 1
):
self.symbol = symbol
self.upper_price = Decimal(str(upper_price))
self.lower_price = Decimal(str(lower_price))
self.grid_count = grid_count
self.total_investment = Decimal(str(total_investment))
self.leverage = leverage
# 计算网格
self.grid_prices = self._calculate_grid_prices()
self.grid_size = self._calculate_grid_size()
# 订单状态
self.buy_orders: Dict[str, dict] = {} # price -> order
self.sell_orders: Dict[str, dict] = {}
def _calculate_grid_prices(self) -> List[Decimal]:
"""计算网格价格"""
price_range = self.upper_price - self.lower_price
grid_interval = price_range / self.grid_count
prices = []
for i in range(self.grid_count + 1):
price = self.lower_price + grid_interval * i
prices.append(price.quantize(Decimal('0.01'), rounding=ROUND_DOWN))
return prices
def _calculate_grid_size(self) -> Decimal:
"""计算每格仓位大小"""
# 每格投入金额
per_grid_investment = self.total_investment / self.grid_count
# 按中间价格计算数量
mid_price = (self.upper_price + self.lower_price) / 2
size = (per_grid_investment * self.leverage) / mid_price
return size.quantize(Decimal('0.001'), rounding=ROUND_DOWN)
def initialize_orders(self, current_price: float) -> List[Order]:
"""初始化网格订单"""
orders = []
current = Decimal(str(current_price))
for price in self.grid_prices:
if price < current:
# 低于当前价,挂买单
orders.append(Order(
symbol=self.symbol,
side=Side.LONG,
order_type=OrderType.LIMIT,
quantity=float(self.grid_size),
price=float(price)
))
elif price > current:
# 高于当前价,挂卖单
orders.append(Order(
symbol=self.symbol,
side=Side.SHORT,
order_type=OrderType.LIMIT,
quantity=float(self.grid_size),
price=float(price)
))
return orders
def on_order_filled(self, filled_order: dict) -> Optional[Order]:
"""
订单成交回调
买单成交后,在上一格挂卖单
卖单成交后,在下一格挂买单
"""
filled_price = Decimal(str(filled_order["price"]))
filled_side = filled_order["side"]
# 找到成交的网格位置
grid_index = None
for i, price in enumerate(self.grid_prices):
if abs(price - filled_price) < Decimal('0.01'):
grid_index = i
break
if grid_index is None:
return None
new_order = None
if filled_side == "BUY":
# 买单成交,在上一格挂卖单
if grid_index < len(self.grid_prices) - 1:
sell_price = self.grid_prices[grid_index + 1]
new_order = Order(
symbol=self.symbol,
side=Side.SHORT,
order_type=OrderType.LIMIT,
quantity=float(self.grid_size),
price=float(sell_price),
reduce_only=True
)
else: # SELL
# 卖单成交,在下一格挂买单
if grid_index > 0:
buy_price = self.grid_prices[grid_index - 1]
new_order = Order(
symbol=self.symbol,
side=Side.LONG,
order_type=OrderType.LIMIT,
quantity=float(self.grid_size),
price=float(buy_price)
)
return new_order
def get_grid_info(self) -> dict:
"""获取网格信息"""
return {
"symbol": self.symbol,
"upper_price": float(self.upper_price),
"lower_price": float(self.lower_price),
"grid_count": self.grid_count,
"grid_size": float(self.grid_size),
"grid_prices": [float(p) for p in self.grid_prices],
"price_interval": float(self.grid_prices[1] - self.grid_prices[0])
}
4. 资金费率套利策略
class FundingRateArbitrage:
"""
资金费率套利策略
原理:
- 永续合约每 8 小时结算一次资金费率
- 正费率时,多头付给空头
- 负费率时,空头付给多头
- 通过现货对冲,无风险收取资金费率
"""
def __init__(
self,
min_funding_rate: float = 0.0005, # 最小费率阈值 (0.05%)
position_size_usd: float = 10000,
leverage: int = 1
):
self.min_funding_rate = min_funding_rate
self.position_size_usd = position_size_usd
self.leverage = leverage
self.positions: Dict[str, dict] = {}
async def scan_opportunities(self, client: BinanceFuturesClient) -> List[dict]:
"""扫描套利机会"""
opportunities = []
# 获取所有交易对的资金费率
funding_rates = await client._request("GET", "/fapi/v1/premiumIndex")
for item in funding_rates:
symbol = item["symbol"]
funding_rate = float(item["lastFundingRate"])
# 过滤低费率
if abs(funding_rate) < self.min_funding_rate:
continue
# 计算年化收益
annual_return = funding_rate * 3 * 365 * 100 # 每天 3 次
opportunities.append({
"symbol": symbol,
"funding_rate": funding_rate,
"annual_return": annual_return,
"next_funding_time": item["nextFundingTime"],
"direction": "SHORT" if funding_rate > 0 else "LONG"
})
# 按费率绝对值排序
opportunities.sort(key=lambda x: abs(x["funding_rate"]), reverse=True)
return opportunities
def calculate_position(self, symbol: str, current_price: float) -> dict:
"""计算开仓参数"""
quantity = self.position_size_usd / current_price
return {
"symbol": symbol,
"quantity": quantity,
"leverage": self.leverage,
"spot_quantity": quantity, # 现货对冲数量
"futures_quantity": quantity
}
async def open_arbitrage_position(
self,
futures_client: BinanceFuturesClient,
spot_client, # 现货客户端
opportunity: dict
) -> dict:
"""
开启套利仓位
正费率:
- 现货买入
- 合约做空
负费率:
- 现货卖出 (需要有持仓)
- 合约做多
"""
symbol = opportunity["symbol"]
ticker = await futures_client.get_ticker(symbol)
current_price = float(ticker["price"])
position = self.calculate_position(symbol, current_price)
if opportunity["funding_rate"] > 0:
# 正费率: 做空合约 + 买入现货
# 1. 买入现货
# await spot_client.buy(symbol, position["spot_quantity"])
# 2. 做空合约
await futures_client.place_order(
symbol=symbol,
side="SELL",
order_type="MARKET",
quantity=position["futures_quantity"]
)
else:
# 负费率: 做多合约 + 卖出现货
# 需要先有现货持仓
await futures_client.place_order(
symbol=symbol,
side="BUY",
order_type="MARKET",
quantity=position["futures_quantity"]
)
self.positions[symbol] = {
"opportunity": opportunity,
"position": position,
"open_time": time.time()
}
return position
def calculate_pnl(self, symbol: str, current_price: float, funding_collected: float) -> dict:
"""计算套利收益"""
if symbol not in self.positions:
return None
pos = self.positions[symbol]
# 理论上价格变动带来的盈亏会被对冲
# 实际收益 = 资金费率收入 - 交易手续费 - 借贷利息
return {
"symbol": symbol,
"funding_income": funding_collected,
"position_pnl": 0, # 对冲后理论为 0
"total_pnl": funding_collected
}
风险控制
仓位管理
class PositionSizer:
"""
仓位计算器
核心原则:
1. 单笔最大亏损不超过账户的 X%
2. 总仓位不超过账户的 Y%
3. 单币种仓位不超过 Z%
"""
def __init__(
self,
max_risk_per_trade: float = 0.02, # 单笔最大风险 2%
max_total_exposure: float = 0.5, # 最大总敞口 50%
max_single_exposure: float = 0.2, # 单币种最大敞口 20%
max_leverage: int = 10
):
self.max_risk_per_trade = max_risk_per_trade
self.max_total_exposure = max_total_exposure
self.max_single_exposure = max_single_exposure
self.max_leverage = max_leverage
def calculate_position_size(
self,
account_balance: float,
entry_price: float,
stop_loss_price: float,
leverage: int = 1
) -> float:
"""
基于风险计算仓位大小
公式:
仓位 = (账户余额 × 最大风险比例) / (入场价 - 止损价) / 杠杆
"""
# 风险金额
risk_amount = account_balance * self.max_risk_per_trade
# 止损距离 (百分比)
stop_distance = abs(entry_price - stop_loss_price) / entry_price
if stop_distance == 0:
return 0
# 仓位价值
position_value = risk_amount / stop_distance
# 数量
quantity = position_value / entry_price
# 杠杆限制
leverage = min(leverage, self.max_leverage)
# 最大敞口限制
max_position_value = account_balance * self.max_single_exposure * leverage
max_quantity = max_position_value / entry_price
return min(quantity, max_quantity)
def validate_position(
self,
account_balance: float,
current_positions: List[Position],
new_position: Position
) -> tuple:
"""验证新仓位是否符合风控规则"""
errors = []
# 计算当前总敞口
current_exposure = sum(
p.size * p.entry_price for p in current_positions
)
new_exposure = new_position.size * new_position.entry_price
total_exposure = current_exposure + new_exposure
# 检查总敞口
if total_exposure > account_balance * self.max_total_exposure * new_position.leverage:
errors.append(f"总敞口超限: {total_exposure:.2f} > {account_balance * self.max_total_exposure:.2f}")
# 检查单币种敞口
symbol_exposure = sum(
p.size * p.entry_price
for p in current_positions
if p.symbol == new_position.symbol
) + new_exposure
max_symbol = account_balance * self.max_single_exposure * new_position.leverage
if symbol_exposure > max_symbol:
errors.append(f"单币种敞口超限: {symbol_exposure:.2f} > {max_symbol:.2f}")
# 检查杠杆
if new_position.leverage > self.max_leverage:
errors.append(f"杠杆超限: {new_position.leverage} > {self.max_leverage}")
return len(errors) == 0, errors
止损止盈
class StopLossManager:
"""
止损止盈管理器
止损类型:
1. 固定止损 - 固定价格或百分比
2. 追踪止损 - 随价格移动
3. 时间止损 - 超时平仓
4. ATR 止损 - 基于波动率
"""
def __init__(
self,
default_stop_loss_pct: float = 0.02, # 默认止损 2%
default_take_profit_pct: float = 0.04, # 默认止盈 4%
trailing_stop_pct: float = 0.01, # 追踪止损 1%
max_holding_hours: int = 24 # 最大持仓时间
):
self.default_stop_loss_pct = default_stop_loss_pct
self.default_take_profit_pct = default_take_profit_pct
self.trailing_stop_pct = trailing_stop_pct
self.max_holding_hours = max_holding_hours
self.trailing_stops: Dict[str, float] = {} # symbol -> highest/lowest price
def calculate_stop_loss(
self,
entry_price: float,
side: Side,
method: str = "fixed"
) -> float:
"""计算止损价格"""
if method == "fixed":
if side == Side.LONG:
return entry_price * (1 - self.default_stop_loss_pct)
else:
return entry_price * (1 + self.default_stop_loss_pct)
# 其他方法可以扩展...
return entry_price * (1 - self.default_stop_loss_pct)
def calculate_take_profit(
self,
entry_price: float,
side: Side
) -> float:
"""计算止盈价格"""
if side == Side.LONG:
return entry_price * (1 + self.default_take_profit_pct)
else:
return entry_price * (1 - self.default_take_profit_pct)
def update_trailing_stop(
self,
symbol: str,
current_price: float,
side: Side,
entry_price: float
) -> Optional[float]:
"""
更新追踪止损
多头: 追踪最高价,止损在最高价下方 X%
空头: 追踪最低价,止损在最低价上方 X%
"""
key = f"{symbol}_{side.value}"
if side == Side.LONG:
# 更新最高价
if key not in self.trailing_stops:
self.trailing_stops[key] = entry_price
if current_price > self.trailing_stops[key]:
self.trailing_stops[key] = current_price
# 计算追踪止损价
stop_price = self.trailing_stops[key] * (1 - self.trailing_stop_pct)
# 只有盈利后才启用追踪止损
if self.trailing_stops[key] > entry_price * (1 + self.trailing_stop_pct):
return stop_price
else: # SHORT
if key not in self.trailing_stops:
self.trailing_stops[key] = entry_price
if current_price < self.trailing_stops[key]:
self.trailing_stops[key] = current_price
stop_price = self.trailing_stops[key] * (1 + self.trailing_stop_pct)
if self.trailing_stops[key] < entry_price * (1 - self.trailing_stop_pct):
return stop_price
return None
def check_time_stop(self, position: Position, current_time: int) -> bool:
"""检查时间止损"""
holding_hours = (current_time - position.entry_time) / 3600
return holding_hours > self.max_holding_hours
def create_stop_orders(
self,
position: Position,
stop_loss: float,
take_profit: float
) -> List[Order]:
"""创建止损止盈订单"""
orders = []
# 止损单
orders.append(Order(
symbol=position.symbol,
side=Side.SHORT if position.side == Side.LONG else Side.LONG,
order_type=OrderType.STOP_MARKET,
quantity=position.size,
stop_price=stop_loss,
reduce_only=True
))
# 止盈单
orders.append(Order(
symbol=position.symbol,
side=Side.SHORT if position.side == Side.LONG else Side.LONG,
order_type=OrderType.TAKE_PROFIT_MARKET,
quantity=position.size,
stop_price=take_profit,
reduce_only=True
))
return orders
回撤控制
class DrawdownController:
"""
回撤控制器
功能:
1. 监控账户回撤
2. 达到阈值时降低仓位或暂停交易
3. 恢复后逐步放开限制
"""
def __init__(
self,
max_daily_drawdown: float = 0.05, # 日最大回撤 5%
max_total_drawdown: float = 0.15, # 总最大回撤 15%
recovery_threshold: float = 0.5 # 回撤恢复阈值
):
self.max_daily_drawdown = max_daily_drawdown
self.max_total_drawdown = max_total_drawdown
self.recovery_threshold = recovery_threshold
self.peak_balance = 0
self.daily_start_balance = 0
self.trading_enabled = True
self.position_scale = 1.0 # 仓位缩放比例
def update(self, current_balance: float) -> dict:
"""更新状态"""
# 更新峰值
if current_balance > self.peak_balance:
self.peak_balance = current_balance
# 计算回撤
total_drawdown = (self.peak_balance - current_balance) / self.peak_balance
daily_drawdown = (self.daily_start_balance - current_balance) / self.daily_start_balance if self.daily_start_balance > 0 else 0
status = {
"current_balance": current_balance,
"peak_balance": self.peak_balance,
"total_drawdown": total_drawdown,
"daily_drawdown": daily_drawdown,
"trading_enabled": self.trading_enabled,
"position_scale": self.position_scale
}
# 检查回撤阈值
if total_drawdown >= self.max_total_drawdown:
self.trading_enabled = False
self.position_scale = 0
status["alert"] = "CRITICAL: Max total drawdown reached, trading disabled"
elif daily_drawdown >= self.max_daily_drawdown:
self.trading_enabled = False
self.position_scale = 0
status["alert"] = "WARNING: Max daily drawdown reached, trading paused"
elif total_drawdown >= self.max_total_drawdown * 0.5:
# 回撤超过一半,降低仓位
self.position_scale = 0.5
status["warning"] = "Reducing position size due to drawdown"
else:
# 检查恢复
if not self.trading_enabled:
recovery = (current_balance - (self.peak_balance * (1 - self.max_total_drawdown))) / \
(self.peak_balance * self.max_total_drawdown)
if recovery >= self.recovery_threshold:
self.trading_enabled = True
self.position_scale = 0.5
status["info"] = "Trading resumed with reduced position"
return status
def reset_daily(self, current_balance: float):
"""每日重置"""
self.daily_start_balance = current_balance
def get_adjusted_quantity(self, original_quantity: float) -> float:
"""获取调整后的仓位"""
return original_quantity * self.position_scale
实战技巧
1. API 请求优化
import asyncio
from collections import deque
import time
class RateLimiter:
"""
请求频率限制器
币安 API 限制:
- 每分钟 1200 次请求权重
- 每秒 10 次订单
- 每天 200,000 次订单
"""
def __init__(
self,
requests_per_minute: int = 1200,
orders_per_second: int = 10,
orders_per_day: int = 200000
):
self.rpm = requests_per_minute
self.ops = orders_per_second
self.opd = orders_per_day
self.request_times = deque()
self.order_times = deque()
self.daily_orders = 0
self.daily_reset_time = time.time()
async def wait_for_request(self, weight: int = 1):
"""等待请求配额"""
now = time.time()
# 清理过期记录
while self.request_times and self.request_times[0] < now - 60:
self.request_times.popleft()
# 检查是否需要等待
current_weight = sum(1 for _ in self.request_times) * weight
if current_weight >= self.rpm:
wait_time = 60 - (now - self.request_times[0])
if wait_time > 0:
await asyncio.sleep(wait_time)
self.request_times.append(now)
async def wait_for_order(self):
"""等待订单配额"""
now = time.time()
# 重置每日计数
if now - self.daily_reset_time > 86400:
self.daily_orders = 0
self.daily_reset_time = now
# 检查每日限制
if self.daily_orders >= self.opd:
raise Exception("Daily order limit reached")
# 清理过期记录
while self.order_times and self.order_times[0] < now - 1:
self.order_times.popleft()
# 检查每秒限制
if len(self.order_times) >= self.ops:
wait_time = 1 - (now - self.order_times[0])
if wait_time > 0:
await asyncio.sleep(wait_time)
self.order_times.append(now)
self.daily_orders += 1
2. 订单执行优化
class SmartOrderExecutor:
"""
智能订单执行器
功能:
1. 大单拆分
2. 冰山订单
3. TWAP/VWAP
"""
def __init__(self, client: BinanceFuturesClient):
self.client = client
async def execute_twap(
self,
symbol: str,
side: str,
total_quantity: float,
duration_minutes: int = 10,
slices: int = 10
) -> List[dict]:
"""
TWAP (时间加权平均价格) 执行
将大单拆分为多个小单,在指定时间内均匀执行
"""
results = []
slice_quantity = total_quantity / slices
interval_seconds = (duration_minutes * 60) / slices
for i in range(slices):
try:
result = await self.client.place_order(
symbol=symbol,
side=side,
order_type="MARKET",
quantity=slice_quantity
)
results.append(result)
if i < slices - 1:
await asyncio.sleep(interval_seconds)
except Exception as e:
print(f"TWAP slice {i+1} failed: {e}")
return results
async def execute_iceberg(
self,
symbol: str,
side: str,
total_quantity: float,
visible_quantity: float,
price: float
) -> List[dict]:
"""
冰山订单执行
只显示部分数量,成交后自动补充
"""
results = []
remaining = total_quantity
while remaining > 0:
slice_qty = min(visible_quantity, remaining)
result = await self.client.place_order(
symbol=symbol,
side=side,
order_type="LIMIT",
quantity=slice_qty,
price=price
)
# 等待成交
order_id = result["orderId"]
while True:
await asyncio.sleep(1)
order = await self.client._request(
"GET", "/fapi/v1/order",
{"symbol": symbol, "orderId": order_id},
signed=True
)
if order["status"] == "FILLED":
results.append(order)
remaining -= float(order["executedQty"])
break
elif order["status"] in ["CANCELED", "REJECTED", "EXPIRED"]:
break
return results
async def execute_with_slippage_control(
self,
symbol: str,
side: str,
quantity: float,
max_slippage_pct: float = 0.001
) -> dict:
"""
带滑点控制的执行
检查订单簿深度,避免过大滑点
"""
# 获取订单簿
orderbook = await self.client.get_orderbook(symbol, limit=20)
if side == "BUY":
asks = orderbook["asks"]
# 计算买入 quantity 的平均价格
total_cost = 0
filled = 0
for ask in asks:
price = float(ask[0])
size = float(ask[1])
if filled + size >= quantity:
total_cost += (quantity - filled) * price
filled = quantity
break
else:
total_cost += size * price
filled += size
avg_price = total_cost / filled if filled > 0 else 0
best_price = float(asks[0][0])
else: # SELL
bids = orderbook["bids"]
total_value = 0
filled = 0
for bid in bids:
price = float(bid[0])
size = float(bid[1])
if filled + size >= quantity:
total_value += (quantity - filled) * price
filled = quantity
break
else:
total_value += size * price
filled += size
avg_price = total_value / filled if filled > 0 else 0
best_price = float(bids[0][0])
# 检查滑点
slippage = abs(avg_price - best_price) / best_price
if slippage > max_slippage_pct:
raise Exception(f"Slippage too high: {slippage:.4%} > {max_slippage_pct:.4%}")
# 执行订单
return await self.client.place_order(
symbol=symbol,
side=side,
order_type="MARKET",
quantity=quantity
)
3. 日志与监控
import logging
from datetime import datetime
import json
class TradingLogger:
"""交易日志管理器"""
def __init__(self, log_file: str = "trading.log"):
self.logger = logging.getLogger("trading")
self.logger.setLevel(logging.DEBUG)
# 文件处理器
fh = logging.FileHandler(log_file)
fh.setLevel(logging.DEBUG)
# 控制台处理器
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# 格式
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)
self.logger.addHandler(fh)
self.logger.addHandler(ch)
# 交易记录
self.trades = []
def log_signal(self, signal: Signal):
"""记录信号"""
self.logger.info(f"SIGNAL: {signal.symbol} {signal.side.value} "
f"strength={signal.strength:.2f} price={signal.price} "
f"reason={signal.reason}")
def log_order(self, order: Order, result: dict):
"""记录订单"""
self.logger.info(f"ORDER: {order.symbol} {order.side.value} "
f"type={order.order_type.value} qty={order.quantity} "
f"orderId={result.get('orderId')}")
def log_trade(self, trade: dict):
"""记录成交"""
self.trades.append(trade)
self.logger.info(f"TRADE: {json.dumps(trade)}")
def log_position(self, position: Position):
"""记录持仓"""
self.logger.info(f"POSITION: {position.symbol} {position.side.value} "
f"size={position.size} entry={position.entry_price} "
f"pnl={position.unrealized_pnl}")
def log_error(self, error: str, exc_info=False):
"""记录错误"""
self.logger.error(f"ERROR: {error}", exc_info=exc_info)
def get_daily_summary(self) -> dict:
"""获取每日汇总"""
today = datetime.now().strftime("%Y-%m-%d")
today_trades = [t for t in self.trades if t.get("date", "").startswith(today)]
total_pnl = sum(t.get("pnl", 0) for t in today_trades)
win_trades = [t for t in today_trades if t.get("pnl", 0) > 0]
lose_trades = [t for t in today_trades if t.get("pnl", 0) < 0]
return {
"date": today,
"total_trades": len(today_trades),
"win_trades": len(win_trades),
"lose_trades": len(lose_trades),
"win_rate": len(win_trades) / len(today_trades) if today_trades else 0,
"total_pnl": total_pnl,
"avg_pnl": total_pnl / len(today_trades) if today_trades else 0
}
4. 回测框架
from dataclasses import dataclass, field
from typing import List, Dict
import pandas as pd
@dataclass
class BacktestResult:
"""回测结果"""
total_return: float
annual_return: float
max_drawdown: float
sharpe_ratio: float
win_rate: float
profit_factor: float
total_trades: int
equity_curve: List[float] = field(default_factory=list)
trades: List[dict] = field(default_factory=list)
class Backtester:
"""
回测引擎
功能:
1. 历史数据回放
2. 策略评估
3. 性能统计
"""
def __init__(
self,
initial_capital: float = 10000,
commission: float = 0.0004, # 手续费率
slippage: float = 0.0001 # 滑点
):
self.initial_capital = initial_capital
self.commission = commission
self.slippage = slippage
def run(
self,
strategy: Strategy,
data: pd.DataFrame,
symbol: str
) -> BacktestResult:
"""运行回测"""
capital = self.initial_capital
position = 0
entry_price = 0
equity_curve = [capital]
trades = []
for i, row in data.iterrows():
kline = {
"open": row["open"],
"high": row["high"],
"low": row["low"],
"close": row["close"],
"volume": row["volume"],
"close_time": row.get("timestamp", i)
}
# 获取信号
signal = strategy.on_kline(symbol, kline)
current_price = row["close"]
# 处理信号
if signal:
if signal.side == Side.LONG and position <= 0:
# 平空
if position < 0:
pnl = (entry_price - current_price) * abs(position)
pnl -= abs(position) * current_price * self.commission
capital += pnl
trades.append({
"type": "close_short",
"price": current_price,
"pnl": pnl
})
# 开多
position = capital * 0.9 / current_price # 90% 仓位
entry_price = current_price * (1 + self.slippage)
capital -= position * entry_price * self.commission
trades.append({
"type": "open_long",
"price": entry_price,
"size": position
})
elif signal.side == Side.SHORT and position >= 0:
# 平多
if position > 0:
pnl = (current_price - entry_price) * position
pnl -= position * current_price * self.commission
capital += pnl
trades.append({
"type": "close_long",
"price": current_price,
"pnl": pnl
})
# 开空
position = -capital * 0.9 / current_price
entry_price = current_price * (1 - self.slippage)
capital -= abs(position) * entry_price * self.commission
trades.append({
"type": "open_short",
"price": entry_price,
"size": position
})
# 更新权益
if position > 0:
unrealized = (current_price - entry_price) * position
elif position < 0:
unrealized = (entry_price - current_price) * abs(position)
else:
unrealized = 0
equity_curve.append(capital + unrealized)
# 计算统计指标
return self._calculate_statistics(equity_curve, trades)
def _calculate_statistics(
self,
equity_curve: List[float],
trades: List[dict]
) -> BacktestResult:
"""计算回测统计"""
equity = pd.Series(equity_curve)
# 收益率
total_return = (equity.iloc[-1] - equity.iloc[0]) / equity.iloc[0]
# 年化收益 (假设 365 天)
days = len(equity)
annual_return = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0
# 最大回撤
peak = equity.expanding().max()
drawdown = (equity - peak) / peak
max_drawdown = drawdown.min()
# 夏普比率 (假设无风险利率 0)
returns = equity.pct_change().dropna()
sharpe_ratio = returns.mean() / returns.std() * (365 ** 0.5) if returns.std() > 0 else 0
# 胜率
pnl_trades = [t for t in trades if "pnl" in t]
win_trades = [t for t in pnl_trades if t["pnl"] > 0]
win_rate = len(win_trades) / len(pnl_trades) if pnl_trades else 0
# 盈亏比
total_profit = sum(t["pnl"] for t in win_trades) if win_trades else 0
lose_trades = [t for t in pnl_trades if t["pnl"] < 0]
total_loss = abs(sum(t["pnl"] for t in lose_trades)) if lose_trades else 1
profit_factor = total_profit / total_loss if total_loss > 0 else 0
return BacktestResult(
total_return=total_return,
annual_return=annual_return,
max_drawdown=max_drawdown,
sharpe_ratio=sharpe_ratio,
win_rate=win_rate,
profit_factor=profit_factor,
total_trades=len(trades),
equity_curve=equity_curve,
trades=trades
)
常见问题与建议
新手常见误区
1. 过度优化 (Overfitting)
问题: 策略在历史数据上完美,实盘效果很差
解决: 使用样本外测试,避免过多参数
2. 忽视手续费和滑点
问题: 回测盈利,实盘亏损
解决: 回测时加入真实的交易成本
3. 杠杆过高
问题: 一次大跌就爆仓
解决: 控制杠杆在 3-5 倍以内
4. 没有止损
问题: 小亏变大亏,无法翻身
解决: 每笔交易必须设置止损
5. 频繁修改策略
问题: 亏损后改参数,越改越乱
解决: 制定规则后严格执行,定期复盘
6. 单一策略/币种
问题: 行情不适合时持续亏损
解决: 多策略组合,分散风险
量化交易建议
| 方面 | 建议 |
|---|---|
| 资金管理 | 用可承受损失的资金,单笔风险 < 2% |
| 杠杆使用 | 新手 1-3 倍,熟练后最多 5-10 倍 |
| 策略开发 | 从简单策略开始,逐步迭代 |
| 回测验证 | 至少 6 个月数据,样本外测试 |
| 实盘启动 | 小资金试跑,验证后再加仓 |
| 风险控制 | 设置日/周/月最大亏损限制 |
| 心态管理 | 接受亏损是正常的,关注长期结果 |
进阶学习路径
flowchart TB
classDef beginner fill:#2ec4b6,stroke:#011627,stroke-width:2px,color:#ffffff
classDef intermediate fill:#fca311,stroke:#e85d04,stroke-width:2px,color:#ffffff
classDef advanced fill:#7209b7,stroke:#3a0ca3,stroke-width:2px,color:#ffffff
subgraph 入门 ["入门阶段"]
A1["学习 Python/编程基础"]:::beginner
A2["了解技术分析指标"]:::beginner
A3["学习 API 接口使用"]:::beginner
A4["实现简单策略"]:::beginner
end
subgraph 进阶 ["进阶阶段"]
B1["学习数学/统计基础"]:::intermediate
B2["掌握回测框架"]:::intermediate
B3["理解风险管理"]:::intermediate
B4["多策略组合"]:::intermediate
end
subgraph 高级 ["高级阶段"]
C1["机器学习应用"]:::advanced
C2["高频交易技术"]:::advanced
C3["跨市场套利"]:::advanced
C4["自建交易系统"]:::advanced
end
A1 --> A2 --> A3 --> A4
A4 --> B1 --> B2 --> B3 --> B4
B4 --> C1 --> C2 --> C3 --> C4
linkStyle default stroke:#8d99ae,stroke-width:2px,fill:none
总结
量化交易是一个需要持续学习和实践的领域:
核心要点
| 方面 | 关键点 |
|---|---|
| 策略 | 简单有效 > 复杂花哨,逻辑清晰可解释 |
| 风控 | 第一要务,活下来才能赚钱 |
| 执行 | 严格遵守规则,消除情绪影响 |
| 迭代 | 定期复盘,持续优化 |
成功量化的关键
- 系统化思维: 将交易变成可重复的流程
- 风险意识: 永远把风控放在第一位
- 持续学习: 市场在变,策略也要进化
- 心态稳定: 接受波动,关注长期
免责声明: 本文仅供学习参考,不构成任何投资建议。加密货币合约交易风险极高,可能导致全部本金损失。请在充分了解风险后谨慎决策。
相关推荐
个人量化策略开发思路
基于币安合约API的个人量化交易策略开发思路,包括合约币对获取、单币种监控和全币种并发扫描实现
·2 分钟·
#量化交易#币安
虚拟币交易中的均线指标
深入讲解加密货币交易中的均线指标,包括 SMA、EMA、WMA 等类型的计算方法、实战应用技巧以及经典均线交易策略
·16 分钟·
#加密货币#均线
虚拟币交易中的RSI指标
全面解析相对强弱指数(RSI)在加密货币交易中的应用,包括计算原理、超买超卖判断、背离信号识别以及多种RSI交易策略
·19 分钟·
#加密货币#RSI