91 lines
2.1 KiB
Go
91 lines
2.1 KiB
Go
package mqtt
|
|
|
|
import (
|
|
"fmt"
|
|
"log/slog"
|
|
"math/rand"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// MQTT客户端连接配置
|
|
type MqttOptions struct {
|
|
// MQTT Broker 代理
|
|
Broker string
|
|
// 认证用户名
|
|
Username string
|
|
// 认证密码
|
|
Password string
|
|
}
|
|
|
|
func NewMqttOptions(address, username, password string) MqttOptions {
|
|
return MqttOptions{
|
|
Broker: address,
|
|
Username: username,
|
|
Password: password,
|
|
}
|
|
}
|
|
|
|
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
|
|
slog.Debug("MQTT收到消息", "topic", msg.Topic(), "msg", string(msg.Payload()))
|
|
}
|
|
|
|
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
|
|
or := client.OptionsReader()
|
|
slog.Info("MQTT连接成功", "ClientID", or.ClientID())
|
|
// subs := make(map[string]byte)
|
|
// subs["$SYS/brokers/+/clients/+/+"] = 0
|
|
// client.SubscribeMultiple(subs, messagePubHandler)
|
|
}
|
|
|
|
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
|
|
slog.Error("MQTT连接断开", "error", err)
|
|
}
|
|
|
|
var mqttClient mqtt.Client
|
|
var clientId string
|
|
|
|
// 初始化MQTT客户端id
|
|
func initClientId() {
|
|
if clientId == "" {
|
|
us := uuid.New().String()
|
|
usl := len(us)
|
|
sufix5 := us[usl-5 : usl]
|
|
clientId = fmt.Sprintf("%s%d", sufix5, rand.New(rand.NewSource(time.Now().UnixNano())).Int()%1000)
|
|
}
|
|
}
|
|
|
|
// 获取MQTT客户端id
|
|
func GetClientId() string {
|
|
return clientId
|
|
}
|
|
|
|
// 启动MQTT
|
|
func Startup(options MqttOptions) {
|
|
initClientId()
|
|
opts := mqtt.NewClientOptions()
|
|
opts.AddBroker(options.Broker)
|
|
opts.SetClientID(clientId)
|
|
opts.SetUsername(options.Username)
|
|
opts.SetPassword(options.Password)
|
|
opts.SetDefaultPublishHandler(messagePubHandler)
|
|
opts.OnConnect = connectHandler
|
|
opts.OnConnectionLost = connectLostHandler
|
|
client := mqtt.NewClient(opts)
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
panic(token.Error())
|
|
}
|
|
mqttClient = client
|
|
}
|
|
|
|
// 发布消息
|
|
func Publish(dest string, data any) error {
|
|
token := mqttClient.Publish(dest, 0, false, data)
|
|
if token.Wait() && token.Error() != nil {
|
|
return token.Error()
|
|
}
|
|
return nil
|
|
}
|