package dynamics import ( "bytes" "context" "encoding/json" "fmt" "log/slog" "net/http" "runtime/debug" "sync" "time" "joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/sys_error" "joylink.club/bj-rtsts-server/third_party/message" "joylink.club/bj-rtsts-server/third_party/udp" ) type DynamicsMessageManager interface { CollectDynamicsTurnoutInfo() []*message.DynamicsTurnoutInfo HandleDynamicsTrainInfo(info *message.DynamicsTrainInfo) GetDynamicsRunConfig() *config.DynamicsConfig GetDynamicsRunRepository() *message.LineBaseInfo } // 动力学接口 type Dynamics interface { // 请求添加列车 RequestAddTrain(info *message.InitTrainInfo) error // 请求移除列车 RequestRemoveTrain(req *message.RemoveTrainReq) error // 启动动力学消息功能 Start(manager DynamicsMessageManager) error // 停止动力学消息功能 Stop() // 发送列车控制消息 SendTrainControlMessage(b []byte) } var _default Dynamics var initMutex sync.Mutex func Default() Dynamics { initMutex.Lock() defer initMutex.Unlock() if _default == nil { _default = &dynamics{} } return _default } type dynamics struct { trainInfoUdpServer udp.UdpServer turnoutStateUdpClient udp.UdpClient trainControlUdpClient udp.UdpClient baseUrl string httpClient *http.Client manager DynamicsMessageManager turnoutTaskCancel context.CancelFunc runConfig *config.DynamicsConfig } // 解码列车信息并处理 func (d *dynamics) handleDynamicsTrainInfo(b []byte) { trainInfo := &message.DynamicsTrainInfo{} err := trainInfo.Decode(b) if err != nil { panic(err) } handler := d.manager if handler != nil { handler.HandleDynamicsTrainInfo(trainInfo) } } func getUrlBase(c *config.DynamicsConfig) string { ip := c.Ip var port string if c.HttpPort != 0 { port = fmt.Sprintf(":%d", c.HttpPort) } urlBase := "http://" + ip + port return urlBase } func (d *dynamics) buildUrl(uri string) string { return d.baseUrl + uri } func (d *dynamics) requestStartSimulation(base *message.LineBaseInfo) error { if !d.runConfig.Open { return nil } url := d.buildUrl("/api/start/") data, _ := json.Marshal(base) resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) if err != nil { return sys_error.New("动力学开始仿真请求发送错误", err) } defer resp.Body.Close() var buf []byte _, err = resp.Body.Read(buf) if err != nil { return sys_error.New("动力学开始仿真请求响应错误", err) } return nil } func (d *dynamics) requestStopSimulation() error { if !d.runConfig.Open { return nil } url := d.buildUrl("/api/end/") resp, err := d.httpClient.Post(url, "application/json", nil) if err != nil { return fmt.Errorf("停止仿真请求异常: %v", err) } defer resp.Body.Close() var buf []byte _, err = resp.Body.Read(buf) if err != nil { return fmt.Errorf("停止仿真响应读取异常: %v", err) } return nil } func (d *dynamics) RequestAddTrain(info *message.InitTrainInfo) error { if !d.runConfig.Open { return nil } url := d.buildUrl("/api/aerodynamics/init/train/") data, _ := json.Marshal(info) resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) if err != nil { return fmt.Errorf("动力学添加列车请求异常: %v", err) } defer resp.Body.Close() var buf []byte _, err = resp.Body.Read(buf) if err != nil { return fmt.Errorf("动力学添加列车响应读取异常: %v", err) } return nil } func (d *dynamics) RequestRemoveTrain(req *message.RemoveTrainReq) error { if !d.runConfig.Open { return nil } url := d.buildUrl("/api/aerodynamics/remove/train/") data, _ := json.Marshal(req) resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) if err != nil { return fmt.Errorf("动力学移除列车请求异常: %v", err) } defer resp.Body.Close() var buf []byte _, err = resp.Body.Read(buf) if err != nil { return fmt.Errorf("动力学移除列车响应读取异常: %v", err) } return nil } func (d *dynamics) Start(manager DynamicsMessageManager) error { if manager == nil { panic("启动动力学消息服务错误: DynamicsMessageManager不能为nil") } if d.manager != nil { panic("启动动力学消息服务错误: 存在正在运行的任务") } d.runConfig = manager.GetDynamicsRunConfig() if d.runConfig == nil || d.runConfig.Ip == "" || !d.runConfig.Open { return nil } d.manager = manager // 初始化客户端信息 d.initDynamics() // 初始化运行资源 err := d.initDynamicsRunRepository() if err != nil { panic("启动动力学消息服务错误: 存在正在运行的任务") } ctx, cancle := context.WithCancel(context.Background()) go d.sendTurnoutStateTask(ctx) d.turnoutTaskCancel = cancle return nil } // 初始化客户端、服务等信息 func (d *dynamics) initDynamics() { d.turnoutStateUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemotePort)) d.trainControlUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemoteTrainPort)) d.baseUrl = getUrlBase(d.runConfig) d.httpClient = &http.Client{Timeout: time.Second * 5} d.trainInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", d.runConfig.UdpLocalPort), d.handleDynamicsTrainInfo) d.trainInfoUdpServer.Listen() } // 动力学运行所需数据 func (d *dynamics) initDynamicsRunRepository() error { // 动力学接口调用 err := d.requestStartSimulation(d.manager.GetDynamicsRunRepository()) if err != nil { return err } return nil } func (d *dynamics) Stop() { if d.httpClient != nil { d.requestStopSimulation() d.httpClient = nil } if d.turnoutStateUdpClient != nil { d.turnoutStateUdpClient.Close() } if d.trainControlUdpClient != nil { d.trainControlUdpClient.Close() } if d.trainInfoUdpServer != nil { d.trainInfoUdpServer.Close() } if d.turnoutTaskCancel != nil { d.turnoutTaskCancel() d.manager = nil } } const ( // 道岔消息发送间隔,单位ms TurnoutMessageSendInterval = 50 ) var turnoutStateLifeSignal uint16 //道岔消息生命信号 // 定时发送道岔状态任务 func (d *dynamics) sendTurnoutStateTask(ctx context.Context) { defer func() { if err := recover(); err != nil { slog.Error("定时发送道岔状态任务异常", "error", err, "stack", string(debug.Stack())) debug.PrintStack() } }() for { select { case <-ctx.Done(): return default: } turnoutStates := d.manager.CollectDynamicsTurnoutInfo() // slog.Debug("发送道岔状态", "count", len(turnoutStates)) for _, state := range turnoutStates { turnoutStateLifeSignal++ state.LifeSignal = turnoutStateLifeSignal d.turnoutStateUdpClient.SendMsg(state) } time.Sleep(time.Millisecond * TurnoutMessageSendInterval) } } func (d *dynamics) SendTrainControlMessage(b []byte) { d.trainControlUdpClient.Send(b) }