package mqtt import ( "context" "fmt" "log" "net/url" "sync" "testing" "time" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/autopaho/extensions/rpc" "github.com/eclipse/paho.golang/paho" "google.golang.org/protobuf/proto" "joylink.club/iot/config" "joylink.club/iot/dto" ) func TestRequest(t *testing.T) { listen() clientId := "iotlogreq_test" logReqTopic := GetLogReqTopic() cliCfg := getCmConfig(clientId, logReqTopic) ctx, cancel := context.WithCancel(context.Background()) defer cancel() cm, err := autopaho.NewConnection(ctx, cliCfg) if err != nil { panic(err) } log.Print("TEST") time.Sleep(3 * time.Second) h, err := rpc.NewHandler(ctx, rpc.HandlerOpts{ Conn: cm, Router: cliCfg.Router, ResponseTopicFmt: "%s/iotlogresp", ClientID: clientId, }) if err != nil { log.Fatal(err) } req := &dto.IotServiceLogReq{ Count: 10, } b, err := proto.Marshal(req) if err != nil { log.Fatal(err) } resp, err := h.Request(ctx, &paho.Publish{ Topic: logReqTopic, Payload: b, }) if err != nil { log.Fatal(err) } fmt.Printf("请求结果: %v\n", resp) lr := &dto.IotServiceLogResp{} err = proto.Unmarshal(resp.Payload, lr) log.Printf("Received response: %s\n", lr) time.Sleep(3 * time.Second) } func listen() { var v sync.WaitGroup v.Add(1) go func() { config.LoadConfig() mqttcfg := config.Cfg.Mqtt cmc := &IotMqttConfig{ AppId: mqttcfg.Topic.App, BrokerUrl: mqttcfg.Address, ClientId: mqttcfg.ClientId, Username: mqttcfg.Username, Password: mqttcfg.Password, KeepAlive: mqttcfg.KeepAlive, ConnectRetryDelay: mqttcfg.ConnectRetryDelay, ConnectTimeout: mqttcfg.ConnectTimeout, } err := Start(cmc) if err != nil { panic(err) } time.Sleep(2 * time.Second) RegIotLogReqHandler(func(req *dto.IotServiceLogReq) *dto.IotServiceLogResp { fmt.Printf("收到日志请求: %v\n", req) resp := &dto.IotServiceLogResp{ Logs: []string{"日志1", "日志2"}, } fmt.Printf("返回日志响应: %v\n", resp) return resp }) v.Done() for { time.Sleep(1 * time.Second) } }() v.Wait() } func getCmConfig(clientId, logReqTopic string) autopaho.ClientConfig { addr, _ := url.Parse("tcp://192.168.3.233:1883") cc := autopaho.ClientConfig{ BrokerUrls: []*url.URL{addr}, KeepAlive: 60, OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { fmt.Println("mqtt connection up") ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) defer cancel() if _, err := cm.Subscribe(ctx, &paho.Subscribe{ Subscriptions: []paho.SubscribeOptions{ {Topic: logReqTopic, QoS: 0}, }, }); err != nil { fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) return } fmt.Println("mqtt subscription made") }, OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) }, ClientConfig: paho.ClientConfig{ ClientID: clientId, Router: paho.NewStandardRouter(), OnClientError: func(err error) { fmt.Printf("%s requested disconnect: %s\n", clientId, err) }, OnServerDisconnect: func(d *paho.Disconnect) { if d.Properties != nil { fmt.Printf("%s requested disconnect: %s\n", clientId, d.Properties.ReasonString) } else { fmt.Printf("%s requested disconnect; reason code: %d\n", clientId, d.ReasonCode) } }, }, } cc.SetUsernamePassword("rtsts_service", []byte("joylink@0503")) return cc }