用于原始的Socket
访问,支持tcp
、udp
、tls
、unix
协议。
支持4种流行的通信协议:mqtt
、nats
、amqp
、kafka
。
支持连接数据库,支持的数据库包括:sqlite3
、mysql
、postgres
、clickhouse
。
如果超时,Dial()
函数返回空值。正常调用时返回一个连接对象,该对象包含三个方法:read
、write
、close
。read
方法用于读取数据,write
方法用于发送数据,close
方法用于关闭连接。
- 不传参数时,阻塞直到接收到消息后返回。例如:```ws.read()```。
- 传入参数时,单位为毫秒,指定消息等待超时时间。例如:```ws.read(2000)```指定超时时间为2秒(2000毫秒)。
- 以下两个参数仅对WebSocket有效:
传入参数```-1```时,无论是否有消息,函数都立即返回。例如:```ws.read(-1)```。
传入参数```-2```时,无论是否有消息,函数都立即返回,但只返回最新的消息,缓冲区中的其他消息将被丢弃。例如:```ws.read(-2)```。
```read()```函数缓冲区说明:
WebSocket协议推送的数据,如果策略中```read()```函数调用的时间间隔过长,可能会造成数据累积。这些数据存储在缓冲区中,缓冲区的数据结构为队列,上限为2000条。超过2000条后,最新的数据进入缓冲区,最旧的数据将被清除。
| 场景 | 无参数 | 参数:-1 | 参数:-2 | 参数:2000,单位是毫秒 |
| - | - | - | - | - |
| 缓冲区已有数据 | 立即返回最旧数据 | 立即返回最旧数据 | 立即返回最新数据 | 立即返回最旧数据 |
| 缓冲区无数据 | 阻塞直到有数据时返回 | 立即返回空值 | 立即返回空值 | 等待2000毫秒,无数据返回空值,有数据则返回 |
| WebSocket连接断开或底层重连时 | read()函数返回空字符串,即:"",write()函数返回0。检测到该情况后,可以使用close()函数关闭连接。如果设置了自动重连,则无需关闭,系统底层会自动重连。 | - | - | - |
object
Dial(address)
Dial(address, timeout)
Dial(address, options)
请求地址。
address
true
string
超时时间(秒)。
timeout
false
number
配置选项。
options
false
object
```javascript
function main(){
// Dial支持tcp://、udp://、tls://、unix://协议,可添加一个参数指定超时秒数
var client = Dial("tls://www.baidu.com:443")
if (client) {
// write可再添加一个数字参数指定超时,write返回成功发送的字节数
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while (true) {
// read可再添加一个数字参数指定超时,单位:毫秒。返回null表示错误、超时或socket已关闭
var buf = client.read()
if (!buf) {
break
}
Log(buf)
}
client.close()
}
}```
```python
def main():
client = Dial("tls://www.baidu.com:443")
if client:
client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
while True:
buf = client.read()
if not buf:
break
Log(buf)
client.close()```
```cpp
void main() {
auto client = Dial("tls://www.baidu.com:443");
if(client.Valid) {
client.write("GET / HTTP/1.1\nConnection: Closed\n\n");
while(true) {
auto buf = client.read();
if(buf == "") {
break;
}
Log(buf);
}
client.close();
}
}```
Dial函数调用示例:
```javascript
var client = null
function main() {
// client = Dial("sqlite3://:memory:") // 使用内存数据库
client = Dial("sqlite3://test1.db") // 打开/连接托管者所在目录的数据库文件
// 记录句柄
var sqlite3Handle = client.fd()
Log("sqlite3Handle:", sqlite3Handle)
// 查询数据库中的表
var ret = client.exec("SELECT name FROM sqlite_master WHERE type='table'")
Log(ret)
}
function onexit() {
Log("执行client.close()")
client.close()
}```
```python
// 不支持```
```cpp
// 不支持```
Dial函数连接数据库时返回的连接对象具有2个独有的方法:
- ```exec(sqlString)```: 用于执行SQL语句,使用方式类似于```DBExec()```函数。
- ```fd()```: ```fd()```函数返回一个句柄(例如:句柄变量为handle),用于其他线程重连(即使Dial创建的对象已经执行```close()```函数关闭连接),将句柄传入```Dial()```函数,例如:```Dial(handle)```重用连接。
以下是Dial函数连接```sqlite3```数据库的示例。
```address```参数的详细说明:在标准地址```wss://xxx.xxx.xxx:10441/websocket?compress```后,使用```|```符号作为分隔符。如果参数字符串中包含```|```字符,则使用```||```作为分隔符。分隔符后的部分为功能参数设置,各参数之间使用```&```字符连接。
例如,同时设置```socks5```代理和压缩参数时可以写作:
```Dial("wss://baidu.com/stream|proxy=socks5://xxx:9999&compress=gzip_raw&mode=recv")```
|Dial函数的address参数支持的功能|参数说明|
| - | - |
|WebSocket协议数据压缩相关的参数:compress=参数值|compress为压缩方式,compress参数可选gzip_raw、gzip等。如果gzip方式为非标准gzip,可以使用扩展方式:gzip_raw|
|WebSocket协议数据压缩相关的参数:mode=参数值|mode为压缩模式,mode参数可选dual、send、recv三种。dual为双向压缩,发送和接收压缩数据。send为仅发送压缩数据。recv为仅接收压缩数据,本地进行解压缩。|
|WebSocket协议启用compression设置:enableCompression=true|使用enableCompression=false关闭该设置,默认不启用。|
|WebSocket协议设置底层自动重连相关的参数:reconnect=参数值|reconnect用于设置是否重连,reconnect=true为启用重连。不设置该参数时默认不重连。|
|WebSocket协议设置底层自动重连相关的参数:interval=参数值|interval为重试时间间隔,单位为毫秒,interval=10000表示重试间隔为10秒,不设置时默认为1秒,即interval=1000。|
|WebSocket协议设置底层自动重连相关的参数:payload=参数值|payload为WebSocket重连时需要发送的订阅消息,例如:payload=okok。|
|socks5代理的相关参数:proxy=参数值|proxy为socks5代理设置,参数值格式:socks5://name:pwd@192.168.0.1:1080,其中name为socks5服务端用户名,pwd为socks5服务端登录密码,1080为socks5服务端口。|
```Dial()```函数仅支持实盘环境。
使用Dial函数连接数据库时,连接字符串的编写请参考各数据库的Go语言驱动项目。
| 支持的数据库 | 驱动项目 | 连接字符串(Connection String) | 备注 |
| - | - | - | - |
| sqlite3 | github.com/mattn/go-sqlite3 | sqlite3://file:test.db?cache=shared&mode=memory | ```sqlite3://```前缀表示使用sqlite3数据库,调用示例:```Dial("sqlite3://test1.db")``` |
| mysql | github.com/go-sql-driver/mysql | mysql://username:yourpassword@tcp(localhost:3306)/yourdatabase?charset=utf8mb4 | -- |
| postgres | github.com/lib/pq | postgres://user=postgres dbname=yourdatabase sslmode=disable password=yourpassword host=localhost port=5432 | -- |
| clickhouse | github.com/ClickHouse/clickhouse-go | clickhouse://tcp://host:9000?username=username&password=yourpassword&database=youdatabase | -- |
目前仅JavaScript语言支持Dial函数中使用```mqtt```、```nats```、```amqp```、```kafka```通信协议。以下为JavaScript语言策略代码示例,展示```mqtt```、```nats```、```amqp```、```kafka```四种协议的使用方法:
```js
// 需要先配置、部署完成各个协议的代理服务器
// 为了便于演示,主题test_topic的订阅(read操作)、发布(write操作)都在当前这个策略中进行
var arrConn = []
var arrName = []
function main() {
LogReset(1)
conn_nats = Dial("nats://admin@127.0.0.1:4222?topic=test_topic")
conn_mqtt = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
conn_amqp = Dial("amqp://q:admin@127.0.0.1:5672/?queue=test_Queue")
conn_kafka = Dial("kafka://localhost:9092/test_topic")
arrConn = [conn_nats, conn_amqp, conn_mqtt, conn_kafka]
arrName = ["nats", "amqp", "mqtt", "kafka"]
while (true) {
for (var i in arrConn) {
var conn = arrConn[i]
var name = arrName[i]
// 写数据
conn.write(name + ", time: " + _D() + ", test msg.")
// 读数据
var readMsg = conn.read(1000)
Log(name + " readMsg: ", readMsg, "#FF0000")
}
Sleep(1000)
}
}
function onexit() {
for (var i in arrConn) {
arrConn[i].close()
Log("关闭", arrName[i], "连接")
}
}
Dial函数用于访问WebSocket接口时,支持设置WSS请求头:
let options = {"headers": {"Authorization": "Bearer token123"}}
let conn = Dial("wss://api.example.com/ws", options)
options = {"headers": {"Authorization": "Bearer token123"}}
conn = Dial("wss://api.example.com/ws", options)
// 暂不支持