jl-iot/mqtt/client.go
walker 0b20dcdf7a modbus驱采映射服务功能调整重构
1,添加驱动采集获取和写入接口
2,创建服务接口调整接收驱采字节数组
mqtt客户端功能代码调整,未完
2023-12-18 15:34:10 +08:00

180 lines
4.0 KiB
Go

package mqtt
import (
"context"
"fmt"
"log/slog"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
mproto "joylink.club/iot/mqtt/proto"
)
var iotcli *IotClient
type IotClient struct {
cmc *IotMqttConfig
cc *autopaho.ClientConfig
cm *autopaho.ConnectionManager
}
// 初始化并启动MQTT客户端服务
func Start(cmc *IotMqttConfig) error {
if err := checkConfig(cmc); err != nil {
return err
}
BuildTopics(cmc.AppId, cmc.ClientId)
cc, err := cmc.tryInto()
if err != nil {
return err
}
cm, err := autopaho.NewConnection(context.Background(), *cc)
if err != nil {
return err
}
iotcli = &IotClient{
cmc: cmc,
cc: cc,
cm: cm,
}
return nil
}
func checkConfig(cmc *IotMqttConfig) error {
if cmc.AppId == "" {
return fmt.Errorf("应用编号不能为空")
}
if cmc.ClientId == "" {
return fmt.Errorf("客户端编号不能为空")
}
if cmc.BrokerUrl == "" {
return fmt.Errorf("MQTT代理服务地址不能为空")
}
if cmc.Username == "" {
return fmt.Errorf("MQTT用户名不能为空")
}
if cmc.Password == "" {
return fmt.Errorf("MQTT密码不能为空")
}
return nil
}
// 发布IOT服务状态
func PubIotServiceState(s *mproto.IotServiceState) error {
return pub(GetIotServiceStateTopic(), s)
}
// 发布IOT采集数据
func PubIotCjData(cj *mproto.IotCj) error {
return pub(GetCjTopic(), cj)
}
// 发布IOT驱动数据
func PubIotQdData(qd *mproto.IotQd) error {
return pub(GetCjTopic(), qd)
}
// 注册IOT采集数据处理
func RegIotCjHandler(h func(cj *mproto.IotCj)) {
iotcli.cc.Router.RegisterHandler(GetCmdTopic(), func(p *paho.Publish) {
cmd := &mproto.IotCj{}
err := proto.Unmarshal(p.Payload, cmd)
if err != nil {
slog.Error("采集数据proto.Unmarshal异常", "error", err)
return
}
h(cmd)
})
}
// 注册IOT驱动数据处理
func RegIotQdHandler(h func(qd *mproto.IotQd)) {
iotcli.cc.Router.RegisterHandler(GetCmdTopic(), func(p *paho.Publish) {
cmd := &mproto.IotQd{}
err := proto.Unmarshal(p.Payload, cmd)
if err != nil {
slog.Error("驱动数据proto.Unmarshal异常", "error", err)
return
}
h(cmd)
})
}
// 注册IOT日志查询请求处理
func RegIotLogReqHandler(h func(cmd *mproto.IotServiceLogReq)) {
iotcli.cc.Router.RegisterHandler(GetCmdTopic(), func(p *paho.Publish) {
cmd := &mproto.IotServiceLogReq{}
err := proto.Unmarshal(p.Payload, cmd)
if err != nil {
slog.Error("RegIotReqHandler proto.Unmarshal异常", "error", err)
return
}
h(cmd)
})
}
// 注销IOT处理
func UnregHandler(topic string) {
iotcli.cc.Router.UnregisterHandler(topic)
}
// 注销所有IOT处理
func UnregAllHandler() {
iotcli.cc.Router = paho.NewStandardRouter()
}
func subIotQc() {
slog.Info("订阅Iot驱采")
sub(GetCjTopic()) // 订阅采集
sub(GetQdTopic()) // 订阅驱动
}
// 发起订阅
func sub(topic string) {
slog.Info("发起订阅", "topic", topic)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err := iotcli.cm.Subscribe(ctx, &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{
Topic: topic,
QoS: 0,
NoLocal: true,
},
},
})
if err != nil {
slog.Error("订阅失败", "topic", topic, "error", err)
}
}
// 发布数据
func pub(topic string, data protoreflect.ProtoMessage) error {
if data == nil {
return fmt.Errorf("发布数据引用为nil")
}
b, err := proto.Marshal(data)
if err != nil {
return err
}
switch topic {
case GetIotServiceStateTopic():
slog.Debug("发布Iot服务状态", "topic", topic, "data", data)
case GetCjTopic():
slog.Debug("发布采集数据", "topic", topic, "data", data)
case GetQdTopic():
slog.Debug("发布驱动数据", "topic", topic, "data", data)
default:
slog.Error("未知发布主题", "topic", topic, "data", data)
return fmt.Errorf("未知发布主题: topic=%s", topic)
}
_, err = iotcli.cm.Publish(context.Background(), &paho.Publish{
Topic: topic,
QoS: 0,
Payload: b,
})
return err
}