rts-sim-testing-service/third_party/dynamics/dynamics.go

291 lines
7.3 KiB
Go

package dynamics
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"runtime/debug"
"sync"
"time"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/sys_error"
"joylink.club/bj-rtsts-server/third_party/message"
"joylink.club/bj-rtsts-server/third_party/udp"
)
type DynamicsMessageManager interface {
CollectDynamicsTurnoutInfo() *message.DynamicsTurnoutInfo
HandleDynamicsTrainInfo(info *message.DynamicsTrainInfo)
GetDynamicsRunConfig() *config.DynamicsConfig
GetDynamicsRunRepository() *message.LineBaseInfo
}
// 动力学接口
type Dynamics interface {
// 请求添加列车
RequestAddTrain(info *message.InitTrainInfo) error
// 请求移除列车
RequestRemoveTrain(req *message.RemoveTrainReq) error
//请求变更列车参数
TrainOperationConfig(req *message.TrainOperationConfig) error
// 启动动力学消息功能
Start(manager DynamicsMessageManager) error
// 停止动力学消息功能
Stop()
// 发送列车控制消息
SendTrainControlMessage(b []byte)
}
var _default Dynamics
var initMutex sync.Mutex
func Default() Dynamics {
initMutex.Lock()
defer initMutex.Unlock()
if _default == nil {
_default = &dynamics{}
}
return _default
}
type dynamics struct {
trainInfoUdpServer udp.UdpServer
turnoutStateUdpClient udp.UdpClient
trainControlUdpClient udp.UdpClient
baseUrl string
httpClient *http.Client
manager DynamicsMessageManager
turnoutTaskCancel context.CancelFunc
runConfig *config.DynamicsConfig
}
// 解码列车信息并处理
func (d *dynamics) handleDynamicsTrainInfo(b []byte) {
trainInfo := &message.DynamicsTrainInfo{}
err := trainInfo.Decode(b)
if err != nil {
panic(err)
}
handler := d.manager
if handler != nil {
handler.HandleDynamicsTrainInfo(trainInfo)
}
}
func getUrlBase(c *config.DynamicsConfig) string {
ip := c.Ip
var port string
if c.HttpPort != 0 {
port = fmt.Sprintf(":%d", c.HttpPort)
}
urlBase := "http://" + ip + port
return urlBase
}
func (d *dynamics) buildUrl(uri string) string {
return d.baseUrl + uri
}
func (d *dynamics) requestStartSimulation(base *message.LineBaseInfo) error {
if !d.runConfig.Open {
return nil
}
url := d.buildUrl("/api/start/")
data, _ := json.Marshal(base)
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return sys_error.New("动力学开始仿真请求发送错误", err)
}
defer resp.Body.Close()
var buf []byte
_, err = resp.Body.Read(buf)
if err != nil {
return sys_error.New("动力学开始仿真请求响应错误", err)
}
return nil
}
func (d *dynamics) requestStopSimulation() error {
if !d.runConfig.Open {
return nil
}
url := d.buildUrl("/api/end/")
resp, err := d.httpClient.Post(url, "application/json", nil)
if err != nil {
return fmt.Errorf("停止仿真请求异常: %v", err)
}
defer resp.Body.Close()
var buf []byte
_, err = resp.Body.Read(buf)
if err != nil {
return fmt.Errorf("停止仿真响应读取异常: %v", err)
}
return nil
}
func (d *dynamics) RequestAddTrain(info *message.InitTrainInfo) error {
if !d.runConfig.Open {
return nil
}
url := d.buildUrl("/api/aerodynamics/init/train/")
data, _ := json.Marshal(info)
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return fmt.Errorf("动力学添加列车请求异常: %v", err)
}
defer resp.Body.Close()
var buf []byte
_, err = resp.Body.Read(buf)
if err != nil {
return fmt.Errorf("动力学添加列车响应读取异常: %v", err)
}
return nil
}
func (d *dynamics) RequestRemoveTrain(req *message.RemoveTrainReq) error {
if !d.runConfig.Open {
return nil
}
url := d.buildUrl("/api/aerodynamics/remove/train/")
data, _ := json.Marshal(req)
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return fmt.Errorf("动力学移除列车请求异常: %v", err)
}
defer resp.Body.Close()
var buf []byte
_, err = resp.Body.Read(buf)
if err != nil {
return fmt.Errorf("动力学移除列车响应读取异常: %v", err)
}
return nil
}
// 动力学数据设置
func (d *dynamics) TrainOperationConfig(req *message.TrainOperationConfig) error {
if !d.runConfig.Open {
return nil
}
data, _ := json.Marshal(req)
url := d.buildUrl("/api/config")
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
defer resp.Body.Close()
if err != nil {
return fmt.Errorf("动力学移除列车请求异常: %v", err)
}
var bodyData []byte
_, err = resp.Body.Read(bodyData)
if err != nil {
return fmt.Errorf("动力学移除列车响应读取异常: %v", err)
}
return nil
}
func (d *dynamics) Start(manager DynamicsMessageManager) error {
if manager == nil {
panic("启动动力学消息服务错误: DynamicsMessageManager不能为nil")
}
if d.manager != nil {
return fmt.Errorf("启动动力学消息服务错误: 存在正在运行的任务")
}
d.runConfig = manager.GetDynamicsRunConfig()
if d.runConfig == nil || d.runConfig.Ip == "" || !d.runConfig.Open {
return nil
}
d.manager = manager
// 初始化客户端信息
d.initDynamics()
// 初始化运行资源
err := d.initDynamicsRunRepository()
if err != nil {
d.Stop() // 发送错误后将信息销毁
panic(err)
}
ctx, cancle := context.WithCancel(context.Background())
go d.sendTurnoutStateTask(ctx)
d.turnoutTaskCancel = cancle
return nil
}
// 初始化客户端、服务等信息
func (d *dynamics) initDynamics() {
d.turnoutStateUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemotePort))
d.trainControlUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", d.runConfig.Ip, d.runConfig.UdpRemoteTrainPort))
d.baseUrl = getUrlBase(d.runConfig)
d.httpClient = &http.Client{Timeout: time.Second * 5}
d.trainInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", d.runConfig.UdpLocalPort), d.handleDynamicsTrainInfo)
d.trainInfoUdpServer.Listen()
}
// 动力学运行所需数据
func (d *dynamics) initDynamicsRunRepository() error {
// 动力学接口调用
err := d.requestStartSimulation(d.manager.GetDynamicsRunRepository())
if err != nil {
return err
}
return nil
}
func (d *dynamics) Stop() {
initMutex.Lock()
defer initMutex.Unlock()
_default = nil
if d.httpClient != nil {
d.requestStopSimulation()
d.httpClient = nil
}
if d.turnoutStateUdpClient != nil {
d.turnoutStateUdpClient.Close()
}
if d.trainControlUdpClient != nil {
d.trainControlUdpClient.Close()
}
if d.trainInfoUdpServer != nil {
d.trainInfoUdpServer.Close()
}
if d.turnoutTaskCancel != nil {
d.turnoutTaskCancel()
}
d.manager = nil
}
const (
// 道岔消息发送间隔,单位ms
TurnoutMessageSendInterval = 50
)
var turnoutStateLifeSignal uint16 //道岔消息生命信号
// 定时发送道岔状态任务
func (d *dynamics) sendTurnoutStateTask(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
slog.Error("定时发送道岔状态任务异常", "error", err, "stack", string(debug.Stack()))
debug.PrintStack()
}
}()
for {
select {
case <-ctx.Done():
return
default:
}
turnoutState := d.manager.CollectDynamicsTurnoutInfo()
turnoutStateLifeSignal++
turnoutState.LifeSignal = turnoutStateLifeSignal
d.turnoutStateUdpClient.SendMsg(turnoutState)
time.Sleep(time.Millisecond * TurnoutMessageSendInterval)
}
}
func (d *dynamics) SendTrainControlMessage(b []byte) {
d.trainControlUdpClient.Send(b)
}