rts-sim-testing-service/mqtt/config.go
2024-05-07 10:52:29 +08:00

90 lines
2.7 KiB
Go

package mqtt
import (
"fmt"
"log/slog"
"math/rand"
"net/url"
"time"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"github.com/google/uuid"
"joylink.club/bj-rtsts-server/config"
)
type MqttOptions struct {
AppId string // 所属应用编号
BrokerUrl string // Broker地址
ClientId string // 客户端ID
Username string // 用户名
Password string // 密码
KeepAlive uint16 // 保活时间间隔,单位s,默认为60
ConnectRetryDelay uint16 // 连接重试延时,单位s,默认为3
ConnectTimeout uint16 // 连接操作超时,单位s,默认为3
route *paho.StandardRouter
}
func NewMqttOptions(address, username, password string) *MqttOptions {
return &MqttOptions{
AppId: config.SystemName,
BrokerUrl: address,
Username: username,
Password: password,
ClientId: (func() string { // 初始化MQTT客户端id
us := uuid.New().String()
usl := len(us)
sufix5 := us[usl-5 : usl]
return fmt.Sprintf("%s%d", sufix5, rand.New(rand.NewSource(time.Now().UnixNano())).Int()%1000)
})(),
}
}
func (c *MqttOptions) tryInto() (*autopaho.ClientConfig, error) {
addr, err := url.Parse(c.BrokerUrl)
if err != nil {
return nil, fmt.Errorf("Mqtt.Address格式错误, %s: %w", c.BrokerUrl, err)
}
if c.KeepAlive == 0 {
c.KeepAlive = 60
}
if c.ConnectRetryDelay == 0 {
c.ConnectRetryDelay = 3
}
if c.ConnectTimeout == 0 {
c.ConnectTimeout = 3
}
router := paho.NewStandardRouter()
cc := &autopaho.ClientConfig{
BrokerUrls: []*url.URL{
addr,
},
KeepAlive: c.KeepAlive,
ConnectRetryDelay: time.Duration(c.ConnectRetryDelay) * time.Second,
ConnectTimeout: time.Duration(c.ConnectTimeout) * time.Second,
OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) {
slog.Info("MQTT连接成功")
},
OnConnectError: func(err error) {
slog.Error("MQTT连接失败", "error", err)
},
ClientConfig: paho.ClientConfig{
ClientID: c.ClientId,
Router: paho.NewStandardRouter(),
OnClientError: func(err error) { fmt.Printf("%s Mqtt客户端发生错误: %s\n", c.ClientId, err) },
OnServerDisconnect: func(d *paho.Disconnect) {
fmt.Printf("%s 连接断开; reason code: %d,properties: %v\n", c.ClientId, d.ReasonCode, d.Properties)
},
//添加订阅路由功能,以支持订阅
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
func(pr paho.PublishReceived) (bool, error) {
router.Route(pr.Packet.Packet())
return true, nil // we assume that the router handles all messages (todo: amend router API)
}},
},
}
cc.SetUsernamePassword(c.Username, []byte(c.Password))
c.route = router
return cc, nil
}