package service import ( "context" "errors" "fmt" "log/slog" "strings" "time" "joylink.club/iot/protocol/modbus" "joylink.club/iot/service/model" sproto "joylink.club/iot/service/proto" ) // Modbus驱采服务 type modbusQcService struct { config *sproto.ModbusConfig cli modbus.MasterClient qc model.QC cancel context.CancelFunc done chan struct{} // 服务协程退出信号 } func (m *modbusQcService) Stop() error { m.cancel() <-m.done slog.Info("Modbus驱采映射服务线程退出", "url", m.config.Url) return nil } // GetCjBits implements IotQcMappingService. func (s *modbusQcService) GetCjBits() []bool { return s.qc.GetCjBits() } // GetCjBytes implements IotQcMappingService. func (s *modbusQcService) GetCjBytes() []byte { return s.qc.GetCjBytes() } // GetQdBits implements IotQcMappingService. func (s *modbusQcService) GetQdBits() []bool { return s.qc.GetQdBits() } // GetQdBytes implements IotQcMappingService. func (s *modbusQcService) GetQdBytes() []byte { return s.qc.GetQdBytes() } // WriteCjBytes implements IotQcMappingService. func (s *modbusQcService) WriteCjBytes(bytes []byte) error { if len(bytes) != len(s.qc.GetCjBytes()) { return fmt.Errorf("写入采集字节长度有误,应为%d,实为%d", len(s.qc.GetCjBytes()), len(bytes)) } err := s.onWrite(sproto.DataType_CollectTable, bytes) if err == nil { s.qc.SetCjBytes(bytes) } return err } // WriteQdBytes implements IotQcMappingService. func (s *modbusQcService) WriteQdBytes(bytes []byte) error { if len(bytes) != len(s.qc.GetQdBytes()) { return fmt.Errorf("写入驱动字节长度有误,应为%d,实为%d", len(s.qc.GetQdBytes()), len(bytes)) } err := s.onWrite(sproto.DataType_DriveTable, bytes) if err == nil { s.qc.SetQdBytes(bytes) } return err } func NewModbusQcService(config *sproto.ModbusConfig, qd []byte, cj []byte) (IotQcMappingService, error) { if err := checkConfig(config); err != nil { return nil, err } _, ok := modbus.GetClient(config.Url) if ok { return nil, fmt.Errorf("modbus客户端已存在,url=%s", config.Url) } cli, err := modbus.NewClient(&modbus.ClientConfig{ Url: config.Url, Timeout: config.Timeout, }) if err != nil { return nil, errors.Join(err, fmt.Errorf("modbus客户端创建失败,url=%s", config.Url)) } cli.SetUnitId(uint8(config.UnitId)) cli.SetEndianness(convertEndianness(config.Endianness)) cli.Start() s := &modbusQcService{ config: config, cli: cli, qc: model.NewDC(qd, cj), done: make(chan struct{}), } // s.initOnUpdateTask() ctx, cancel := context.WithCancel(context.Background()) go s.run(ctx) s.cancel = cancel return s, nil } func (m *modbusQcService) onWrite(dt sproto.DataType, bytes []byte) error { mapping := m.config.Mapping for _, mdm := range mapping { if isWriteFunction(mdm.Function) { if mdm.Type == dt { if !m.cli.IsConnected() { slog.Warn("Modbus驱动采集服务数据写入失败,modbus客户端未连接", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function) return errors.New("数据写入失败,modbus客户端未连接") } switch mdm.Function { case sproto.Modbus_WriteCoil, sproto.Modbus_WriteCoils, sproto.Modbus_RWCoils: data := getQcBits(bytes, mdm) err := m.cli.WriteCoils(uint16(mdm.Addr), data) if err != nil { slog.Error("Modbus驱动采集服务写入线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err, "Function", mdm.Function) return err } else { slog.Info("Modbus驱动采集服务写入线圈成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", data, "mapping", mdm) } case sproto.Modbus_WriteRegister, sproto.Modbus_WriteRegisters, sproto.Modbus_RWRegisters: data := getQcBytes(bytes, mdm) err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), data) if err != nil { slog.Error("Modbus驱动采集服务写入寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err, "Function", mdm.Function) return err } else { slog.Info("Modbus驱动采集服务写入寄存器成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", data, "mapping", mdm) } } } } } return nil } // func (m *modbusQcService) initOnUpdateTask() { // mapping := m.config.Mapping // for _, mdm := range mapping { // if mdm.WriteStrategy == sproto.Modbus_OnUpdate && isWriteFunction(mdm.Function) { // et := model.DCE_Drive_Update // if mdm.Type == sproto.DataType_CollectTable { // et = model.DCE_Collect_Update // } // m.qc.On(et, func(d model.QC) { // if !m.cli.IsConnected() { // slog.Warn("Modbus驱动采集服务数据更新写入失败,modbus客户端未连接", "url", m.config.Url, "Function", mdm.Function) // return // } // switch mdm.Function { // case sproto.Modbus_WriteCoil, sproto.Modbus_WriteCoils, sproto.Modbus_RWCoils: // err := m.cli.WriteCoils(uint16(mdm.Addr), m.GetDcBits(mdm)) // if err != nil { // slog.Error("Modbus驱动采集服务写入线圈失败", "url", m.config.Url, "error", err, "Function", mdm.Function) // } else { // slog.Info("Modbus驱动采集服务写入线圈成功", "url", m.config.Url, "Function", mdm.Function) // } // case sproto.Modbus_WriteRegister, sproto.Modbus_WriteRegisters, sproto.Modbus_RWRegisters: // err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), m.GetDcBytes(mdm)) // if err != nil { // slog.Error("Modbus驱动采集服务写入寄存器失败", "url", m.config.Url, "error", err, "Function", mdm.Function) // } else { // slog.Info("Modbus驱动采集服务写入寄存器成功", "url", m.config.Url, "Function", mdm.Function) // } // } // }) // } // } // } func isWriteFunction(modbus_Function sproto.Modbus_Function) bool { return modbus_Function == sproto.Modbus_WriteCoil || modbus_Function == sproto.Modbus_WriteCoils || modbus_Function == sproto.Modbus_WriteRegister || modbus_Function == sproto.Modbus_WriteRegisters || modbus_Function == sproto.Modbus_RWCoils || modbus_Function == sproto.Modbus_RWRegisters } func (m *modbusQcService) run(ctx context.Context) { defer close(m.done) mainLoop: for { select { case <-ctx.Done(): slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url) modbus.DeleteClient(m.config.Url) break mainLoop default: } m.mappingTaskExecute() time.Sleep(time.Millisecond * time.Duration(m.config.Interval)) } } func (m *modbusQcService) mappingTaskExecute() { if m.cli.IsConnected() { for _, mdm := range m.config.Mapping { switch mdm.Function { case sproto.Modbus_ReadCoil, sproto.Modbus_RWCoils: data, err := m.cli.ReadCoil(uint16(mdm.Addr), uint16(mdm.Quantity)) if err != nil { slog.Error("Modbus驱动采集服务读取线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } err = m.updateDcByBits(mdm, data) if err != nil { slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } case sproto.Modbus_ReadDiscreteInput: data, err := m.cli.ReadDiscreteInput(uint16(mdm.Addr), uint16(mdm.Quantity)) if err != nil { slog.Error("Modbus驱动采集服务读取离散输入失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } err = m.updateDcByBits(mdm, data) if err != nil { slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } case sproto.Modbus_ReadHoldingRegister, sproto.Modbus_RWRegisters: data, err := m.cli.ReadHoldingRegisterBytes(uint16(mdm.Addr), uint16(mdm.Quantity*2)) if err != nil { slog.Error("Modbus驱动采集服务读取保持寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } err = m.updateDcByBytes(mdm, data) if err != nil { slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } case sproto.Modbus_ReadInputRegister: data, err := m.cli.ReadInputRegisterBytes(uint16(mdm.Addr), uint16(mdm.Quantity*2)) if err != nil { slog.Error("Modbus驱动采集服务读取输入寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } err = m.updateDcByBytes(mdm, data) if err != nil { slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } // case sproto.Modbus_WriteCoil: // if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 // bits := m.GetDcBits(mdm) // err := m.cli.WriteCoil(uint16(mdm.Addr), bits[0]) // if err != nil { // slog.Error("Modbus驱动采集服务写单线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) // continue // } // } // case sproto.Modbus_WriteCoils: // if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 // bits := m.GetDcBits(mdm) // err := m.cli.WriteCoils(uint16(mdm.Addr), bits) // if err != nil { // slog.Error("Modbus驱动采集服务写多线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) // continue // } // } // case sproto.Modbus_WriteRegister: // if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 // data := m.GetDcBytes(mdm) // err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), data) // if err != nil { // slog.Error("Modbus驱动采集服务写单寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) // continue // } // } // case sproto.Modbus_WriteRegisters: // if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 // data := m.GetDcBytes(mdm) // err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), data) // if err != nil { // slog.Error("Modbus驱动采集服务写多寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) // continue // } // } } } } else { slog.Error("Modbus驱动采集服务映射任务执行失败,Modbus未连接", "url", m.config.Url, "unitid", m.config.UnitId) } } func getQcBits(bytes []byte, mdm *sproto.ModbusDcMapping) []bool { bits := model.DecodeBools(bytes) start := mdm.Start quantity := mdm.Quantity if start+quantity > uint32(len(bits)) { panic(fmt.Errorf("getQcBits超出范围")) } return bits[start : start+quantity] } func getQcBytes(bytes []byte, mdm *sproto.ModbusDcMapping) []byte { start := mdm.Start quantity := mdm.Quantity * 2 if start+quantity > uint32(len(bytes)) { panic(fmt.Errorf("getQcBytes超出范围")) } return bytes[start : start+quantity] } // func (m *modbusQcService) GetDcBits(mdm *sproto.ModbusDcMapping) []bool { // switch mdm.Type { // case sproto.DataType_CollectTable: // 采集数据 // return m.qc.GetCjBitsOf(mdm.Start, mdm.Quantity) // case sproto.DataType_DriveTable: // 驱动数据 // return m.qc.GetQdBitsOf(mdm.Start, mdm.Quantity) // default: // panic("未知数据类型") // } // } // func (m *modbusQcService) GetDcBytes(mdm *sproto.ModbusDcMapping) []byte { // switch mdm.Type { // case sproto.DataType_CollectTable: // 采集数据 // return m.qc.GetCjBytesOf(mdm.Start, mdm.Quantity*2) // case sproto.DataType_DriveTable: // 驱动数据 // return m.qc.GetQdBytesOf(mdm.Start, mdm.Quantity*2) // default: // panic("未知数据类型") // } // } func (m *modbusQcService) updateDcByBits(mdm *sproto.ModbusDcMapping, bits []bool) error { switch mdm.Type { case sproto.DataType_CollectTable: // 采集数据 return m.qc.UpdateCjByBits(mdm.Start, bits) case sproto.DataType_DriveTable: // 驱动数据 return m.qc.UpdateQdByBits(mdm.Start, bits) } return nil } func (m *modbusQcService) updateDcByBytes(mdm *sproto.ModbusDcMapping, bytes []byte) error { switch mdm.Type { case sproto.DataType_CollectTable: // 采集数据 return m.qc.UpdateCjByBytes(mdm.Start, bytes) case sproto.DataType_DriveTable: // 驱动数据 return m.qc.UpdateQdByBytes(mdm.Start, bytes) } return nil } func checkConfig(config *sproto.ModbusConfig) error { if config.Url == "" { return fmt.Errorf("Modbus配置未设置url") } if !strings.HasPrefix(config.Url, "tcp") { return fmt.Errorf("Modbus配置url必须以tcp开头") } if config.UnitId == 0 { return fmt.Errorf("Modbus配置未设置unitId") } if config.Interval == 0 { return fmt.Errorf("Modbus配置未设置interval") } if len(config.Mapping) == 0 { return fmt.Errorf("Modbus配置无映射配置") } return nil } func convertEndianness(endianness sproto.Modbus_Endianness) modbus.Endianness { switch endianness { case sproto.Modbus_BigEndian: return modbus.BigEndian case sproto.Modbus_LittleEndian: return modbus.LittleEndian } return modbus.BigEndian }