ctpbee 深度解析、安装教程与策略示例
ctpbee是一个基于 Python 的、面向国内期货市场(CTP 接口)的量化交易框架。它旨在为个人开发者和小型量化团队提供一个快速、灵活、易于扩展的交易系统开发工具。ctpbee的名字来源于 CTP (Comprehensive Transaction Platform) 和 Bee (蜜蜂),寓意像蜜蜂一样勤劳地处理交易任务。ctpbee是一个功能强大且灵活的 Python 量化交易框架,特
ctpbee 深度解析、安装教程与策略示例
目录
1. ctpbee 简介
1.1 什么是 ctpbee?
ctpbee 是一个基于 Python 的、面向国内期货市场(CTP 接口)的量化交易框架。它旨在为个人开发者和小型量化团队提供一个快速、灵活、易于扩展的交易系统开发工具。ctpbee 的名字来源于 CTP (Comprehensive Transaction Platform) 和 Bee (蜜蜂),寓意像蜜蜂一样勤劳地处理交易任务。
1.2 核心特性
- 事件驱动 (Event-Driven): 基于事件回调机制,当市场行情、订单状态等发生变化时,框架会触发相应的处理函数。
- 异步 IO (AsyncIO): 底层利用 Python 的
asyncio库处理网络 IO 和并发任务,提高了系统在高并发行情下的处理效率。 - App 化结构: 采用 App (应用) 的概念来组织代码,用户可以将策略、风控、数据记录等功能模块化为独立的 App,易于管理和复用。
- 简洁的 API: 提供了一套相对简洁易懂的 API,用于处理行情、交易、查询等操作。
- 跨平台: 支持 Windows, Linux, macOS 等主流操作系统。
- 可扩展性: 良好的架构设计使得添加新功能、对接新接口或自定义模块变得相对容易。
1.3 目标用户
- 希望使用 Python 进行国内期货程序化交易的个人开发者。
- 需要快速搭建和验证交易策略的小型量化团队。
- 对交易系统底层细节有一定了解,并希望拥有更高灵活性的开发者。
2. 核心概念
2.1 事件驱动
ctpbee 的核心运行机制是事件驱动。外部事件(如交易所推送的行情 Tick 数据、订单成交回报、委托状态变化)和内部事件(如定时任务、策略信号)都会被放入一个事件队列。框架的主循环不断从队列中取出事件,并将其分发给注册了相应处理函数的 App。开发者无需关心事件循环的细节,只需在自己的 App 中实现对应的 on_xxx 方法即可。
2.2 App 结构
ctpbee 将所有功能单元都视为 App。框架本身提供了一些核心 App(如处理 CTP 连接、数据分发的底层 App),而用户的策略、风控逻辑、数据存储等也需要编写为 App。
Ctpbee类: 用户策略的主要载体,继承自Ctpbee基类。它包含了处理各种事件的回调方法(on_tick,on_order等)和执行交易操作的主动函数接口 (self.action)。CtpBeeApi/Beelayer: 框架的核心引擎,负责管理所有 App、事件分发、CTP 底层接口的交互等。通常用户直接与Ctpbee子类交互,较少直接操作CtpBeeApi。
2.3 异步与协程
ctpbee 底层使用了 asyncio,这意味着许多 IO 操作(如网络通信)是非阻塞的。这使得 ctpbee 在接收和处理大量市场数据时表现更佳。用户编写策略 App 时,通常不需要直接编写 async/await 代码(除非进行深度定制),框架已经封装好了异步逻辑。
2.4 数据流
- CTP -> CTP API: 交易所通过 CTP 协议将行情和交易回报推送到 CTP API (C++ 库)。
- CTP API -> Python Wrapper:
ctpbee(或其依赖的 CTP 封装库,如vnpy_ctp) 通过 Python C++ 绑定将 CTP API 的回调函数封装成 Python 可调用的接口。 - Python Wrapper -> CtpBeeApi: 核心引擎接收到底层数据,将其包装成标准化的事件对象 (如
TickData,OrderData)。 - CtpBeeApi -> App: 核心引擎根据事件类型,将事件对象分发给所有加载的、且实现了对应处理方法的 App (如
Ctpbee策略子类)。 - App -> Action -> CtpBeeApi -> … -> CTP: 用户在 App 中调用
self.action中的方法(如send_order),请求通过核心引擎、封装库最终发送到 CTP 服务器。
3. 安装教程
3.1 环境要求
- Python: 建议使用 Python 3.7 或更高版本 (64位)。
- 操作系统: Windows, Linux, macOS。
- CTP API 文件: 需要从期货公司获取 SimNow 或 实盘 的 CTP API 文件(通常包含
thosttraderapi_se.dll/so,thostmduserapi_se.dll/so等文件和*.h头文件)。请务必通过合法途径获取!
3.2 安装步骤
3.2.1 使用 pip 安装
最简单的安装方式是使用 pip:
pip install ctpbee -i https://pypi.tuna.tsinghua.edu.cn/simple
(建议使用国内镜像源如清华源加速下载)
这将安装 ctpbee 及其核心依赖。
3.2.2 安装 CTP API
ctpbee 本身不包含 CTP 的 C++ API 文件(因为这些文件有版权限制,且不同期货公司/版本可能不同)。你需要手动将获取到的 CTP API 文件放置到正确的目录下。
- 通常位置: 将
thosttraderapi_se.*和thostmduserapi_se.*文件(以及其他必要的.dll或.so文件)复制到 Python 环境的根目录 或 当前运行脚本的目录。- 查找 Python 环境根目录:
- 在 Python 解释器中运行:
import sys print(sys.executable) # 获取解释器路径 print(sys.prefix) # 获取环境根目录 (通常是这个) - 对于 Conda 环境,通常是
anaconda3/envs/your_env_name/ - 对于 venv 环境,通常是
your_env_folder/Scripts/(Windows) 或your_env_folder/bin/(Linux/macOS) 目录下,但更推荐放在环境根目录your_env_folder/。
- 在 Python 解释器中运行:
- 查找 Python 环境根目录:
- 重要: 确保 API 文件的 位数 (32位/64位) 与你的 Python 解释器位数 一致!通常现在都使用 64 位。
3.2.3 Windows 注意事项
- Visual C++ Redistributable: 可能需要安装 Microsoft Visual C++ Redistributable for Visual Studio (通常是 2015 或更高版本)。如果运行
import ctpbee或连接时报错提示缺少VCRUNTIME*.dll或类似文件,请安装它。 - API 文件: 将
.dll文件放到上述指定位置。
3.2.4 Linux / macOS 注意事项
- API 文件: 将
.so文件放到上述指定位置。 - 权限: 确保
.so文件具有可执行权限 (chmod +x *.so)。 - 依赖: 在 Linux 上,某些 CTP API 版本可能依赖特定的系统库(如
libstdc++.so.6的特定版本)。如果遇到问题,可能需要检查或更新系统库。
3.3 验证安装
创建一个简单的 Python 文件 test_ctpbee.py:
try:
import ctpbee
print(f"ctpbee version: {ctpbee.__version__}")
print("ctpbee imported successfully!")
except ImportError as e:
print(f"Failed to import ctpbee: {e}")
print("Please check your installation and CTP API file placement.")
# 你还可以尝试初始化一个简单的 App (不需要连接)
try:
from ctpbee import Ctpbee, CtpBeeApi, VnpyTrader, VnpyMdApi
from ctpbee.log import VLogger
# 注意:根据你的 ctpbee 版本,这里的导入和类名可能略有不同
# 例如,较新版本可能简化为:
# from ctpbee import CtpBee, CtpApp, Action, Event # (示例,请查阅文档)
class MinimalApp(Ctpbee):
def __init__(self, name):
super().__init__(name)
print("MinimalApp initialized.")
def on_init(self):
print("MinimalApp on_init called.")
# app = MinimalApp("test_app")
# print("Ctpbee app instance created.")
# # 此处不启动,仅测试初始化
except Exception as e:
print(f"Error during basic ctpbee initialization test: {e}")
运行 python test_ctpbee.py。如果看到 “ctpbee imported successfully!” 并且没有其他关于 ctpbee 内部初始化的错误,说明基本安装和导入是成功的。
4. 快速上手
下面是一个最简单的连接 CTP 并打印 Tick 数据的例子。
import sys
import time
from ctpbee import Ctpbee, CtpBeeApi, VnpyMdApi, VnpyTrader # 根据版本调整导入
from ctpbee.constant import Exchange, LogLevel
# SimNow 7x24 测试环境配置 (请替换为你自己的)
# 获取地址: http://simnow.com.cn/
CONNECT_INFO = {
"TD_API": "...", # 交易 User ID
"MD_API": "...", # 行情 User ID
"PASSWORD": "...", # 密码
"brokerid": "9999", # Broker ID
# 7x24 地址,如果是实盘或普通 SimNow 请修改
"td_address": "tcp://180.168.146.187:10202",
"md_address": "tcp://180.168.146.187:10212"
}
class QuickStartApp(Ctpbee):
def __init__(self, name):
super().__init__(name)
self.init_completed = False
print(f"策略实例 '{name}' 初始化完成")
# 1. 连接成功后会调用 on_connect
def on_connect(self):
print("交易和行情接口连接成功")
# 你可以在这里进行一些连接后的初始化操作
pass
# 2. 账户验证通过后会调用 on_login
def on_login(self):
print("交易和行情接口登录成功")
# 登录成功后才能进行订阅行情、查询等操作
# 3. 所有 App 初始化完成,框架准备就绪时调用 on_init
def on_init(self):
"""当一切准备就绪后调用"""
print("框架及所有 App 初始化完成,可以开始交易逻辑")
self.init_completed = True
# 在这里订阅行情
# 订阅 rb2409 合约的 Tick 行情
print("尝试订阅 rb2409...")
ret = self.action.subscribe("rb2409.SHFE")
if ret == 0:
print("订阅请求发送成功 (rb2409)")
else:
print(f"订阅请求发送失败 (rb2409),错误码: {ret}")
# 也可以使用完整 local_symbol
# ret = self.action.subscribe("rb2409.SHFE")
# print(f"订阅 rb2409.SHFE 结果: {ret}")
def on_tick(self, tick):
"""处理收到的 Tick 数据"""
# 确保初始化完成后再处理 tick,避免过早处理
if not self.init_completed:
return
print(f"收到Tick: {tick.datetime} - {tick.local_symbol} - LastPrice: {tick.last_price}")
def on_disconnected(self, result):
""" 处理断开事件 """
print(f"连接断开: {result}")
def on_error(self, error):
""" 处理错误信息 """
print(f"发生错误: {error}")
def on_order(self, order):
""" 处理订单回报 """
print(f"订单更新: {order.local_symbol} - {order.order_id} - {order.status} - {order.volume_traded}/{order.volume_total}")
def on_trade(self, trade):
""" 处理成交回报 """
print(f"成交回报: {trade.local_symbol} - {trade.order_id} - {trade.direction} - {trade.price}@{trade.volume}")
if __name__ == '__main__':
# 设置日志级别 (可选)
# from ctpbee.log import VLogger
# VLogger.set_level(LogLevel.INFO) # DEBUG, INFO, WARNING, ERROR
print("开始创建 CtpBeeApi 实例...")
# 创建核心引擎实例
# 注意:根据 ctpbee 版本,可能需要传入不同的 API 类
# 例如旧版本可能是 VnTrader=VnpyTrader, VnMd=VnpyMdApi
# 新版本可能简化了,或者需要不同的类名,请参考官方文档或示例
try:
bee_layer = CtpBeeApi(
"QuickStartExample",
VnTrader=VnpyTrader, # 指定交易API实现
VnMd=VnpyMdApi, # 指定行情API实现
log_output=True, # 在控制台打印日志
# log_level=LogLevel.DEBUG # 更详细的日志
)
print("CtpBeeApi 实例创建成功")
# 创建策略 App 实例
app = QuickStartApp("MyStrategy")
print("策略 App 实例创建成功")
# 将策略 App 添加到核心引擎
bee_layer.add_app(app)
print("策略 App 已添加到引擎")
# 配置连接信息
bee_layer.config.from_mapping(CONNECT_INFO)
print("连接信息配置完成")
print("启动 CtpBee...")
# 启动引擎,开始连接和事件循环
bee_layer.start()
# 让程序保持运行,以便接收事件
# 在实际应用中,这里可能是无限循环或其他逻辑
# while True:
# time.sleep(1)
# 这里简单等待一段时间用于演示
print("CtpBee 已启动,等待事件...")
try:
# 保持主线程运行,否则程序会立即退出
while True:
time.sleep(10)
except KeyboardInterrupt:
print("接收到 Ctrl+C,正在停止...")
bee_layer.stop()
print("CtpBee 已停止。")
sys.exit(0)
except Exception as e:
print(f"启动过程中发生严重错误: {e}")
import traceback
traceback.print_exc()
运行说明:
- 替换
CONNECT_INFO: 将CONNECT_INFO中的 SimNow 账号信息替换为你自己的。 - 放置 API 文件: 确保 CTP API 文件已放置在正确位置。
- 运行脚本:
python your_script_name.py - 观察输出: 你应该能看到连接、登录、初始化、订阅成功以及
rb2409的 Tick 数据(如果在交易时段内)等信息。
5. 策略开发详解
5.1 策略 App 结构
如上例所示,策略通常通过继承 ctpbee.Ctpbee 类来实现。
from ctpbee import Ctpbee
from ctpbee.constant import ContractData, TickData, BarData, OrderData, TradeData, PositionData, AccountData, Direction, Offset
class MyStrategy(Ctpbee):
def __init__(self, name, param1=None, param2=None):
# 1. 调用父类构造函数
super().__init__(name)
# 2. 初始化策略参数
self.param1 = param1
self.param2 = param2
# 3. 初始化策略内部状态变量
self.pos = 0 # 示例:持仓状态 (0: 空仓, 1: 多头, -1: 空头)
self.last_tick_price = 0.0
# ... 其他变量
# 4. 实现必要的事件回调方法
def on_tick(self, tick: TickData):
# 处理 Tick 数据
pass
def on_bar(self, bar: BarData):
# 处理 K 线数据 (如果生成了 K 线)
pass
def on_order(self, order: OrderData):
# 处理订单回报
pass
def on_trade(self, trade: TradeData):
# 处理成交回报, 更新持仓等
pass
def on_position(self, position: PositionData):
# 处理持仓查询回报 (主动查询或登录时推送)
pass
def on_account(self, account: AccountData):
# 处理账户资金查询回报 (主动查询或登录时推送)
pass
def on_init(self):
# 策略初始化完成,可以订阅行情、查询初始状态等
print(f"策略 {self.name} 初始化完成。")
# 示例: 查询账户资金和持仓
self.action.query_account()
self.action.query_position()
# 示例: 订阅合约
self.action.subscribe("rb2409.SHFE")
# 5. (可选) 实现其他辅助方法
def _calculate_signal(self, price):
# 内部计算逻辑
pass
def _update_position(self, trade: TradeData):
# 根据成交更新内部持仓状态
pass
5.2 关键事件回调函数
这些方法定义在你的 Ctpbee 子类中,当相应的事件发生时,框架会自动调用它们。
on_tick(self, tick: TickData)- 参数
tick:TickData对象,包含最新的市场快照信息(时间、最新价、买卖价、量等)。 - 用途:处理实时行情,执行高频逻辑。
- 参数
on_bar(self, bar: BarData)- 参数
bar:BarData对象,包含 K 线数据(时间、开高低收价、成交量)。 - 用途:处理基于 K 线的交易信号。
ctpbee可以内置或通过 App 扩展实现 Tick 合成 Bar 的功能。
- 参数
on_order(self, order: OrderData)- 参数
order:OrderData对象,包含订单的详细信息和状态更新(如已提交、部分成交、全部成交、已撤销、错误等)。 - 用途:跟踪订单状态,处理未成交、错单等情况。
- 参数
on_trade(self, trade: TradeData)- 参数
trade:TradeData对象,包含成交的详细信息(时间、合约、方向、价格、数量、订单号等)。 - 用途:确认成交,更新策略的内部持仓状态,计算盈亏。
- 参数
on_position(self, position: PositionData)- 参数
position:PositionData对象,包含单个合约的持仓信息(多头、空头、昨仓、今仓、保证金、持仓均价等)。 - 用途:初始化时获取初始持仓,或定期核对持仓信息。
- 参数
on_account(self, account: AccountData)- 参数
account:AccountData对象,包含账户资金信息(余额、可用资金、保证金、冻结资金、盈亏等)。 - 用途:监控账户风险,计算可开仓位。
- 参数
on_init(self)- 无参数。
- 用途:在框架和所有 App 初始化完成后调用。是执行订阅行情、查询初始持仓/资金等操作的理想位置。
on_realtime(self)(不常用)- 无参数。
- 用途:一个高频率触发的回调(可能比
on_tick更频繁,取决于底层实现),用于需要极低延迟响应的场景,但通常不推荐在其中执行复杂计算。
5.3 主动函数 (Action)
策略需要通过 self.action 对象来执行主动操作,例如发送订单、撤销订单、查询信息等。
常用方法包括:
-
发送订单:
self.action.send_order(price: float, volume: int, direction: Direction, offset: Offset, type: OrderType, local_symbol: str, **kwargs)price: 价格 (限价单需要,市价单通常填 0 或根据 API 要求)volume: 数量direction: 方向 (Direction.LONG或Direction.SHORT)offset: 开平仓 (Offset.OPEN,Offset.CLOSE,Offset.CLOSETODAY,Offset.CLOSEYESTERDAY)type: 订单类型 (OrderType.LIMIT,OrderType.MARKET等)local_symbol: 合约代码 (e.g., “rb2409.SHFE”)- 返回: 订单的
order_ref(本地订单引用,非 CTP 订单号)。
- 便捷函数:
self.action.buy(price, volume, local_symbol, **kwargs)(开多)self.action.short(price, volume, local_symbol, **kwargs)(开空)self.action.sell(price, volume, local_symbol, is_today=False, **kwargs)(平多)self.action.cover(price, volume, local_symbol, is_today=False, **kwargs)(平空)is_today: 用于区分平今/平昨,需要根据实际情况和 CTP API 版本设置。
-
撤销订单:
self.action.cancel_order(order_id: str)order_id: 要撤销的订单的order_ref或 CTP 返回的order_sys_id(取决于ctpbee版本和配置)。- 返回: 操作结果代码 (通常 0 表示成功提交撤单请求)。
self.action.cancel_all(): 撤销所有活动订单。
-
查询:
self.action.query_position(): 查询持仓。self.action.query_account(): 查询账户资金。
5.4 策略示例:双均线交叉策略
这是一个经典的趋势跟踪策略,当短期均线上穿长期均线时买入开仓,下穿时卖出平仓(反向类似)。
5.4.1 策略逻辑
- 选择一个合约(如
rb2409.SHFE)。 - 计算两条移动平均线 (MA),例如 10 周期 (MA10) 和 20 周期 (MA20)。
- 需要基于 K 线数据计算。这里假设我们使用分钟 K 线。
ctpbee需要配置或使用 App 来合成 K 线。为了简化示例,我们假设有一个机制可以获取或计算 Bar 数据。- 简化处理: 在这个示例中,我们将在
on_tick中累积数据并模拟 K 线计算,或者假设已有on_bar回调。为了代码清晰,我们假设on_bar可用。
- 简化处理: 在这个示例中,我们将在
- 开多信号: 当
MA10从下方上穿MA20时,如果当前无持仓,则买入开仓。 - 平多信号: 当
MA10从上方下穿MA20时,如果当前持有多头仓位,则卖出平仓。 - 开空/平空信号: (可选,可类似实现)
- 仓位管理: 每次只持有一个方向的仓位。
5.4.2 代码实现
import sys
import time
import numpy as np
from collections import deque
from ctpbee import Ctpbee, CtpBeeApi, VnpyMdApi, VnpyTrader # 根据版本调整导入
from ctpbee.constant import Exchange, LogLevel, Interval, Offset, Direction, OrderType, Status
from ctpbee.date import get_day_from_datetime
# --- 配置信息 (同上例) ---
CONNECT_INFO = {
"TD_API": "...", "MD_API": "...", "PASSWORD": "...", "brokerid": "9999",
"td_address": "tcp://180.168.146.187:10202", "md_address": "tcp://180.168.146.187:10212"
}
# --- 策略参数 ---
TARGET_SYMBOL = "rb2409.SHFE"
FAST_MA_PERIOD = 10
SLOW_MA_PERIOD = 20
TRADE_VOLUME = 1 # 每次交易手数
class MovingAverageCrossStrategy(Ctpbee):
def __init__(self, name):
super().__init__(name)
self.init_completed = False
self.subscribed = False
# K线数据存储 (使用 deque 高效存储固定长度序列)
self.prices = deque(maxlen=SLOW_MA_PERIOD + 5) # 存储收盘价,长度比最长均线周期稍大
self.fast_ma = 0.0
self.slow_ma = 0.0
self.prev_fast_ma = 0.0 # 上一根K线的均线值,用于判断交叉
self.prev_slow_ma = 0.0
# 持仓状态 (简化: 0=空仓, 1=持多)
self.position_state = 0
self.active_order_ref = None # 记录活动订单引用
# --- 用于 Tick 合成 Bar 的变量 ---
self.current_bar = None
self.last_tick = None
self.bar_interval_seconds = 60 # 假设合成 1 分钟 K 线
def on_init(self):
"""初始化完成后订阅行情"""
print("策略初始化完成,开始订阅行情...")
self.action.query_account() # 查询账户信息
self.action.query_position() # 查询初始持仓
ret = self.action.subscribe(TARGET_SYMBOL)
if ret == 0:
print(f"订阅 {TARGET_SYMBOL} 成功")
self.subscribed = True
else:
print(f"订阅 {TARGET_SYMBOL} 失败: {ret}")
self.init_completed = True
def on_position(self, position: PositionData):
""" 处理持仓回报,用于初始化持仓状态 """
if not self.init_completed: # 仅在启动时根据查询结果设置一次
if position.local_symbol == TARGET_SYMBOL:
if position.direction == Direction.LONG and position.volume > 0:
print(f"检测到初始多头持仓: {position.volume} 手")
self.position_state = 1
# 可添加空头持仓检测 (如果策略支持)
# elif position.direction == Direction.SHORT and position.volume > 0:
# print(f"检测到初始空头持仓: {position.volume} 手")
# self.position_state = -1
def on_tick(self, tick: TickData):
""" 使用 Tick 数据合成 Bar """
if not self.subscribed or tick.local_symbol != TARGET_SYMBOL or tick.last_price <= 0:
return
self.last_tick = tick # 保存最新 tick
# --- 简易 Tick 合成 1 分钟 Bar 逻辑 ---
# 注意:这是一个非常基础的实现,实际应用需要更健壮的 K 线合成器
if self.current_bar is None:
# 创建第一个 Bar
self.current_bar = {
"datetime": tick.datetime.replace(second=0, microsecond=0),
"open": tick.last_price,
"high": tick.last_price,
"low": tick.last_price,
"close": tick.last_price,
"volume": tick.volume # 注意:这里用 tick 的 volume,更准确应累加
}
else:
bar_start_time = self.current_bar["datetime"]
# 如果 tick 时间仍在当前 Bar 的时间范围内
if tick.datetime < bar_start_time + timedelta(seconds=self.bar_interval_seconds):
self.current_bar["high"] = max(self.current_bar["high"], tick.last_price)
self.current_bar["low"] = min(self.current_bar["low"], tick.last_price)
self.current_bar["close"] = tick.last_price
# self.current_bar["volume"] += (tick.volume - previous_tick_volume) # 需要更复杂的累加
else:
# 当前 Bar 结束,调用 on_bar 处理
# 将字典包装成 BarData 对象 (或者直接使用字典)
# 假设我们有一个 BarData 类或直接处理字典
self.handle_bar_logic(self.current_bar)
# 创建新的 Bar
self.current_bar = {
"datetime": tick.datetime.replace(second=0, microsecond=0),
"open": tick.last_price,
"high": tick.last_price,
"low": tick.last_price,
"close": tick.last_price,
"volume": tick.volume # 重置 volume
}
# def on_bar(self, bar: BarData): # 如果有 K 线合成器,会调用这里
# """ 处理 K 线数据的逻辑 """
# if bar.local_symbol != TARGET_SYMBOL:
# return
# self.handle_bar_logic(bar) # 调用核心逻辑
def handle_bar_logic(self, bar_data):
""" 核心的均线交叉逻辑 """
# bar_data 可以是 BarData 对象或包含 K 线数据的字典
close_price = bar_data["close"] if isinstance(bar_data, dict) else bar_data.close_price
if close_price <= 0: return
self.prices.append(close_price)
# 确保有足够的数据计算均线
if len(self.prices) < SLOW_MA_PERIOD:
return
# 计算当前 MA
current_prices = np.array(self.prices)
self.fast_ma = np.mean(current_prices[-FAST_MA_PERIOD:])
self.slow_ma = np.mean(current_prices[-SLOW_MA_PERIOD:])
print(f"Bar Close: {close_price:.2f} | MA{FAST_MA_PERIOD}: {self.fast_ma:.2f} | MA{SLOW_MA_PERIOD}: {self.slow_ma:.2f} | Pos: {self.position_state}")
# 判断交叉 (需要有上一周期的 MA 值)
if self.prev_fast_ma > 0 and self.prev_slow_ma > 0: # 确保不是第一次计算
# 金叉:快线上穿慢线
crossed_over = self.prev_fast_ma <= self.prev_slow_ma and self.fast_ma > self.slow_ma
# 死叉:快线下穿慢线
crossed_below = self.prev_fast_ma >= self.prev_slow_ma and self.fast_ma < self.slow_ma
# --- 交易逻辑 ---
if self.active_order_ref is None: # 如果没有正在进行的订单
# 金叉 且 当前空仓 -> 开多
if crossed_over and self.position_state == 0:
print(f"信号: 金叉 @ {close_price:.2f} -> 尝试开多 {TRADE_VOLUME} 手")
order_ref = self.action.buy(self.last_tick.limit_up, TRADE_VOLUME, TARGET_SYMBOL, type=OrderType.MARKET) # 使用市价单或涨停价买入
if order_ref:
print(f"开多单已发送,OrderRef: {order_ref}")
self.active_order_ref = order_ref
else:
print("开多单发送失败")
# 死叉 且 当前持多仓 -> 平多
elif crossed_below and self.position_state == 1:
print(f"信号: 死叉 @ {close_price:.2f} -> 尝试平多 {TRADE_VOLUME} 手")
# 注意: 平仓需要区分平今/平昨,这里简化处理,假设总是能平
# 实际中需要查询 PositionData 获取可平手数和类型
order_ref = self.action.sell(self.last_tick.limit_down, TRADE_VOLUME, TARGET_SYMBOL, type=OrderType.MARKET) # 使用市价单或跌停价卖出
if order_ref:
print(f"平多单已发送,OrderRef: {order_ref}")
self.active_order_ref = order_ref
else:
print("平多单发送失败")
# 更新上一周期的 MA 值
self.prev_fast_ma = self.fast_ma
self.prev_slow_ma = self.slow_ma
def on_order(self, order: OrderData):
""" 处理订单回报,更新活动订单状态 """
if order.order_ref == self.active_order_ref:
print(f"订单更新: Ref={order.order_ref}, Status={order.status}, Traded={order.volume_traded}/{order.volume_total}")
if order.is_active():
print("订单活动中...")
else:
# 订单完成(全部成交、已撤销、错误)
print("订单已结束。")
self.active_order_ref = None # 清除活动订单标记
if order.status == Status.ALLTRADED:
print("订单全部成交!")
elif order.status == Status.CANCELLED:
print("订单已撤销。")
elif order.status == Status.REJECTED:
print(f"订单被拒绝: {order.error_msg}")
def on_trade(self, trade: TradeData):
""" 处理成交回报,更新持仓状态 """
print(f"成交回报: {trade.local_symbol} {trade.direction} {trade.offset} {trade.volume}@{trade.price}")
if trade.local_symbol == TARGET_SYMBOL:
if trade.direction == Direction.LONG and trade.offset == Offset.OPEN:
self.position_state = 1 # 更新为持多仓
print("持仓状态更新为:多头")
elif trade.direction == Direction.SHORT and trade.offset in [Offset.CLOSE, Offset.CLOSETODAY, Offset.CLOSEYESTERDAY]:
if self.position_state == 1: # 确认是平多仓的成交
self.position_state = 0 # 更新为空仓
print("持仓状态更新为:空仓")
# 可以添加处理开空/平空的逻辑
# elif trade.direction == Direction.SHORT and trade.offset == Offset.OPEN:
# self.position_state = -1
# elif trade.direction == Direction.LONG and trade.offset in [Offset.CLOSE, ...]:
# if self.position_state == -1: self.position_state = 0
# --- K 线合成需要 datetime ---
from datetime import timedelta
# --- 主程序入口 (同上例) ---
if __name__ == '__main__':
print("开始创建 CtpBeeApi 实例...")
try:
bee_layer = CtpBeeApi(
"MACrossExample", VnTrader=VnpyTrader, VnMd=VnpyMdApi, log_output=True
)
print("CtpBeeApi 实例创建成功")
app = MovingAverageCrossStrategy("MA_Strategy")
print("策略 App 实例创建成功")
bee_layer.add_app(app)
print("策略 App 已添加到引擎")
bee_layer.config.from_mapping(CONNECT_INFO)
print("连接信息配置完成")
print("启动 CtpBee...")
bee_layer.start()
print("CtpBee 已启动,等待事件...")
try:
while True:
time.sleep(10)
except KeyboardInterrupt:
print("接收到 Ctrl+C,正在停止...")
bee_layer.stop()
print("CtpBee 已停止。")
sys.exit(0)
except Exception as e:
print(f"启动过程中发生严重错误: {e}")
import traceback
traceback.print_exc()
注意:
- K 线合成: 上述示例包含了一个非常简化的基于 Tick 的分钟 K 线合成逻辑。在实际应用中,建议使用
ctpbee内置的 K 线生成器(如果提供)或专门的 K 线合成 App,它们会处理时间对齐、数据完整性等更多细节。 - 平今/平昨:
action.sell和action.cover的is_today参数或offset参数需要根据实际情况正确设置(Offset.CLOSETODAYvsOffset.CLOSEYESTERDAY)。这通常需要策略记录持仓的开仓日期或查询PositionData中的yd_volume和td_volume。示例中简化了处理。 - 错误处理和鲁棒性: 实际策略需要更完善的错误处理(如订单被拒、网络中断)、状态管理(如防止重复发单)和风险控制。
- 市价单: 示例中使用了市价单 (
OrderType.MARKET)。注意市价单在 CTP 中的实现方式(通常是发送一个交易所允许范围内的最优价格限价单,如涨跌停价),并了解其滑点风险。 - 依赖:
numpy用于计算均线,需要确保已安装 (pip install numpy)。deque来自 Python 内置的collections模块。
5.4.3 运行策略
与快速上手示例类似:
- 替换
CONNECT_INFO中的账户信息。 - 确保 CTP API 文件和
numpy已安装。 - 将代码保存为
.py文件。 - 运行
python your_strategy_file.py。 - 观察日志输出,包括 K 线计算、均线值、交易信号和订单/成交回报。
6. 进阶功能
6.1 数据记录
ctpbee 通常提供或可以方便地集成数据记录功能,将 Tick、Bar、Order、Trade 等数据保存到文件 (CSV, HDF5) 或数据库 (MongoDB, PostgreSQL) 中,用于后续分析或回测。可以自行编写数据记录 App 或使用社区提供的 App。
6.2 回测系统 (looper)
ctpbee 包含一个名为 looper 的回测模块。它允许你在历史数据上运行你的策略 App,评估策略表现。
- 基本用法: 需要准备历史数据(通常是 Tick 或 Bar 数据),然后配置
looper使用这些数据来模拟实时事件流,驱动策略 App 的on_tick或on_bar方法。 - 配置: 需要设置回测的时间范围、手续费、滑点、初始资金等参数。
- 运行: 通常通过命令行或脚本启动
looper,并指定策略 App 和数据源。 - 结果分析: 回测结束后,
looper会输出 PnL 曲线、交易列表、绩效指标(夏普比率、最大回撤等)。
具体用法请参考 ctpbee 的官方文档或 looper 模块的示例。
6.3 风险管理
虽然 ctpbee 本身可能不包含复杂的内置风控引擎,但其 App 结构非常适合实现自定义的风控逻辑:
- 编写风控 App: 创建一个独立的 App,在
on_order发送前进行检查(如检查订单频率、累计亏损、持仓数量限制等)。 - 在策略 App 内实现: 在策略的
send_order调用前加入风控检查逻辑。 - 利用
on_account和on_position: 实时监控账户资金和持仓风险。
6.4 扩展性 (App 开发)
ctpbee 的核心优势之一是其扩展性。你可以开发自己的 App 来实现特定功能:
- 自定义 K 线合成器: 实现不同周期、不同计算方式的 Bar 生成。
- 指标计算 App: 将常用的技术指标计算封装成 App。
- GUI 界面: 开发一个简单的图形界面来展示行情、持仓和日志。
- 对接其他数据源: 编写 App 从 Web API 或其他来源获取数据。
- 高级订单执行算法: 实现 TWAP, VWAP 等算法交易逻辑。
开发自定义 App 需要对 ctpbee 的事件机制和 App 生命周期有更深入的理解。
7. 社区与资源
- GitHub 仓库: https://github.com/ctpbee/ctpbee (获取最新代码、报告问题、参与贡献)
- 官方文档: (通常在 GitHub Wiki 或 ReadTheDocs 上,请查找项目链接) 文档是学习 API 和功能细节的最佳途径。
- 示例代码: GitHub 仓库中的
examples目录通常包含各种用法示例。 - 交流社区: 查找是否有相关的 QQ 群、微信群或论坛,与其他用户交流经验。
8. 总结
ctpbee 是一个功能强大且灵活的 Python 量化交易框架,特别适合国内期货 CTP 市场。其事件驱动、异步 IO 和 App 化结构为开发者提供了高效、模块化的开发体验。通过本篇解析,你应该对 ctpbee 的安装、核心概念、策略开发流程以及一些进阶功能有了更深入的了解。
要精通 ctpbee,建议:
- 仔细阅读官方文档。
- 动手实践示例代码,并尝试修改和扩展。
- 理解事件驱动和 App 的核心思想。
- 逐步学习和应用回测、风控等进阶功能。
- 参与社区交流,学习他人经验。
希望这份深度解析能帮助你更好地入门和使用 ctpbee 进行量化交易开发!
更多推荐



所有评论(0)