mqtt客户端添加请求处理公共实现

基于请求处理实现Iot日志查询请求处理器注册接口
This commit is contained in:
walker 2023-12-19 16:15:21 +08:00
parent 70c8f9a41d
commit 5f94a5a146
4 changed files with 206 additions and 28 deletions

View File

@ -49,6 +49,7 @@ func LoadConfig() {
cnf.SetConfigType("yml")
cnf.AddConfigPath("./config/")
cnf.AddConfigPath(".")
cnf.AddConfigPath("../config/")
err := cnf.ReadInConfig()
if err != nil {
panic(fmt.Errorf("读取配置文件错误: %w", err))

View File

@ -78,7 +78,7 @@ func PubIotQdData(qd *mproto.IotQd) error {
// 注册IOT采集数据处理
func RegIotCjHandler(h func(cj *mproto.IotCj)) {
iotcli.cc.Router.RegisterHandler(GetCmdTopic(), func(p *paho.Publish) {
iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) {
cmd := &mproto.IotCj{}
err := proto.Unmarshal(p.Payload, cmd)
if err != nil {
@ -91,7 +91,7 @@ func RegIotCjHandler(h func(cj *mproto.IotCj)) {
// 注册IOT驱动数据处理
func RegIotQdHandler(h func(qd *mproto.IotQd)) {
iotcli.cc.Router.RegisterHandler(GetCmdTopic(), func(p *paho.Publish) {
iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) {
cmd := &mproto.IotQd{}
err := proto.Unmarshal(p.Payload, cmd)
if err != nil {
@ -103,15 +103,9 @@ func RegIotQdHandler(h func(qd *mproto.IotQd)) {
}
// 注册IOT日志查询请求处理
func RegIotLogReqHandler(h func(cmd *mproto.IotServiceLogReq)) {
iotcli.cc.Router.RegisterHandler(GetCmdTopic(), func(p *paho.Publish) {
cmd := &mproto.IotServiceLogReq{}
err := proto.Unmarshal(p.Payload, cmd)
if err != nil {
slog.Error("RegIotReqHandler proto.Unmarshal异常", "error", err)
return
}
h(cmd)
func RegIotLogReqHandler(h func(req *mproto.IotServiceLogReq) *mproto.IotServiceLogResp) {
iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) {
reqHandle(p, h, &mproto.IotServiceLogReq{})
})
}
@ -127,8 +121,9 @@ func UnregAllHandler() {
func subIotQc() {
slog.Info("订阅Iot驱采")
sub(GetCjTopic()) // 订阅采集
sub(GetQdTopic()) // 订阅驱动
sub(GetCjTopic()) // 订阅采集
sub(GetQdTopic()) // 订阅驱动
sub(GetLogReqTopic()) // 订阅日志查询请求
}
// 发起订阅
@ -150,6 +145,32 @@ func sub(topic string) {
}
}
func reqHandle[T proto.Message, P proto.Message](p *paho.Publish, h func(T) P, r T) {
fmt.Printf("收到请求: %v\n", p)
if p.Properties != nil && p.Properties.CorrelationData != nil && p.Properties.ResponseTopic != "" {
err := proto.Unmarshal(p.Payload, r)
if err != nil {
slog.Error("Iot请求响应数据处理proto.Unmarshal异常", "error", err)
return
}
resp := h(r)
b, err := proto.Marshal(resp)
if err != nil {
slog.Error("Iot请求响应数据处理proto.Marshal异常", "error", err)
}
_, err = iotcli.cm.Publish(context.Background(), &paho.Publish{
Topic: p.Properties.ResponseTopic,
Properties: &paho.PublishProperties{
CorrelationData: p.Properties.CorrelationData,
},
Payload: b,
})
if err != nil {
slog.Error("Iot请求处理回复异常", "error", err)
}
}
}
// 发布数据
func pub(topic string, data protoreflect.ProtoMessage) error {
if data == nil {
@ -159,17 +180,17 @@ func pub(topic string, data protoreflect.ProtoMessage) error {
if err != nil {
return err
}
switch topic {
case GetIotServiceStateTopic():
slog.Debug("发布Iot服务状态", "topic", topic, "data", data)
case GetCjTopic():
slog.Debug("发布采集数据", "topic", topic, "data", data)
case GetQdTopic():
slog.Debug("发布驱动数据", "topic", topic, "data", data)
default:
slog.Error("未知发布主题", "topic", topic, "data", data)
return fmt.Errorf("未知发布主题: topic=%s", topic)
}
// switch topic {
// case GetIotServiceStateTopic():
// slog.Debug("发布Iot服务状态", "topic", topic, "data", data)
// case GetCjTopic():
// slog.Debug("发布采集数据", "topic", topic, "data", data)
// case GetQdTopic():
// slog.Debug("发布驱动数据", "topic", topic, "data", data)
// default:
// slog.Error("未知发布主题", "topic", topic, "data", data)
// return fmt.Errorf("未知发布主题: topic=%s", topic)
// }
_, err = iotcli.cm.Publish(context.Background(), &paho.Publish{
Topic: topic,
QoS: 0,

148
mqtt/client_test.go Normal file
View File

@ -0,0 +1,148 @@
package mqtt
import (
"context"
"fmt"
"log"
"net/url"
"sync"
"testing"
"time"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/autopaho/extensions/rpc"
"github.com/eclipse/paho.golang/paho"
"google.golang.org/protobuf/proto"
"joylink.club/iot/config"
mproto "joylink.club/iot/mqtt/proto"
)
func TestRequest(t *testing.T) {
listen()
clientId := "iotlogreq_test"
logReqTopic := GetLogReqTopic()
cliCfg := getCmConfig(clientId, logReqTopic)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm, err := autopaho.NewConnection(ctx, cliCfg)
if err != nil {
panic(err)
}
log.Print("TEST")
time.Sleep(3 * time.Second)
h, err := rpc.NewHandler(ctx, rpc.HandlerOpts{
Conn: cm,
Router: cliCfg.Router,
ResponseTopicFmt: "%s/iotlogresp",
ClientID: clientId,
})
if err != nil {
log.Fatal(err)
}
req := &mproto.IotServiceLogReq{
Count: 10,
}
b, err := proto.Marshal(req)
if err != nil {
log.Fatal(err)
}
resp, err := h.Request(ctx, &paho.Publish{
Topic: logReqTopic,
Payload: b,
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("请求结果: %v\n", resp)
log.Printf("Received response: %s", string(resp.Payload))
time.Sleep(3 * time.Second)
}
func listen() {
var v sync.WaitGroup
v.Add(1)
go func() {
config.LoadConfig()
mqttcfg := config.Cfg.Mqtt
cmc := &IotMqttConfig{
AppId: mqttcfg.Topic.App,
BrokerUrl: mqttcfg.Address,
ClientId: mqttcfg.ClientId,
Username: mqttcfg.Username,
Password: mqttcfg.Password,
KeepAlive: mqttcfg.KeepAlive,
ConnectRetryDelay: mqttcfg.ConnectRetryDelay,
ConnectTimeout: mqttcfg.ConnectTimeout,
}
err := Start(cmc)
if err != nil {
panic(err)
}
time.Sleep(2 * time.Second)
RegIotLogReqHandler(func(req *mproto.IotServiceLogReq) *mproto.IotServiceLogResp {
fmt.Printf("收到日志请求: %v\n", req)
resp := &mproto.IotServiceLogResp{
Code: req.Code,
Logs: []string{"日志1", "日志2"},
}
fmt.Printf("返回日志响应: %v\n", resp)
return resp
})
v.Done()
for {
time.Sleep(1 * time.Second)
}
}()
v.Wait()
}
func getCmConfig(clientId, logReqTopic string) autopaho.ClientConfig {
addr, _ := url.Parse("tcp://192.168.3.233:1883")
cc := autopaho.ClientConfig{
BrokerUrls: []*url.URL{addr},
KeepAlive: 60,
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
fmt.Println("mqtt connection up")
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
if _, err := cm.Subscribe(ctx, &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{Topic: logReqTopic, QoS: 0},
},
}); err != nil {
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
return
}
fmt.Println("mqtt subscription made")
},
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
ClientConfig: paho.ClientConfig{
ClientID: clientId,
Router: paho.NewStandardRouter(),
OnClientError: func(err error) { fmt.Printf("%s requested disconnect: %s\n", clientId, err) },
OnServerDisconnect: func(d *paho.Disconnect) {
if d.Properties != nil {
fmt.Printf("%s requested disconnect: %s\n", clientId, d.Properties.ReasonString)
} else {
fmt.Printf("%s requested disconnect; reason code: %d\n", clientId, d.ReasonCode)
}
},
},
}
cc.SetUsernamePassword("rtsts_service", []byte("joylink@0503"))
return cc
}

View File

@ -3,10 +3,18 @@ package mqtt
import "fmt"
const (
// IOT服务状态主题,第一个参数为应用编号,第二个参数为客户端编号
Topic_IotServiceState string = "/%s/%s/iotss"
Topic_IotLog string = "/%s/%s/iotlog"
Topic_IotQd string = "/%s/%s/iotqd"
Topic_IotCj string = "/%s/%s/iotcj"
// IOT服务启动(请求响应)主题,第一个参数为应用编号,第二个参数为客户端编号
Topic_IotServiceStart string = "/%s/%s/iotstart"
// IOT服务停止(请求响应)主题,第一个参数为应用编号,第二个参数为客户端编号
Topic_IotServiceStop string = "/%s/%s/iotstop"
// IOT日志服务(请求响应)主题,第一个参数为应用编号,第二个参数为客户端编号
Topic_IotLog string = "/%s/%s/iotlog"
// IOT驱动数据主题第一个参数为应用编号第二个参数为客户端编号
Topic_IotQd string = "/%s/%s/iotqd"
// IOT采集数据主题第一个参数为应用编号第二个参数为客户端编号
Topic_IotCj string = "/%s/%s/iotcj"
)
var topicMap = make(map[string]string, 4)
@ -22,7 +30,7 @@ func GetIotServiceStateTopic() string {
return topicMap[Topic_IotServiceState]
}
func GetCmdTopic() string {
func GetLogReqTopic() string {
return topicMap[Topic_IotLog]
}