让策略程序真正并发执行,给JavaScript策略增加系统底层多线程支持

Author: 雨幕(youquant), Created: 2023-03-06 18:00:53, Updated: 2023-11-15 19:53:12

img

让策略程序真正并发执行,给JavaScript策略增加系统底层多线程支持

在使用JavaScript语言在FMZ上开发策略时,由于策略架构是轮询的。如果有并发设计的场景是通过exchange.Go函数来进行一些接口的并发调用,从而实现一些并发场景的需求。但是如果希望真正单独创建一个线程执行一系列的操作是无法实现的,例如像Python语言一样,使用threading库去做一些并发设计。

基于这种需求,FMZ平台升级了系统底层。给JavaScript语言也增加了真正的多线程支持。详细的功能包括:

  • 创建线程并发执行自定义的函数。
  • 线程间通信。
  • 共享线程间储存的变量。
  • 等待线程执行结束回收资源并返回执行结果。
  • 强制结束线程,并回收资源。
  • 在并发的线程执行函数中获取当前线程ID。

接下来小编带您一起逐个理解每一项功能。

创建线程并发执行自定义的函数

__Thread函数可以创建一个线程,并发执行一个函数。例如需要创建一个并发的函数func1func1函数做什么工作呢?我们可以让它从0累加到9,为了看到这个逐步累加的过程,在func1函数中使用for循环每次暂停(Sleep函数用来休眠一定毫秒数)一定时间。

function func1(sleepMilliseconds) {
    var sum = 0 
    for (var i = 0 ; i < 10 ; i++) {
        sum += i 
        Sleep(sleepMilliseconds)
        Log("sum:", sum)
    }
    
    return sum
}

function main() {
    // 使用__Thread函数并发创建一个线程,参数200即为func1函数的参数,
    // 如果func1函数有多个参数,这里就具体传对应的参数
    var thread1Id = __Thread(func1, 200)
    
    // 这里需要等待线程Id为thread1Id的线程执行结果,否则main函数执行完就直接释放所有线程
    var ret = __threadJoin(thread1Id)
    Log("ret:", ret)
}

实际应用场景中,我们可以这样并发进行http请求:

function main() {
    let threads = [
        "https://www.baidu.com",
        "https://www.163.com"
    ].map(function(url) {
        return __Thread(function(url) {
            Log("GET", url)
            return HttpQuery(url)
        }, url)
    })
    threads.forEach(function(tid) {
        Log(__threadJoin(tid))
    })
}

等待线程执行结束回收资源并返回执行结果

以上例子中我们在main函数中最后使用了__threadJoin函数来等待并发的线程执行完毕,变量ret接收__threadJoin函数的返回值,我们打印了这个返回值,可以观察这个并发的线程执行的具体结果。

// id:线程ID,terminated:是否被强制停止,elapsed:耗时(纳秒),ret:线程执行函数的返回值
ret: {"id":1,"terminated":false,"elapsed":2004884301,"ret":45}

强制结束线程,并回收资源

function func1(sleepMilliseconds) {
    var sum = 0 
    for (var i = 0 ; i < 10 ; i++) {
        sum += i 
        Sleep(sleepMilliseconds)
        Log("sum:", sum)
    }
    
    return sum
}

function main() {
    var thread1Id = __Thread(func1, 200)
    Sleep(1000)
    retThreadTerminate = __threadTerminate(thread1Id)
    Log(retThreadTerminate)   // true
}

还是以刚才的例子,在创建线程后,可以在等待1秒之后就强制终止线程执行。

线程间通信

线程间通信主要使用__threadPostMessage函数和__threadPeekMessage函数。我们来看以下简单例子:

function func1() {
    var id = __threadId()
    while (true) {
        var postMsg = "来自id:" + id + "的线程函数func1的消息"
        __threadPostMessage(0, postMsg)              // 发送消息到主线程
        var peekMsg = __threadPeekMessage(0)         // 接收来自主线程的消息
        Log(peekMsg)
        Sleep(5000)
    }
}

function main() {
    var threadId = __Thread(func1)
    
    while (true) {
        var postMsg = "来自主线程的main函数的消息"
        __threadPostMessage(threadId, postMsg)
        var peekMsg = __threadPeekMessage(threadId)
        Log(peekMsg, "#FF0000")                     // #FF0000 , 设置日志为红色用于区分
        Sleep(5000)
    }
}

__threadPostMessage函数用于向某个线程发送消息,第一个参数是具体发送到哪个线程的ID,第二个参数是发送的消息,可以是字符串、数值、数组、JSON对象等。可以在并发的线程函数中向主线程发送消息,主线程的ID定义为0。

__threadPeekMessage函数用于监听某个线程发送来的消息,第一个参数是监听具体哪个ID的线程,第二个参数可以设置超时时间(毫秒数),也可以设置为-1表示阻塞,一直监听到有消息才返回。可以在并发的线程函数中监听主线程发送到当前线程的消息,主线程的ID定义为0。

当然,除了并发的线程和主线程通信。并发的线程之间也可以直接相互通信。

在并发的线程执行函数中获取当前线程ID

在上面的例子中,使用了var id = __threadId()__threadId()函数可以获取当前线程的ID。

共享线程间储存的变量

除了线程间的通信,还可以使用共享变量进行交互。

function testFunc() {
    __threadSetData(0, "testFunc", 100)   // 储存在当前线程环境,键值对 testFunc : 100 
    Log("testFunc执行完毕")
}

function main() {
    // threadId为1 ,创建的threadId为1的线程会先执行完,只要线程资源没有被回收,线程本地储存的变量就有效
    var testThread = __Thread(testFunc)
    
    Sleep(1000)

    // 输出 in main, get testFunc: 100
    Log("in main, get testFunc:", __threadGetData(testThread, "testFunc"))   // 取出键名为testFunc的值
}

以上就是所有功能的简单演示,下面我们来看一个稍微复杂一点点的测试例子。

原生多线程JavaScript与WASM性能比较

这个测试策略地址:https://www.youquant.com/strategy/379243

猛地一看可能不知道这个测试策略是做什么的,没关系我们一点一点来说明。首先让我们了解一下“什么是WASM”。

WebAssemblyWASM, WebAssembly是一种新的编码格式并且可以在浏览器中运行,WASM可以与JavaScript并存,WASM更类似一种低级的汇编语言。

那么这个测试策略就是为了比较wasm和javascript执行效率的高低,只不过比较的时候即可以让两种执行方式先后执行,统计各自耗时。也可以让两种执行方式并发执行,统计耗时。既然已经支持了JavaScript语言策略的底层并发实现,那么这个测试策略自然就使用并发的方式去比较,比较相同算法执行的快慢。

  • c语言版本的算法,fib函数
// C语言的斐波纳契数字的递归算法
int fib(int f) {
    if (f < 2) return f;
    return fib(f - 1) + fib(f - 2);
}
  • JavaScript语言版本的算法,fib函数
// 相同的斐波纳契数字的递归算法,使用JavaScript编写
function fib(f) {
    if (f < 2) return f
    return fib(f - 1) + fib(f - 2)
}

可以看到两种fib函数算法逻辑完全一样,以下是这个测试策略的源码:

function main() {
    // 为了方便看代码,我直接把注释写在以下代码上:
    let cycle = 100    // 测试执行100次循环
    let input = 30     // 将要传入算法fib函数的参数
    let threads = [
        __Thread(function(cycle, input) {           // 并发一个线程,使用JavaScript版本的fib函数执行计算
            function fib(f) {                       // 具体测试用的算法,fib函数
                if (f < 2) return f
                return fib(f - 1) + fib(f - 2)
            }
            let ret = 0
            for (let i = 0; i < cycle; i++) {       // 循环100次 
                ret = fib(input);                   // 调用JavaScript语言的fib函数 
                Log("javascript progress: ", i)
            }
            return 'javascript fib: ' + ret
        }, cycle, input),
        
        __Thread(function(cycle, input) {           // 并发一个线程,使用wasm版本的fib函数执行计算
            let data = 'data:hex,0061736d010000000186808080000160017f017f0382808080000100048480808000017000000583808080000100010681808080000007908080800002066d656d6f727902000366696200000aa480808000019e80808000000240200041024e0d0020000f0b2000417f6a10002000417e6a10006a0b'
            let m = wasm.parseModule(data)          // data变量中为wasm编码后的C语言的fib函数的hex字符串,使用wasm.parseModule创建wasm模型m

            let instance = wasm.buildInstance(m, {  // 模型实例化,分配一定堆栈空间
                stack_size: 65 * 1024 * 1024,
            })

            let ret = 0
            for (let i = 0; i < cycle; i++) {                // 循环100次
                ret = instance.callFunction('fib', input)    // 调用wasm实例中的fib函数代码,相当于调用了int fib(int f)函数 
                Log("wasm progress: ", i)
            }

            return 'wasm fib: ' + ret
        }, cycle, input)
    ]
    
    // threads数组中的元素是__Thread函数返回的ID
    threads.forEach(function(tid) {
        let info = __threadJoin(tid)                         // 使用__threadJoin函数等待两个并发的线程执行,获取执行结果
        Log('#'+tid, info.ret, 'elapsed:', info.elapsed / 1e6, "#ff0000")   // 输出执行结果
    })
}

简单说WASM就是一种执行效率更高的程序编码,例子中我们将「斐波纳契数字的递归算法」的c语言代码转换为WASM。过程是这样的:

  • 1、由一段C语言函数代码,编译为wasm编码。

    可以使用这个网站转换: https://wasdk.github.io/WasmFiddle/

    // C语言的斐波纳契数字的递归算法
    int fib(int f) {
        if (f < 2) return f;
        return fib(f - 1) + fib(f - 2);
    }
    
  • 2、把wasm编码进一步编码为hex字符串。

    可以使用以下命令:

    python -c "print('data:hex,'+bytes.hex(open('program.wasm','rb').read()))"
    

    编码后的hex字符串即代码中的let data = 'data:hex,0061736d0100000001868...

  • 3、然后通过FMZ集成的函数wasm.parseModule()解析为wasm模型。

  • 4、再通过FMZ集成的函数wasm.buildInstance()创建为wasm模型实例。

  • 5、再调用这个wasm模型实例中的fib函数,即:ret = instance.callFunction('fib', input)

创建实盘运行测试

这个测试策略只能实盘测试,目前JavaScript多线程功能不支持回测。

wasmJavaScript执行对比,最后执行结果:

2023-03-06 11:00:33		信息	#2 wasm fib: 832040 elapsed: 13283.773019
2023-03-06 11:00:33		信息	#1 javascript fib: 832040 elapsed: 21266.326974

看来wasm耗时更少,更胜一筹。


更多内容