d45752ddb5
All checks were successful
local-test分支打包构建docker并发布运行 / Docker-Build (push) Successful in 1m40s
367 lines
9.4 KiB
Go
367 lines
9.4 KiB
Go
package dynamics
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"runtime/debug"
|
|
"sync"
|
|
"time"
|
|
|
|
"joylink.club/bj-rtsts-server/config"
|
|
"joylink.club/bj-rtsts-server/sys_error"
|
|
"joylink.club/bj-rtsts-server/third_party/message"
|
|
"joylink.club/bj-rtsts-server/third_party/tpapi"
|
|
"joylink.club/bj-rtsts-server/third_party/udp"
|
|
)
|
|
|
|
type DynamicsMessageManager interface {
|
|
CollectDynamicsTurnoutInfo() *message.DynamicsTurnoutInfo
|
|
|
|
CollectTrainControlState() []message.TrainControlMsg
|
|
HandleDynamicsTrainInfo(info *message.DynamicsTrainInfo)
|
|
GetDynamicsRunConfig() *config.DynamicsConfig
|
|
GetDynamicsRunRepository() *message.LineBaseInfo
|
|
}
|
|
|
|
// 动力学接口
|
|
type Dynamics interface {
|
|
tpapi.ThirdPartyApiService
|
|
// 请求添加列车
|
|
RequestAddTrain(info *message.InitTrainInfo) error
|
|
// 请求移除列车
|
|
RequestRemoveTrain(req *message.RemoveTrainReq) error
|
|
//请求变更列车参数
|
|
TrainOperationConfig(req *message.TrainOperationConfig) error
|
|
// 启动动力学消息功能
|
|
Start(manager DynamicsMessageManager) error
|
|
// 停止动力学消息功能
|
|
Stop()
|
|
|
|
// 发送列车控制消息
|
|
SendTrainControlMessage(b []byte)
|
|
|
|
SendTrainControl(cm *message.TrainControlMsg)
|
|
}
|
|
|
|
var _default *dynamics
|
|
var initMutex sync.Mutex
|
|
|
|
const Name = "动力学"
|
|
const Interval = 15
|
|
|
|
func Default() Dynamics {
|
|
initMutex.Lock()
|
|
defer initMutex.Unlock()
|
|
if _default == nil {
|
|
_default = &dynamics{
|
|
udpDelayRecorder: tpapi.NewUdpDelayRecorder(Interval, func(err error) {
|
|
if err != nil {
|
|
_default.updateState(tpapi.ThirdPartyState_Broken)
|
|
} else {
|
|
_default.updateState(tpapi.ThirdPartyState_Normal)
|
|
}
|
|
}),
|
|
}
|
|
}
|
|
return _default
|
|
}
|
|
|
|
type dynamics struct {
|
|
tpapi.ThirdPartyApiService
|
|
trainInfoUdpServer udp.UdpServer
|
|
turnoutStateUdpClient udp.UdpClient
|
|
trainControlUdpClient udp.UdpClient
|
|
|
|
state tpapi.ThirdPartyApiServiceState
|
|
udpDelayRecorder *tpapi.UdpDelayRecorder
|
|
|
|
baseUrl string
|
|
httpClient *http.Client
|
|
manager DynamicsMessageManager
|
|
turnoutTaskCancel context.CancelFunc
|
|
runConfig *config.DynamicsConfig
|
|
}
|
|
|
|
func (d *dynamics) updateState(state tpapi.ThirdPartyApiServiceState) {
|
|
d.state = state
|
|
}
|
|
|
|
func (d *dynamics) State() tpapi.ThirdPartyApiServiceState {
|
|
return d.state
|
|
}
|
|
|
|
func (d *dynamics) Name() string {
|
|
return Name
|
|
}
|
|
|
|
// 解码列车信息并处理
|
|
func (d *dynamics) handleDynamicsTrainInfo(b []byte) {
|
|
d.udpDelayRecorder.RecordInterval()
|
|
// slog.Debug("动力学列车信息近期消息间隔", "intervals", d.udpDelayRecorder.GetIntervals())
|
|
trainInfo := &message.DynamicsTrainInfo{}
|
|
err := trainInfo.Decode(b)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
handler := d.manager
|
|
if handler != nil {
|
|
handler.HandleDynamicsTrainInfo(trainInfo)
|
|
}
|
|
}
|
|
|
|
func getUrlBase(c *config.DynamicsConfig) string {
|
|
ip := c.Ip
|
|
var port string
|
|
if c.HttpPort != 0 {
|
|
port = fmt.Sprintf(":%d", c.HttpPort)
|
|
}
|
|
urlBase := "http://" + ip + port
|
|
return urlBase
|
|
}
|
|
|
|
func (d *dynamics) buildUrl(uri string) string {
|
|
return d.baseUrl + uri
|
|
}
|
|
|
|
func (d *dynamics) requestStartSimulation(base *message.LineBaseInfo) error {
|
|
if !d.runConfig.Open {
|
|
return nil
|
|
}
|
|
url := d.buildUrl("/api/start/")
|
|
data, _ := json.Marshal(base)
|
|
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
|
|
if err != nil {
|
|
return sys_error.New("动力学开始仿真请求发送错误", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
var buf []byte
|
|
_, err = resp.Body.Read(buf)
|
|
if err != nil {
|
|
return sys_error.New("动力学开始仿真请求响应错误", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *dynamics) requestStopSimulation() error {
|
|
if !d.runConfig.Open {
|
|
return nil
|
|
}
|
|
url := d.buildUrl("/api/end/")
|
|
resp, err := d.httpClient.Post(url, "application/json", nil)
|
|
if err != nil {
|
|
return fmt.Errorf("停止仿真请求异常: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
var buf []byte
|
|
_, err = resp.Body.Read(buf)
|
|
if err != nil {
|
|
return fmt.Errorf("停止仿真响应读取异常: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *dynamics) RequestAddTrain(info *message.InitTrainInfo) error {
|
|
if !d.runConfig.Open {
|
|
return nil
|
|
}
|
|
url := d.buildUrl("/api/aerodynamics/init/train/")
|
|
data, _ := json.Marshal(info)
|
|
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
|
|
if err != nil {
|
|
return fmt.Errorf("动力学添加列车请求异常: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
var buf []byte
|
|
_, err = resp.Body.Read(buf)
|
|
if err != nil {
|
|
return fmt.Errorf("动力学添加列车响应读取异常: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *dynamics) RequestRemoveTrain(req *message.RemoveTrainReq) error {
|
|
if !d.runConfig.Open {
|
|
return nil
|
|
}
|
|
url := d.buildUrl("/api/aerodynamics/remove/train/")
|
|
data, _ := json.Marshal(req)
|
|
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
|
|
if err != nil {
|
|
return fmt.Errorf("动力学移除列车请求异常: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
var buf []byte
|
|
_, err = resp.Body.Read(buf)
|
|
if err != nil {
|
|
return fmt.Errorf("动力学移除列车响应读取异常: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// 动力学数据设置
|
|
func (d *dynamics) TrainOperationConfig(req *message.TrainOperationConfig) error {
|
|
if !d.runConfig.Open {
|
|
return nil
|
|
}
|
|
data, _ := json.Marshal(req)
|
|
url := d.buildUrl("/api/config/")
|
|
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
|
|
if err != nil {
|
|
return fmt.Errorf("动力学移除列车请求异常: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
var bodyData []byte
|
|
_, err = resp.Body.Read(bodyData)
|
|
if err != nil {
|
|
return fmt.Errorf("动力学移除列车响应读取异常: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
func (d *dynamics) Start(manager DynamicsMessageManager) error {
|
|
if manager == nil {
|
|
panic("启动动力学消息服务错误: DynamicsMessageManager不能为nil")
|
|
}
|
|
if d.manager != nil {
|
|
return fmt.Errorf("启动动力学消息服务错误: 存在正在运行的任务")
|
|
}
|
|
d.runConfig = manager.GetDynamicsRunConfig()
|
|
if d.runConfig == nil || d.runConfig.Ip == "" || !d.runConfig.Open {
|
|
return nil
|
|
}
|
|
d.manager = manager
|
|
// 初始化客户端信息
|
|
err := d.initDynamics()
|
|
if err != nil {
|
|
d.Stop()
|
|
return err
|
|
}
|
|
// 初始化运行资源
|
|
err = d.initDynamicsRunRepository()
|
|
if err != nil {
|
|
d.Stop() // 发送错误后将信息销毁
|
|
return err
|
|
}
|
|
ctx, cancle := context.WithCancel(context.Background())
|
|
go d.sendTurnoutStateTask(ctx)
|
|
go d.sendTrainControlStateTask(ctx)
|
|
d.turnoutTaskCancel = cancle
|
|
d.updateState(tpapi.ThirdPartyState_Normal)
|
|
d.udpDelayRecorder.Start()
|
|
return nil
|
|
}
|
|
|
|
// 初始化客户端、服务等信息
|
|
func (d *dynamics) initDynamics() error {
|
|
d.turnoutStateUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemotePort))
|
|
d.trainControlUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemoteTrainPort))
|
|
d.baseUrl = getUrlBase(d.runConfig)
|
|
d.httpClient = &http.Client{Timeout: time.Second * 5}
|
|
d.trainInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", d.runConfig.UdpLocalPort), d.handleDynamicsTrainInfo)
|
|
return d.trainInfoUdpServer.Listen()
|
|
}
|
|
|
|
// 动力学运行所需数据
|
|
func (d *dynamics) initDynamicsRunRepository() error {
|
|
// 动力学接口调用
|
|
err := d.requestStartSimulation(d.manager.GetDynamicsRunRepository())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *dynamics) Stop() {
|
|
initMutex.Lock()
|
|
defer initMutex.Unlock()
|
|
slog.Debug("动力学服务停止")
|
|
// 停止网络监听
|
|
d.udpDelayRecorder.Stop()
|
|
if d.turnoutTaskCancel != nil {
|
|
d.turnoutTaskCancel()
|
|
}
|
|
if d.httpClient != nil {
|
|
d.requestStopSimulation()
|
|
d.httpClient = nil
|
|
}
|
|
if d.turnoutStateUdpClient != nil {
|
|
d.turnoutStateUdpClient.Close()
|
|
d.turnoutStateUdpClient = nil
|
|
}
|
|
if d.trainControlUdpClient != nil {
|
|
d.trainControlUdpClient.Close()
|
|
d.trainControlUdpClient = nil
|
|
}
|
|
if d.trainInfoUdpServer != nil {
|
|
d.trainInfoUdpServer.Close()
|
|
d.trainInfoUdpServer = nil
|
|
}
|
|
d.manager = nil
|
|
d.updateState(tpapi.ThirdPartyState_Closed)
|
|
}
|
|
|
|
const (
|
|
// 道岔消息发送间隔,单位ms
|
|
TurnoutMessageSendInterval = 50
|
|
TrainControlMessageSendInterval = 15
|
|
)
|
|
|
|
var turnoutStateLifeSignal uint16 //道岔消息生命信号
|
|
// 定时发送列车控制状态
|
|
func (d *dynamics) sendTrainControlStateTask(ctx context.Context) {
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
slog.Error("定时发送列车控制状态任务异常", "error", err, "stack", string(debug.Stack()))
|
|
debug.PrintStack()
|
|
}
|
|
}()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
tccs := d.manager.CollectTrainControlState()
|
|
for _, tcc := range tccs {
|
|
d.SendTrainControl(&tcc)
|
|
}
|
|
time.Sleep(time.Millisecond * TrainControlMessageSendInterval)
|
|
}
|
|
}
|
|
|
|
// 定时发送道岔状态任务
|
|
func (d *dynamics) sendTurnoutStateTask(ctx context.Context) {
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
slog.Error("定时发送道岔状态任务异常", "error", err, "stack", string(debug.Stack()))
|
|
debug.PrintStack()
|
|
}
|
|
}()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
turnoutState := d.manager.CollectDynamicsTurnoutInfo()
|
|
turnoutStateLifeSignal++
|
|
turnoutState.LifeSignal = turnoutStateLifeSignal
|
|
d.turnoutStateUdpClient.SendMsg(turnoutState)
|
|
time.Sleep(time.Millisecond * TurnoutMessageSendInterval)
|
|
}
|
|
}
|
|
|
|
func (d *dynamics) SendTrainControlMessage(b []byte) {
|
|
if d.trainControlUdpClient != nil {
|
|
d.trainControlUdpClient.Send(b)
|
|
}
|
|
|
|
}
|
|
func (d *dynamics) SendTrainControl(cm *message.TrainControlMsg) {
|
|
d.SendTrainControlMessage(cm.Encode())
|
|
}
|