输入/搜索内容
欢迎使用优宽量化交易平台
编程语言
JavaScript
TypeScript
Python
C++
My语言(麦语言)
PINE语言
Blockly可视化
Workflow工作流
支持的协议
密钥安全性
实盘
策略库
托管者
部署托管者
一键租用托管者
手动部署托管者
托管者操作注意事项
全局指定IP地址
命令行版本托管者程序的参数
实盘数据迁移
托管者监控
交易所
策略编辑器
回测系统
策略入口函数
策略框架与API函数
模板类库
策略参数
交互控件
商品期货
期权交易
股票证券
C++策略编写说明
JavaScript策略编写说明
内置库
扩展API接口
MCP 服务
交易终端
数据探索
Alpha因子分析工具
调试工具
远程编辑
完整策略的导入与导出
多语言支持
实盘、策略分组
实盘展示
策略分享与出租
实盘消息推送
实盘报错、异常退出的常见原因
交易所特殊说明、兼容记录

功能概述

策略实盘间通信功能允许不同的实盘策略之间进行数据共享和状态同步。通过频道机制,一个实盘可以将自己的状态数据广播给其他实盘,实现跨实盘、跨托管者、跨服务器的数据通信。

核心概念

  • 频道(Channel):每个实盘都有一个独立的频道,频道ID即为实盘ID
  • 广播端:使用SetChannelData()函数在频道上发布数据的实盘
  • 订阅端:使用GetChannelData()函数订阅其他实盘频道数据的实盘
  • 状态覆盖:频道上仅保存最新状态,新数据会覆盖旧数据,而非消息队列

主要特性

  • 非阻塞通信:所有函数调用均为非阻塞式,不会影响策略主流程
  • 跨平台支持:支持跨实盘、跨托管者、跨服务器进行数据传输
  • 多频道订阅:单个实盘可同时订阅多个不同实盘的频道
  • 灵活的数据格式:支持任意可JSON序列化的数据结构

应用场景

  • 主从策略协同:主策略分析市场并广播信号,从策略接收信号执行交易
  • 多账户同步:在多个交易账户之间同步交易信号和仓位信息
  • 策略监控:监控策略广播运行状态,监控实盘订阅并进行展示或告警
  • 数据共享:共享行情分析、指标计算等结果,避免重复计算

基本用法

示例

  • 广播端示例 - 发布市场数据

    javascript
    function main() { var updateId = 0 var robotId = _G() // 获取当前实盘ID while(true) { if (!exchange.IO("status")) { Sleep(1000) continue } // 获取市场数据 var ticker = exchange.GetTicker("rb888") if (!ticker) { Sleep(5000) continue } // 准备频道状态数据 var channelState = { robotId: robotId, updateId: ++updateId, timestamp: Date.now(), symbol: "rb888", lastPrice: ticker.Last, volume: ticker.Volume, high: ticker.High, low: ticker.Low } // 在频道上发布最新状态(覆盖旧状态) SetChannelData(channelState) // 显示当前频道状态 LogStatus("频道广播端 [实盘ID: " + robotId + "]\n" + "更新ID: #" + channelState.updateId + "\n" + "时间: " + _D(channelState.timestamp) + "\n" + "交易对: " + channelState.symbol + "\n" + "最新价: $" + channelState.lastPrice.toFixed(2)) Sleep(60000) // 每分钟更新一次频道状态 } }
    python
    def main(): updateId = 0 robotId = _G() # 获取当前实盘ID while True: if not exchange.IO("status"): Sleep(1000) continue # 获取市场数据 ticker = exchange.GetTicker("rb888") if not ticker: Sleep(5000) continue # 准备频道状态数据 channelState = { "robotId": robotId, "updateId": updateId + 1, "timestamp": time.time() * 1000, "symbol": "rb888", "lastPrice": ticker["Last"], "volume": ticker["Volume"], "high": ticker["High"], "low": ticker["Low"] } updateId += 1 # 在频道上发布最新状态(覆盖旧状态) SetChannelData(channelState) # 显示当前频道状态 LogStatus("频道广播端 [实盘ID: {}]\n".format(robotId) + "更新ID: #{}\n".format(channelState["updateId"]) + "时间: {}\n".format(_D(channelState["timestamp"])) + "最新价: ${:.2f}".format(channelState["lastPrice"])) Sleep(60000) # 每分钟更新一次频道状态
    c++
  • 订阅端示例 - 订阅多个频道

    javascript
    function main() { // 需要订阅的两个频道ID(根据实际情况修改) var channelId1 = "632799" // 频道1的实盘ID var channelId2 = "632800" // 频道2的实盘ID while(true) { // 订阅频道1的当前状态 var state1 = GetChannelData(channelId1) // 订阅频道2的当前状态 var state2 = GetChannelData(channelId2) // 构建状态显示 var statusMsg = "频道订阅端 - 当前订阅状态\n\n" // 显示频道1状态 statusMsg += "═══ 频道1 [" + channelId1 + "] ═══\n" if (state1 !== null) { statusMsg += "更新ID: #" + state1.updateId + "\n" statusMsg += "时间: " + _D(state1.timestamp) + "\n" statusMsg += "交易对: " + state1.symbol + "\n" statusMsg += "最新价: $" + state1.lastPrice.toFixed(2) + "\n" } else { statusMsg += "状态: 等待中...(首次调用返回 null)\n" } statusMsg += "\n" // 显示频道2状态 statusMsg += "═══ 频道2 [" + channelId2 + "] ═══\n" if (state2 !== null) { statusMsg += "更新ID: #" + state2.updateId + "\n" statusMsg += "时间: " + _D(state2.timestamp) + "\n" statusMsg += "最新价: $" + state2.lastPrice.toFixed(2) + "\n" } else { statusMsg += "状态: 等待中...(首次调用返回 null)\n" } LogStatus(statusMsg) Sleep(5000) // 每5秒订阅一次频道 } }
    python
    def main(): # 需要订阅的两个频道ID(根据实际情况修改) channelId1 = "632799" # 频道1的实盘ID channelId2 = "632800" # 频道2的实盘ID while True: # 订阅频道1的当前状态 state1 = GetChannelData(channelId1) # 订阅频道2的当前状态 state2 = GetChannelData(channelId2) # 构建状态显示 statusMsg = "频道订阅端 - 当前订阅状态\n\n" # 显示频道1状态 statusMsg += "═══ 频道1 [{}] ═══\n".format(channelId1) if state1 is not None: statusMsg += "更新ID: #{}\n".format(state1["updateId"]) statusMsg += "时间: {}\n".format(_D(state1["timestamp"])) statusMsg += "最新价: ${:.2f}\n".format(state1["lastPrice"]) else: statusMsg += "状态: 等待中...(首次调用返回 None)\n" statusMsg += "\n" # 显示频道2状态 statusMsg += "═══ 频道2 [{}] ═══\n".format(channelId2) if state2 is not None: statusMsg += "更新ID: #{}\n".format(state2["updateId"]) statusMsg += "时间: {}\n".format(_D(state2["timestamp"])) statusMsg += "最新价: ${:.2f}\n".format(state2["lastPrice"]) else: statusMsg += "状态: 等待中...(首次调用返回 None)\n" LogStatus(statusMsg) Sleep(5000) # 每5秒订阅一次频道
    c++
  • 实际应用场景

    场景1:主从策略协同交易

    主策略(信号广播端)

    javascript
    function main() { var robotId = _G() Log("主策略启动,实盘ID:", robotId) while(true) { if (!exchange.IO("status")) { Sleep(1000) continue } // 分析市场,生成交易信号 var records = exchange.GetRecords("rb888") if (!records || records.length < 20) { Sleep(5000) continue } // 简单的均线策略 var ma5 = TA.MA(records, 5) var ma20 = TA.MA(records, 20) var signal = "HOLD" if (ma5[ma5.length-1] > ma20[ma20.length-1] && ma5[ma5.length-2] <= ma20[ma20.length-2]) { signal = "BUY" } else if (ma5[ma5.length-1] < ma20[ma20.length-1] && ma5[ma5.length-2] >= ma20[ma20.length-2]) { signal = "SELL" } // 广播交易信号 var signalData = { timestamp: Date.now(), symbol: "rb888", signal: signal, price: records[records.length-1].Close, ma5: ma5[ma5.length-1], ma20: ma20[ma20.length-1] } SetChannelData(signalData) LogStatus("主策略 - 信号广播\n" + "信号: " + signal + "\n" + "价格: $" + signalData.price.toFixed(2) + "\n" + "MA5: " + signalData.ma5.toFixed(2) + "\n" + "MA20: " + signalData.ma20.toFixed(2)) Sleep(60000) } }
    python
    def main(): robotId = _G() Log("主策略启动,实盘ID:", robotId) while True: if not exchange.IO("status"): Sleep(1000) continue # 分析市场,生成交易信号 records = exchange.GetRecords("rb888") if not records or len(records) < 20: Sleep(5000) continue # 简单的均线策略 ma5 = TA.MA(records, 5) ma20 = TA.MA(records, 20) signal = "HOLD" if ma5[-1] > ma20[-1] and ma5[-2] <= ma20[-2]: signal = "BUY" elif ma5[-1] < ma20[-1] and ma5[-2] >= ma20[-2]: signal = "SELL" # 广播交易信号 signalData = { "timestamp": time.time() * 1000, "symbol": "rb888", "signal": signal, "price": records[-1]["Close"], "ma5": ma5[-1], "ma20": ma20[-1] } SetChannelData(signalData) LogStatus("主策略 - 信号广播\n" + "信号: {}\n".format(signal) + "价格: ${:.2f}\n".format(signalData["price"]) + "MA5: {:.2f}\n".format(signalData["ma5"]) + "MA20: {:.2f}".format(signalData["ma20"])) Sleep(60000)
    c++
  • 实际应用场景

    场景1:主从策略协同交易

    从策略(信号接收执行端)

    javascript
    function main() { var masterRobotId = "632799" // 主策略的实盘ID var lastSignal = null Log("从策略启动,订阅主策略:", masterRobotId) while(true) { // 获取主策略的信号 var signalData = GetChannelData(masterRobotId) if (signalData === null) { LogStatus("等待主策略信号...") Sleep(5000) continue } // 检查是否有新信号 if (lastSignal !== signalData.signal) { Log("收到新信号:", signalData.signal, "价格:", signalData.price) // 执行交易 if (signalData.signal === "BUY") { var ticker = exchange.GetTicker(signalData.symbol) if (ticker) { exchange.Buy(ticker.Last, 0.01) Log("执行买入,价格:", ticker.Last) } } else if (signalData.signal === "SELL") { var ticker = exchange.GetTicker(signalData.symbol) if (ticker) { exchange.Sell(ticker.Last, 0.01) Log("执行卖出,价格:", ticker.Last) } } lastSignal = signalData.signal } LogStatus("从策略 - 跟随主策略\n" + "当前信号: " + signalData.signal + "\n" + "信号价格: $" + signalData.price.toFixed(2) + "\n" + "信号时间: " + _D(signalData.timestamp)) Sleep(5000) } }
    python
    def main(): masterRobotId = "632799" # 主策略的实盘ID lastSignal = None Log("从策略启动,订阅主策略:", masterRobotId) while True: # 获取主策略的信号 signalData = GetChannelData(masterRobotId) if signalData is None: LogStatus("等待主策略信号...") Sleep(5000) continue # 检查是否有新信号 if lastSignal != signalData["signal"]: Log("收到新信号:", signalData["signal"], "价格:", signalData["price"]) # 执行交易 if signalData["signal"] == "BUY": ticker = exchange.GetTicker(signalData["symbol"]) if ticker: exchange.Buy(ticker["Last"], 0.01) Log("执行买入,价格:", ticker["Last"]) elif signalData["signal"] == "SELL": ticker = exchange.GetTicker(signalData["symbol"]) if ticker: exchange.Sell(ticker["Last"], 0.01) Log("执行卖出,价格:", ticker["Last"]) lastSignal = signalData["signal"] LogStatus("从策略 - 跟随主策略\n" + "当前信号: {}\n".format(signalData["signal"]) + "信号价格: ${:.2f}\n".format(signalData["price"]) + "信号时间: {}".format(_D(signalData["timestamp"]))) Sleep(5000)
    c++
  • 场景2:多策略状态监控

    监控策略

    javascript
    function main() { // 需要监控的策略实盘ID列表 var monitorList = ["632799", "632800", "632801"] while(true) { var table = { type: "table", title: "策略运行状态监控", cols: ["实盘ID", "状态", "最后更新", "交易对", "当前价格", "盈亏"], rows: [] } for (var i = 0; i < monitorList.length; i++) { var robotId = monitorList[i] var data = GetChannelData(robotId) if (data !== null) { var updateTime = _D(data.timestamp) var timeDiff = Date.now() - data.timestamp var status = timeDiff < 120000 ? "运行中" : "异常" table.rows.push([ robotId, status, updateTime, data.symbol || "-", data.lastPrice ? "$" + data.lastPrice.toFixed(2) : "-", data.profit ? data.profit.toFixed(2) + "%" : "-" ]) } else { table.rows.push([ robotId, "等待数据", "-", "-", "-", "-" ]) } } LogStatus("`" + JSON.stringify(table) + "`") Sleep(10000) } }
    python
    def main(): # 需要监控的策略实盘ID列表 monitorList = ["632799", "632800", "632801"] while True: table = { "type": "table", "title": "策略运行状态监控", "cols": ["实盘ID", "状态", "最后更新", "交易对", "当前价格", "盈亏"], "rows": [] } for robotId in monitorList: data = GetChannelData(robotId) if data is not None: updateTime = _D(data["timestamp"]) timeDiff = time.time() * 1000 - data["timestamp"] status = "运行中" if timeDiff < 120000 else "异常" table["rows"].append([ robotId, status, updateTime, data.get("symbol", "-"), "${:.2f}".format(data["lastPrice"]) if "lastPrice" in data else "-", "{:.2f}%".format(data["profit"]) if "profit" in data else "-" ]) else: table["rows"].append([ robotId, "等待数据", "-", "-", "-", "-" ]) LogStatus("`" + json.dumps(table) + "`") Sleep(10000)
    c++
  • API函数说明

    SetChannelData(data)

    功能:在频道上发布最新状态数据

    参数

    • data:需要发布的数据,可以是任何可JSON序列化的数据结构

    返回值:无

    特性

    • 非阻塞调用
    • 覆盖之前的数据,不累积历史
    • 自动使用当前实盘ID作为频道ID

    数据长度限制

    • JSON序列化后不超过1024字节
    • 建议仅传输必要的状态信息

    详细文档SetChannelData

    GetChannelData(robotId)

    功能:订阅指定实盘的频道数据

    参数

    • robotId:要订阅的实盘ID(字符串或数字)

    返回值

    • 首次调用返回null,需要重试
    • 成功后返回频道的最新数据

    特性

    • 非阻塞调用
    • 可订阅多个频道
    • 可订阅自己的频道

    详细文档GetChannelData

    注意事项

    • 首次调用返回nullGetChannelData()函数首次调用时会返回null,这是正常现象,需要等待数据同步完成。建议在代码中进行null判断。

    • 数据覆盖机制:频道上仅保存最新状态,调用SetChannelData()会覆盖之前的数据。如需保存历史数据,应在订阅端自行记录。

    • 非阻塞特性:所有频道通信函数均为非阻塞的,不会影响策略的主流程执行。但这也意味着无法保证数据的即时性。

    • 数据大小限制:传入SetChannelData的数据在JSON序列化后不得超过1024字节。应仅传输必要的状态信息,如交易信号、价格、持仓等关键数据,避免传输完整的K线数组或大量历史数据。

    • 实盘环境限制:频道通信功能主要适用于实盘环境,在回测系统中可能受限或不可用。

    • 实盘ID获取:可通过_G()函数获取当前实盘ID,也可在平台界面查看实盘ID。

    • 安全性考虑:频道数据可能被其他有权限的实盘订阅,请勿在频道中传输敏感信息(如API密钥等)。

    最佳实践

    • 合理的更新频率:根据实际需求设置数据更新频率,避免过于频繁的更新造成资源浪费。

    • 数据结构设计:设计清晰的数据结构,包含必要的元数据(如时间戳、版本号等),便于订阅端处理。

    • 错误处理:订阅端应处理null返回值,广播端应确保数据格式正确。

    • 状态版本控制:在数据中包含版本号或更新ID,帮助订阅端判断是否有新数据。

    • 监控与告警:对于关键的通信链路,建议实现超时监控和告警机制。

    • 测试验证:在正式使用前,先在测试环境验证频道通信的稳定性和延迟。

    • 文档记录:记录频道数据格式和通信协议,便于后续维护和多人协作。

参考