rts-sim-testing-service/third_party/dynamics/dynamics.go
2024-01-25 09:30:19 +08:00

334 lines
8.5 KiB
Go

package dynamics
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"runtime/debug"
"sync"
"time"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/sys_error"
"joylink.club/bj-rtsts-server/third_party/message"
"joylink.club/bj-rtsts-server/third_party/tpapi"
"joylink.club/bj-rtsts-server/third_party/udp"
)
type DynamicsMessageManager interface {
CollectDynamicsTurnoutInfo() *message.DynamicsTurnoutInfo
HandleDynamicsTrainInfo(info *message.DynamicsTrainInfo)
GetDynamicsRunConfig() *config.DynamicsConfig
GetDynamicsRunRepository() *message.LineBaseInfo
}
// 动力学接口
type Dynamics interface {
tpapi.ThirdPartyApiService
// 请求添加列车
RequestAddTrain(info *message.InitTrainInfo) error
// 请求移除列车
RequestRemoveTrain(req *message.RemoveTrainReq) error
//请求变更列车参数
TrainOperationConfig(req *message.TrainOperationConfig) error
// 启动动力学消息功能
Start(manager DynamicsMessageManager) error
// 停止动力学消息功能
Stop()
// 发送列车控制消息
SendTrainControlMessage(b []byte)
}
var _default *dynamics
var initMutex sync.Mutex
const Name = "动力学"
const Interval = 15
func Default() Dynamics {
initMutex.Lock()
defer initMutex.Unlock()
if _default == nil {
_default = &dynamics{
udpDelayRecorder: tpapi.NewUdpDelayRecorder(Interval, func(err error) {
if err != nil {
_default.updateState(tpapi.ThirdPartyState_Broken)
} else {
_default.updateState(tpapi.ThirdPartyState_Normal)
}
}),
}
}
return _default
}
type dynamics struct {
tpapi.ThirdPartyApiService
trainInfoUdpServer udp.UdpServer
turnoutStateUdpClient udp.UdpClient
trainControlUdpClient udp.UdpClient
state tpapi.ThirdPartyApiServiceState
udpDelayRecorder *tpapi.UdpDelayRecorder
baseUrl string
httpClient *http.Client
manager DynamicsMessageManager
turnoutTaskCancel context.CancelFunc
runConfig *config.DynamicsConfig
}
func (d *dynamics) updateState(state tpapi.ThirdPartyApiServiceState) {
d.state = state
}
func (d *dynamics) State() tpapi.ThirdPartyApiServiceState {
return d.state
}
func (d *dynamics) Name() string {
return Name
}
// 解码列车信息并处理
func (d *dynamics) handleDynamicsTrainInfo(b []byte) {
d.udpDelayRecorder.RecordInterval()
// slog.Debug("动力学列车信息近期消息间隔", "intervals", d.udpDelayRecorder.GetIntervals())
trainInfo := &message.DynamicsTrainInfo{}
err := trainInfo.Decode(b)
if err != nil {
panic(err)
}
handler := d.manager
if handler != nil {
handler.HandleDynamicsTrainInfo(trainInfo)
}
}
func getUrlBase(c *config.DynamicsConfig) string {
ip := c.Ip
var port string
if c.HttpPort != 0 {
port = fmt.Sprintf(":%d", c.HttpPort)
}
urlBase := "http://" + ip + port
return urlBase
}
func (d *dynamics) buildUrl(uri string) string {
return d.baseUrl + uri
}
func (d *dynamics) requestStartSimulation(base *message.LineBaseInfo) error {
if !d.runConfig.Open {
return nil
}
url := d.buildUrl("/api/start/")
data, _ := json.Marshal(base)
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return sys_error.New("动力学开始仿真请求发送错误", err)
}
defer resp.Body.Close()
var buf []byte
_, err = resp.Body.Read(buf)
if err != nil {
return sys_error.New("动力学开始仿真请求响应错误", err)
}
return nil
}
func (d *dynamics) requestStopSimulation() error {
if !d.runConfig.Open {
return nil
}
url := d.buildUrl("/api/end/")
resp, err := d.httpClient.Post(url, "application/json", nil)
if err != nil {
return fmt.Errorf("停止仿真请求异常: %v", err)
}
defer resp.Body.Close()
var buf []byte
_, err = resp.Body.Read(buf)
if err != nil {
return fmt.Errorf("停止仿真响应读取异常: %v", err)
}
return nil
}
func (d *dynamics) RequestAddTrain(info *message.InitTrainInfo) error {
if !d.runConfig.Open {
return nil
}
url := d.buildUrl("/api/aerodynamics/init/train/")
data, _ := json.Marshal(info)
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return fmt.Errorf("动力学添加列车请求异常: %v", err)
}
defer resp.Body.Close()
var buf []byte
_, err = resp.Body.Read(buf)
if err != nil {
return fmt.Errorf("动力学添加列车响应读取异常: %v", err)
}
return nil
}
func (d *dynamics) RequestRemoveTrain(req *message.RemoveTrainReq) error {
if !d.runConfig.Open {
return nil
}
url := d.buildUrl("/api/aerodynamics/remove/train/")
data, _ := json.Marshal(req)
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return fmt.Errorf("动力学移除列车请求异常: %v", err)
}
defer resp.Body.Close()
var buf []byte
_, err = resp.Body.Read(buf)
if err != nil {
return fmt.Errorf("动力学移除列车响应读取异常: %v", err)
}
return nil
}
// 动力学数据设置
func (d *dynamics) TrainOperationConfig(req *message.TrainOperationConfig) error {
if !d.runConfig.Open {
return nil
}
data, _ := json.Marshal(req)
url := d.buildUrl("/api/config/")
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return fmt.Errorf("动力学移除列车请求异常: %v", err)
}
defer resp.Body.Close()
var bodyData []byte
_, err = resp.Body.Read(bodyData)
if err != nil {
return fmt.Errorf("动力学移除列车响应读取异常: %v", err)
}
return nil
}
func (d *dynamics) Start(manager DynamicsMessageManager) error {
if manager == nil {
panic("启动动力学消息服务错误: DynamicsMessageManager不能为nil")
}
if d.manager != nil {
return fmt.Errorf("启动动力学消息服务错误: 存在正在运行的任务")
}
d.runConfig = manager.GetDynamicsRunConfig()
if d.runConfig == nil || d.runConfig.Ip == "" || !d.runConfig.Open {
return nil
}
d.manager = manager
// 初始化客户端信息
err := d.initDynamics()
if err != nil {
d.Stop()
return err
}
// 初始化运行资源
err = d.initDynamicsRunRepository()
if err != nil {
d.Stop() // 发送错误后将信息销毁
return err
}
ctx, cancle := context.WithCancel(context.Background())
go d.sendTurnoutStateTask(ctx)
d.turnoutTaskCancel = cancle
d.updateState(tpapi.ThirdPartyState_Normal)
d.udpDelayRecorder.Start()
return nil
}
// 初始化客户端、服务等信息
func (d *dynamics) initDynamics() error {
d.turnoutStateUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemotePort))
d.trainControlUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemoteTrainPort))
d.baseUrl = getUrlBase(d.runConfig)
d.httpClient = &http.Client{Timeout: time.Second * 5}
d.trainInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", d.runConfig.UdpLocalPort), d.handleDynamicsTrainInfo)
return d.trainInfoUdpServer.Listen()
}
// 动力学运行所需数据
func (d *dynamics) initDynamicsRunRepository() error {
// 动力学接口调用
err := d.requestStartSimulation(d.manager.GetDynamicsRunRepository())
if err != nil {
return err
}
return nil
}
func (d *dynamics) Stop() {
initMutex.Lock()
defer initMutex.Unlock()
slog.Debug("动力学服务停止")
// 停止网络监听
d.udpDelayRecorder.Stop()
if d.turnoutTaskCancel != nil {
d.turnoutTaskCancel()
}
if d.httpClient != nil {
d.requestStopSimulation()
d.httpClient = nil
}
if d.turnoutStateUdpClient != nil {
d.turnoutStateUdpClient.Close()
d.turnoutStateUdpClient = nil
}
if d.trainControlUdpClient != nil {
d.trainControlUdpClient.Close()
d.trainControlUdpClient = nil
}
if d.trainInfoUdpServer != nil {
d.trainInfoUdpServer.Close()
d.trainInfoUdpServer = nil
}
d.manager = nil
d.updateState(tpapi.ThirdPartyState_Closed)
}
const (
// 道岔消息发送间隔,单位ms
TurnoutMessageSendInterval = 50
)
var turnoutStateLifeSignal uint16 //道岔消息生命信号
// 定时发送道岔状态任务
func (d *dynamics) sendTurnoutStateTask(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
slog.Error("定时发送道岔状态任务异常", "error", err, "stack", string(debug.Stack()))
debug.PrintStack()
}
}()
for {
select {
case <-ctx.Done():
return
default:
}
turnoutState := d.manager.CollectDynamicsTurnoutInfo()
turnoutStateLifeSignal++
turnoutState.LifeSignal = turnoutStateLifeSignal
d.turnoutStateUdpClient.SendMsg(turnoutState)
time.Sleep(time.Millisecond * TurnoutMessageSendInterval)
}
}
func (d *dynamics) SendTrainControlMessage(b []byte) {
d.trainControlUdpClient.Send(b)
}