币安交易所合约量化交易思路

·39 分钟阅读·7757··作者:xinglei.wang

什么是合约量化交易

合约量化交易是指使用程序化方式,基于预设的数学模型和交易规则,在加密货币合约市场自动执行交易的方法。与手动交易相比,量化交易能够消除情绪影响、24小时不间断运行、快速响应市场变化。

币安永续合约特点

特性 说明
合约类型 USDT 本位永续合约、币本位永续合约
杠杆倍数 1-125倍可调
交易时间 24/7 全天候
资金费率 每 8 小时结算一次
强平机制 逐仓/全仓模式
API 限制 请求频率限制、订单数量限制

量化 vs 手动交易

手动交易的问题:

1. 情绪影响
   - 恐惧: 该入场时不敢入场
   - 贪婪: 该止盈时不舍得平仓
   - FOMO: 追涨杀跌

2. 执行效率
   - 反应速度慢
   - 无法 24 小时盯盘
   - 多币种难以兼顾

3. 一致性
   - 同样的信号,不同的执行
   - 容易违反交易纪律

量化交易的优势:

1. 纪律性
   - 严格按规则执行
   - 无情绪干扰

2. 效率
   - 毫秒级响应
   - 多币种并行监控
   - 24/7 运行

3. 可回测
   - 历史数据验证
   - 策略优化迭代

量化交易系统架构

整体架构

flowchart TB
    classDef data fill:#2b2d42,stroke:#8d99ae,stroke-width:2px,color:#edf2f4
    classDef strategy fill:#7209b7,stroke:#3a0ca3,stroke-width:2px,color:#ffffff
    classDef execution fill:#2ec4b6,stroke:#011627,stroke-width:2px,color:#ffffff
    classDef risk fill:#e71d36,stroke:#011627,stroke-width:2px,color:#ffffff
    classDef monitor fill:#fca311,stroke:#e85d04,stroke-width:2px,color:#ffffff

    subgraph DataLayer ["数据层"]
        MD["行情数据<br/>K线/Tick/深度"]:::data
        FD["资金费率"]:::data
        OI["持仓量数据"]:::data
        News["消息/情绪"]:::data
    end

    subgraph StrategyLayer ["策略层"]
        Signal["信号生成"]:::strategy
        Filter["信号过滤"]:::strategy
        Position["仓位计算"]:::strategy
    end

    subgraph ExecutionLayer ["执行层"]
        Order["订单管理"]:::execution
        Split["拆单算法"]:::execution
        API["API 接口"]:::execution
    end

    subgraph RiskLayer ["风控层"]
        StopLoss["止损控制"]:::risk
        Exposure["敞口控制"]:::risk
        Drawdown["回撤控制"]:::risk
    end

    subgraph MonitorLayer ["监控层"]
        Log["日志系统"]:::monitor
        Alert["告警系统"]:::monitor
        Dashboard["仪表盘"]:::monitor
    end

    MD --> Signal
    FD --> Signal
    OI --> Signal
    News --> Filter

    Signal --> Filter
    Filter --> Position
    Position --> Order

    Order --> Split
    Split --> API

    API --> StopLoss
    StopLoss --> Exposure
    Exposure --> Drawdown

    API --> Log
    Drawdown --> Alert
    Log --> Dashboard

    linkStyle default stroke:#8d99ae,stroke-width:2px,fill:none

核心模块

"""
量化交易系统核心模块
"""

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional
from enum import Enum
import asyncio

class Side(Enum):
    LONG = "LONG"
    SHORT = "SHORT"

class OrderType(Enum):
    MARKET = "MARKET"
    LIMIT = "LIMIT"
    STOP_MARKET = "STOP_MARKET"
    TAKE_PROFIT_MARKET = "TAKE_PROFIT_MARKET"

@dataclass
class Signal:
    """交易信号"""
    symbol: str
    side: Side
    strength: float  # 信号强度 0-1
    price: float
    timestamp: int
    reason: str

@dataclass
class Position:
    """持仓信息"""
    symbol: str
    side: Side
    size: float
    entry_price: float
    unrealized_pnl: float
    leverage: int

@dataclass
class Order:
    """订单信息"""
    symbol: str
    side: Side
    order_type: OrderType
    quantity: float
    price: Optional[float] = None
    stop_price: Optional[float] = None
    reduce_only: bool = False

class Strategy(ABC):
    """策略基类"""

    @abstractmethod
    def on_kline(self, symbol: str, kline: dict) -> Optional[Signal]:
        """K线更新回调"""
        pass

    @abstractmethod
    def on_tick(self, symbol: str, tick: dict) -> Optional[Signal]:
        """Tick 更新回调"""
        pass

class RiskManager(ABC):
    """风控基类"""

    @abstractmethod
    def check_order(self, order: Order) -> bool:
        """检查订单是否符合风控规则"""
        pass

    @abstractmethod
    def get_position_size(self, signal: Signal) -> float:
        """计算仓位大小"""
        pass

class Executor(ABC):
    """执行器基类"""

    @abstractmethod
    async def place_order(self, order: Order) -> dict:
        """下单"""
        pass

    @abstractmethod
    async def cancel_order(self, order_id: str) -> bool:
        """撤单"""
        pass

数据获取与处理

币安 API 连接

import hmac
import hashlib
import time
import aiohttp
import asyncio
from typing import Dict, List, Optional
import json

class BinanceFuturesClient:
    """币安合约 API 客户端"""

    BASE_URL = "https://fapi.binance.com"
    WS_URL = "wss://fstream.binance.com"

    def __init__(self, api_key: str, api_secret: str):
        self.api_key = api_key
        self.api_secret = api_secret
        self.session: Optional[aiohttp.ClientSession] = None

    async def _init_session(self):
        if self.session is None:
            self.session = aiohttp.ClientSession()

    def _sign(self, params: dict) -> str:
        """生成签名"""
        query_string = '&'.join(f"{k}={v}" for k, v in sorted(params.items()))
        signature = hmac.new(
            self.api_secret.encode(),
            query_string.encode(),
            hashlib.sha256
        ).hexdigest()
        return signature

    async def _request(self, method: str, endpoint: str, params: dict = None, signed: bool = False) -> dict:
        """发送请求"""
        await self._init_session()

        url = f"{self.BASE_URL}{endpoint}"
        headers = {"X-MBX-APIKEY": self.api_key}

        if params is None:
            params = {}

        if signed:
            params["timestamp"] = int(time.time() * 1000)
            params["signature"] = self._sign(params)

        async with self.session.request(method, url, params=params, headers=headers) as resp:
            data = await resp.json()
            if resp.status != 200:
                raise Exception(f"API Error: {data}")
            return data

    # ========== 行情接口 ==========

    async def get_klines(self, symbol: str, interval: str, limit: int = 500) -> List[dict]:
        """获取 K 线数据"""
        data = await self._request("GET", "/fapi/v1/klines", {
            "symbol": symbol,
            "interval": interval,
            "limit": limit
        })

        return [{
            "open_time": k[0],
            "open": float(k[1]),
            "high": float(k[2]),
            "low": float(k[3]),
            "close": float(k[4]),
            "volume": float(k[5]),
            "close_time": k[6],
            "quote_volume": float(k[7]),
            "trades": k[8],
            "taker_buy_volume": float(k[9]),
            "taker_buy_quote_volume": float(k[10])
        } for k in data]

    async def get_ticker(self, symbol: str) -> dict:
        """获取最新价格"""
        return await self._request("GET", "/fapi/v1/ticker/price", {"symbol": symbol})

    async def get_orderbook(self, symbol: str, limit: int = 20) -> dict:
        """获取订单簿"""
        return await self._request("GET", "/fapi/v1/depth", {
            "symbol": symbol,
            "limit": limit
        })

    async def get_funding_rate(self, symbol: str) -> dict:
        """获取资金费率"""
        return await self._request("GET", "/fapi/v1/fundingRate", {
            "symbol": symbol,
            "limit": 1
        })

    async def get_open_interest(self, symbol: str) -> dict:
        """获取持仓量"""
        return await self._request("GET", "/fapi/v1/openInterest", {"symbol": symbol})

    # ========== 交易接口 ==========

    async def place_order(
        self,
        symbol: str,
        side: str,
        order_type: str,
        quantity: float,
        price: float = None,
        stop_price: float = None,
        reduce_only: bool = False,
        time_in_force: str = "GTC"
    ) -> dict:
        """下单"""
        params = {
            "symbol": symbol,
            "side": side,
            "type": order_type,
            "quantity": quantity,
        }

        if price:
            params["price"] = price
        if stop_price:
            params["stopPrice"] = stop_price
        if reduce_only:
            params["reduceOnly"] = "true"
        if order_type == "LIMIT":
            params["timeInForce"] = time_in_force

        return await self._request("POST", "/fapi/v1/order", params, signed=True)

    async def cancel_order(self, symbol: str, order_id: int) -> dict:
        """撤销订单"""
        return await self._request("DELETE", "/fapi/v1/order", {
            "symbol": symbol,
            "orderId": order_id
        }, signed=True)

    async def get_position(self, symbol: str = None) -> List[dict]:
        """获取持仓"""
        data = await self._request("GET", "/fapi/v2/positionRisk", {}, signed=True)
        if symbol:
            return [p for p in data if p["symbol"] == symbol]
        return data

    async def get_account(self) -> dict:
        """获取账户信息"""
        return await self._request("GET", "/fapi/v2/account", {}, signed=True)

    async def set_leverage(self, symbol: str, leverage: int) -> dict:
        """设置杠杆"""
        return await self._request("POST", "/fapi/v1/leverage", {
            "symbol": symbol,
            "leverage": leverage
        }, signed=True)

    async def close(self):
        """关闭连接"""
        if self.session:
            await self.session.close()

WebSocket 实时数据

import websockets
import json
import asyncio
from typing import Callable, Dict

class BinanceFuturesWebSocket:
    """币安合约 WebSocket 客户端"""

    WS_URL = "wss://fstream.binance.com/ws"

    def __init__(self):
        self.ws = None
        self.callbacks: Dict[str, Callable] = {}
        self.running = False

    async def connect(self):
        """建立连接"""
        self.ws = await websockets.connect(self.WS_URL)
        self.running = True

    async def subscribe_kline(self, symbol: str, interval: str, callback: Callable):
        """订阅 K 线"""
        stream = f"{symbol.lower()}@kline_{interval}"
        self.callbacks[stream] = callback

        await self.ws.send(json.dumps({
            "method": "SUBSCRIBE",
            "params": [stream],
            "id": 1
        }))

    async def subscribe_ticker(self, symbol: str, callback: Callable):
        """订阅 Ticker"""
        stream = f"{symbol.lower()}@ticker"
        self.callbacks[stream] = callback

        await self.ws.send(json.dumps({
            "method": "SUBSCRIBE",
            "params": [stream],
            "id": 2
        }))

    async def subscribe_depth(self, symbol: str, callback: Callable, level: int = 20):
        """订阅深度"""
        stream = f"{symbol.lower()}@depth{level}@100ms"
        self.callbacks[stream] = callback

        await self.ws.send(json.dumps({
            "method": "SUBSCRIBE",
            "params": [stream],
            "id": 3
        }))

    async def subscribe_agg_trade(self, symbol: str, callback: Callable):
        """订阅聚合成交"""
        stream = f"{symbol.lower()}@aggTrade"
        self.callbacks[stream] = callback

        await self.ws.send(json.dumps({
            "method": "SUBSCRIBE",
            "params": [stream],
            "id": 4
        }))

    async def listen(self):
        """监听消息"""
        while self.running:
            try:
                msg = await asyncio.wait_for(self.ws.recv(), timeout=30)
                data = json.loads(msg)

                # 处理订阅确认
                if "result" in data:
                    continue

                # 处理数据
                stream = data.get("stream") or data.get("e")
                if stream in self.callbacks:
                    await self.callbacks[stream](data)

            except asyncio.TimeoutError:
                # 发送 ping 保持连接
                await self.ws.ping()
            except Exception as e:
                print(f"WebSocket error: {e}")
                break

    async def close(self):
        """关闭连接"""
        self.running = False
        if self.ws:
            await self.ws.close()


# 使用示例
async def main():
    ws = BinanceFuturesWebSocket()
    await ws.connect()

    async def on_kline(data):
        kline = data["k"]
        print(f"K线: {kline['s']} {kline['i']} Close: {kline['c']}")

    async def on_ticker(data):
        print(f"Ticker: {data['s']} Price: {data['c']} Change: {data['P']}%")

    await ws.subscribe_kline("BTCUSDT", "1m", on_kline)
    await ws.subscribe_ticker("BTCUSDT", on_ticker)

    await ws.listen()

# asyncio.run(main())

常见量化策略

1. 趋势跟踪策略

import numpy as np
from typing import Optional, List

class TrendFollowingStrategy(Strategy):
    """
    趋势跟踪策略

    原理:
    - 使用移动平均线判断趋势方向
    - 快线上穿慢线做多,下穿做空
    - 结合 ADX 过滤震荡行情
    """

    def __init__(
        self,
        fast_period: int = 10,
        slow_period: int = 30,
        adx_period: int = 14,
        adx_threshold: float = 25
    ):
        self.fast_period = fast_period
        self.slow_period = slow_period
        self.adx_period = adx_period
        self.adx_threshold = adx_threshold
        self.klines: Dict[str, List[dict]] = {}

    def _calculate_ema(self, prices: List[float], period: int) -> float:
        """计算 EMA"""
        if len(prices) < period:
            return prices[-1]

        multiplier = 2 / (period + 1)
        ema = prices[0]
        for price in prices[1:]:
            ema = (price - ema) * multiplier + ema
        return ema

    def _calculate_adx(self, klines: List[dict], period: int) -> float:
        """计算 ADX (平均趋向指数)"""
        if len(klines) < period + 1:
            return 0

        # 计算 +DM 和 -DM
        plus_dm = []
        minus_dm = []
        tr_list = []

        for i in range(1, len(klines)):
            high = klines[i]["high"]
            low = klines[i]["low"]
            prev_high = klines[i-1]["high"]
            prev_low = klines[i-1]["low"]
            prev_close = klines[i-1]["close"]

            # True Range
            tr = max(high - low, abs(high - prev_close), abs(low - prev_close))
            tr_list.append(tr)

            # +DM, -DM
            up_move = high - prev_high
            down_move = prev_low - low

            if up_move > down_move and up_move > 0:
                plus_dm.append(up_move)
            else:
                plus_dm.append(0)

            if down_move > up_move and down_move > 0:
                minus_dm.append(down_move)
            else:
                minus_dm.append(0)

        # 平滑处理
        atr = self._calculate_ema(tr_list, period)
        plus_di = 100 * self._calculate_ema(plus_dm, period) / atr if atr > 0 else 0
        minus_di = 100 * self._calculate_ema(minus_dm, period) / atr if atr > 0 else 0

        # 计算 DX 和 ADX
        di_diff = abs(plus_di - minus_di)
        di_sum = plus_di + minus_di
        dx = 100 * di_diff / di_sum if di_sum > 0 else 0

        return dx  # 简化,实际应计算 ADX (DX 的 EMA)

    def on_kline(self, symbol: str, kline: dict) -> Optional[Signal]:
        """K线更新"""
        if symbol not in self.klines:
            self.klines[symbol] = []

        self.klines[symbol].append(kline)

        # 保留足够的历史数据
        max_period = max(self.slow_period, self.adx_period) + 10
        if len(self.klines[symbol]) > max_period * 2:
            self.klines[symbol] = self.klines[symbol][-max_period:]

        if len(self.klines[symbol]) < self.slow_period:
            return None

        # 计算指标
        closes = [k["close"] for k in self.klines[symbol]]
        fast_ema = self._calculate_ema(closes, self.fast_period)
        slow_ema = self._calculate_ema(closes, self.slow_period)
        adx = self._calculate_adx(self.klines[symbol], self.adx_period)

        # 计算前一根 K 线的 EMA
        prev_closes = closes[:-1]
        prev_fast_ema = self._calculate_ema(prev_closes, self.fast_period)
        prev_slow_ema = self._calculate_ema(prev_closes, self.slow_period)

        # 生成信号
        signal = None

        # ADX 过滤震荡
        if adx < self.adx_threshold:
            return None

        # 金叉做多
        if prev_fast_ema <= prev_slow_ema and fast_ema > slow_ema:
            signal = Signal(
                symbol=symbol,
                side=Side.LONG,
                strength=min(1.0, adx / 50),
                price=kline["close"],
                timestamp=kline["close_time"],
                reason=f"Golden cross, ADX={adx:.2f}"
            )

        # 死叉做空
        elif prev_fast_ema >= prev_slow_ema and fast_ema < slow_ema:
            signal = Signal(
                symbol=symbol,
                side=Side.SHORT,
                strength=min(1.0, adx / 50),
                price=kline["close"],
                timestamp=kline["close_time"],
                reason=f"Death cross, ADX={adx:.2f}"
            )

        return signal

    def on_tick(self, symbol: str, tick: dict) -> Optional[Signal]:
        return None

2. 均值回归策略

class MeanReversionStrategy(Strategy):
    """
    均值回归策略

    原理:
    - 价格总会回归到均值附近
    - 使用布林带判断超买超卖
    - 价格触及上轨做空,触及下轨做多
    """

    def __init__(
        self,
        period: int = 20,
        std_dev: float = 2.0,
        rsi_period: int = 14,
        rsi_oversold: float = 30,
        rsi_overbought: float = 70
    ):
        self.period = period
        self.std_dev = std_dev
        self.rsi_period = rsi_period
        self.rsi_oversold = rsi_oversold
        self.rsi_overbought = rsi_overbought
        self.klines: Dict[str, List[dict]] = {}

    def _calculate_bollinger_bands(self, prices: List[float]) -> tuple:
        """计算布林带"""
        if len(prices) < self.period:
            return None, None, None

        recent_prices = prices[-self.period:]
        middle = np.mean(recent_prices)
        std = np.std(recent_prices)

        upper = middle + self.std_dev * std
        lower = middle - self.std_dev * std

        return upper, middle, lower

    def _calculate_rsi(self, prices: List[float]) -> float:
        """计算 RSI"""
        if len(prices) < self.rsi_period + 1:
            return 50

        changes = [prices[i] - prices[i-1] for i in range(1, len(prices))]
        recent_changes = changes[-self.rsi_period:]

        gains = [c if c > 0 else 0 for c in recent_changes]
        losses = [-c if c < 0 else 0 for c in recent_changes]

        avg_gain = np.mean(gains)
        avg_loss = np.mean(losses)

        if avg_loss == 0:
            return 100

        rs = avg_gain / avg_loss
        rsi = 100 - (100 / (1 + rs))

        return rsi

    def on_kline(self, symbol: str, kline: dict) -> Optional[Signal]:
        """K线更新"""
        if symbol not in self.klines:
            self.klines[symbol] = []

        self.klines[symbol].append(kline)

        # 保留历史数据
        if len(self.klines[symbol]) > self.period * 3:
            self.klines[symbol] = self.klines[symbol][-self.period * 2:]

        if len(self.klines[symbol]) < self.period:
            return None

        closes = [k["close"] for k in self.klines[symbol]]
        current_price = closes[-1]

        # 计算指标
        upper, middle, lower = self._calculate_bollinger_bands(closes)
        rsi = self._calculate_rsi(closes)

        signal = None

        # 价格触及下轨 + RSI 超卖 → 做多
        if current_price <= lower and rsi < self.rsi_oversold:
            # 计算距离均值的偏离程度作为信号强度
            deviation = (middle - current_price) / middle
            signal = Signal(
                symbol=symbol,
                side=Side.LONG,
                strength=min(1.0, deviation * 10),
                price=current_price,
                timestamp=kline["close_time"],
                reason=f"Oversold, RSI={rsi:.2f}, Below BB lower"
            )

        # 价格触及上轨 + RSI 超买 → 做空
        elif current_price >= upper and rsi > self.rsi_overbought:
            deviation = (current_price - middle) / middle
            signal = Signal(
                symbol=symbol,
                side=Side.SHORT,
                strength=min(1.0, deviation * 10),
                price=current_price,
                timestamp=kline["close_time"],
                reason=f"Overbought, RSI={rsi:.2f}, Above BB upper"
            )

        return signal

    def on_tick(self, symbol: str, tick: dict) -> Optional[Signal]:
        return None

3. 网格交易策略

from decimal import Decimal, ROUND_DOWN
from typing import Dict, List

class GridTradingStrategy:
    """
    网格交易策略

    原理:
    - 在价格区间内设置多个买卖网格
    - 价格下跌触发买入,上涨触发卖出
    - 适合震荡行情,持续赚取网格利润
    """

    def __init__(
        self,
        symbol: str,
        upper_price: float,
        lower_price: float,
        grid_count: int,
        total_investment: float,
        leverage: int = 1
    ):
        self.symbol = symbol
        self.upper_price = Decimal(str(upper_price))
        self.lower_price = Decimal(str(lower_price))
        self.grid_count = grid_count
        self.total_investment = Decimal(str(total_investment))
        self.leverage = leverage

        # 计算网格
        self.grid_prices = self._calculate_grid_prices()
        self.grid_size = self._calculate_grid_size()

        # 订单状态
        self.buy_orders: Dict[str, dict] = {}   # price -> order
        self.sell_orders: Dict[str, dict] = {}

    def _calculate_grid_prices(self) -> List[Decimal]:
        """计算网格价格"""
        price_range = self.upper_price - self.lower_price
        grid_interval = price_range / self.grid_count

        prices = []
        for i in range(self.grid_count + 1):
            price = self.lower_price + grid_interval * i
            prices.append(price.quantize(Decimal('0.01'), rounding=ROUND_DOWN))

        return prices

    def _calculate_grid_size(self) -> Decimal:
        """计算每格仓位大小"""
        # 每格投入金额
        per_grid_investment = self.total_investment / self.grid_count
        # 按中间价格计算数量
        mid_price = (self.upper_price + self.lower_price) / 2
        size = (per_grid_investment * self.leverage) / mid_price
        return size.quantize(Decimal('0.001'), rounding=ROUND_DOWN)

    def initialize_orders(self, current_price: float) -> List[Order]:
        """初始化网格订单"""
        orders = []
        current = Decimal(str(current_price))

        for price in self.grid_prices:
            if price < current:
                # 低于当前价,挂买单
                orders.append(Order(
                    symbol=self.symbol,
                    side=Side.LONG,
                    order_type=OrderType.LIMIT,
                    quantity=float(self.grid_size),
                    price=float(price)
                ))
            elif price > current:
                # 高于当前价,挂卖单
                orders.append(Order(
                    symbol=self.symbol,
                    side=Side.SHORT,
                    order_type=OrderType.LIMIT,
                    quantity=float(self.grid_size),
                    price=float(price)
                ))

        return orders

    def on_order_filled(self, filled_order: dict) -> Optional[Order]:
        """
        订单成交回调

        买单成交后,在上一格挂卖单
        卖单成交后,在下一格挂买单
        """
        filled_price = Decimal(str(filled_order["price"]))
        filled_side = filled_order["side"]

        # 找到成交的网格位置
        grid_index = None
        for i, price in enumerate(self.grid_prices):
            if abs(price - filled_price) < Decimal('0.01'):
                grid_index = i
                break

        if grid_index is None:
            return None

        new_order = None

        if filled_side == "BUY":
            # 买单成交,在上一格挂卖单
            if grid_index < len(self.grid_prices) - 1:
                sell_price = self.grid_prices[grid_index + 1]
                new_order = Order(
                    symbol=self.symbol,
                    side=Side.SHORT,
                    order_type=OrderType.LIMIT,
                    quantity=float(self.grid_size),
                    price=float(sell_price),
                    reduce_only=True
                )

        else:  # SELL
            # 卖单成交,在下一格挂买单
            if grid_index > 0:
                buy_price = self.grid_prices[grid_index - 1]
                new_order = Order(
                    symbol=self.symbol,
                    side=Side.LONG,
                    order_type=OrderType.LIMIT,
                    quantity=float(self.grid_size),
                    price=float(buy_price)
                )

        return new_order

    def get_grid_info(self) -> dict:
        """获取网格信息"""
        return {
            "symbol": self.symbol,
            "upper_price": float(self.upper_price),
            "lower_price": float(self.lower_price),
            "grid_count": self.grid_count,
            "grid_size": float(self.grid_size),
            "grid_prices": [float(p) for p in self.grid_prices],
            "price_interval": float(self.grid_prices[1] - self.grid_prices[0])
        }

4. 资金费率套利策略

class FundingRateArbitrage:
    """
    资金费率套利策略

    原理:
    - 永续合约每 8 小时结算一次资金费率
    - 正费率时,多头付给空头
    - 负费率时,空头付给多头
    - 通过现货对冲,无风险收取资金费率
    """

    def __init__(
        self,
        min_funding_rate: float = 0.0005,  # 最小费率阈值 (0.05%)
        position_size_usd: float = 10000,
        leverage: int = 1
    ):
        self.min_funding_rate = min_funding_rate
        self.position_size_usd = position_size_usd
        self.leverage = leverage
        self.positions: Dict[str, dict] = {}

    async def scan_opportunities(self, client: BinanceFuturesClient) -> List[dict]:
        """扫描套利机会"""
        opportunities = []

        # 获取所有交易对的资金费率
        funding_rates = await client._request("GET", "/fapi/v1/premiumIndex")

        for item in funding_rates:
            symbol = item["symbol"]
            funding_rate = float(item["lastFundingRate"])

            # 过滤低费率
            if abs(funding_rate) < self.min_funding_rate:
                continue

            # 计算年化收益
            annual_return = funding_rate * 3 * 365 * 100  # 每天 3 次

            opportunities.append({
                "symbol": symbol,
                "funding_rate": funding_rate,
                "annual_return": annual_return,
                "next_funding_time": item["nextFundingTime"],
                "direction": "SHORT" if funding_rate > 0 else "LONG"
            })

        # 按费率绝对值排序
        opportunities.sort(key=lambda x: abs(x["funding_rate"]), reverse=True)

        return opportunities

    def calculate_position(self, symbol: str, current_price: float) -> dict:
        """计算开仓参数"""
        quantity = self.position_size_usd / current_price

        return {
            "symbol": symbol,
            "quantity": quantity,
            "leverage": self.leverage,
            "spot_quantity": quantity,  # 现货对冲数量
            "futures_quantity": quantity
        }

    async def open_arbitrage_position(
        self,
        futures_client: BinanceFuturesClient,
        spot_client,  # 现货客户端
        opportunity: dict
    ) -> dict:
        """
        开启套利仓位

        正费率:
        - 现货买入
        - 合约做空

        负费率:
        - 现货卖出 (需要有持仓)
        - 合约做多
        """
        symbol = opportunity["symbol"]
        ticker = await futures_client.get_ticker(symbol)
        current_price = float(ticker["price"])

        position = self.calculate_position(symbol, current_price)

        if opportunity["funding_rate"] > 0:
            # 正费率: 做空合约 + 买入现货
            # 1. 买入现货
            # await spot_client.buy(symbol, position["spot_quantity"])

            # 2. 做空合约
            await futures_client.place_order(
                symbol=symbol,
                side="SELL",
                order_type="MARKET",
                quantity=position["futures_quantity"]
            )

        else:
            # 负费率: 做多合约 + 卖出现货
            # 需要先有现货持仓
            await futures_client.place_order(
                symbol=symbol,
                side="BUY",
                order_type="MARKET",
                quantity=position["futures_quantity"]
            )

        self.positions[symbol] = {
            "opportunity": opportunity,
            "position": position,
            "open_time": time.time()
        }

        return position

    def calculate_pnl(self, symbol: str, current_price: float, funding_collected: float) -> dict:
        """计算套利收益"""
        if symbol not in self.positions:
            return None

        pos = self.positions[symbol]
        # 理论上价格变动带来的盈亏会被对冲
        # 实际收益 = 资金费率收入 - 交易手续费 - 借贷利息

        return {
            "symbol": symbol,
            "funding_income": funding_collected,
            "position_pnl": 0,  # 对冲后理论为 0
            "total_pnl": funding_collected
        }

风险控制

仓位管理

class PositionSizer:
    """
    仓位计算器

    核心原则:
    1. 单笔最大亏损不超过账户的 X%
    2. 总仓位不超过账户的 Y%
    3. 单币种仓位不超过 Z%
    """

    def __init__(
        self,
        max_risk_per_trade: float = 0.02,   # 单笔最大风险 2%
        max_total_exposure: float = 0.5,     # 最大总敞口 50%
        max_single_exposure: float = 0.2,    # 单币种最大敞口 20%
        max_leverage: int = 10
    ):
        self.max_risk_per_trade = max_risk_per_trade
        self.max_total_exposure = max_total_exposure
        self.max_single_exposure = max_single_exposure
        self.max_leverage = max_leverage

    def calculate_position_size(
        self,
        account_balance: float,
        entry_price: float,
        stop_loss_price: float,
        leverage: int = 1
    ) -> float:
        """
        基于风险计算仓位大小

        公式:
        仓位 = (账户余额 × 最大风险比例) / (入场价 - 止损价) / 杠杆
        """
        # 风险金额
        risk_amount = account_balance * self.max_risk_per_trade

        # 止损距离 (百分比)
        stop_distance = abs(entry_price - stop_loss_price) / entry_price

        if stop_distance == 0:
            return 0

        # 仓位价值
        position_value = risk_amount / stop_distance

        # 数量
        quantity = position_value / entry_price

        # 杠杆限制
        leverage = min(leverage, self.max_leverage)

        # 最大敞口限制
        max_position_value = account_balance * self.max_single_exposure * leverage
        max_quantity = max_position_value / entry_price

        return min(quantity, max_quantity)

    def validate_position(
        self,
        account_balance: float,
        current_positions: List[Position],
        new_position: Position
    ) -> tuple:
        """验证新仓位是否符合风控规则"""
        errors = []

        # 计算当前总敞口
        current_exposure = sum(
            p.size * p.entry_price for p in current_positions
        )
        new_exposure = new_position.size * new_position.entry_price
        total_exposure = current_exposure + new_exposure

        # 检查总敞口
        if total_exposure > account_balance * self.max_total_exposure * new_position.leverage:
            errors.append(f"总敞口超限: {total_exposure:.2f} > {account_balance * self.max_total_exposure:.2f}")

        # 检查单币种敞口
        symbol_exposure = sum(
            p.size * p.entry_price
            for p in current_positions
            if p.symbol == new_position.symbol
        ) + new_exposure

        max_symbol = account_balance * self.max_single_exposure * new_position.leverage
        if symbol_exposure > max_symbol:
            errors.append(f"单币种敞口超限: {symbol_exposure:.2f} > {max_symbol:.2f}")

        # 检查杠杆
        if new_position.leverage > self.max_leverage:
            errors.append(f"杠杆超限: {new_position.leverage} > {self.max_leverage}")

        return len(errors) == 0, errors

止损止盈

class StopLossManager:
    """
    止损止盈管理器

    止损类型:
    1. 固定止损 - 固定价格或百分比
    2. 追踪止损 - 随价格移动
    3. 时间止损 - 超时平仓
    4. ATR 止损 - 基于波动率
    """

    def __init__(
        self,
        default_stop_loss_pct: float = 0.02,    # 默认止损 2%
        default_take_profit_pct: float = 0.04,  # 默认止盈 4%
        trailing_stop_pct: float = 0.01,        # 追踪止损 1%
        max_holding_hours: int = 24             # 最大持仓时间
    ):
        self.default_stop_loss_pct = default_stop_loss_pct
        self.default_take_profit_pct = default_take_profit_pct
        self.trailing_stop_pct = trailing_stop_pct
        self.max_holding_hours = max_holding_hours

        self.trailing_stops: Dict[str, float] = {}  # symbol -> highest/lowest price

    def calculate_stop_loss(
        self,
        entry_price: float,
        side: Side,
        method: str = "fixed"
    ) -> float:
        """计算止损价格"""
        if method == "fixed":
            if side == Side.LONG:
                return entry_price * (1 - self.default_stop_loss_pct)
            else:
                return entry_price * (1 + self.default_stop_loss_pct)

        # 其他方法可以扩展...
        return entry_price * (1 - self.default_stop_loss_pct)

    def calculate_take_profit(
        self,
        entry_price: float,
        side: Side
    ) -> float:
        """计算止盈价格"""
        if side == Side.LONG:
            return entry_price * (1 + self.default_take_profit_pct)
        else:
            return entry_price * (1 - self.default_take_profit_pct)

    def update_trailing_stop(
        self,
        symbol: str,
        current_price: float,
        side: Side,
        entry_price: float
    ) -> Optional[float]:
        """
        更新追踪止损

        多头: 追踪最高价,止损在最高价下方 X%
        空头: 追踪最低价,止损在最低价上方 X%
        """
        key = f"{symbol}_{side.value}"

        if side == Side.LONG:
            # 更新最高价
            if key not in self.trailing_stops:
                self.trailing_stops[key] = entry_price

            if current_price > self.trailing_stops[key]:
                self.trailing_stops[key] = current_price

            # 计算追踪止损价
            stop_price = self.trailing_stops[key] * (1 - self.trailing_stop_pct)

            # 只有盈利后才启用追踪止损
            if self.trailing_stops[key] > entry_price * (1 + self.trailing_stop_pct):
                return stop_price

        else:  # SHORT
            if key not in self.trailing_stops:
                self.trailing_stops[key] = entry_price

            if current_price < self.trailing_stops[key]:
                self.trailing_stops[key] = current_price

            stop_price = self.trailing_stops[key] * (1 + self.trailing_stop_pct)

            if self.trailing_stops[key] < entry_price * (1 - self.trailing_stop_pct):
                return stop_price

        return None

    def check_time_stop(self, position: Position, current_time: int) -> bool:
        """检查时间止损"""
        holding_hours = (current_time - position.entry_time) / 3600
        return holding_hours > self.max_holding_hours

    def create_stop_orders(
        self,
        position: Position,
        stop_loss: float,
        take_profit: float
    ) -> List[Order]:
        """创建止损止盈订单"""
        orders = []

        # 止损单
        orders.append(Order(
            symbol=position.symbol,
            side=Side.SHORT if position.side == Side.LONG else Side.LONG,
            order_type=OrderType.STOP_MARKET,
            quantity=position.size,
            stop_price=stop_loss,
            reduce_only=True
        ))

        # 止盈单
        orders.append(Order(
            symbol=position.symbol,
            side=Side.SHORT if position.side == Side.LONG else Side.LONG,
            order_type=OrderType.TAKE_PROFIT_MARKET,
            quantity=position.size,
            stop_price=take_profit,
            reduce_only=True
        ))

        return orders

回撤控制

class DrawdownController:
    """
    回撤控制器

    功能:
    1. 监控账户回撤
    2. 达到阈值时降低仓位或暂停交易
    3. 恢复后逐步放开限制
    """

    def __init__(
        self,
        max_daily_drawdown: float = 0.05,    # 日最大回撤 5%
        max_total_drawdown: float = 0.15,    # 总最大回撤 15%
        recovery_threshold: float = 0.5       # 回撤恢复阈值
    ):
        self.max_daily_drawdown = max_daily_drawdown
        self.max_total_drawdown = max_total_drawdown
        self.recovery_threshold = recovery_threshold

        self.peak_balance = 0
        self.daily_start_balance = 0
        self.trading_enabled = True
        self.position_scale = 1.0  # 仓位缩放比例

    def update(self, current_balance: float) -> dict:
        """更新状态"""
        # 更新峰值
        if current_balance > self.peak_balance:
            self.peak_balance = current_balance

        # 计算回撤
        total_drawdown = (self.peak_balance - current_balance) / self.peak_balance
        daily_drawdown = (self.daily_start_balance - current_balance) / self.daily_start_balance if self.daily_start_balance > 0 else 0

        status = {
            "current_balance": current_balance,
            "peak_balance": self.peak_balance,
            "total_drawdown": total_drawdown,
            "daily_drawdown": daily_drawdown,
            "trading_enabled": self.trading_enabled,
            "position_scale": self.position_scale
        }

        # 检查回撤阈值
        if total_drawdown >= self.max_total_drawdown:
            self.trading_enabled = False
            self.position_scale = 0
            status["alert"] = "CRITICAL: Max total drawdown reached, trading disabled"

        elif daily_drawdown >= self.max_daily_drawdown:
            self.trading_enabled = False
            self.position_scale = 0
            status["alert"] = "WARNING: Max daily drawdown reached, trading paused"

        elif total_drawdown >= self.max_total_drawdown * 0.5:
            # 回撤超过一半,降低仓位
            self.position_scale = 0.5
            status["warning"] = "Reducing position size due to drawdown"

        else:
            # 检查恢复
            if not self.trading_enabled:
                recovery = (current_balance - (self.peak_balance * (1 - self.max_total_drawdown))) / \
                          (self.peak_balance * self.max_total_drawdown)
                if recovery >= self.recovery_threshold:
                    self.trading_enabled = True
                    self.position_scale = 0.5
                    status["info"] = "Trading resumed with reduced position"

        return status

    def reset_daily(self, current_balance: float):
        """每日重置"""
        self.daily_start_balance = current_balance

    def get_adjusted_quantity(self, original_quantity: float) -> float:
        """获取调整后的仓位"""
        return original_quantity * self.position_scale

实战技巧

1. API 请求优化

import asyncio
from collections import deque
import time

class RateLimiter:
    """
    请求频率限制器

    币安 API 限制:
    - 每分钟 1200 次请求权重
    - 每秒 10 次订单
    - 每天 200,000 次订单
    """

    def __init__(
        self,
        requests_per_minute: int = 1200,
        orders_per_second: int = 10,
        orders_per_day: int = 200000
    ):
        self.rpm = requests_per_minute
        self.ops = orders_per_second
        self.opd = orders_per_day

        self.request_times = deque()
        self.order_times = deque()
        self.daily_orders = 0
        self.daily_reset_time = time.time()

    async def wait_for_request(self, weight: int = 1):
        """等待请求配额"""
        now = time.time()

        # 清理过期记录
        while self.request_times and self.request_times[0] < now - 60:
            self.request_times.popleft()

        # 检查是否需要等待
        current_weight = sum(1 for _ in self.request_times) * weight
        if current_weight >= self.rpm:
            wait_time = 60 - (now - self.request_times[0])
            if wait_time > 0:
                await asyncio.sleep(wait_time)

        self.request_times.append(now)

    async def wait_for_order(self):
        """等待订单配额"""
        now = time.time()

        # 重置每日计数
        if now - self.daily_reset_time > 86400:
            self.daily_orders = 0
            self.daily_reset_time = now

        # 检查每日限制
        if self.daily_orders >= self.opd:
            raise Exception("Daily order limit reached")

        # 清理过期记录
        while self.order_times and self.order_times[0] < now - 1:
            self.order_times.popleft()

        # 检查每秒限制
        if len(self.order_times) >= self.ops:
            wait_time = 1 - (now - self.order_times[0])
            if wait_time > 0:
                await asyncio.sleep(wait_time)

        self.order_times.append(now)
        self.daily_orders += 1

2. 订单执行优化

class SmartOrderExecutor:
    """
    智能订单执行器

    功能:
    1. 大单拆分
    2. 冰山订单
    3. TWAP/VWAP
    """

    def __init__(self, client: BinanceFuturesClient):
        self.client = client

    async def execute_twap(
        self,
        symbol: str,
        side: str,
        total_quantity: float,
        duration_minutes: int = 10,
        slices: int = 10
    ) -> List[dict]:
        """
        TWAP (时间加权平均价格) 执行

        将大单拆分为多个小单,在指定时间内均匀执行
        """
        results = []
        slice_quantity = total_quantity / slices
        interval_seconds = (duration_minutes * 60) / slices

        for i in range(slices):
            try:
                result = await self.client.place_order(
                    symbol=symbol,
                    side=side,
                    order_type="MARKET",
                    quantity=slice_quantity
                )
                results.append(result)

                if i < slices - 1:
                    await asyncio.sleep(interval_seconds)

            except Exception as e:
                print(f"TWAP slice {i+1} failed: {e}")

        return results

    async def execute_iceberg(
        self,
        symbol: str,
        side: str,
        total_quantity: float,
        visible_quantity: float,
        price: float
    ) -> List[dict]:
        """
        冰山订单执行

        只显示部分数量,成交后自动补充
        """
        results = []
        remaining = total_quantity

        while remaining > 0:
            slice_qty = min(visible_quantity, remaining)

            result = await self.client.place_order(
                symbol=symbol,
                side=side,
                order_type="LIMIT",
                quantity=slice_qty,
                price=price
            )

            # 等待成交
            order_id = result["orderId"]
            while True:
                await asyncio.sleep(1)
                order = await self.client._request(
                    "GET", "/fapi/v1/order",
                    {"symbol": symbol, "orderId": order_id},
                    signed=True
                )

                if order["status"] == "FILLED":
                    results.append(order)
                    remaining -= float(order["executedQty"])
                    break
                elif order["status"] in ["CANCELED", "REJECTED", "EXPIRED"]:
                    break

        return results

    async def execute_with_slippage_control(
        self,
        symbol: str,
        side: str,
        quantity: float,
        max_slippage_pct: float = 0.001
    ) -> dict:
        """
        带滑点控制的执行

        检查订单簿深度,避免过大滑点
        """
        # 获取订单簿
        orderbook = await self.client.get_orderbook(symbol, limit=20)

        if side == "BUY":
            asks = orderbook["asks"]
            # 计算买入 quantity 的平均价格
            total_cost = 0
            filled = 0
            for ask in asks:
                price = float(ask[0])
                size = float(ask[1])

                if filled + size >= quantity:
                    total_cost += (quantity - filled) * price
                    filled = quantity
                    break
                else:
                    total_cost += size * price
                    filled += size

            avg_price = total_cost / filled if filled > 0 else 0
            best_price = float(asks[0][0])

        else:  # SELL
            bids = orderbook["bids"]
            total_value = 0
            filled = 0
            for bid in bids:
                price = float(bid[0])
                size = float(bid[1])

                if filled + size >= quantity:
                    total_value += (quantity - filled) * price
                    filled = quantity
                    break
                else:
                    total_value += size * price
                    filled += size

            avg_price = total_value / filled if filled > 0 else 0
            best_price = float(bids[0][0])

        # 检查滑点
        slippage = abs(avg_price - best_price) / best_price
        if slippage > max_slippage_pct:
            raise Exception(f"Slippage too high: {slippage:.4%} > {max_slippage_pct:.4%}")

        # 执行订单
        return await self.client.place_order(
            symbol=symbol,
            side=side,
            order_type="MARKET",
            quantity=quantity
        )

3. 日志与监控

import logging
from datetime import datetime
import json

class TradingLogger:
    """交易日志管理器"""

    def __init__(self, log_file: str = "trading.log"):
        self.logger = logging.getLogger("trading")
        self.logger.setLevel(logging.DEBUG)

        # 文件处理器
        fh = logging.FileHandler(log_file)
        fh.setLevel(logging.DEBUG)

        # 控制台处理器
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)

        # 格式
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        fh.setFormatter(formatter)
        ch.setFormatter(formatter)

        self.logger.addHandler(fh)
        self.logger.addHandler(ch)

        # 交易记录
        self.trades = []

    def log_signal(self, signal: Signal):
        """记录信号"""
        self.logger.info(f"SIGNAL: {signal.symbol} {signal.side.value} "
                        f"strength={signal.strength:.2f} price={signal.price} "
                        f"reason={signal.reason}")

    def log_order(self, order: Order, result: dict):
        """记录订单"""
        self.logger.info(f"ORDER: {order.symbol} {order.side.value} "
                        f"type={order.order_type.value} qty={order.quantity} "
                        f"orderId={result.get('orderId')}")

    def log_trade(self, trade: dict):
        """记录成交"""
        self.trades.append(trade)
        self.logger.info(f"TRADE: {json.dumps(trade)}")

    def log_position(self, position: Position):
        """记录持仓"""
        self.logger.info(f"POSITION: {position.symbol} {position.side.value} "
                        f"size={position.size} entry={position.entry_price} "
                        f"pnl={position.unrealized_pnl}")

    def log_error(self, error: str, exc_info=False):
        """记录错误"""
        self.logger.error(f"ERROR: {error}", exc_info=exc_info)

    def get_daily_summary(self) -> dict:
        """获取每日汇总"""
        today = datetime.now().strftime("%Y-%m-%d")
        today_trades = [t for t in self.trades if t.get("date", "").startswith(today)]

        total_pnl = sum(t.get("pnl", 0) for t in today_trades)
        win_trades = [t for t in today_trades if t.get("pnl", 0) > 0]
        lose_trades = [t for t in today_trades if t.get("pnl", 0) < 0]

        return {
            "date": today,
            "total_trades": len(today_trades),
            "win_trades": len(win_trades),
            "lose_trades": len(lose_trades),
            "win_rate": len(win_trades) / len(today_trades) if today_trades else 0,
            "total_pnl": total_pnl,
            "avg_pnl": total_pnl / len(today_trades) if today_trades else 0
        }

4. 回测框架

from dataclasses import dataclass, field
from typing import List, Dict
import pandas as pd

@dataclass
class BacktestResult:
    """回测结果"""
    total_return: float
    annual_return: float
    max_drawdown: float
    sharpe_ratio: float
    win_rate: float
    profit_factor: float
    total_trades: int
    equity_curve: List[float] = field(default_factory=list)
    trades: List[dict] = field(default_factory=list)

class Backtester:
    """
    回测引擎

    功能:
    1. 历史数据回放
    2. 策略评估
    3. 性能统计
    """

    def __init__(
        self,
        initial_capital: float = 10000,
        commission: float = 0.0004,  # 手续费率
        slippage: float = 0.0001     # 滑点
    ):
        self.initial_capital = initial_capital
        self.commission = commission
        self.slippage = slippage

    def run(
        self,
        strategy: Strategy,
        data: pd.DataFrame,
        symbol: str
    ) -> BacktestResult:
        """运行回测"""
        capital = self.initial_capital
        position = 0
        entry_price = 0
        equity_curve = [capital]
        trades = []

        for i, row in data.iterrows():
            kline = {
                "open": row["open"],
                "high": row["high"],
                "low": row["low"],
                "close": row["close"],
                "volume": row["volume"],
                "close_time": row.get("timestamp", i)
            }

            # 获取信号
            signal = strategy.on_kline(symbol, kline)

            current_price = row["close"]

            # 处理信号
            if signal:
                if signal.side == Side.LONG and position <= 0:
                    # 平空
                    if position < 0:
                        pnl = (entry_price - current_price) * abs(position)
                        pnl -= abs(position) * current_price * self.commission
                        capital += pnl
                        trades.append({
                            "type": "close_short",
                            "price": current_price,
                            "pnl": pnl
                        })

                    # 开多
                    position = capital * 0.9 / current_price  # 90% 仓位
                    entry_price = current_price * (1 + self.slippage)
                    capital -= position * entry_price * self.commission
                    trades.append({
                        "type": "open_long",
                        "price": entry_price,
                        "size": position
                    })

                elif signal.side == Side.SHORT and position >= 0:
                    # 平多
                    if position > 0:
                        pnl = (current_price - entry_price) * position
                        pnl -= position * current_price * self.commission
                        capital += pnl
                        trades.append({
                            "type": "close_long",
                            "price": current_price,
                            "pnl": pnl
                        })

                    # 开空
                    position = -capital * 0.9 / current_price
                    entry_price = current_price * (1 - self.slippage)
                    capital -= abs(position) * entry_price * self.commission
                    trades.append({
                        "type": "open_short",
                        "price": entry_price,
                        "size": position
                    })

            # 更新权益
            if position > 0:
                unrealized = (current_price - entry_price) * position
            elif position < 0:
                unrealized = (entry_price - current_price) * abs(position)
            else:
                unrealized = 0

            equity_curve.append(capital + unrealized)

        # 计算统计指标
        return self._calculate_statistics(equity_curve, trades)

    def _calculate_statistics(
        self,
        equity_curve: List[float],
        trades: List[dict]
    ) -> BacktestResult:
        """计算回测统计"""
        equity = pd.Series(equity_curve)

        # 收益率
        total_return = (equity.iloc[-1] - equity.iloc[0]) / equity.iloc[0]

        # 年化收益 (假设 365 天)
        days = len(equity)
        annual_return = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0

        # 最大回撤
        peak = equity.expanding().max()
        drawdown = (equity - peak) / peak
        max_drawdown = drawdown.min()

        # 夏普比率 (假设无风险利率 0)
        returns = equity.pct_change().dropna()
        sharpe_ratio = returns.mean() / returns.std() * (365 ** 0.5) if returns.std() > 0 else 0

        # 胜率
        pnl_trades = [t for t in trades if "pnl" in t]
        win_trades = [t for t in pnl_trades if t["pnl"] > 0]
        win_rate = len(win_trades) / len(pnl_trades) if pnl_trades else 0

        # 盈亏比
        total_profit = sum(t["pnl"] for t in win_trades) if win_trades else 0
        lose_trades = [t for t in pnl_trades if t["pnl"] < 0]
        total_loss = abs(sum(t["pnl"] for t in lose_trades)) if lose_trades else 1
        profit_factor = total_profit / total_loss if total_loss > 0 else 0

        return BacktestResult(
            total_return=total_return,
            annual_return=annual_return,
            max_drawdown=max_drawdown,
            sharpe_ratio=sharpe_ratio,
            win_rate=win_rate,
            profit_factor=profit_factor,
            total_trades=len(trades),
            equity_curve=equity_curve,
            trades=trades
        )

常见问题与建议

新手常见误区

1. 过度优化 (Overfitting)
   问题: 策略在历史数据上完美,实盘效果很差
   解决: 使用样本外测试,避免过多参数

2. 忽视手续费和滑点
   问题: 回测盈利,实盘亏损
   解决: 回测时加入真实的交易成本

3. 杠杆过高
   问题: 一次大跌就爆仓
   解决: 控制杠杆在 3-5 倍以内

4. 没有止损
   问题: 小亏变大亏,无法翻身
   解决: 每笔交易必须设置止损

5. 频繁修改策略
   问题: 亏损后改参数,越改越乱
   解决: 制定规则后严格执行,定期复盘

6. 单一策略/币种
   问题: 行情不适合时持续亏损
   解决: 多策略组合,分散风险

量化交易建议

方面 建议
资金管理 用可承受损失的资金,单笔风险 < 2%
杠杆使用 新手 1-3 倍,熟练后最多 5-10 倍
策略开发 从简单策略开始,逐步迭代
回测验证 至少 6 个月数据,样本外测试
实盘启动 小资金试跑,验证后再加仓
风险控制 设置日/周/月最大亏损限制
心态管理 接受亏损是正常的,关注长期结果

进阶学习路径

flowchart TB
    classDef beginner fill:#2ec4b6,stroke:#011627,stroke-width:2px,color:#ffffff
    classDef intermediate fill:#fca311,stroke:#e85d04,stroke-width:2px,color:#ffffff
    classDef advanced fill:#7209b7,stroke:#3a0ca3,stroke-width:2px,color:#ffffff

    subgraph 入门 ["入门阶段"]
        A1["学习 Python/编程基础"]:::beginner
        A2["了解技术分析指标"]:::beginner
        A3["学习 API 接口使用"]:::beginner
        A4["实现简单策略"]:::beginner
    end

    subgraph 进阶 ["进阶阶段"]
        B1["学习数学/统计基础"]:::intermediate
        B2["掌握回测框架"]:::intermediate
        B3["理解风险管理"]:::intermediate
        B4["多策略组合"]:::intermediate
    end

    subgraph 高级 ["高级阶段"]
        C1["机器学习应用"]:::advanced
        C2["高频交易技术"]:::advanced
        C3["跨市场套利"]:::advanced
        C4["自建交易系统"]:::advanced
    end

    A1 --> A2 --> A3 --> A4
    A4 --> B1 --> B2 --> B3 --> B4
    B4 --> C1 --> C2 --> C3 --> C4

    linkStyle default stroke:#8d99ae,stroke-width:2px,fill:none

总结

量化交易是一个需要持续学习和实践的领域:

核心要点

方面 关键点
策略 简单有效 > 复杂花哨,逻辑清晰可解释
风控 第一要务,活下来才能赚钱
执行 严格遵守规则,消除情绪影响
迭代 定期复盘,持续优化

成功量化的关键

  1. 系统化思维: 将交易变成可重复的流程
  2. 风险意识: 永远把风控放在第一位
  3. 持续学习: 市场在变,策略也要进化
  4. 心态稳定: 接受波动,关注长期

免责声明: 本文仅供学习参考,不构成任何投资建议。加密货币合约交易风险极高,可能导致全部本金损失。请在充分了解风险后谨慎决策。

相关推荐

个人量化策略开发思路

基于币安合约API的个人量化交易策略开发思路,包括合约币对获取、单币种监控和全币种并发扫描实现

·2 分钟·
#量化交易#币安

虚拟币交易中的均线指标

深入讲解加密货币交易中的均线指标,包括 SMA、EMA、WMA 等类型的计算方法、实战应用技巧以及经典均线交易策略

·16 分钟·
#加密货币#均线

虚拟币交易中的RSI指标

全面解析相对强弱指数(RSI)在加密货币交易中的应用,包括计算原理、超买超卖判断、背离信号识别以及多种RSI交易策略

·19 分钟·
#加密货币#RSI