package mqtt import ( "fmt" "log/slog" "math/rand" "net/url" "time" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" "github.com/google/uuid" "joylink.club/bj-rtsts-server/config" ) type MqttOptions struct { AppId string // 所属应用编号 BrokerUrl string // Broker地址 ClientId string // 客户端ID Username string // 用户名 Password string // 密码 KeepAlive uint16 // 保活时间间隔,单位s,默认为60 ConnectRetryDelay uint16 // 连接重试延时,单位s,默认为3 ConnectTimeout uint16 // 连接操作超时,单位s,默认为3 route *paho.StandardRouter } func NewMqttOptions(address, username, password string) *MqttOptions { return &MqttOptions{ AppId: config.SystemName, BrokerUrl: address, Username: username, Password: password, ClientId: (func() string { // 初始化MQTT客户端id us := uuid.New().String() usl := len(us) sufix5 := us[usl-5 : usl] return fmt.Sprintf("%s%d", sufix5, rand.New(rand.NewSource(time.Now().UnixNano())).Int()%1000) })(), } } func (c *MqttOptions) tryInto() (*autopaho.ClientConfig, error) { addr, err := url.Parse(c.BrokerUrl) if err != nil { return nil, fmt.Errorf("Mqtt.Address格式错误, %s: %w", c.BrokerUrl, err) } if c.KeepAlive == 0 { c.KeepAlive = 60 } if c.ConnectRetryDelay == 0 { c.ConnectRetryDelay = 3 } if c.ConnectTimeout == 0 { c.ConnectTimeout = 3 } router := paho.NewStandardRouter() cc := &autopaho.ClientConfig{ BrokerUrls: []*url.URL{ addr, }, KeepAlive: c.KeepAlive, ConnectRetryDelay: time.Duration(c.ConnectRetryDelay) * time.Second, ConnectTimeout: time.Duration(c.ConnectTimeout) * time.Second, OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) { slog.Info("MQTT连接成功") }, OnConnectError: func(err error) { slog.Error("MQTT连接失败", "error", err) }, ClientConfig: paho.ClientConfig{ ClientID: c.ClientId, Router: paho.NewStandardRouter(), OnClientError: func(err error) { fmt.Printf("%s Mqtt客户端发生错误: %s\n", c.ClientId, err) }, OnServerDisconnect: func(d *paho.Disconnect) { fmt.Printf("%s 连接断开; reason code: %d,properties: %v\n", c.ClientId, d.ReasonCode, d.Properties) }, //添加订阅路由功能,以支持订阅 OnPublishReceived: []func(paho.PublishReceived) (bool, error){ func(pr paho.PublishReceived) (bool, error) { router.Route(pr.Packet.Packet()) return true, nil // we assume that the router handles all messages (todo: amend router API) }}, }, } cc.SetUsernamePassword(c.Username, []byte(c.Password)) c.route = router return cc, nil }