From e9e0ee07720d560a026c6fcf39e2499c6df46af6 Mon Sep 17 00:00:00 2001 From: walker Date: Thu, 21 Dec 2023 17:13:05 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=9C=E4=B8=BA=E5=AD=90=E7=B3=BB=E7=BB=9F?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E7=9A=84=E4=BE=8B=E5=AD=90=E7=A8=8B=E5=BA=8F?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0(example/subsys=5Fuse/main.go)=20=E4=BF=AE?= =?UTF-8?q?=E6=94=B9IOT-MQTT=E6=8E=A5=E5=8F=A3bug=20=E5=AE=8C=E5=96=84IOT?= =?UTF-8?q?=E5=90=AF=E5=8A=A8/=E5=81=9C=E6=AD=A2=E9=A9=B1=E9=87=87?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- example/subsys_use/main.go | 262 ++++++++++++++++++++++++++++++----- mqtt/client.go | 31 +++-- mqtt/config.go | 8 +- server/server.go | 73 +++++++++- service/modbus_qc_mapping.go | 82 +---------- 5 files changed, 325 insertions(+), 131 deletions(-) diff --git a/example/subsys_use/main.go b/example/subsys_use/main.go index 92dda10..809b8ed 100644 --- a/example/subsys_use/main.go +++ b/example/subsys_use/main.go @@ -2,8 +2,9 @@ package main import ( "context" + "errors" "fmt" - "log" + "log/slog" "net/url" "time" @@ -13,49 +14,244 @@ import ( "google.golang.org/protobuf/proto" "joylink.club/iot/dto" "joylink.club/iot/mqtt" + "joylink.club/iot/server" + "joylink.club/iot/service" + "joylink.club/iot/service/model" ) // 作为子系统使用方式 func main() { - clientId := "iotlogreq_test" - logReqTopic := mqtt.GetLogReqTopic() - cliCfg := getCmConfig(clientId, logReqTopic) + go server.StartIotQcServer() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cm, err := autopaho.NewConnection(ctx, cliCfg) - if err != nil { - panic(err) + time.Sleep(2 * time.Second) // 等待mqtt主题初始化 + ac := initAppClient() + time.Sleep(3 * time.Second) // 等待应用mqtt连接 + for i := 0; i < 4; i++ { + ac.startIotQcService() // 启动IOT驱采服务 + time.Sleep(10 * time.Second) + ac.stopIotQcService() // 停止IOT驱采服务 + time.Sleep(5 * time.Second) } - h, err := rpc.NewHandler(ctx, rpc.HandlerOpts{ - Conn: cm, - Router: cliCfg.Router, - ResponseTopicFmt: "%s/iotlogresp", - ClientID: clientId, + time.Sleep(10 * time.Second) + ac.disconnect() +} + +type AppClient struct { + cfg *autopaho.ClientConfig + cm *autopaho.ConnectionManager + task service.IScheduledTask +} + +func (app *AppClient) stopIotQcService() { + resp, err := app.iotStopReq(&dto.IotQcServiceStopReq{}) + if err != nil { + panic(fmt.Errorf("停止服务请求错误, err: %v", err)) + } + if resp.Code != 0 { + panic(fmt.Errorf("停止服务请求响应错误, code: %d, msg: %s", resp.Code, resp.Msg)) + } + slog.Info("应用停止iot服务成功", "resp", resp) + app.cfg.Router.UnregisterHandler(mqtt.GetCjTopic()) + app.cfg.Router.UnregisterHandler(mqtt.GetQdTopic()) + t := app.task + app.task = nil + if t != nil { + t.Stop() + } +} + +func (app *AppClient) startIotQcService() { + modbusCfg := &dto.ModbusConfig{ + Url: "tcp://127.0.0.1:502", + UnitId: 2, + Timeout: 500, + Interval: 1000, + Qdl: 2, // 驱动数据字节数 + Cjl: 2, // 采集数据字节数 + Mapping: []*dto.ModbusDcMapping{ + { + // Function: proto.Modbus_ReadHoldingRegister, + Function: dto.Modbus_ReadCoil, + Addr: 0, + Quantity: 16, + Type: dto.DataType_CJ, + Start: 0, + }, + { + Function: dto.Modbus_RWCoils, + Addr: 16, + Quantity: 16, + Type: dto.DataType_QD, + Start: 0, + }, + }, + } + resp, err := app.iotServiceStartReq(&dto.IotQcServiceStartReq{ + Config: modbusCfg, }) if err != nil { - log.Fatal(err) + panic(fmt.Errorf("启动服务请求错误, err: %v", err)) } + if resp.Code != 0 { + panic(fmt.Errorf("启动服务请求响应错误, code: %d, msg: %s", resp.Code, resp.Msg)) + } + slog.Info("应用启动iot服务成功", "resp", resp) + app.RegIotCjDataHandler(func(cj *dto.IotCj) { + slog.Info("应用收到采集数据", "cj", model.BytesDebug(cj.Data)) + }) + app.RegIotQdDataHandler(func(qd *dto.IotQd) { + slog.Info("应用收到驱动数据", "qd", model.BytesDebug(qd.Data)) + }) - req := &dto.IotServiceLogReq{ - Count: 10, + i := 0 + writeTask := service.NewScheduledTask(func() { + i++ + idx := i % 8 + err := app.PubIotQdData(&dto.IotQd{Data: []byte{byte(1 << idx), byte(3 << idx)}}) + if err != nil { + slog.Error("发布写入驱动数据错误", "error", err) + } + }, time.Second) + app.task = writeTask +} + +func (app *AppClient) disconnect() { + slog.Info("断开应用MQTT客户端") + ctx, cancle := context.WithTimeout(context.Background(), 5*time.Second) + defer cancle() + err := app.cm.Disconnect(ctx) + if err != nil { + slog.Error("断开MQTT客户端失败", "err", err) } +} +func (app *AppClient) newRpcHandler(respTopicFmt string) (*rpc.Handler, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + h, err := rpc.NewHandler(ctx, rpc.HandlerOpts{ + Conn: app.cm, + Router: app.cfg.Router, + ResponseTopicFmt: respTopicFmt, + ClientID: app.cfg.ClientID, + }) + if err != nil { + slog.Error("创建RPC处理器失败", "err", err) + return nil, err + } + return h, nil +} + +// 注册IOT服务启动请求处理 +func (app *AppClient) iotServiceStartReq(req *dto.IotQcServiceStartReq) (*dto.IotQcServiceCommonResp, error) { + h, err := app.newRpcHandler("%s/iotstartresp") + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() b, err := proto.Marshal(req) if err != nil { - log.Fatal(err) + slog.Error("序列化请求消息失败", "err", err) + return nil, err } resp, err := h.Request(ctx, &paho.Publish{ - Topic: logReqTopic, + Topic: mqtt.GetIotServiceStartTopic(), Payload: b, }) if err != nil { - log.Fatal(err) + return nil, errors.Join(fmt.Errorf("发送启动服务请求错误"), err) } - fmt.Printf("请求结果: %v\n", resp) + result := &dto.IotQcServiceCommonResp{} + proto.Unmarshal(resp.Payload, result) + return result, nil } -func getCmConfig(clientId, logReqTopic string) autopaho.ClientConfig { +func (app *AppClient) iotStopReq(req *dto.IotQcServiceStopReq) (*dto.IotQcServiceCommonResp, error) { + h, err := app.newRpcHandler("%s/iotstopresp") + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + b, err := proto.Marshal(req) + if err != nil { + return nil, errors.Join(fmt.Errorf("序列化IOT服务停止请求消息失败"), err) + } + resp, err := h.Request(ctx, &paho.Publish{ + Topic: mqtt.GetIotServiceStopTopic(), + Payload: b, + }) + if err != nil { + return nil, errors.Join(fmt.Errorf("发送停止IOT服务请求错误"), err) + } + result := &dto.IotQcServiceCommonResp{} + proto.Unmarshal(resp.Payload, result) + return result, nil +} + +func (app *AppClient) RegIotCjDataHandler(h func(*dto.IotCj)) { + app.cfg.Router.RegisterHandler(mqtt.GetCjTopic(), func(p *paho.Publish) { + cj := &dto.IotCj{} + err := proto.Unmarshal(p.Payload, cj) + if err != nil { + slog.Error("采集数据proto.Unmarshal异常", "error", err) + return + } + h(cj) + }) +} + +func (app *AppClient) RegIotQdDataHandler(h func(*dto.IotQd)) { + app.cfg.Router.RegisterHandler(mqtt.GetQdTopic(), func(p *paho.Publish) { + qd := &dto.IotQd{} + err := proto.Unmarshal(p.Payload, qd) + if err != nil { + slog.Error("驱动数据proto.Unmarshal异常", "error", err) + return + } + h(qd) + }) +} + +func (app *AppClient) PubIotCjData(cj *dto.IotCj) error { + return app.pub(mqtt.GetCjTopic(), cj) +} + +func (app *AppClient) PubIotQdData(qd *dto.IotQd) error { + slog.Warn("应用发布驱动数据", "topic", mqtt.GetQdTopic(), "data", model.BytesDebug(qd.Data)) + return app.pub(mqtt.GetQdTopic(), qd) +} + +func (app *AppClient) pub(topic string, data proto.Message) error { + b, err := proto.Marshal(data) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = app.cm.Publish(ctx, &paho.Publish{ + Topic: topic, + Payload: b, + }) + return err +} + +func initAppClient() *AppClient { + clientId := "iotstartreq_test" + topics := []string{mqtt.GetIotServiceStateTopic(), mqtt.GetCjTopic(), mqtt.GetQdTopic()} + cfg := getCmConfig(clientId, topics) + + // ctx, cancel := context.WithCancel(context.Background()) + // defer cancel() + cm, err := autopaho.NewConnection(context.Background(), cfg) + if err != nil { + panic(err) + } + ac := &AppClient{cfg: &cfg, cm: cm} + return ac +} + +func getCmConfig(clientId string, subTopics []string) autopaho.ClientConfig { addr, _ := url.Parse("tcp://192.168.3.233:1883") cc := autopaho.ClientConfig{ BrokerUrls: []*url.URL{addr}, @@ -64,13 +260,15 @@ func getCmConfig(clientId, logReqTopic string) autopaho.ClientConfig { fmt.Println("mqtt connection up") ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) defer cancel() - if _, err := cm.Subscribe(ctx, &paho.Subscribe{ - Subscriptions: []paho.SubscribeOptions{ - {Topic: logReqTopic, QoS: 0}, - }, - }); err != nil { - fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) - return + for _, v := range subTopics { + if _, err := cm.Subscribe(ctx, &paho.Subscribe{ + Subscriptions: []paho.SubscribeOptions{ + {Topic: v, QoS: 0, NoLocal: true}, + }, + }); err != nil { + fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) + return + } } fmt.Println("mqtt subscription made") }, @@ -78,12 +276,12 @@ func getCmConfig(clientId, logReqTopic string) autopaho.ClientConfig { ClientConfig: paho.ClientConfig{ ClientID: clientId, Router: paho.NewStandardRouter(), - OnClientError: func(err error) { fmt.Printf("%s requested disconnect: %s\n", clientId, err) }, + OnClientError: func(err error) { fmt.Printf("%s 客户端错误: %s\n", clientId, err) }, OnServerDisconnect: func(d *paho.Disconnect) { if d.Properties != nil { - fmt.Printf("%s requested disconnect: %s\n", clientId, d.Properties.ReasonString) + fmt.Printf("%s 服务断联: %v\n", clientId, d) } else { - fmt.Printf("%s requested disconnect; reason code: %d\n", clientId, d.ReasonCode) + fmt.Printf("%s 服务断联; reason code: %d\n", clientId, d.ReasonCode) } }, }, diff --git a/mqtt/client.go b/mqtt/client.go index cebbbd1..3c630cd 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -83,12 +83,12 @@ func PubIotCjData(cj *dto.IotCj) error { // 发布IOT驱动数据 func PubIotQdData(qd *dto.IotQd) error { - return pub(GetCjTopic(), qd) + return pub(GetQdTopic(), qd) } // 注册IOT采集数据处理 func RegIotCjHandler(h func(cj *dto.IotCj)) { - iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) { + iotcli.cc.Router.RegisterHandler(GetCjTopic(), func(p *paho.Publish) { cmd := &dto.IotCj{} err := proto.Unmarshal(p.Payload, cmd) if err != nil { @@ -97,11 +97,12 @@ func RegIotCjHandler(h func(cj *dto.IotCj)) { } h(cmd) }) + slog.Info("注册IOT采集数据处理success") } // 注册IOT驱动数据处理 func RegIotQdHandler(h func(qd *dto.IotQd)) { - iotcli.cc.Router.RegisterHandler(GetLogReqTopic(), func(p *paho.Publish) { + iotcli.cc.Router.RegisterHandler(GetQdTopic(), func(p *paho.Publish) { cmd := &dto.IotQd{} err := proto.Unmarshal(p.Payload, cmd) if err != nil { @@ -110,6 +111,7 @@ func RegIotQdHandler(h func(qd *dto.IotQd)) { } h(cmd) }) + slog.Info("注册IOT驱动数据处理success") } // 注册IOT服务启动请求处理 @@ -172,7 +174,7 @@ func sub(topic string) { } func reqHandle[T proto.Message, P proto.Message](p *paho.Publish, h func(T) P, r T) { - fmt.Printf("收到请求: %v\n", p) + slog.Info("IOT-MQTT服务收到请求", "publish", p) if p.Properties != nil && p.Properties.CorrelationData != nil && p.Properties.ResponseTopic != "" { err := proto.Unmarshal(p.Payload, r) if err != nil { @@ -206,17 +208,16 @@ func pub(topic string, data protoreflect.ProtoMessage) error { if err != nil { return err } - // switch topic { - // case GetIotServiceStateTopic(): - // slog.Debug("发布Iot服务状态", "topic", topic, "data", data) - // case GetCjTopic(): - // slog.Debug("发布采集数据", "topic", topic, "data", data) - // case GetQdTopic(): - // slog.Debug("发布驱动数据", "topic", topic, "data", data) - // default: - // slog.Error("未知发布主题", "topic", topic, "data", data) - // return fmt.Errorf("未知发布主题: topic=%s", topic) - // } + switch topic { + case GetIotServiceStateTopic(): + slog.Debug("IOT-MQTT发布Iot服务状态", "topic", topic, "data", data) + case GetCjTopic(): + slog.Debug("IOT-MQTT发布采集数据", "topic", topic, "data", data) + case GetQdTopic(): + slog.Debug("IOT-MQTT发布驱动数据", "topic", topic, "data", data) + default: + return fmt.Errorf("IOT-MQTT未知发布主题: topic=%s", topic) + } _, err = iotcli.cm.Publish(context.Background(), &paho.Publish{ Topic: topic, QoS: 0, diff --git a/mqtt/config.go b/mqtt/config.go index b86830a..38f2295 100644 --- a/mqtt/config.go +++ b/mqtt/config.go @@ -43,18 +43,18 @@ func (c *IotMqttConfig) tryInto() (*autopaho.ClientConfig, error) { ConnectRetryDelay: time.Duration(c.ConnectRetryDelay) * time.Second, ConnectTimeout: time.Duration(c.ConnectTimeout) * time.Second, OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) { - slog.Info("MQTT连接成功") + slog.Info("MQTT连接成功", "url", c.BrokerUrl) subIotQc() }, OnConnectError: func(err error) { - slog.Error("MQTT连接失败", "error", err) + slog.Error("MQTT连接失败", "url", c.BrokerUrl, "error", err) }, ClientConfig: paho.ClientConfig{ ClientID: c.ClientId, Router: paho.NewStandardRouter(), - OnClientError: func(err error) { fmt.Printf("%s Mqtt客户端发生错误: %s\n", c.ClientId, err) }, + OnClientError: func(err error) { slog.Error("MQTT客户端发生错误", "clientId", c.ClientId, "err", err) }, OnServerDisconnect: func(d *paho.Disconnect) { - fmt.Printf("%s 连接断开; reason code: %d,properties: %v\n", c.ClientId, d.ReasonCode, d.Properties) + slog.Error("MQTT连接断开", "clientId", c.ClientId, "reasonCode", d.ReasonCode, "properties", d.Properties) }, }, } diff --git a/server/server.go b/server/server.go index 6ad5780..9b072ff 100644 --- a/server/server.go +++ b/server/server.go @@ -1,6 +1,7 @@ package server import ( + "context" "log/slog" "os" "time" @@ -15,8 +16,10 @@ var iqcs *IotQcServer type IotQcServer struct { qcMappingService service.IotQcMappingService + qcDataPubTask service.IScheduledTask tasks []service.IScheduledTask state *dto.IotServiceState + cancel context.CancelFunc } func (s *IotQcServer) start() error { @@ -25,9 +28,35 @@ func (s *IotQcServer) start() error { s.registerReqHandlers() // 启动服务状态发布定时任务 iqcs.tasks = append(iqcs.tasks, service.NewScheduledTask(pubServerState, 1*time.Second)) + ctx, cancel := context.WithCancel(context.Background()) + s.serve(ctx) + s.cancel = cancel return nil } +func (s *IotQcServer) serve(ctx context.Context) { + defer s.stop() + for { + <-ctx.Done() + time.Sleep(10 * time.Millisecond) + } +} + +func (s *IotQcServer) stop() error { + if s.qcDataPubTask != nil { + s.qcDataPubTask.Stop() + } + if s.qcMappingService != nil { + s.qcMappingService.Stop() + } + for _, task := range s.tasks { + task.Stop() + } + mqtt.Stop() + return nil +} + +// 服务状态监测 func (s *IotQcServer) stateMonitor() *dto.IotServiceState { if s.qcMappingService != nil { if err := s.qcMappingService.ReportError(); err != nil { @@ -43,10 +72,22 @@ func (s *IotQcServer) stateMonitor() *dto.IotServiceState { } } +// 注册服务请求处理 func (s *IotQcServer) registerReqHandlers() { mqtt.RegIotQcServiceStartReqHandler(s.startIotQcMappingService) mqtt.RegIotQcServiceStopReqHandler(s.stopIotQcMappingService) mqtt.RegIotLogReqHandler(GetIotLog) + // 注册驱采数据写入处理 + mqtt.RegIotQdHandler(s.handleQdWrite) + mqtt.RegIotCjHandler(s.handleCjWrite) +} + +func (s *IotQcServer) pubQcData() { + service := s.qcMappingService + if service != nil { + mqtt.PubIotCjData(&dto.IotCj{Data: service.GetCjBytes()}) + mqtt.PubIotQdData(&dto.IotQd{Data: service.GetQdBytes()}) + } } func (s *IotQcServer) startIotQcMappingService(req *dto.IotQcServiceStartReq) *dto.IotQcServiceCommonResp { @@ -56,13 +97,34 @@ func (s *IotQcServer) startIotQcMappingService(req *dto.IotQcServiceStartReq) *d return &dto.IotQcServiceCommonResp{Code: 1, Msg: err.Error()} } s.qcMappingService = mqcs + s.qcDataPubTask = service.NewScheduledTask(s.pubQcData, time.Duration(req.Config.Interval)*time.Millisecond) return &dto.IotQcServiceCommonResp{Code: 0, Msg: "成功"} } +func (s *IotQcServer) handleQdWrite(qd *dto.IotQd) { + if s.qcMappingService != nil { + slog.Info("IOT收到并执行写入驱动数据", "data", qd.Data) + s.qcMappingService.WriteQdBytes(qd.Data) + } +} + +func (s *IotQcServer) handleCjWrite(cj *dto.IotCj) { + if s.qcMappingService != nil { + slog.Info("IOT收到并执行写入采集数据", "data", cj.Data) + s.qcMappingService.WriteCjBytes(cj.Data) + } +} + func (s *IotQcServer) stopIotQcMappingService(req *dto.IotQcServiceStopReq) *dto.IotQcServiceCommonResp { - if err := s.qcMappingService.Stop(); err != nil { - slog.Error("停止Modbus驱采映射服务失败", "err", err) - return &dto.IotQcServiceCommonResp{Code: 1, Msg: err.Error()} + task := s.qcDataPubTask + s.qcDataPubTask = nil + if task != nil { + task.Stop() + } + service := s.qcMappingService + s.qcMappingService = nil + if service != nil { + service.Stop() } return &dto.IotQcServiceCommonResp{Code: 0, Msg: "成功"} } @@ -77,8 +139,13 @@ func StartIotQcServer() { iqcs.start() } +func StopIotQcServer() { + iqcs.cancel() +} + func pubServerState() { state := iqcs.stateMonitor() + slog.Debug("发布服务状态", "state", state.State, "msg", state.ErrMsg) mqtt.PubIotServiceState(state) } diff --git a/service/modbus_qc_mapping.go b/service/modbus_qc_mapping.go index 595dd2e..5ae7105 100644 --- a/service/modbus_qc_mapping.go +++ b/service/modbus_qc_mapping.go @@ -24,7 +24,8 @@ type modbusQcService struct { // ReportError implements IotQcMappingService. func (s *modbusQcService) ReportError() error { if !s.cli.IsConnected() { - return fmt.Errorf("modbus连接断开") + slog.Error("Modbus驱采服务映射任务Modbus客户端未连接,", "url", s.config.Url, "unitid", s.config.UnitId) + return fmt.Errorf("modbus未连接或连接断开") } return nil } @@ -96,6 +97,7 @@ func (s *modbusQcService) WriteQdBytes(bytes []byte) error { return err } +// 新建Modbus驱采映射处理服务 func NewModbusQcService(config *dto.ModbusConfig) (IotQcMappingService, error) { // 基础配置检查 if err := checkConfig(config); err != nil { @@ -147,7 +149,7 @@ func (m *modbusQcService) onWrite(dt dto.DataType, bytes []byte) error { slog.Error("Modbus驱动采集服务写入线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err, "Function", mdm.Function) return err } else { - slog.Debug("Modbus驱动采集服务写入线圈成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", model.BitsDebug(data), "mapping", mdm) + slog.Info("Modbus驱动采集服务写入线圈成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", model.BitsDebug(data), "mapping", mdm) } case dto.Modbus_WriteRegister, dto.Modbus_WriteRegisters, dto.Modbus_RWRegisters: data := getQcBytes(bytes, mdm) @@ -156,7 +158,7 @@ func (m *modbusQcService) onWrite(dt dto.DataType, bytes []byte) error { slog.Error("Modbus驱动采集服务写入寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err, "Function", mdm.Function) return err } else { - slog.Debug("Modbus驱动采集服务写入寄存器成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", model.BytesDebug(data), "mapping", mdm) + slog.Info("Modbus驱动采集服务写入寄存器成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", model.BytesDebug(data), "mapping", mdm) } } } @@ -165,56 +167,6 @@ func (m *modbusQcService) onWrite(dt dto.DataType, bytes []byte) error { return nil } -// func (m *modbusQcService) initOnUpdateTask() { -// mapping := m.config.Mapping -// for _, mdm := range mapping { -// if mdm.WriteStrategy == dto.Modbus_OnUpdate && isWriteFunction(mdm.Function) { -// et := model.DCE_Drive_Update -// if mdm.Type == dto.DataType_CollectTable { -// et = model.DCE_Collect_Update -// } -// m.qc.On(et, func(d model.QC) { -// if !m.cli.IsConnected() { -// slog.Warn("Modbus驱动采集服务数据更新写入失败,modbus客户端未连接", "url", m.config.Url, "Function", mdm.Function) -// return -// } -// switch mdm.Function { -// case dto.Modbus_WriteCoil, dto.Modbus_WriteCoils, dto.Modbus_RWCoils: -// err := m.cli.WriteCoils(uint16(mdm.Addr), m.GetDcBits(mdm)) -// if err != nil { -// slog.Error("Modbus驱动采集服务写入线圈失败", "url", m.config.Url, "error", err, "Function", mdm.Function) -// } else { -// slog.Info("Modbus驱动采集服务写入线圈成功", "url", m.config.Url, "Function", mdm.Function) -// } -// case dto.Modbus_WriteRegister, dto.Modbus_WriteRegisters, dto.Modbus_RWRegisters: -// err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), m.GetDcBytes(mdm)) -// if err != nil { -// slog.Error("Modbus驱动采集服务写入寄存器失败", "url", m.config.Url, "error", err, "Function", mdm.Function) -// } else { -// slog.Info("Modbus驱动采集服务写入寄存器成功", "url", m.config.Url, "Function", mdm.Function) -// } -// } -// }) -// } -// } -// } - -// func (m *modbusQcService) run(ctx context.Context) { -// defer close(m.done) -// mainLoop: -// for { -// select { -// case <-ctx.Done(): -// slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url) -// modbus.DeleteClient(m.config.Url) -// break mainLoop -// default: -// } -// m.readTaskExecute() -// time.Sleep(time.Millisecond * time.Duration(m.config.Interval)) -// } -// } - func (m *modbusQcService) readTaskExecute() { if m.cli.IsConnected() { for _, mdm := range m.config.Mapping { @@ -301,8 +253,6 @@ func (m *modbusQcService) readTaskExecute() { // } } } - } else { - slog.Error("Modbus驱动采集服务映射任务执行失败,Modbus未连接", "url", m.config.Url, "unitid", m.config.UnitId) } } @@ -325,28 +275,6 @@ func getQcBytes(bytes []byte, mdm *dto.ModbusDcMapping) []byte { return bytes[start : start+quantity] } -// func (m *modbusQcService) GetDcBits(mdm *dto.ModbusDcMapping) []bool { -// switch mdm.Type { -// case dto.DataType_CollectTable: // 采集数据 -// return m.qc.GetCjBitsOf(mdm.Start, mdm.Quantity) -// case dto.DataType_DriveTable: // 驱动数据 -// return m.qc.GetQdBitsOf(mdm.Start, mdm.Quantity) -// default: -// panic("未知数据类型") -// } -// } - -// func (m *modbusQcService) GetDcBytes(mdm *dto.ModbusDcMapping) []byte { -// switch mdm.Type { -// case dto.DataType_CollectTable: // 采集数据 -// return m.qc.GetCjBytesOf(mdm.Start, mdm.Quantity*2) -// case dto.DataType_DriveTable: // 驱动数据 -// return m.qc.GetQdBytesOf(mdm.Start, mdm.Quantity*2) -// default: -// panic("未知数据类型") -// } -// } - func (m *modbusQcService) updateDcByBits(mdm *dto.ModbusDcMapping, bits []bool) error { switch mdm.Type { case dto.DataType_CJ: // 采集数据