package server import ( "context" "log/slog" "os" "time" "joylink.club/iot/config" "joylink.club/iot/dto" "joylink.club/iot/log" "joylink.club/iot/mqtt" "joylink.club/iot/service" ) var iqcs *IotQcServer type IotQcServer struct { qcMappingService service.IotQcMappingService qcDataPubTask service.IScheduledTask tasks []service.IScheduledTask state *dto.IotServiceState cancel context.CancelFunc } func (s *IotQcServer) start() error { startMqttClient() // 注册服务请求处理 s.registerReqHandlers() // 启动服务状态发布定时任务 iqcs.tasks = append(iqcs.tasks, service.NewScheduledTask(pubServerState, 1*time.Second)) ctx, cancel := context.WithCancel(context.Background()) s.serve(ctx) s.cancel = cancel return nil } func (s *IotQcServer) serve(ctx context.Context) { defer s.stop() for { <-ctx.Done() time.Sleep(10 * time.Millisecond) } } func (s *IotQcServer) stop() error { if s.qcDataPubTask != nil { s.qcDataPubTask.Stop() } if s.qcMappingService != nil { s.qcMappingService.Stop() } for _, task := range s.tasks { task.Stop() } mqtt.Stop() return nil } // 服务状态监测 func (s *IotQcServer) stateMonitor() *dto.IotServiceState { if s.qcMappingService != nil { if err := s.qcMappingService.ReportError(); err != nil { // slog.Error("Modbus驱采映射服务报错", "err", err) return &dto.IotServiceState{ State: dto.ServiceState_Error, ErrMsg: err.Error(), } } } return &dto.IotServiceState{ State: dto.ServiceState_Normal, } } // 注册服务请求处理 func (s *IotQcServer) registerReqHandlers() { mqtt.RegIotQcServiceStartReqHandler(s.startIotQcMappingService) mqtt.RegIotQcServiceStopReqHandler(s.stopIotQcMappingService) mqtt.RegIotLogReqHandler(GetIotLog) // 注册驱采数据写入处理 mqtt.RegIotQdHandler(s.handleQdWrite) mqtt.RegIotCjHandler(s.handleCjWrite) } func (s *IotQcServer) pubQcData() { service := s.qcMappingService if service != nil { mqtt.PubIotCjData(&dto.IotCj{Data: service.GetCjBytes()}) mqtt.PubIotQdData(&dto.IotQd{Data: service.GetQdBytes()}) } } func (s *IotQcServer) startIotQcMappingService(req *dto.IotQcServiceStartReq) *dto.IotQcServiceCommonResp { mqcs, err := service.NewModbusQcService(req.Config) if err != nil { slog.Error("创建Modbus驱采映射服务失败", "err", err) return &dto.IotQcServiceCommonResp{Code: 1, Msg: err.Error()} } s.qcMappingService = mqcs s.qcDataPubTask = service.NewScheduledTask(s.pubQcData, time.Duration(req.Config.Interval)*time.Millisecond) return &dto.IotQcServiceCommonResp{Code: 0, Msg: "成功"} } func (s *IotQcServer) handleQdWrite(qd *dto.IotQd) { if s.qcMappingService != nil { slog.Info("IOT收到并执行写入驱动数据", "data", qd.Data) s.qcMappingService.WriteQdBytes(qd.Data) } } func (s *IotQcServer) handleCjWrite(cj *dto.IotCj) { if s.qcMappingService != nil { slog.Info("IOT收到并执行写入采集数据", "data", cj.Data) s.qcMappingService.WriteCjBytes(cj.Data) } } func (s *IotQcServer) stopIotQcMappingService(req *dto.IotQcServiceStopReq) *dto.IotQcServiceCommonResp { task := s.qcDataPubTask s.qcDataPubTask = nil if task != nil { task.Stop() } service := s.qcMappingService s.qcMappingService = nil if service != nil { service.Stop() } return &dto.IotQcServiceCommonResp{Code: 0, Msg: "成功"} } func StartIotQcServer() { log.InitLog() iqcs = &IotQcServer{ tasks: []service.IScheduledTask{}, state: &dto.IotServiceState{ State: dto.ServiceState_Normal, }, } iqcs.start() } func StopIotQcServer() { iqcs.cancel() } func pubServerState() { state := iqcs.stateMonitor() slog.Debug("发布服务状态", "state", state.State, "msg", state.ErrMsg) mqtt.PubIotServiceState(state) } func startMqttClient() { config.LoadConfig() mqttcfg := config.Cfg.Mqtt cmc := &mqtt.IotMqttConfig{ AppId: mqttcfg.Topic.App, BrokerUrl: mqttcfg.Address, ClientId: mqttcfg.ClientId, Username: mqttcfg.Username, Password: mqttcfg.Password, KeepAlive: mqttcfg.KeepAlive, ConnectRetryDelay: mqttcfg.ConnectRetryDelay, ConnectTimeout: mqttcfg.ConnectTimeout, } err := mqtt.Start(cmc) if err != nil { slog.Error("启动MQTT客户端失败", "error", err) os.Exit(1) } }