package mqtt import ( "fmt" "log/slog" "math/rand" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/google/uuid" ) // MQTT客户端连接配置 type MqttOptions struct { // MQTT Broker 代理 Broker string // 认证用户名 Username string // 认证密码 Password string } func NewMqttOptions(address, username, password string) MqttOptions { return MqttOptions{ Broker: address, Username: username, Password: password, } } var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { slog.Debug("MQTT收到消息", "topic", msg.Topic(), "msg", string(msg.Payload())) } var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { or := client.OptionsReader() slog.Info("MQTT连接成功", "ClientID", or.ClientID()) // subs := make(map[string]byte) // subs["$SYS/brokers/+/clients/+/+"] = 0 // client.SubscribeMultiple(subs, messagePubHandler) } var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { slog.Error("MQTT连接断开", "error", err) } var mqttClient mqtt.Client var clientId string // 初始化MQTT客户端id func initClientId() { if clientId == "" { us := uuid.New().String() usl := len(us) sufix5 := us[usl-5 : usl] clientId = fmt.Sprintf("%s%d", sufix5, rand.New(rand.NewSource(time.Now().UnixNano())).Int()%1000) } } // 获取MQTT客户端id func GetClientId() string { return clientId } // 启动MQTT func Startup(options MqttOptions) { initClientId() opts := mqtt.NewClientOptions() opts.AddBroker(options.Broker) opts.SetClientID(clientId) opts.SetUsername(options.Username) opts.SetPassword(options.Password) opts.SetDefaultPublishHandler(messagePubHandler) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } mqttClient = client } // 发布消息 func Publish(dest string, data any) error { token := mqttClient.Publish(dest, 0, false, data) if token.Wait() && token.Error() != nil { return token.Error() } return nil }