diff --git a/config/config.go b/config/config.go index eb6d221..9cd8518 100644 --- a/config/config.go +++ b/config/config.go @@ -49,6 +49,7 @@ func LoadConfig() { cnf.SetConfigType("yml") cnf.AddConfigPath("./config/") cnf.AddConfigPath(".") + cnf.AddConfigPath("../config/") err := cnf.ReadInConfig() if err != nil { panic(fmt.Errorf("读取配置文件错误: %w", err)) diff --git a/mqtt/client.go b/mqtt/client.go index 6e69d4c..7f34c9d 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -78,7 +78,7 @@ func PubIotQdData(qd *mproto.IotQd) error { // 注册IOT采集数据处理 func RegIotCjHandler(h func(cj *mproto.IotCj)) { - iotcli.cc.Router.RegisterHandler(GetCmdTopic(), func(p *paho.Publish) { + iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) { cmd := &mproto.IotCj{} err := proto.Unmarshal(p.Payload, cmd) if err != nil { @@ -91,7 +91,7 @@ func RegIotCjHandler(h func(cj *mproto.IotCj)) { // 注册IOT驱动数据处理 func RegIotQdHandler(h func(qd *mproto.IotQd)) { - iotcli.cc.Router.RegisterHandler(GetCmdTopic(), func(p *paho.Publish) { + iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) { cmd := &mproto.IotQd{} err := proto.Unmarshal(p.Payload, cmd) if err != nil { @@ -103,15 +103,9 @@ func RegIotQdHandler(h func(qd *mproto.IotQd)) { } // 注册IOT日志查询请求处理 -func RegIotLogReqHandler(h func(cmd *mproto.IotServiceLogReq)) { - iotcli.cc.Router.RegisterHandler(GetCmdTopic(), func(p *paho.Publish) { - cmd := &mproto.IotServiceLogReq{} - err := proto.Unmarshal(p.Payload, cmd) - if err != nil { - slog.Error("RegIotReqHandler proto.Unmarshal异常", "error", err) - return - } - h(cmd) +func RegIotLogReqHandler(h func(req *mproto.IotServiceLogReq) *mproto.IotServiceLogResp) { + iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) { + reqHandle(p, h, &mproto.IotServiceLogReq{}) }) } @@ -127,8 +121,9 @@ func UnregAllHandler() { func subIotQc() { slog.Info("订阅Iot驱采") - sub(GetCjTopic()) // 订阅采集 - sub(GetQdTopic()) // 订阅驱动 + sub(GetCjTopic()) // 订阅采集 + sub(GetQdTopic()) // 订阅驱动 + sub(GetLogReqTopic()) // 订阅日志查询请求 } // 发起订阅 @@ -150,6 +145,32 @@ func sub(topic string) { } } +func reqHandle[T proto.Message, P proto.Message](p *paho.Publish, h func(T) P, r T) { + fmt.Printf("收到请求: %v\n", p) + if p.Properties != nil && p.Properties.CorrelationData != nil && p.Properties.ResponseTopic != "" { + err := proto.Unmarshal(p.Payload, r) + if err != nil { + slog.Error("Iot请求响应数据处理proto.Unmarshal异常", "error", err) + return + } + resp := h(r) + b, err := proto.Marshal(resp) + if err != nil { + slog.Error("Iot请求响应数据处理proto.Marshal异常", "error", err) + } + _, err = iotcli.cm.Publish(context.Background(), &paho.Publish{ + Topic: p.Properties.ResponseTopic, + Properties: &paho.PublishProperties{ + CorrelationData: p.Properties.CorrelationData, + }, + Payload: b, + }) + if err != nil { + slog.Error("Iot请求处理回复异常", "error", err) + } + } +} + // 发布数据 func pub(topic string, data protoreflect.ProtoMessage) error { if data == nil { @@ -159,17 +180,17 @@ func pub(topic string, data protoreflect.ProtoMessage) error { if err != nil { return err } - switch topic { - case GetIotServiceStateTopic(): - slog.Debug("发布Iot服务状态", "topic", topic, "data", data) - case GetCjTopic(): - slog.Debug("发布采集数据", "topic", topic, "data", data) - case GetQdTopic(): - slog.Debug("发布驱动数据", "topic", topic, "data", data) - default: - slog.Error("未知发布主题", "topic", topic, "data", data) - return fmt.Errorf("未知发布主题: topic=%s", topic) - } + // switch topic { + // case GetIotServiceStateTopic(): + // slog.Debug("发布Iot服务状态", "topic", topic, "data", data) + // case GetCjTopic(): + // slog.Debug("发布采集数据", "topic", topic, "data", data) + // case GetQdTopic(): + // slog.Debug("发布驱动数据", "topic", topic, "data", data) + // default: + // slog.Error("未知发布主题", "topic", topic, "data", data) + // return fmt.Errorf("未知发布主题: topic=%s", topic) + // } _, err = iotcli.cm.Publish(context.Background(), &paho.Publish{ Topic: topic, QoS: 0, diff --git a/mqtt/client_test.go b/mqtt/client_test.go new file mode 100644 index 0000000..c5b5b94 --- /dev/null +++ b/mqtt/client_test.go @@ -0,0 +1,148 @@ +package mqtt + +import ( + "context" + "fmt" + "log" + "net/url" + "sync" + "testing" + "time" + + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/autopaho/extensions/rpc" + "github.com/eclipse/paho.golang/paho" + "google.golang.org/protobuf/proto" + "joylink.club/iot/config" + mproto "joylink.club/iot/mqtt/proto" +) + +func TestRequest(t *testing.T) { + listen() + clientId := "iotlogreq_test" + logReqTopic := GetLogReqTopic() + cliCfg := getCmConfig(clientId, logReqTopic) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cm, err := autopaho.NewConnection(ctx, cliCfg) + if err != nil { + panic(err) + } + log.Print("TEST") + + time.Sleep(3 * time.Second) + + h, err := rpc.NewHandler(ctx, rpc.HandlerOpts{ + Conn: cm, + Router: cliCfg.Router, + ResponseTopicFmt: "%s/iotlogresp", + ClientID: clientId, + }) + + if err != nil { + log.Fatal(err) + } + + req := &mproto.IotServiceLogReq{ + Count: 10, + } + b, err := proto.Marshal(req) + if err != nil { + log.Fatal(err) + } + resp, err := h.Request(ctx, &paho.Publish{ + Topic: logReqTopic, + Payload: b, + }) + if err != nil { + log.Fatal(err) + } + + fmt.Printf("请求结果: %v\n", resp) + log.Printf("Received response: %s", string(resp.Payload)) + + time.Sleep(3 * time.Second) +} + +func listen() { + var v sync.WaitGroup + + v.Add(1) + + go func() { + config.LoadConfig() + mqttcfg := config.Cfg.Mqtt + cmc := &IotMqttConfig{ + AppId: mqttcfg.Topic.App, + BrokerUrl: mqttcfg.Address, + ClientId: mqttcfg.ClientId, + Username: mqttcfg.Username, + Password: mqttcfg.Password, + KeepAlive: mqttcfg.KeepAlive, + ConnectRetryDelay: mqttcfg.ConnectRetryDelay, + ConnectTimeout: mqttcfg.ConnectTimeout, + } + err := Start(cmc) + if err != nil { + panic(err) + } + + time.Sleep(2 * time.Second) + + RegIotLogReqHandler(func(req *mproto.IotServiceLogReq) *mproto.IotServiceLogResp { + fmt.Printf("收到日志请求: %v\n", req) + resp := &mproto.IotServiceLogResp{ + Code: req.Code, + Logs: []string{"日志1", "日志2"}, + } + fmt.Printf("返回日志响应: %v\n", resp) + return resp + }) + + v.Done() + + for { + time.Sleep(1 * time.Second) + } + }() + + v.Wait() +} + +func getCmConfig(clientId, logReqTopic string) autopaho.ClientConfig { + addr, _ := url.Parse("tcp://192.168.3.233:1883") + cc := autopaho.ClientConfig{ + BrokerUrls: []*url.URL{addr}, + KeepAlive: 60, + OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { + fmt.Println("mqtt connection up") + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) + defer cancel() + if _, err := cm.Subscribe(ctx, &paho.Subscribe{ + Subscriptions: []paho.SubscribeOptions{ + {Topic: logReqTopic, QoS: 0}, + }, + }); err != nil { + fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) + return + } + fmt.Println("mqtt subscription made") + }, + OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) }, + ClientConfig: paho.ClientConfig{ + ClientID: clientId, + Router: paho.NewStandardRouter(), + OnClientError: func(err error) { fmt.Printf("%s requested disconnect: %s\n", clientId, err) }, + OnServerDisconnect: func(d *paho.Disconnect) { + if d.Properties != nil { + fmt.Printf("%s requested disconnect: %s\n", clientId, d.Properties.ReasonString) + } else { + fmt.Printf("%s requested disconnect; reason code: %d\n", clientId, d.ReasonCode) + } + }, + }, + } + cc.SetUsernamePassword("rtsts_service", []byte("joylink@0503")) + return cc +} diff --git a/mqtt/topic.go b/mqtt/topic.go index 20f0498..46a6d8e 100644 --- a/mqtt/topic.go +++ b/mqtt/topic.go @@ -3,10 +3,18 @@ package mqtt import "fmt" const ( + // IOT服务状态主题,第一个参数为应用编号,第二个参数为客户端编号 Topic_IotServiceState string = "/%s/%s/iotss" - Topic_IotLog string = "/%s/%s/iotlog" - Topic_IotQd string = "/%s/%s/iotqd" - Topic_IotCj string = "/%s/%s/iotcj" + // IOT服务启动(请求响应)主题,第一个参数为应用编号,第二个参数为客户端编号 + Topic_IotServiceStart string = "/%s/%s/iotstart" + // IOT服务停止(请求响应)主题,第一个参数为应用编号,第二个参数为客户端编号 + Topic_IotServiceStop string = "/%s/%s/iotstop" + // IOT日志服务(请求响应)主题,第一个参数为应用编号,第二个参数为客户端编号 + Topic_IotLog string = "/%s/%s/iotlog" + // IOT驱动数据主题,第一个参数为应用编号,第二个参数为客户端编号 + Topic_IotQd string = "/%s/%s/iotqd" + // IOT采集数据主题,第一个参数为应用编号,第二个参数为客户端编号 + Topic_IotCj string = "/%s/%s/iotcj" ) var topicMap = make(map[string]string, 4) @@ -22,7 +30,7 @@ func GetIotServiceStateTopic() string { return topicMap[Topic_IotServiceState] } -func GetCmdTopic() string { +func GetLogReqTopic() string { return topicMap[Topic_IotLog] }