最近,优宽量化交易平台支持了数据库接口,所以使用JavaScript语言也可以很方便的实现一个K线行情数据收集器了。 有了需求,马上行动~
构建JavaScript版本行情收集器之前,我们先来熟悉优宽的数据库接口DBExec
。
先熟悉以下几种操作:
function main() {
/* 创建表
var strSql = [
"CREATE TABLE RECORDS_MIN1(",
"TS INT PRIMARY KEY NOT NULL,",
"HIGH REAL NOT NULL,",
"OPEN REAL NOT NULL,",
"LOW REAL NOT NULL,",
"CLOSE REAL NOT NULL,",
"VOLUME REAL NOT NULL)"
].join("")
// DBExec函数返回:{"rowsAffected":1,"lastInsertId":18}
*/
/* 表写入数据
// INSERT INTO COMPANY (ID,NAME,AGE,ADDRESS,SALARY)
// VALUES (1, 'Paul', 32, 'California', 20000.00 );
var strSql = [
"INSERT INTO RECORDS_MIN1 (TS, HIGH, OPEN, LOW, CLOSE, VOLUME)",
"VALUES (111111, 111.11, 111.12, 111.13, 111.14, 111.16);"
].join("")
// DBExec函数返回:{"rowsAffected":1,"lastInsertId":1}
*/
/* 查询表中数据
// SELECT * FROM RECORDS_MIN1;
var strSql = ["SELECT * FROM RECORDS_MIN1;"].join("")
// DBExec函数返回:{"columns":["TS","HIGH","OPEN","LOW","CLOSE","VOLUME"],"values":[[111111,111.11,111.12,111.13,111.14,111.16]]}
*/
var ret = DBExec(strSql) // 执行SQL语句
Log(ret)
}
利用优宽的数据库接口DBExec
可以实现收集交易所的K线数据。
例如有些策略基于很长的K线数据计算指标,很不容易收集数据让K线数据长度足够计算指标,但是遇到策略程序设计不完善实盘异常停止、临时调整代码、临时调整策略参数等需要重启实盘的场景。此时实盘一旦重启收集的数据就没有了(程序变量中保存)。所以使用数据库接口,保存收集的行情数据是一个非常好的解决办法。
我们的需求也十分简单:
简单的数据收集器源码:
var collecter = {}
collecter.init = function(tableName) {
this.preBarTS = 0
this.tableName = tableName
this.tableAvaliable = false
// 检测tableName表
if (typeof(tableName) == "undefined" || typeof(tableName) != "string") {
Log(tableName)
throw "tableName error!"
}
// SELECT * FROM RECORDS_MIN1 LIMIT 1
var strSql = "SELECT * FROM " + tableName + " LIMIT 1"
var ret = DBExec(strSql)
if (!ret) {
// 表不存在,创建表
Log("尝试读取表", this.tableName, "的数据,读取失败,开始创建表", this.tableName)
var strSql = [
"CREATE TABLE " + tableName + "(",
"TS INT PRIMARY KEY NOT NULL,",
"HIGH REAL NOT NULL,",
"OPEN REAL NOT NULL,",
"LOW REAL NOT NULL,",
"CLOSE REAL NOT NULL,",
"VOLUME REAL NOT NULL)"
].join("")
ret = DBExec(strSql)
if (!ret) {
throw "创建" + tableName + "表失败!"
}
Log("创建", tableName, "表", ret)
}
Log(this.tableName, ret)
this.tableAvaliable = true
}
collecter.run = function(records) {
if (!this.tableAvaliable) {
return
}
var len = records.length
var lastBar = records[len - 1]
var beginBar = records[0]
var ret = null
if (this.preBarTS == 0) {
// 初始
/*
DELETE FROM table_name WHERE [condition];
*/
var strSql = "DELETE FROM " + this.tableName + " WHERE TS >= " + beginBar.Time + ";"
ret = DBExec(strSql)
Log("删除与当前记录重复部分", ret)
// 写入
ret = DBExec("BEGIN")
Log("BEGIN:", ret)
for (var i = 0 ; i < len - 1 ; i++) {
var strSql = [
"INSERT INTO " + this.tableName + " (TS, HIGH, OPEN, LOW, CLOSE, VOLUME) ",
`VALUES (${records[i].Time}, ${records[i].High}, ${records[i].Open}, ${records[i].Low}, ${records[i].Close}, ${records[i].Volume});`
].join("")
DBExec(strSql)
}
ret = DBExec("COMMIT")
Log("COMMIT:", ret)
this.preBarTS = lastBar.Time
} else if(this.preBarTS != lastBar.Time) {
// 更新
var strSql = [
"INSERT INTO " + this.tableName + " (TS, HIGH, OPEN, LOW, CLOSE, VOLUME) ",
`VALUES (${records[len-2].Time}, ${records[len-2].High}, ${records[len-2].Open}, ${records[len-2].Low}, ${records[len-2].Close}, ${records[len-2].Volume});`
].join("")
ret = DBExec(strSql)
Log("INSERT:", ret)
this.preBarTS = lastBar.Time
}
}
collecter.getRecords = function() {
// 读取数据库
var strSql = "SELECT * FROM " + this.tableName + ";"
var ret = DBExec(strSql)
// Log("SELECT * FROM .. :", ret)
// SELECT * FROM .. : {"columns":["TS","HIGH","OPEN","LOW","CLOSE","VOLUME"],"values":[[1616110200000,58085.9,57716.2,57664.3,57757.6,24.216], ...]}
var arr = ret.values
var r = []
for (var i = 0 ; i < arr.length ; i++) {
r.push({
Time : arr[i][0],
High : arr[i][1],
Open : arr[i][2],
Low : arr[i][3],
Close : arr[i][4],
Volume : arr[i][5]
})
}
return r
}
collecter.deleteTable = function() {
// DROP TABLE database_name.table_name;
var strSql = "DROP TABLE " + this.tableName
var ret = DBExec(strSql)
if (!ret) {
Log("删除表", this.tableName, "失败:", ret)
} else {
Log("删除表", this.tableName, " DROP TABLE:", ret)
this.tableAvaliable = false
}
}
function main() {
collecter.init(tableName)
while(true) {
var r = _C(exchange.GetRecords)
// records tbl
var rTbl = {
type : "table",
title : "数据",
cols : ["strTime", "Time", "High", "Open", "Low", "Close", "Volume"],
rows : []
}
var arrR = []
if (collecter.tableAvaliable) {
arrR = collecter.getRecords()
}
for (var i = arrR.length - 1; (i > arrR.length - 1 - 9) && (i >= 0); i--) {
var bar = arrR[i]
rTbl.rows.push([_D(bar.Time), bar.Time, bar.High, bar.Open, bar.Low, bar.Close, bar.Volume])
}
LogStatus(_D(), "获取的K线数据长度:", arrR.length, ", collecter.tableAvaliable:", collecter.tableAvaliable, "\n",
"`" + JSON.stringify(rTbl) + "`")
collecter.run(r)
// 交互测试
var cmd = GetCommand()
if(cmd) {
// 处理交互
Log("交互命令:", cmd)
var arr = cmd.split(":")
// 从数据库中读取K线数据,刷新图表
if(arr[0] == "refreshRecords") {
if (collecter.tableAvaliable) {
var records = collecter.getRecords()
$.PlotRecords(records, collecter.tableName) // 使用画线类库画图
} else {
Log("对应的数据库表不存在 collecter.tableAvaliable:", collecter.tableAvaliable)
}
} else if (arr[0] == "deleteBDTable") { // 删除数据库表
collecter.deleteTable()
} else if (arr[0] == "initCollecter") { // 初始化收集器
Log("初始化收集器")
collecter.init(tableName)
}
}
Sleep(5000)
}
}
对比数据