package service import ( "errors" "fmt" "log/slog" "strings" "time" "joylink.club/iot/dto" "joylink.club/iot/protocol/modbus" "joylink.club/iot/service/model" ) // Modbus驱采服务 type modbusQcService struct { config *dto.ModbusConfig cli modbus.MasterClient qc model.QC tasks []IScheduledTask stopped bool } // ReportError implements IotQcMappingService. func (s *modbusQcService) ReportError() error { if !s.cli.IsConnected() { slog.Error("Modbus驱采服务映射任务Modbus客户端未连接,", "url", s.config.Url, "unitid", s.config.UnitId) return fmt.Errorf("modbus未连接或连接断开") } return nil } // RegisterQcDataHandleScheduleTask implements IotQcMappingService. func (s *modbusQcService) RegisterQcDataHandleScheduleTask(task func(), interval time.Duration) { if s.stopped { panic(fmt.Errorf("modbus驱采映射服务已停止")) } s.tasks = append(s.tasks, NewScheduledTask(task, interval)) } func (m *modbusQcService) Stop() error { if m.stopped { // 已经停止 return nil } m.stopped = true for _, task := range m.tasks { task.Stop() } m.tasks = nil slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url) defer modbus.DeleteClient(m.config.Url) slog.Info("Modbus驱采映射服务线程退出", "url", m.config.Url) return nil } // GetCjBits implements IotQcMappingService. func (s *modbusQcService) GetCjBits() []bool { return s.qc.GetCjBits() } // GetCjBytes implements IotQcMappingService. func (s *modbusQcService) GetCjBytes() []byte { return s.qc.GetCjBytes() } // GetQdBits implements IotQcMappingService. func (s *modbusQcService) GetQdBits() []bool { return s.qc.GetQdBits() } // GetQdBytes implements IotQcMappingService. func (s *modbusQcService) GetQdBytes() []byte { return s.qc.GetQdBytes() } // WriteCjBytes implements IotQcMappingService. func (s *modbusQcService) WriteCjBytes(bytes []byte) error { if len(bytes) != len(s.qc.GetCjBytes()) { return fmt.Errorf("写入采集字节长度有误,应为%d,实为%d", len(s.qc.GetCjBytes()), len(bytes)) } err := s.onWrite(dto.DataType_CJ, bytes) if err == nil { s.qc.SetCjBytes(bytes) } return err } // WriteQdBytes implements IotQcMappingService. func (s *modbusQcService) WriteQdBytes(bytes []byte) error { if len(bytes) != len(s.qc.GetQdBytes()) { return fmt.Errorf("写入驱动字节长度有误,应为%d,实为%d", len(s.qc.GetQdBytes()), len(bytes)) } err := s.onWrite(dto.DataType_QD, bytes) if err == nil { s.qc.SetQdBytes(bytes) } return err } // 新建Modbus驱采映射处理服务 func NewModbusQcService(config *dto.ModbusConfig) (IotQcMappingService, error) { // 基础配置检查 if err := checkConfig(config); err != nil { return nil, err } // 映射范围配置检查 qd := make([]byte, config.Qdl) cj := make([]byte, config.Cjl) if err := checkConfigMappingRange(config, qd, cj); err != nil { return nil, err } _, ok := modbus.GetClient(config.Url) if ok { return nil, fmt.Errorf("modbus客户端已存在,url=%s", config.Url) } cli, err := modbus.NewClient(&modbus.ClientConfig{ Url: config.Url, Timeout: config.Timeout, }) if err != nil { return nil, errors.Join(err, fmt.Errorf("modbus客户端创建失败,url=%s", config.Url)) } cli.SetUnitId(uint8(config.UnitId)) cli.SetEndianness(convertEndianness(config.Endianness)) cli.Start() s := &modbusQcService{ config: config, cli: cli, qc: model.NewDC(qd, cj), } s.RegisterQcDataHandleScheduleTask(s.readTaskExecute, time.Duration(config.Interval)*time.Millisecond) return s, nil } func (m *modbusQcService) onWrite(dt dto.DataType, bytes []byte) error { mapping := m.config.Mapping for _, mdm := range mapping { if isWriteFunction(mdm.Function) { if mdm.Type == dt { if !m.cli.IsConnected() { slog.Warn("Modbus驱动采集服务数据写入失败,modbus客户端未连接", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function) return errors.New("数据写入失败,modbus客户端未连接") } switch mdm.Function { case dto.Modbus_WriteCoil, dto.Modbus_WriteCoils, dto.Modbus_RWCoils: data := getQcBits(bytes, mdm) err := m.cli.WriteCoils(uint16(mdm.Addr), data) if err != nil { slog.Error("Modbus驱动采集服务写入线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err, "Function", mdm.Function) return err } else { slog.Info("Modbus驱动采集服务写入线圈成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", model.BitsDebug(data), "mapping", mdm) } case dto.Modbus_WriteRegister, dto.Modbus_WriteRegisters, dto.Modbus_RWRegisters: data := getQcBytes(bytes, mdm) err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), data) if err != nil { slog.Error("Modbus驱动采集服务写入寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err, "Function", mdm.Function) return err } else { slog.Info("Modbus驱动采集服务写入寄存器成功", "url", m.config.Url, "unitid", m.config.UnitId, "Function", mdm.Function, "data", model.BytesDebug(data), "mapping", mdm) } } } } } return nil } func (m *modbusQcService) readTaskExecute() { if m.cli.IsConnected() { for _, mdm := range m.config.Mapping { switch mdm.Function { case dto.Modbus_ReadCoil, dto.Modbus_RWCoils: data, err := m.cli.ReadCoil(uint16(mdm.Addr), uint16(mdm.Quantity)) if err != nil { slog.Error("Modbus驱动采集服务读取线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } err = m.updateDcByBits(mdm, data) if err != nil { slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } case dto.Modbus_ReadDiscreteInput: data, err := m.cli.ReadDiscreteInput(uint16(mdm.Addr), uint16(mdm.Quantity)) if err != nil { slog.Error("Modbus驱动采集服务读取离散输入失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } err = m.updateDcByBits(mdm, data) if err != nil { slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } case dto.Modbus_ReadHoldingRegister, dto.Modbus_RWRegisters: data, err := m.cli.ReadHoldingRegisterBytes(uint16(mdm.Addr), uint16(mdm.Quantity*2)) if err != nil { slog.Error("Modbus驱动采集服务读取保持寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } err = m.updateDcByBytes(mdm, data) if err != nil { slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } case dto.Modbus_ReadInputRegister: data, err := m.cli.ReadInputRegisterBytes(uint16(mdm.Addr), uint16(mdm.Quantity*2)) if err != nil { slog.Error("Modbus驱动采集服务读取输入寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } err = m.updateDcByBytes(mdm, data) if err != nil { slog.Error("Modbus驱动采集服务更新驱采数据失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) continue } // case dto.Modbus_WriteCoil: // if mdm.WriteStrategy == dto.Modbus_OnScheduled { // 定时写 // bits := m.GetDcBits(mdm) // err := m.cli.WriteCoil(uint16(mdm.Addr), bits[0]) // if err != nil { // slog.Error("Modbus驱动采集服务写单线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) // continue // } // } // case dto.Modbus_WriteCoils: // if mdm.WriteStrategy == dto.Modbus_OnScheduled { // 定时写 // bits := m.GetDcBits(mdm) // err := m.cli.WriteCoils(uint16(mdm.Addr), bits) // if err != nil { // slog.Error("Modbus驱动采集服务写多线圈失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) // continue // } // } // case dto.Modbus_WriteRegister: // if mdm.WriteStrategy == dto.Modbus_OnScheduled { // 定时写 // data := m.GetDcBytes(mdm) // err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), data) // if err != nil { // slog.Error("Modbus驱动采集服务写单寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) // continue // } // } // case dto.Modbus_WriteRegisters: // if mdm.WriteStrategy == dto.Modbus_OnScheduled { // 定时写 // data := m.GetDcBytes(mdm) // err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), data) // if err != nil { // slog.Error("Modbus驱动采集服务写多寄存器失败", "url", m.config.Url, "unitid", m.config.UnitId, "error", err) // continue // } // } } } } } func getQcBits(bytes []byte, mdm *dto.ModbusDcMapping) []bool { bits := model.DecodeBools(bytes) start := mdm.Start quantity := mdm.Quantity if start+quantity > uint32(len(bits)) { panic(fmt.Errorf("getQcBits超出范围")) } return bits[start : start+quantity] } func getQcBytes(bytes []byte, mdm *dto.ModbusDcMapping) []byte { start := mdm.Start quantity := mdm.Quantity * 2 if start+quantity > uint32(len(bytes)) { panic(fmt.Errorf("getQcBytes超出范围")) } return bytes[start : start+quantity] } func (m *modbusQcService) updateDcByBits(mdm *dto.ModbusDcMapping, bits []bool) error { switch mdm.Type { case dto.DataType_CJ: // 采集数据 return m.qc.UpdateCjByBits(mdm.Start, bits) case dto.DataType_QD: // 驱动数据 return m.qc.UpdateQdByBits(mdm.Start, bits) } return nil } func (m *modbusQcService) updateDcByBytes(mdm *dto.ModbusDcMapping, bytes []byte) error { switch mdm.Type { case dto.DataType_CJ: // 采集数据 return m.qc.UpdateCjByBytes(mdm.Start, bytes) case dto.DataType_QD: // 驱动数据 return m.qc.UpdateQdByBytes(mdm.Start, bytes) } return nil } func checkConfig(config *dto.ModbusConfig) error { if config.Url == "" { return fmt.Errorf("Modbus配置未设置url") } if !strings.HasPrefix(config.Url, "tcp") { return fmt.Errorf("Modbus配置url必须以tcp开头") } if config.UnitId == 0 { return fmt.Errorf("Modbus配置未设置unitId") } if config.Interval == 0 { return fmt.Errorf("Modbus配置未设置interval") } if len(config.Mapping) == 0 { return fmt.Errorf("Modbus配置无映射配置") } return nil } func checkConfigMappingRange(modbusConfig *dto.ModbusConfig, qd []byte, cj []byte) error { for _, mdm := range modbusConfig.Mapping { if mdm.Type == dto.DataType_CJ { err := checkMappingOutRange(mdm, cj) if err != nil { return err } } else if mdm.Type == dto.DataType_QD { err := checkMappingOutRange(mdm, qd) if err != nil { return err } } } return nil } func checkMappingOutRange(mdm *dto.ModbusDcMapping, bytes []byte) error { f := mdm.Function start := mdm.Start quantity := mdm.Quantity if isCoilFunction(f) { end := start + quantity if end > uint32(len(bytes)*8) { return fmt.Errorf("modbus地址映射配置错误,采集表地址超出范围: 功能=%s,起始位地址=%d,位数量=%d,实际位长度=%d", f, start, quantity, len(bytes)*8) } } else { end := start + quantity*2 if end > uint32(len(bytes)) { return fmt.Errorf("modbus地址映射配置错误,采集表地址超出范围: 功能=%s,起始字节地址=%d,字数量=%d,实际位长度=%d", f, start, quantity, len(bytes)) } } return nil } func convertEndianness(endianness dto.Modbus_Endianness) modbus.Endianness { switch endianness { case dto.Modbus_BigEndian: return modbus.BigEndian case dto.Modbus_LittleEndian: return modbus.LittleEndian } return modbus.BigEndian } func isWriteFunction(modbus_Function dto.Modbus_Function) bool { return modbus_Function == dto.Modbus_WriteCoil || modbus_Function == dto.Modbus_WriteCoils || modbus_Function == dto.Modbus_WriteRegister || modbus_Function == dto.Modbus_WriteRegisters || modbus_Function == dto.Modbus_RWCoils || modbus_Function == dto.Modbus_RWRegisters } func isCoilFunction(modbus_Function dto.Modbus_Function) bool { return modbus_Function == dto.Modbus_ReadCoil || modbus_Function == dto.Modbus_ReadDiscreteInput || modbus_Function == dto.Modbus_RWCoils || modbus_Function == dto.Modbus_WriteCoil || modbus_Function == dto.Modbus_WriteCoils } // func isRegisterFunction(modbus_Function dto.Modbus_Function) bool { // return modbus_Function == dto.Modbus_ReadInputRegister || // modbus_Function == dto.Modbus_ReadHoldingRegister || // modbus_Function == dto.Modbus_RWRegisters || // modbus_Function == dto.Modbus_WriteRegister || // modbus_Function == dto.Modbus_WriteRegisters // }