package service import ( "context" "errors" "fmt" "log/slog" "strings" "time" "joylink.club/iot/modbus" "joylink.club/iot/service/model" sproto "joylink.club/iot/service/proto" ) type modbusDcService struct { config *sproto.ModbusConfig cli modbus.MasterClient dc model.DC cancel context.CancelFunc } func NewModbusDcService(config *sproto.ModbusConfig, dc model.DC) (IotService, error) { if err := checkConfig(config); err != nil { return nil, err } _, ok := modbus.GetClient(config.Url) if ok { return nil, fmt.Errorf("modbus客户端已存在,url=%s", config.Url) } cli, err := modbus.NewClient(config.Url) if err != nil { return nil, errors.Join(err, fmt.Errorf("modbus客户端创建失败,url=%s", config.Url)) } cli.SetUnitId(uint8(config.UnitId)) s := &modbusDcService{ config: config, cli: cli, dc: dc, } s.initOnUpdateTask() ctx, cancel := context.WithCancel(context.Background()) go s.run(ctx) s.cancel = cancel return s, nil } func (m *modbusDcService) initOnUpdateTask() { mapping := m.config.Mapping for _, mdm := range mapping { if mdm.WriteStrategy == sproto.Modbus_OnUpdate && isWriteFunction(mdm.Function) { et := model.DCE_Drive_Update if mdm.Type == sproto.DataType_CollectTable { et = model.DCE_Collect_Update } m.dc.On(et, func(d model.DC) { if !m.cli.IsConnected() { slog.Warn("Modbus驱动采集服务数据更新写入失败,modbus客户端未连接", "id", m.config.Id, "url", m.config.Url, "Function", mdm.Function) return } switch mdm.Function { case sproto.Modbus_WriteCoil, sproto.Modbus_WriteCoils, sproto.Modbus_RWCoils: err := m.cli.WriteCoils(uint16(mdm.Addr), m.GetDcBits(mdm)) if err != nil { slog.Error("Modbus驱动采集服务写入线圈失败", "id", m.config.Id, "url", m.config.Url, "error", err, "Function", mdm.Function) } else { slog.Info("Modbus驱动采集服务写入线圈成功", "id", m.config.Id, "url", m.config.Url, "Function", mdm.Function) } case sproto.Modbus_WriteRegister, sproto.Modbus_WriteRegisters, sproto.Modbus_RWRegisters: err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), m.GetDcBytes(mdm)) if err != nil { slog.Error("Modbus驱动采集服务写入寄存器失败", "id", m.config.Id, "url", m.config.Url, "error", err, "Function", mdm.Function) } else { slog.Info("Modbus驱动采集服务写入寄存器成功", "id", m.config.Id, "url", m.config.Url, "Function", mdm.Function) } } }) } } } func isWriteFunction(modbus_Function sproto.Modbus_Function) bool { return modbus_Function == sproto.Modbus_WriteCoil || modbus_Function == sproto.Modbus_WriteCoils || modbus_Function == sproto.Modbus_WriteRegister || modbus_Function == sproto.Modbus_WriteRegisters || modbus_Function == sproto.Modbus_RWCoils || modbus_Function == sproto.Modbus_RWRegisters } func (m *modbusDcService) run(ctx context.Context) { m.cli.Start() for { select { case <-ctx.Done(): slog.Info("Modbus驱动采集服务退出", "id", m.config.Id, "url", m.config.Url) return default: } m.mappingTaskExecute() time.Sleep(time.Millisecond * time.Duration(m.config.Interval)) } } func (m *modbusDcService) mappingTaskExecute() { if m.cli.IsConnected() { for _, mdm := range m.config.Mapping { switch mdm.Function { case sproto.Modbus_ReadCoil, sproto.Modbus_RWCoils: data, err := m.cli.ReadCoil(uint16(mdm.Addr), uint16(mdm.Quantity)) if err != nil { slog.Error("Modbus驱动采集服务读取线圈失败", "id", m.config.Id, "url", m.config.Url, "error", err) continue } err = m.updateDcByBits(mdm, data) if err != nil { slog.Error("Modbus驱动采集服务更新驱采数据失败", "id", m.config.Id, "url", m.config.Url, "error", err) continue } case sproto.Modbus_ReadDiscreteInput: data, err := m.cli.ReadDiscreteInput(uint16(mdm.Addr), uint16(mdm.Quantity)) if err != nil { slog.Error("Modbus驱动采集服务读取离散输入失败", "id", m.config.Id, "url", m.config.Url, "error", err) continue } err = m.updateDcByBits(mdm, data) if err != nil { slog.Error("Modbus驱动采集服务更新驱采数据失败", "id", m.config.Id, "url", m.config.Url, "error", err) continue } case sproto.Modbus_ReadHoldingRegister, sproto.Modbus_RWRegisters: data, err := m.cli.ReadHoldingRegisterBytes(uint16(mdm.Addr), uint16(mdm.Quantity*2)) if err != nil { slog.Error("Modbus驱动采集服务读取保持寄存器失败", "id", m.config.Id, "url", m.config.Url, "error", err) continue } err = m.updateDcByBytes(mdm, data) if err != nil { slog.Error("Modbus驱动采集服务更新驱采数据失败", "id", m.config.Id, "url", m.config.Url, "error", err) continue } case sproto.Modbus_ReadInputRegister: data, err := m.cli.ReadInputRegisterBytes(uint16(mdm.Addr), uint16(mdm.Quantity*2)) if err != nil { slog.Error("Modbus驱动采集服务读取输入寄存器失败", "id", m.config.Id, "url", m.config.Url, "error", err) continue } err = m.updateDcByBytes(mdm, data) if err != nil { slog.Error("Modbus驱动采集服务更新驱采数据失败", "id", m.config.Id, "url", m.config.Url, "error", err) continue } case sproto.Modbus_WriteCoil: if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 bits := m.GetDcBits(mdm) err := m.cli.WriteCoil(uint16(mdm.Addr), bits[0]) if err != nil { slog.Error("Modbus驱动采集服务写单线圈失败", "id", m.config.Id, "url", m.config.Url, "error", err) continue } } case sproto.Modbus_WriteCoils: if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 bits := m.GetDcBits(mdm) err := m.cli.WriteCoils(uint16(mdm.Addr), bits) if err != nil { slog.Error("Modbus驱动采集服务写多线圈失败", "id", m.config.Id, "url", m.config.Url, "error", err) continue } } case sproto.Modbus_WriteRegister: if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 data := m.GetDcBytes(mdm) err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), data) if err != nil { slog.Error("Modbus驱动采集服务写单寄存器失败", "id", m.config.Id, "url", m.config.Url, "error", err) continue } } case sproto.Modbus_WriteRegisters: if mdm.WriteStrategy == sproto.Modbus_OnScheduled { // 定时写 data := m.GetDcBytes(mdm) err := m.cli.WriteRegisterBytes(uint16(mdm.Addr), data) if err != nil { slog.Error("Modbus驱动采集服务写多寄存器失败", "id", m.config.Id, "url", m.config.Url, "error", err) continue } } } } } else { slog.Error("Modbus驱动采集服务映射任务执行失败,Modbus未连接", "id", m.config.Id, "url", m.config.Url) } } func (m *modbusDcService) GetDcBits(mdm *sproto.ModbusDcMapping) []bool { switch mdm.Type { case sproto.DataType_CollectTable: // 采集数据 return m.dc.GetCollectBits(mdm.Start, mdm.Quantity) case sproto.DataType_DriveTable: // 驱动数据 return m.dc.GetDriveBits(mdm.Start, mdm.Quantity) default: panic("未知数据类型") } } func (m *modbusDcService) GetDcBytes(mdm *sproto.ModbusDcMapping) []byte { switch mdm.Type { case sproto.DataType_CollectTable: // 采集数据 return m.dc.GetCollectBytes(mdm.Start, mdm.Quantity*2) case sproto.DataType_DriveTable: // 驱动数据 return m.dc.GetDriveBytes(mdm.Start, mdm.Quantity*2) default: panic("未知数据类型") } } func (m *modbusDcService) updateDcByBits(mdm *sproto.ModbusDcMapping, bits []bool) error { switch mdm.Type { case sproto.DataType_CollectTable: // 采集数据 return m.dc.UpdateCollectByBits(mdm.Start, bits) case sproto.DataType_DriveTable: // 驱动数据 return m.dc.UpdateDriveByBits(mdm.Start, bits) } return nil } func (m *modbusDcService) updateDcByBytes(mdm *sproto.ModbusDcMapping, bytes []byte) error { switch mdm.Type { case sproto.DataType_CollectTable: // 采集数据 return m.dc.UpdateCollectByBytes(mdm.Start, bytes) case sproto.DataType_DriveTable: // 驱动数据 return m.dc.UpdateDriveByBytes(mdm.Start, bytes) } return nil } func checkConfig(config *sproto.ModbusConfig) error { if config.Url == "" { return fmt.Errorf("Modbus配置未设置url") } if !strings.HasPrefix(config.Url, "tcp") { return fmt.Errorf("Modbus配置url必须以tcp开头") } if config.UnitId == 0 { return fmt.Errorf("Modbus配置未设置unitId") } if config.Interval == 0 { return fmt.Errorf("Modbus配置未设置interval") } if len(config.Mapping) == 0 { return fmt.Errorf("Modbus配置无映射配置") } return nil } func (m *modbusDcService) Start() error { return nil } func (m *modbusDcService) Stop() error { m.cancel() modbus.DeleteClient(m.config.Url) return nil }