package main import ( "context" "errors" "fmt" "log/slog" "net/url" "time" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/autopaho/extensions/rpc" "github.com/eclipse/paho.golang/paho" "google.golang.org/protobuf/proto" "joylink.club/iot/dto" "joylink.club/iot/mqtt" "joylink.club/iot/server" "joylink.club/iot/service" "joylink.club/iot/service/model" ) // 作为子系统使用方式 func main() { go server.StartIotQcServer() time.Sleep(2 * time.Second) // 等待mqtt主题初始化 ac := initAppClient() time.Sleep(3 * time.Second) // 等待应用mqtt连接 for i := 0; i < 4; i++ { ac.startIotQcService() // 启动IOT驱采服务 resp, err := ac.iotLogReq(&dto.IotServiceLogReq{Count: 10}) if err != nil { slog.Error("应用请求日志错误", "err", err) } else { slog.Info("应用请求日志成功", "resp", resp) } time.Sleep(10 * time.Second) ac.stopIotQcService() // 停止IOT驱采服务 time.Sleep(5 * time.Second) } time.Sleep(10 * time.Second) ac.disconnect() } type AppClient struct { cfg *autopaho.ClientConfig cm *autopaho.ConnectionManager task service.IScheduledTask } func (app *AppClient) stopIotQcService() { resp, err := app.iotStopReq(&dto.IotQcServiceStopReq{}) if err != nil { panic(fmt.Errorf("停止服务请求错误, err: %v", err)) } if resp.Code != 0 { panic(fmt.Errorf("停止服务请求响应错误, code: %d, msg: %s", resp.Code, resp.Msg)) } slog.Info("应用停止iot服务成功", "resp", resp) app.cfg.Router.UnregisterHandler(mqtt.GetCjTopic()) app.cfg.Router.UnregisterHandler(mqtt.GetQdTopic()) t := app.task app.task = nil if t != nil { t.Stop() } } func (app *AppClient) startIotQcService() { modbusCfg := &dto.ModbusConfig{ Url: "tcp://127.0.0.1:502", UnitId: 2, Timeout: 500, Interval: 1000, Qdl: 2, // 驱动数据字节数 Cjl: 2, // 采集数据字节数 Mapping: []*dto.ModbusDcMapping{ { // Function: proto.Modbus_ReadHoldingRegister, Function: dto.Modbus_ReadCoil, Addr: 0, Quantity: 16, Type: dto.DataType_CJ, Start: 0, }, { Function: dto.Modbus_RWCoils, Addr: 16, Quantity: 16, Type: dto.DataType_QD, Start: 0, }, }, } resp, err := app.iotServiceStartReq(&dto.IotQcServiceStartReq{ Config: modbusCfg, }) if err != nil { panic(fmt.Errorf("启动服务请求错误, err: %v", err)) } if resp.Code != 0 { panic(fmt.Errorf("启动服务请求响应错误, code: %d, msg: %s", resp.Code, resp.Msg)) } slog.Info("应用启动iot服务成功", "resp", resp) app.RegIotCjDataHandler(func(cj *dto.IotCj) { slog.Info("应用收到采集数据", "cj", model.BytesDebug(cj.Data)) }) app.RegIotQdDataHandler(func(qd *dto.IotQd) { slog.Info("应用收到驱动数据", "qd", model.BytesDebug(qd.Data)) }) i := 0 writeTask := service.NewScheduledTask(func() { i++ idx := i % 8 err := app.PubIotQdData(&dto.IotQd{Data: []byte{byte(1 << idx), byte(3 << idx)}}) if err != nil { slog.Error("发布写入驱动数据错误", "error", err) } }, time.Second) app.task = writeTask } func (app *AppClient) disconnect() { slog.Info("断开应用MQTT客户端") ctx, cancle := context.WithTimeout(context.Background(), 5*time.Second) defer cancle() err := app.cm.Disconnect(ctx) if err != nil { slog.Error("断开MQTT客户端失败", "err", err) } } func (app *AppClient) newRpcHandler(respTopicFmt string) (*rpc.Handler, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() h, err := rpc.NewHandler(ctx, rpc.HandlerOpts{ Conn: app.cm, Router: app.cfg.Router, ResponseTopicFmt: respTopicFmt, ClientID: app.cfg.ClientID, }) if err != nil { slog.Error("创建RPC处理器失败", "err", err) return nil, err } return h, nil } // 注册IOT服务启动请求处理 func (app *AppClient) iotServiceStartReq(req *dto.IotQcServiceStartReq) (*dto.IotQcServiceCommonResp, error) { h, err := app.newRpcHandler("%s/iotstartresp") if err != nil { return nil, err } ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() b, err := proto.Marshal(req) if err != nil { slog.Error("序列化请求消息失败", "err", err) return nil, err } resp, err := h.Request(ctx, &paho.Publish{ Topic: mqtt.GetIotServiceStartTopic(), Payload: b, }) if err != nil { return nil, errors.Join(fmt.Errorf("发送启动服务请求错误"), err) } result := &dto.IotQcServiceCommonResp{} proto.Unmarshal(resp.Payload, result) return result, nil } func (app *AppClient) iotStopReq(req *dto.IotQcServiceStopReq) (*dto.IotQcServiceCommonResp, error) { h, err := app.newRpcHandler("%s/iotstopresp") if err != nil { return nil, err } ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() b, err := proto.Marshal(req) if err != nil { return nil, errors.Join(fmt.Errorf("序列化IOT服务停止请求消息失败"), err) } resp, err := h.Request(ctx, &paho.Publish{ Topic: mqtt.GetIotServiceStopTopic(), Payload: b, }) if err != nil { return nil, errors.Join(fmt.Errorf("发送停止IOT服务请求错误"), err) } result := &dto.IotQcServiceCommonResp{} proto.Unmarshal(resp.Payload, result) return result, nil } func (app *AppClient) iotLogReq(req *dto.IotServiceLogReq) (*dto.IotServiceLogResp, error) { h, err := app.newRpcHandler("%s/iotlogresp") if err != nil { return nil, err } ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() b, err := proto.Marshal(req) if err != nil { return nil, errors.Join(fmt.Errorf("序列化IOT服务日志请求消息失败"), err) } resp, err := h.Request(ctx, &paho.Publish{ Topic: mqtt.GetLogReqTopic(), Payload: b, }) if err != nil { return nil, errors.Join(fmt.Errorf("发送IOT服务日志请求错误"), err) } result := &dto.IotServiceLogResp{} proto.Unmarshal(resp.Payload, result) return result, nil } func (app *AppClient) RegIotCjDataHandler(h func(*dto.IotCj)) { app.cfg.Router.RegisterHandler(mqtt.GetCjTopic(), func(p *paho.Publish) { cj := &dto.IotCj{} err := proto.Unmarshal(p.Payload, cj) if err != nil { slog.Error("采集数据proto.Unmarshal异常", "error", err) return } h(cj) }) } func (app *AppClient) RegIotQdDataHandler(h func(*dto.IotQd)) { app.cfg.Router.RegisterHandler(mqtt.GetQdTopic(), func(p *paho.Publish) { qd := &dto.IotQd{} err := proto.Unmarshal(p.Payload, qd) if err != nil { slog.Error("驱动数据proto.Unmarshal异常", "error", err) return } h(qd) }) } func (app *AppClient) PubIotCjData(cj *dto.IotCj) error { return app.pub(mqtt.GetCjTopic(), cj) } func (app *AppClient) PubIotQdData(qd *dto.IotQd) error { slog.Warn("应用发布驱动数据", "topic", mqtt.GetQdTopic(), "data", model.BytesDebug(qd.Data)) return app.pub(mqtt.GetQdTopic(), qd) } func (app *AppClient) pub(topic string, data proto.Message) error { b, err := proto.Marshal(data) if err != nil { return err } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _, err = app.cm.Publish(ctx, &paho.Publish{ Topic: topic, Payload: b, }) return err } func initAppClient() *AppClient { clientId := "iotstartreq_test" topics := []string{mqtt.GetIotServiceStateTopic(), mqtt.GetCjTopic(), mqtt.GetQdTopic()} cfg := getCmConfig(clientId, topics) // ctx, cancel := context.WithCancel(context.Background()) // defer cancel() cm, err := autopaho.NewConnection(context.Background(), cfg) if err != nil { panic(err) } ac := &AppClient{cfg: &cfg, cm: cm} return ac } func getCmConfig(clientId string, subTopics []string) autopaho.ClientConfig { addr, _ := url.Parse("tcp://192.168.3.233:1883") cc := autopaho.ClientConfig{ BrokerUrls: []*url.URL{addr}, KeepAlive: 60, OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { fmt.Println("mqtt connection up") ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) defer cancel() for _, v := range subTopics { if _, err := cm.Subscribe(ctx, &paho.Subscribe{ Subscriptions: []paho.SubscribeOptions{ {Topic: v, QoS: 0, NoLocal: true}, }, }); err != nil { fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) return } } fmt.Println("mqtt subscription made") }, OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) }, ClientConfig: paho.ClientConfig{ ClientID: clientId, Router: paho.NewStandardRouter(), OnClientError: func(err error) { fmt.Printf("%s 客户端错误: %s\n", clientId, err) }, OnServerDisconnect: func(d *paho.Disconnect) { if d.Properties != nil { fmt.Printf("%s 服务断联: %v\n", clientId, d) } else { fmt.Printf("%s 服务断联; reason code: %d\n", clientId, d.ReasonCode) } }, }, } cc.SetUsernamePassword("rtsts_service", []byte("joylink@0503")) return cc }