rts-sim-testing-service/third_party/dynamics/dynamics.go

243 lines
6.0 KiB
Go

package dynamics
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"runtime/debug"
"time"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/third_party/message"
"joylink.club/bj-rtsts-server/third_party/udp"
)
type DynamicsMessageManager interface {
CollectDynamicsTurnoutInfo() []*message.DynamicsTurnoutInfo
HandleDynamicsTrainInfo(info *message.DynamicsTrainInfo)
}
// 动力学接口
type Dynamics interface {
// 请求启动仿真
RequestStartSimulation(base *message.LineBaseInfo) error
// 请求停止仿真
RequestStopSimulation() error
// 请求添加列车
RequestAddTrain(info *message.InitTrainInfo) error
// 请求移除列车
RequestRemoveTrain(req *message.RemoveTrainReq) error
// 启动动力学消息功能
Start(manager DynamicsMessageManager)
// 停止动力学消息功能
Stop()
// 发送列车控制消息
SendTrainControlMessage(b []byte)
}
var _default Dynamics
func Default() Dynamics {
if !config.Config.Dynamics.Open {
panic("动力学接口模块未开启")
}
return _default
}
func Init() {
if !config.Config.Dynamics.Open {
return
}
slog.Info("初始化动力学接口模块")
_default = newDynamics()
}
type dynamics struct {
trainInfoUdpServer udp.UdpServer
turnoutStateUdpClient udp.UdpClient
trainControlUdpClient udp.UdpClient
baseUrl string
httpClient *http.Client
manager DynamicsMessageManager
turnoutTaskCancel context.CancelFunc
}
func newDynamics() Dynamics {
d := &dynamics{
turnoutStateUdpClient: udp.NewClient(fmt.Sprintf("%v:%v", config.Config.Dynamics.Ip, config.Config.Dynamics.UdpRemotePort)),
trainControlUdpClient: udp.NewClient(fmt.Sprintf("%v:%v", config.Config.Dynamics.Ip, config.Config.Dynamics.UdpRemoteTrainPort)),
baseUrl: getUrlBase(),
httpClient: &http.Client{
Timeout: time.Second * 5,
},
}
d.trainInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", config.Config.Dynamics.UdpLocalPort), d.handleDynamicsTrainInfo)
d.trainInfoUdpServer.Listen()
return d
}
// 解码列车信息并处理
func (d *dynamics) handleDynamicsTrainInfo(b []byte) {
trainInfo := &message.DynamicsTrainInfo{}
err := trainInfo.Decode(b)
if err != nil {
panic(err)
}
handler := d.manager
if handler != nil {
handler.HandleDynamicsTrainInfo(trainInfo)
}
}
func getUrlBase() string {
ip := config.Config.Dynamics.Ip
var port string
if config.Config.Dynamics.HttpPort != 0 {
port = fmt.Sprintf(":%d", config.Config.Dynamics.HttpPort)
}
urlBase := "http://" + ip + port
return urlBase
}
func (d *dynamics) buildUrl(uri string) string {
return d.baseUrl + uri
}
func (d *dynamics) RequestStartSimulation(base *message.LineBaseInfo) error {
if !config.Config.Dynamics.Open {
return nil
}
url := d.buildUrl("/api/start/")
data, _ := json.Marshal(base)
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return fmt.Errorf("请求启动仿真异常: %v", err)
}
defer resp.Body.Close()
var buf []byte
_, err = resp.Body.Read(buf)
if err != nil {
return fmt.Errorf("请求启动仿真读取相应异常: %v", err)
}
return nil
}
func (d *dynamics) RequestStopSimulation() error {
if !config.Config.Dynamics.Open {
return nil
}
url := d.buildUrl("/api/end/")
resp, err := d.httpClient.Post(url, "application/json", nil)
if err != nil {
return fmt.Errorf("停止仿真请求异常: %v", err)
}
defer resp.Body.Close()
var buf []byte
_, err = resp.Body.Read(buf)
if err != nil {
return fmt.Errorf("停止仿真响应读取异常: %v", err)
}
return nil
}
func (d *dynamics) RequestAddTrain(info *message.InitTrainInfo) error {
if !config.Config.Dynamics.Open {
return nil
}
url := d.buildUrl("/api/aerodynamics/init/train/")
data, _ := json.Marshal(info)
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return fmt.Errorf("动力学添加列车请求异常: %v", err)
}
defer resp.Body.Close()
var buf []byte
_, err = resp.Body.Read(buf)
if err != nil {
return fmt.Errorf("动力学添加列车响应读取异常: %v", err)
}
return nil
}
func (d *dynamics) RequestRemoveTrain(req *message.RemoveTrainReq) error {
if !config.Config.Dynamics.Open {
return nil
}
url := d.buildUrl("/api/aerodynamics/remove/train/")
data, _ := json.Marshal(req)
resp, err := d.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return fmt.Errorf("动力学移除列车请求异常: %v", err)
}
defer resp.Body.Close()
var buf []byte
_, err = resp.Body.Read(buf)
if err != nil {
return fmt.Errorf("动力学移除列车响应读取异常: %v", err)
}
return nil
}
func (d *dynamics) Start(manager DynamicsMessageManager) {
if manager == nil {
panic("启动动力学消息服务错误: DynamicsMessageManager不能为nil")
}
if d.manager != nil {
panic("启动动力学消息服务错误: 存在正在运行的任务")
}
d.manager = manager
ctx, cancle := context.WithCancel(context.Background())
go d.sendTurnoutStateTask(ctx)
d.turnoutTaskCancel = cancle
}
func (d *dynamics) Stop() {
if d.turnoutTaskCancel != nil {
d.turnoutTaskCancel()
d.manager = nil
}
}
const (
// 道岔消息发送间隔,单位ms
TurnoutMessageSendInterval = 50
)
var turnoutStateLifeSignal uint16 //道岔消息生命信号
// 定时发送道岔状态任务
func (d *dynamics) sendTurnoutStateTask(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
slog.Error("定时发送道岔状态任务异常", "error", err, "stack", string(debug.Stack()))
debug.PrintStack()
}
}()
for {
select {
case <-ctx.Done():
return
default:
}
turnoutStates := d.manager.CollectDynamicsTurnoutInfo()
// slog.Debug("发送道岔状态", "count", len(turnoutStates))
for _, state := range turnoutStates {
turnoutStateLifeSignal++
state.LifeSignal = turnoutStateLifeSignal
d.turnoutStateUdpClient.SendMsg(state)
}
time.Sleep(time.Millisecond * TurnoutMessageSendInterval)
}
}
func (d *dynamics) SendTrainControlMessage(b []byte) {
d.trainControlUdpClient.Send(b)
}