package mqtt import ( "context" "fmt" "log/slog" "time" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" "joylink.club/iot/dto" ) var iotcli *IotClient type IotClient struct { cmc *IotMqttConfig cc *autopaho.ClientConfig cm *autopaho.ConnectionManager } // 初始化并启动MQTT客户端服务 func Start(cmc *IotMqttConfig) error { if err := checkConfig(cmc); err != nil { return err } BuildTopics(cmc.AppId, cmc.ClientId) cc, err := cmc.tryInto() if err != nil { return err } cm, err := autopaho.NewConnection(context.Background(), *cc) if err != nil { return err } iotcli = &IotClient{ cmc: cmc, cc: cc, cm: cm, } return nil } // 断开MQTT客户端 func Stop() error { slog.Info("停止MQTT客户端") ctx, cancle := context.WithTimeout(context.Background(), 5*time.Second) defer cancle() err := iotcli.cm.Disconnect(ctx) return err } func checkConfig(cmc *IotMqttConfig) error { if cmc.AppId == "" { return fmt.Errorf("应用编号不能为空") } if cmc.ClientId == "" { return fmt.Errorf("客户端编号不能为空") } if cmc.BrokerUrl == "" { return fmt.Errorf("MQTT代理服务地址不能为空") } if cmc.Username == "" { return fmt.Errorf("MQTT用户名不能为空") } if cmc.Password == "" { return fmt.Errorf("MQTT密码不能为空") } return nil } // 发布IOT服务状态 func PubIotServiceState(s *dto.IotServiceState) error { return pub(GetIotServiceStateTopic(), s) } // 发布IOT采集数据 func PubIotCjData(cj *dto.IotCj) error { return pub(GetCjTopic(), cj) } // 发布IOT驱动数据 func PubIotQdData(qd *dto.IotQd) error { return pub(GetQdTopic(), qd) } // 注册IOT采集数据处理 func RegIotCjHandler(h func(cj *dto.IotCj)) { iotcli.cc.Router.RegisterHandler(GetCjTopic(), func(p *paho.Publish) { cmd := &dto.IotCj{} err := proto.Unmarshal(p.Payload, cmd) if err != nil { slog.Error("采集数据proto.Unmarshal异常", "error", err) return } h(cmd) }) slog.Info("注册IOT采集数据处理success") } // 注册IOT驱动数据处理 func RegIotQdHandler(h func(qd *dto.IotQd)) { iotcli.cc.Router.RegisterHandler(GetQdTopic(), func(p *paho.Publish) { cmd := &dto.IotQd{} err := proto.Unmarshal(p.Payload, cmd) if err != nil { slog.Error("驱动数据proto.Unmarshal异常", "error", err) return } h(cmd) }) slog.Info("注册IOT驱动数据处理success") } // 注册IOT服务启动请求处理 func RegIotQcServiceStartReqHandler(h func(req *dto.IotQcServiceStartReq) *dto.IotQcServiceCommonResp) { iotcli.cc.Router.RegisterHandler(GetIotServiceStartTopic(), func(p *paho.Publish) { reqHandle(p, h, &dto.IotQcServiceStartReq{}) }) } // 注册IOT服务停止请求处理 func RegIotQcServiceStopReqHandler(h func(req *dto.IotQcServiceStopReq) *dto.IotQcServiceCommonResp) { iotcli.cc.Router.RegisterHandler(GetIotServiceStopTopic(), func(p *paho.Publish) { reqHandle(p, h, &dto.IotQcServiceStopReq{}) }) } // 注册IOT日志查询请求处理 func RegIotLogReqHandler(h func(req *dto.IotServiceLogReq) *dto.IotServiceLogResp) { iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) { reqHandle(p, h, &dto.IotServiceLogReq{}) }) } // 注销IOT处理 func UnregHandler(topic string) { iotcli.cc.Router.UnregisterHandler(topic) } // 注销所有IOT处理 func UnregAllHandler() { iotcli.cc.Router = paho.NewStandardRouter() } func subIotQc() { slog.Info("订阅Iot驱采") sub(GetCjTopic()) // 订阅采集 sub(GetQdTopic()) // 订阅驱动 sub(GetIotServiceStartTopic()) // 订阅启动请求 sub(GetIotServiceStopTopic()) // 订阅停止请求 sub(GetLogReqTopic()) // 订阅日志查询请求 } // 发起订阅 func sub(topic string) { slog.Info("发起订阅", "topic", topic) ctx, cancel := context.WithCancel(context.Background()) defer cancel() _, err := iotcli.cm.Subscribe(ctx, &paho.Subscribe{ Subscriptions: []paho.SubscribeOptions{ { Topic: topic, QoS: 0, NoLocal: true, }, }, }) if err != nil { slog.Error("订阅失败", "topic", topic, "error", err) } } func reqHandle[T proto.Message, P proto.Message](p *paho.Publish, h func(T) P, r T) { slog.Info("IOT-MQTT服务收到请求", "publish", p) if p.Properties != nil && p.Properties.CorrelationData != nil && p.Properties.ResponseTopic != "" { err := proto.Unmarshal(p.Payload, r) if err != nil { slog.Error("Iot请求响应数据处理proto.Unmarshal异常", "error", err) return } resp := h(r) b, err := proto.Marshal(resp) if err != nil { slog.Error("Iot请求响应数据处理proto.Marshal异常", "error", err) } _, err = iotcli.cm.Publish(context.Background(), &paho.Publish{ Topic: p.Properties.ResponseTopic, Properties: &paho.PublishProperties{ CorrelationData: p.Properties.CorrelationData, }, Payload: b, }) if err != nil { slog.Error("Iot请求处理回复异常", "error", err) } } } // 发布数据 func pub(topic string, data protoreflect.ProtoMessage) error { if data == nil { return fmt.Errorf("发布数据引用为nil") } b, err := proto.Marshal(data) if err != nil { return err } switch topic { case GetIotServiceStateTopic(): slog.Debug("IOT-MQTT发布Iot服务状态", "topic", topic, "data", data) case GetCjTopic(): slog.Debug("IOT-MQTT发布采集数据", "topic", topic, "data", data) case GetQdTopic(): slog.Debug("IOT-MQTT发布驱动数据", "topic", topic, "data", data) default: return fmt.Errorf("IOT-MQTT未知发布主题: topic=%s", topic) } _, err = iotcli.cm.Publish(context.Background(), &paho.Publish{ Topic: topic, QoS: 0, Payload: b, }) return err }