227 lines
5.7 KiB
Go
227 lines
5.7 KiB
Go
package mqtt
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/eclipse/paho.golang/autopaho"
|
|
"github.com/eclipse/paho.golang/paho"
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/reflect/protoreflect"
|
|
"joylink.club/iot/dto"
|
|
)
|
|
|
|
var iotcli *IotClient
|
|
|
|
type IotClient struct {
|
|
cmc *IotMqttConfig
|
|
cc *autopaho.ClientConfig
|
|
cm *autopaho.ConnectionManager
|
|
}
|
|
|
|
// 初始化并启动MQTT客户端服务
|
|
func Start(cmc *IotMqttConfig) error {
|
|
if err := checkConfig(cmc); err != nil {
|
|
return err
|
|
}
|
|
BuildTopics(cmc.AppId, cmc.ClientId)
|
|
cc, err := cmc.tryInto()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cm, err := autopaho.NewConnection(context.Background(), *cc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
iotcli = &IotClient{
|
|
cmc: cmc,
|
|
cc: cc,
|
|
cm: cm,
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// 断开MQTT客户端
|
|
func Stop() error {
|
|
slog.Info("停止MQTT客户端")
|
|
ctx, cancle := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancle()
|
|
err := iotcli.cm.Disconnect(ctx)
|
|
return err
|
|
}
|
|
|
|
func checkConfig(cmc *IotMqttConfig) error {
|
|
if cmc.AppId == "" {
|
|
return fmt.Errorf("应用编号不能为空")
|
|
}
|
|
if cmc.ClientId == "" {
|
|
return fmt.Errorf("客户端编号不能为空")
|
|
}
|
|
if cmc.BrokerUrl == "" {
|
|
return fmt.Errorf("MQTT代理服务地址不能为空")
|
|
}
|
|
if cmc.Username == "" {
|
|
return fmt.Errorf("MQTT用户名不能为空")
|
|
}
|
|
if cmc.Password == "" {
|
|
return fmt.Errorf("MQTT密码不能为空")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// 发布IOT服务状态
|
|
func PubIotServiceState(s *dto.IotServiceState) error {
|
|
return pub(GetIotServiceStateTopic(), s)
|
|
}
|
|
|
|
// 发布IOT采集数据
|
|
func PubIotCjData(cj *dto.IotCj) error {
|
|
return pub(GetCjTopic(), cj)
|
|
}
|
|
|
|
// 发布IOT驱动数据
|
|
func PubIotQdData(qd *dto.IotQd) error {
|
|
return pub(GetCjTopic(), qd)
|
|
}
|
|
|
|
// 注册IOT采集数据处理
|
|
func RegIotCjHandler(h func(cj *dto.IotCj)) {
|
|
iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) {
|
|
cmd := &dto.IotCj{}
|
|
err := proto.Unmarshal(p.Payload, cmd)
|
|
if err != nil {
|
|
slog.Error("采集数据proto.Unmarshal异常", "error", err)
|
|
return
|
|
}
|
|
h(cmd)
|
|
})
|
|
}
|
|
|
|
// 注册IOT驱动数据处理
|
|
func RegIotQdHandler(h func(qd *dto.IotQd)) {
|
|
iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) {
|
|
cmd := &dto.IotQd{}
|
|
err := proto.Unmarshal(p.Payload, cmd)
|
|
if err != nil {
|
|
slog.Error("驱动数据proto.Unmarshal异常", "error", err)
|
|
return
|
|
}
|
|
h(cmd)
|
|
})
|
|
}
|
|
|
|
// 注册IOT服务启动请求处理
|
|
func RegIotQcServiceStartReqHandler(h func(req *dto.IotQcServiceStartReq) *dto.IotQcServiceCommonResp) {
|
|
iotcli.cc.Router.RegisterHandler(GetIotServiceStartTopic(), func(p *paho.Publish) {
|
|
reqHandle(p, h, &dto.IotQcServiceStartReq{})
|
|
})
|
|
}
|
|
|
|
// 注册IOT服务停止请求处理
|
|
func RegIotQcServiceStopReqHandler(h func(req *dto.IotQcServiceStopReq) *dto.IotQcServiceCommonResp) {
|
|
iotcli.cc.Router.RegisterHandler(GetIotServiceStopTopic(), func(p *paho.Publish) {
|
|
reqHandle(p, h, &dto.IotQcServiceStopReq{})
|
|
})
|
|
}
|
|
|
|
// 注册IOT日志查询请求处理
|
|
func RegIotLogReqHandler(h func(req *dto.IotServiceLogReq) *dto.IotServiceLogResp) {
|
|
iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) {
|
|
reqHandle(p, h, &dto.IotServiceLogReq{})
|
|
})
|
|
}
|
|
|
|
// 注销IOT处理
|
|
func UnregHandler(topic string) {
|
|
iotcli.cc.Router.UnregisterHandler(topic)
|
|
}
|
|
|
|
// 注销所有IOT处理
|
|
func UnregAllHandler() {
|
|
iotcli.cc.Router = paho.NewStandardRouter()
|
|
}
|
|
|
|
func subIotQc() {
|
|
slog.Info("订阅Iot驱采")
|
|
sub(GetCjTopic()) // 订阅采集
|
|
sub(GetQdTopic()) // 订阅驱动
|
|
sub(GetIotServiceStartTopic()) // 订阅启动请求
|
|
sub(GetIotServiceStopTopic()) // 订阅停止请求
|
|
sub(GetLogReqTopic()) // 订阅日志查询请求
|
|
}
|
|
|
|
// 发起订阅
|
|
func sub(topic string) {
|
|
slog.Info("发起订阅", "topic", topic)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
_, err := iotcli.cm.Subscribe(ctx, &paho.Subscribe{
|
|
Subscriptions: []paho.SubscribeOptions{
|
|
{
|
|
Topic: topic,
|
|
QoS: 0,
|
|
NoLocal: true,
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
slog.Error("订阅失败", "topic", topic, "error", err)
|
|
}
|
|
}
|
|
|
|
func reqHandle[T proto.Message, P proto.Message](p *paho.Publish, h func(T) P, r T) {
|
|
fmt.Printf("收到请求: %v\n", p)
|
|
if p.Properties != nil && p.Properties.CorrelationData != nil && p.Properties.ResponseTopic != "" {
|
|
err := proto.Unmarshal(p.Payload, r)
|
|
if err != nil {
|
|
slog.Error("Iot请求响应数据处理proto.Unmarshal异常", "error", err)
|
|
return
|
|
}
|
|
resp := h(r)
|
|
b, err := proto.Marshal(resp)
|
|
if err != nil {
|
|
slog.Error("Iot请求响应数据处理proto.Marshal异常", "error", err)
|
|
}
|
|
_, err = iotcli.cm.Publish(context.Background(), &paho.Publish{
|
|
Topic: p.Properties.ResponseTopic,
|
|
Properties: &paho.PublishProperties{
|
|
CorrelationData: p.Properties.CorrelationData,
|
|
},
|
|
Payload: b,
|
|
})
|
|
if err != nil {
|
|
slog.Error("Iot请求处理回复异常", "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// 发布数据
|
|
func pub(topic string, data protoreflect.ProtoMessage) error {
|
|
if data == nil {
|
|
return fmt.Errorf("发布数据引用为nil")
|
|
}
|
|
b, err := proto.Marshal(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// switch topic {
|
|
// case GetIotServiceStateTopic():
|
|
// slog.Debug("发布Iot服务状态", "topic", topic, "data", data)
|
|
// case GetCjTopic():
|
|
// slog.Debug("发布采集数据", "topic", topic, "data", data)
|
|
// case GetQdTopic():
|
|
// slog.Debug("发布驱动数据", "topic", topic, "data", data)
|
|
// default:
|
|
// slog.Error("未知发布主题", "topic", topic, "data", data)
|
|
// return fmt.Errorf("未知发布主题: topic=%s", topic)
|
|
// }
|
|
_, err = iotcli.cm.Publish(context.Background(), &paho.Publish{
|
|
Topic: topic,
|
|
QoS: 0,
|
|
Payload: b,
|
|
})
|
|
return err
|
|
}
|