Merge remote-tracking branch 'origin/master'

This commit is contained in:
joylink_zhangsai 2024-01-23 16:46:20 +08:00
commit 007ec388b8
17 changed files with 950 additions and 730 deletions

File diff suppressed because it is too large Load Diff

View File

@ -10,6 +10,7 @@ import (
"joylink.club/bj-rtsts-server/docs" "joylink.club/bj-rtsts-server/docs"
"joylink.club/bj-rtsts-server/middleware" "joylink.club/bj-rtsts-server/middleware"
"joylink.club/bj-rtsts-server/mqtt" "joylink.club/bj-rtsts-server/mqtt"
"joylink.club/bj-rtsts-server/third_party"
) )
// @title CBTC测试系统API // @title CBTC测试系统API
@ -26,6 +27,7 @@ import (
func main() { func main() {
engine := InitServer() engine := InitServer()
mqtt.Startup(mqtt.NewMqttOptions(config.Config.Messaging.Mqtt.Address, config.Config.Messaging.Mqtt.Username, config.Config.Messaging.Mqtt.Password)) mqtt.Startup(mqtt.NewMqttOptions(config.Config.Messaging.Mqtt.Address, config.Config.Messaging.Mqtt.Username, config.Config.Messaging.Mqtt.Password))
third_party.Init()
authMiddleware := middleware.InitGinJwtMiddleware() authMiddleware := middleware.InitGinJwtMiddleware()
router := engine.Group("/api") router := engine.Group("/api")
api.InitUserRouter(router, authMiddleware) api.InitUserRouter(router, authMiddleware)

View File

@ -2,11 +2,12 @@ package message_server
import ( import (
"fmt" "fmt"
"joylink.club/bj-rtsts-server/dto/common_proto"
"reflect" "reflect"
"strings" "strings"
"time" "time"
"joylink.club/bj-rtsts-server/dto/common_proto"
"joylink.club/bj-rtsts-server/dto/request_proto" "joylink.club/bj-rtsts-server/dto/request_proto"
"joylink.club/bj-rtsts-server/dto/state_proto" "joylink.club/bj-rtsts-server/dto/state_proto"
"joylink.club/rtsssimulation/repository/model/proto" "joylink.club/rtsssimulation/repository/model/proto"
@ -161,6 +162,18 @@ func handlerSectionState(w ecs.World, uid string) *state_proto.SectionState {
axleState := component.PhysicalSectionStateType.Get(entry) axleState := component.PhysicalSectionStateType.Get(entry)
sectionState.Occupied = axleState.Occ sectionState.Occupied = axleState.Occ
sectionState.AxleFault = entry.HasComponent(component.AxleSectionFaultTag) sectionState.AxleFault = entry.HasComponent(component.AxleSectionFaultTag)
wd := entity.GetWorldData(w)
sectionModel := wd.Repo.FindPhysicalSection(uid)
faDcAxleDeviceEntry := entity.FindAxleManageDevice(wd, sectionModel.CentralizedStation())
if faDcAxleDeviceEntry != nil {
faDcAxleDevice := component.AxleManageDeviceType.Get(faDcAxleDeviceEntry)
axleRuntime := faDcAxleDevice.FindAdr(uid)
if axleRuntime != nil {
sectionState.AxleDrst = axleRuntime.Drst
sectionState.AxlePdrst = axleRuntime.Pdrst
}
}
return sectionState return sectionState
} }
return nil return nil

View File

@ -1,26 +1,56 @@
package message_server package message_server
import ( import (
"log/slog"
"sync"
"joylink.club/bj-rtsts-server/dto/data_proto" "joylink.club/bj-rtsts-server/dto/data_proto"
"joylink.club/bj-rtsts-server/mqtt" "joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/ts/simulation/wayside/memory" "joylink.club/bj-rtsts-server/ts/simulation/wayside/memory"
) )
var _default = &publishTaskManage{tasks: make(map[string][]ms_api.MsgTask)}
// 仿真消息任务管理
type publishTaskManage struct {
mu sync.Mutex
tasks map[string][]ms_api.MsgTask
}
func (m *publishTaskManage) AddTasks(simulationId string, tasks ...ms_api.MsgTask) {
m.mu.Lock()
defer m.mu.Unlock()
m.tasks[simulationId] = append(m.tasks[simulationId], tasks...)
}
func (m *publishTaskManage) RemoveTasks(simulationId string) {
m.mu.Lock()
defer m.mu.Unlock()
slog.Info("停止并移除仿真消息服务", "simulationId", simulationId, "tasks", m.tasks[simulationId])
for _, task := range m.tasks[simulationId] {
task.Stop()
}
delete(m.tasks, simulationId)
}
// 启动仿真所需的消息服务 // 启动仿真所需的消息服务
func Start(vs *memory.VerifySimulation) { func Start(vs *memory.VerifySimulation) {
// 添加仿真状态消息服务
_default.AddTasks(vs.SimulationId, NewStateMs(vs))
_default.AddTasks(vs.SimulationId, NewTpapiServiceMs(vs))
for _, mapId := range vs.MapIds { for _, mapId := range vs.MapIds {
t := memory.QueryGiType(mapId) t := memory.QueryGiType(mapId)
switch t { switch t {
case data_proto.PictureType_StationLayout: // 平面布置图 case data_proto.PictureType_StationLayout: // 平面布置图
// 添加车站关联的平面布置图、IBP、PSL信息 // 添加车站关联的平面布置图、IBP、PSL信息
mqtt.GetMsgClient().PublishTask(vs.SimulationId, NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId)) _default.AddTasks(vs.SimulationId, NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId))
case data_proto.PictureType_RelayCabinetLayout: // 继电器柜 case data_proto.PictureType_RelayCabinetLayout: // 继电器柜
mqtt.GetMsgClient().PublishTask(vs.SimulationId, NewRccMs(vs, mapId)) _default.AddTasks(vs.SimulationId, NewRccMs(vs, mapId))
} }
} }
} }
// 关闭仿真消息服务 // 关闭仿真消息服务
func Close(vs *memory.VerifySimulation) { func Close(vs *memory.VerifySimulation) {
mqtt.GetMsgClient().CloseTask(vs.SimulationId) _default.RemoveTasks(vs.SimulationId)
} }

View File

@ -2,6 +2,8 @@ package message_server
import ( import (
"fmt" "fmt"
"time"
"joylink.club/bj-rtsts-server/dto/state_proto" "joylink.club/bj-rtsts-server/dto/state_proto"
"joylink.club/bj-rtsts-server/message_server/ms_api" "joylink.club/bj-rtsts-server/message_server/ms_api"
@ -11,20 +13,54 @@ import (
) )
func NewStateMs(vs *memory.VerifySimulation) ms_api.MsgTask { func NewStateMs(vs *memory.VerifySimulation) ms_api.MsgTask {
return ms_api.NewMonitorTask(fmt.Sprintf("仿真[%s]状态", vs.SimulationId), func() { return ms_api.NewScheduleTask(fmt.Sprintf("仿真[%s]状态", vs.SimulationId), func() error {
ecs.WorldStateChangeEvent.Subscribe(vs.World, func(_ ecs.World, e ecs.WorldStateChange) { s := &state_proto.SimulationStatus{
s := &state_proto.SimulationStatus{SimulationId: vs.SimulationId} SimulationId: vs.SimulationId,
switch e.NewState { State: convertWorldState(vs.World.State()),
case ecs.WorldClosed: }
s.State = state_proto.SimulationStatus_DESTROY mqtt.GetMsgClient().PubSimulationState(vs.SimulationId, s)
case ecs.WorldError: return nil
s.State = state_proto.SimulationStatus_ERROR }, 1000*time.Millisecond)
case ecs.WorldPause: // return ms_api.NewMonitorTask(fmt.Sprintf("仿真[%s]状态", vs.SimulationId), func() {
s.State = state_proto.SimulationStatus_PAUSE // ecs.WorldStateChangeEvent.Subscribe(vs.World, func(_ ecs.World, e ecs.WorldStateChange) {
default: // s := &state_proto.SimulationStatus{SimulationId: vs.SimulationId}
return // switch e.NewState {
} // case ecs.WorldClosed:
mqtt.GetMsgClient().PubSimulationState(vs.SimulationId, s) // s.State = state_proto.SimulationStatus_DESTROY
}) // case ecs.WorldError:
}) // s.State = state_proto.SimulationStatus_ERROR
// case ecs.WorldPause:
// s.State = state_proto.SimulationStatus_PAUSE
// default:
// return
// }
// mqtt.GetMsgClient().PubSimulationState(vs.SimulationId, s)
// })
// })
}
func convertWorldState(state ecs.WorldState) state_proto.SimulationStatus_SimulationState {
switch state {
case ecs.WorldInit:
return state_proto.SimulationStatus_Init
case ecs.WorldRunning:
return state_proto.SimulationStatus_Running
case ecs.WorldPause:
return state_proto.SimulationStatus_Pause
case ecs.WorldError:
return state_proto.SimulationStatus_Error
case ecs.WorldClosed:
return state_proto.SimulationStatus_Destroy
default:
panic("unknown world state")
}
}
// 发布仿真销毁消息
func PubSimulationDestroyMsg(simulationId string) {
s := &state_proto.SimulationStatus{
SimulationId: simulationId,
State: state_proto.SimulationStatus_Destroy,
}
mqtt.GetMsgClient().PubSimulationState(simulationId, s)
} }

View File

@ -0,0 +1,19 @@
package message_server
import (
"fmt"
"time"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/mqtt"
"joylink.club/bj-rtsts-server/third_party"
"joylink.club/bj-rtsts-server/ts/simulation/wayside/memory"
)
func NewTpapiServiceMs(vs *memory.VerifySimulation) ms_api.MsgTask {
return ms_api.NewScheduleTask(fmt.Sprintf("仿真第三方API服务[%s]状态", vs.SimulationId), func() error {
rss := third_party.GetRunningServiceStates()
mqtt.GetMsgClient().PubTpapiServiceState(vs.SimulationId, rss)
return nil
}, 1000*time.Millisecond)
}

View File

@ -10,16 +10,14 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoreflect"
"joylink.club/bj-rtsts-server/dto/state_proto" "joylink.club/bj-rtsts-server/dto/state_proto"
"joylink.club/bj-rtsts-server/message_server/ms_api"
) )
var mqttClient *MqttClient var mqttClient *MqttClient
// 客户端 // 客户端
type MqttClient struct { type MqttClient struct {
cc *autopaho.ClientConfig cc *autopaho.ClientConfig
cm *autopaho.ConnectionManager cm *autopaho.ConnectionManager
tasks map[string][]ms_api.MsgTask
} }
// 初始化并启动MQTT客户端服务 // 初始化并启动MQTT客户端服务
@ -35,7 +33,7 @@ func Startup(cmc *MqttOptions) error {
if err != nil { if err != nil {
return err return err
} }
mqttClient = &MqttClient{cc: cc, cm: cm, tasks: make(map[string][]ms_api.MsgTask)} mqttClient = &MqttClient{cc: cc, cm: cm}
return nil return nil
} }
@ -81,6 +79,9 @@ func (client *MqttClient) pub(topic string, data protoreflect.ProtoMessage) erro
slog.Error("未知发布主题", "topic", topic, "data", data) slog.Error("未知发布主题", "topic", topic, "data", data)
return fmt.Errorf("未知发布主题: topic=%s", topic) return fmt.Errorf("未知发布主题: topic=%s", topic)
} }
// else {
// slog.Debug("发布主题", "topic", topic, "data", data)
// }
_, err = client.cm.Publish(context.Background(), &paho.Publish{ _, err = client.cm.Publish(context.Background(), &paho.Publish{
Topic: topic, Topic: topic,
QoS: 0, QoS: 0,
@ -89,28 +90,15 @@ func (client *MqttClient) pub(topic string, data protoreflect.ProtoMessage) erro
return err return err
} }
// 发布任务
func (client *MqttClient) PublishTask(simulationId string, tasks ...ms_api.MsgTask) {
client.tasks[simulationId] = append(client.tasks[simulationId], tasks...)
}
// 停止任务
func (client *MqttClient) CloseTask(simulationId string) {
tasks, ok := client.tasks[simulationId]
if !ok {
return
}
for _, task := range tasks {
task.Stop()
}
client.tasks[simulationId] = nil
}
// 发送仿真状态数据 // 发送仿真状态数据
func (client *MqttClient) PubSimulationState(simulationId string, msg *state_proto.SimulationStatus) error { func (client *MqttClient) PubSimulationState(simulationId string, msg *state_proto.SimulationStatus) error {
return client.pub(GetStateTopic(simulationId), msg) return client.pub(GetStateTopic(simulationId), msg)
} }
func (c *MqttClient) PubTpapiServiceState(simulationId string, msg *state_proto.SimulationThirdPartyApiService) error {
return c.pub(GetTpapiServiceTopic(simulationId), msg)
}
// 发送IBP状态数据 // 发送IBP状态数据
func (client *MqttClient) PubIBPState(simulationId string, mapId int32, stationId uint32, msg *state_proto.PushedDevicesStatus) error { func (client *MqttClient) PubIBPState(simulationId string, mapId int32, stationId uint32, msg *state_proto.PushedDevicesStatus) error {
return client.pub(GetIbpTopic(simulationId, mapId, stationId), msg) return client.pub(GetIbpTopic(simulationId, mapId, stationId), msg)

View File

@ -8,16 +8,18 @@ import (
) )
const ( const (
topicPrefix = "/" + config.SystemName + "/simulation/%s/" // 公共部分 仿真ID topicPrefix = "/" + config.SystemName + "/simulation/%s/" // 公共部分 仿真ID
stateTopic = topicPrefix + "state" // 仿真状态topic stateTopic = topicPrefix + "state" // 仿真状态topic
sfpTopic = topicPrefix + "sfp/%d" // 平面布置图设备状态topic 地图ID tpapiServiceTopic = topicPrefix + "tpis" // 第三方API服务状态topic
rccTopic = topicPrefix + "rcc/%d" // 继电器柜继电器状态topic 地图ID sfpTopic = topicPrefix + "sfp/%d" // 平面布置图设备状态topic 地图ID
pslTopic = topicPrefix + "psl/%d/%d" // psl状态topic 地图ID/门控箱ID rccTopic = topicPrefix + "rcc/%d" // 继电器柜继电器状态topic 地图ID
ibpTopic = topicPrefix + "ibp/%d/%d" // ibp盘状态topic 地图ID/车站ID pslTopic = topicPrefix + "psl/%d/%d" // psl状态topic 地图ID/门控箱ID
ibpTopic = topicPrefix + "ibp/%d/%d" // ibp盘状态topic 地图ID/车站ID
) )
var topicMap = map[string]string{ var topicMap = map[string]string{
"state": stateTopic, "state": stateTopic,
"tpis": tpapiServiceTopic,
"sfp": sfpTopic, "sfp": sfpTopic,
"rcc": rccTopic, "rcc": rccTopic,
"psl": pslTopic, "psl": pslTopic,
@ -54,6 +56,10 @@ func GetStateTopic(simulationId string) string {
return fmt.Sprintf(stateTopic, simulationId) return fmt.Sprintf(stateTopic, simulationId)
} }
func GetTpapiServiceTopic(simulation string) string {
return fmt.Sprintf(tpapiServiceTopic, simulation)
}
// 信号布置图设备状态消息topic // 信号布置图设备状态消息topic
func GetSfpTopic(simulationId string, mapId int32) string { func GetSfpTopic(simulationId string, mapId int32) string {
return fmt.Sprintf(sfpTopic, simulationId, mapId) return fmt.Sprintf(sfpTopic, simulationId, mapId)

@ -1 +1 @@
Subproject commit d94a8335dcffbb44d7d865d8b7200da335e2241f Subproject commit fa7211d7bdf68be1df571f222a047ddd6cd764b0

View File

@ -43,9 +43,10 @@ type Dynamics interface {
SendTrainControlMessage(b []byte) SendTrainControlMessage(b []byte)
} }
var _default Dynamics var _default *dynamics
var initMutex sync.Mutex var initMutex sync.Mutex
const Name = "动力学"
const Interval = 15 const Interval = 15
func Default() Dynamics { func Default() Dynamics {
@ -53,12 +54,11 @@ func Default() Dynamics {
defer initMutex.Unlock() defer initMutex.Unlock()
if _default == nil { if _default == nil {
_default = &dynamics{ _default = &dynamics{
ThirdPartyApiService: tpapi.NewThirdPartyApiService(),
udpDelayRecorder: tpapi.NewUdpDelayRecorder(Interval, func(err error) { udpDelayRecorder: tpapi.NewUdpDelayRecorder(Interval, func(err error) {
if err != nil { if err != nil {
_default.UpdateState(tpapi.ThirdPartyState_Broken) _default.updateState(tpapi.ThirdPartyState_Broken)
} else { } else {
_default.UpdateState(tpapi.ThirdPartyState_Normal) _default.updateState(tpapi.ThirdPartyState_Normal)
} }
}), }),
} }
@ -72,6 +72,7 @@ type dynamics struct {
turnoutStateUdpClient udp.UdpClient turnoutStateUdpClient udp.UdpClient
trainControlUdpClient udp.UdpClient trainControlUdpClient udp.UdpClient
state tpapi.ThirdPartyApiServiceState
udpDelayRecorder *tpapi.UdpDelayRecorder udpDelayRecorder *tpapi.UdpDelayRecorder
baseUrl string baseUrl string
@ -81,6 +82,18 @@ type dynamics struct {
runConfig *config.DynamicsConfig runConfig *config.DynamicsConfig
} }
func (d *dynamics) updateState(state tpapi.ThirdPartyApiServiceState) {
d.state = state
}
func (d *dynamics) State() tpapi.ThirdPartyApiServiceState {
return d.state
}
func (d *dynamics) Name() string {
return Name
}
// 解码列车信息并处理 // 解码列车信息并处理
func (d *dynamics) handleDynamicsTrainInfo(b []byte) { func (d *dynamics) handleDynamicsTrainInfo(b []byte) {
d.udpDelayRecorder.RecordInterval() d.udpDelayRecorder.RecordInterval()
@ -227,7 +240,7 @@ func (d *dynamics) Start(manager DynamicsMessageManager) error {
ctx, cancle := context.WithCancel(context.Background()) ctx, cancle := context.WithCancel(context.Background())
go d.sendTurnoutStateTask(ctx) go d.sendTurnoutStateTask(ctx)
d.turnoutTaskCancel = cancle d.turnoutTaskCancel = cancle
d.UpdateState(tpapi.ThirdPartyState_Normal) d.updateState(tpapi.ThirdPartyState_Normal)
d.udpDelayRecorder.Start() d.udpDelayRecorder.Start()
return nil return nil
} }
@ -274,7 +287,7 @@ func (d *dynamics) Stop() {
d.turnoutTaskCancel() d.turnoutTaskCancel()
} }
d.manager = nil d.manager = nil
d.UpdateState(tpapi.ThirdPartyState_Closed) d.updateState(tpapi.ThirdPartyState_Closed)
} }
const ( const (

View File

@ -24,15 +24,16 @@ type Radar struct {
Tail byte Tail byte
} }
type RadarData struct { type RadarData struct {
SourceData byte SourceData byte //接收源数据
valRange byte data uint16 //移位后的数据
valRange byte //数据取值范围
} }
type RadarState struct { type RadarState struct {
SourceState byte SourceState byte //原数据
Model string Model string // 天线模式
SyntheticalState string SyntheticalState string //综合状态
DirState string DirState string //方向状态
Dir string Dir string //方向
} }
func (r *Radar) Decode(data []byte) error { func (r *Radar) Decode(data []byte) error {
@ -70,7 +71,7 @@ func (rd *RadarData) getSumVal() byte {
return rd.SourceData + rd.valRange return rd.SourceData + rd.valRange
} }
func (s *RadarState) parseState() { func (s *RadarState) parseState() {
//第6位 == SW_Mode0, 第7位 == SW_Mode1 //第7位 == SW_Mode0, 第6位 == SW_Mode1
// 11:两个天线和双通道都OK // 11:两个天线和双通道都OK
// 10DRS05_Single-Mode 40度,50度的天线或通道故障 // 10DRS05_Single-Mode 40度,50度的天线或通道故障
// 01DRS05_Single-Mode 50度,40度的天线或通道故障 // 01DRS05_Single-Mode 50度,40度的天线或通道故障
@ -78,7 +79,7 @@ func (s *RadarState) parseState() {
// 模式的工作差别工作在11.模式时效果最好。单模式10或01时可信度下降。 // 模式的工作差别工作在11.模式时效果最好。单模式10或01时可信度下降。
arr := s.getBitsStateArr() arr := s.getBitsStateArr()
s.Model = bitStateStr(arr[6:]) s.Model = bitStateStr(arr[6:])
// 第3位=计算状态位,第4位=信号质量标志位,第5位=Black5out标志位 // 第5位=计算状态位,第4位=信号质量标志位,第3位=Black5out标志位
// 110计算状态高质量 // 110计算状态高质量
// 地面信号反射良好,高精度的信号计算 // 地面信号反射良好,高精度的信号计算
// 100 计算状态,低质量 // 100 计算状态,低质量
@ -98,20 +99,24 @@ func (s *RadarState) parseState() {
s.Dir = bitStateStr(arr[0:1]) s.Dir = bitStateStr(arr[0:1])
} }
func (s *RadarState) getBitsStateArr() []byte { func (s *RadarState) getBitsStateArr() [8]byte {
bits := make([]byte, 8) //bits := make([]byte, 8)
var bits2 [8]byte = [8]byte{}
for i := 0; i < 8; i++ { for i := 0; i < 8; i++ {
bit := s.SourceState >> uint(i) & 1 bit := s.SourceState >> uint(i) & 1
bits[i] = bit bits2[i] = bit
} }
return bits return bits2
} }
func bitStateStr(data []byte) string { func bitStateStr(data []byte) string {
var build = strings.Builder{} var build = strings.Builder{}
for _, d := range data { for i := len(data) - 1; i >= 0; i-- {
build.WriteString(fmt.Sprintf("%v", d)) build.WriteString(fmt.Sprintf("%v", data[i]))
} }
/*for _, d := range data {
build.WriteString(fmt.Sprintf("%v", d))
}*/
return build.String() return build.String()
} }
func (r *Radar) checkTail() bool { func (r *Radar) checkTail() bool {
@ -143,7 +148,7 @@ func readSpeedOrCounter(buf *bytes.Buffer) *RadarData {
}*/ }*/
ss, _ := buf.ReadByte() ss, _ := buf.ReadByte()
limit, _ := buf.ReadByte() limit, _ := buf.ReadByte()
return &RadarData{SourceData: ss, valRange: limit} return &RadarData{SourceData: ss, valRange: limit, data: uint16(ss) << 8}
} }
func readRadarInnerData(buf *bytes.Buffer) (byte, byte) { func readRadarInnerData(buf *bytes.Buffer) (byte, byte) {

47
third_party/message/radar_test.go vendored Normal file
View File

@ -0,0 +1,47 @@
package message
import (
"fmt"
"testing"
)
func TestRadar(t *testing.T) {
data := make([]byte, 0)
data = append(data, radar_head1) //0
data = append(data, radar_head2) //1
data = append(data, 1) //自增
data = append(data, 1) //速度3
data = append(data, 0) //4
data = append(data, 1) //s1 5
data = append(data, 0) //6
data = append(data, 1) //s2 7
data = append(data, 0) //8
data = append(data, 0) //内部使用 9
data = append(data, 0) //内部使用 9
data = append(data, createState())
var sum int
for _, s := range data {
sum += int(s)
}
data = append(data, byte(^sum+1))
radar := &Radar{}
radar.Decode(data)
fmt.Println(radar)
}
func createState() byte {
var b byte = 0
b1 := b | (byte(1) << 7) | (byte(1) << 6) | (byte(1) << 5) | (byte(1) << 4) | (byte(1) << 1) | (byte(1) << 1)
return b1
}
func TestR1(t *testing.T) {
var b byte = 0
fmt.Printf("%08b\n", b)
//6,7位 11
//3,4,5位 011
// 1位 1
//0位 1
b1 := b | (byte(1) << 7) | (byte(1) << 6) | (byte(1) << 5) | (byte(1) << 4) | (byte(1) << 1) | (byte(1) << 0)
fmt.Printf("%08b\n", b1)
fmt.Println(b1)
}

View File

@ -34,12 +34,25 @@ type semiPhysicalTrainImpl struct {
trainControlUdpServer udp.UdpServer trainControlUdpServer udp.UdpServer
trainSpeedInfoUdpClient udp.UdpClient trainSpeedInfoUdpClient udp.UdpClient
state tpapi.ThirdPartyApiServiceState
udpDelayRecorder *tpapi.UdpDelayRecorder udpDelayRecorder *tpapi.UdpDelayRecorder
manager SemiPhysicalMessageManager manager SemiPhysicalMessageManager
runConfig *config.VobcConfig runConfig *config.VobcConfig
} }
func (s *semiPhysicalTrainImpl) updateState(state tpapi.ThirdPartyApiServiceState) {
s.state = state
}
func (s *semiPhysicalTrainImpl) State() tpapi.ThirdPartyApiServiceState {
return s.state
}
func (s *semiPhysicalTrainImpl) Name() string {
return Name
}
func (s *semiPhysicalTrainImpl) handleTrainControlMsg(b []byte) { func (s *semiPhysicalTrainImpl) handleTrainControlMsg(b []byte) {
s.udpDelayRecorder.RecordInterval() s.udpDelayRecorder.RecordInterval()
slog.Debug(fmt.Sprintf("半实物列车控制消息近期消息间隔: %v", s.udpDelayRecorder.GetIntervals())) slog.Debug(fmt.Sprintf("半实物列车控制消息近期消息间隔: %v", s.udpDelayRecorder.GetIntervals()))
@ -63,7 +76,7 @@ func (s *semiPhysicalTrainImpl) Start(manager SemiPhysicalMessageManager) {
// 初始化客户端、服务端 // 初始化客户端、服务端
s.initSemiPhysical() s.initSemiPhysical()
s.manager = manager s.manager = manager
s.UpdateState(tpapi.ThirdPartyState_Normal) s.updateState(tpapi.ThirdPartyState_Normal)
s.udpDelayRecorder.Start() s.udpDelayRecorder.Start()
} }
@ -78,7 +91,7 @@ func (s *semiPhysicalTrainImpl) Stop() {
s.trainSpeedInfoUdpClient.Close() s.trainSpeedInfoUdpClient.Close()
} }
s.manager = nil s.manager = nil
s.UpdateState(tpapi.ThirdPartyState_Closed) s.updateState(tpapi.ThirdPartyState_Closed)
} }
func (s *semiPhysicalTrainImpl) SendTrainControlMessage(info *message.DynamicsTrainInfo) { func (s *semiPhysicalTrainImpl) SendTrainControlMessage(info *message.DynamicsTrainInfo) {
@ -93,9 +106,10 @@ func (s *semiPhysicalTrainImpl) initSemiPhysical() {
s.trainControlUdpServer.Listen() s.trainControlUdpServer.Listen()
} }
var _default SemiPhysicalTrain var _default *semiPhysicalTrainImpl
var initMutex sync.Mutex var initMutex sync.Mutex
const Name = "半实物仿真列车"
const Interval int = 20 const Interval int = 20
func Default() SemiPhysicalTrain { func Default() SemiPhysicalTrain {
@ -103,12 +117,11 @@ func Default() SemiPhysicalTrain {
defer initMutex.Unlock() defer initMutex.Unlock()
if _default == nil { if _default == nil {
_default = &semiPhysicalTrainImpl{ _default = &semiPhysicalTrainImpl{
ThirdPartyApiService: tpapi.NewThirdPartyApiService(),
udpDelayRecorder: tpapi.NewUdpDelayRecorder(Interval, func(err error) { udpDelayRecorder: tpapi.NewUdpDelayRecorder(Interval, func(err error) {
if err != nil { if err != nil {
_default.UpdateState(tpapi.ThirdPartyState_Broken) _default.updateState(tpapi.ThirdPartyState_Broken)
} else { } else {
_default.UpdateState(tpapi.ThirdPartyState_Normal) _default.updateState(tpapi.ThirdPartyState_Normal)
} }
}), }),
} }

View File

@ -1,24 +1,56 @@
package third_party package third_party
import ( import (
"log/slog"
"joylink.club/bj-rtsts-server/dto/state_proto" "joylink.club/bj-rtsts-server/dto/state_proto"
"joylink.club/bj-rtsts-server/third_party/dynamics" "joylink.club/bj-rtsts-server/third_party/dynamics"
"joylink.club/bj-rtsts-server/third_party/semi_physical_train"
"joylink.club/bj-rtsts-server/third_party/tpapi" "joylink.club/bj-rtsts-server/third_party/tpapi"
) )
func GetRunningServiceStates(t state_proto.SimulationThirdPartyApiService_Type) *state_proto.SimulationThirdPartyApiService { var tpapiService []tpapi.ThirdPartyApiService
func Init() {
tpapiService = append(
tpapiService, dynamics.Default(),
semi_physical_train.Default(),
)
}
func GetAllServices() []tpapi.ThirdPartyApiService {
return tpapiService
}
func convertServiceName(name string) state_proto.SimulationThirdPartyApiService_Type {
switch name {
case dynamics.Name:
return state_proto.SimulationThirdPartyApiService_Dynamics
case semi_physical_train.Name:
return state_proto.SimulationThirdPartyApiService_SemiPhysicalTrain
}
return state_proto.SimulationThirdPartyApiService_Undefined
}
func GetRunningServiceStates() *state_proto.SimulationThirdPartyApiService {
ss := &state_proto.SimulationThirdPartyApiService{} ss := &state_proto.SimulationThirdPartyApiService{}
switch dynamics.Default().State() { for _, tpas := range tpapiService {
case tpapi.ThirdPartyState_Normal: t := convertServiceName(tpas.Name())
ss.States = append(ss.States, &state_proto.SimulationThirdPartyApiServiceState{ if t == state_proto.SimulationThirdPartyApiService_Undefined {
Type: state_proto.SimulationThirdPartyApiService_Dynamics, slog.Error("未知的第三方接口服务类型", "name", tpas.Name())
State: state_proto.SimulationThirdPartyApiService_Normal, }
}) switch tpas.State() {
case tpapi.ThirdPartyState_Broken: case tpapi.ThirdPartyState_Normal:
ss.States = append(ss.States, &state_proto.SimulationThirdPartyApiServiceState{ ss.States = append(ss.States, &state_proto.SimulationThirdPartyApiServiceState{
Type: state_proto.SimulationThirdPartyApiService_Dynamics, Type: t,
State: state_proto.SimulationThirdPartyApiService_Error, State: state_proto.SimulationThirdPartyApiService_Normal,
}) })
case tpapi.ThirdPartyState_Broken:
ss.States = append(ss.States, &state_proto.SimulationThirdPartyApiServiceState{
Type: t,
State: state_proto.SimulationThirdPartyApiService_Error,
})
}
} }
return ss return ss
} }

View File

@ -20,29 +20,28 @@ const (
// 第三方对接接口服务 // 第三方对接接口服务
type ThirdPartyApiService interface { type ThirdPartyApiService interface {
Name() string
// 服务状态 // 服务状态
State() ThirdPartyApiServiceState State() ThirdPartyApiServiceState
// 更新状态
UpdateState(state ThirdPartyApiServiceState)
} }
func NewThirdPartyApiService() ThirdPartyApiService { // func NewThirdPartyApiService() ThirdPartyApiService {
return &thirdPartyApiService{ // return &thirdPartyApiService{
state: ThirdPartyState_Closed, // state: ThirdPartyState_Closed,
} // }
} // }
type thirdPartyApiService struct { // type thirdPartyApiService struct {
state ThirdPartyApiServiceState // state ThirdPartyApiServiceState
} // }
func (s *thirdPartyApiService) UpdateState(state ThirdPartyApiServiceState) { // func (s *thirdPartyApiService) UpdateState(state ThirdPartyApiServiceState) {
s.state = state // s.state = state
} // }
func (s *thirdPartyApiService) State() ThirdPartyApiServiceState { // func (s *thirdPartyApiService) State() ThirdPartyApiServiceState {
return s.state // return s.state
} // }
// UDP通信延时记录 // UDP通信延时记录
type UdpDelayRecorder struct { type UdpDelayRecorder struct {
@ -59,7 +58,7 @@ type UdpDelayRecorder struct {
// 长时间未收到消息或恢复处理回调 // 长时间未收到消息或恢复处理回调
handler func(err error) handler func(err error)
cancelFunc func() cancelFunc context.CancelFunc
done chan struct{} // 服务协程退出信号 done chan struct{} // 服务协程退出信号
} }
@ -110,6 +109,9 @@ func (r *UdpDelayRecorder) checkTimeout(ctx context.Context) {
func (r *UdpDelayRecorder) Stop() { func (r *UdpDelayRecorder) Stop() {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
if r.cancelFunc == nil {
return
}
r.cancelFunc() r.cancelFunc()
<-r.done <-r.done
r.lastReceiedTime = 0 r.lastReceiedTime = 0

View File

@ -46,8 +46,6 @@ type VerifySimulation struct {
uidMap map[string]*elementIdStructure uidMap map[string]*elementIdStructure
// 运行环境配置 // 运行环境配置
runConfig *config.ThridPartyConfig runConfig *config.ThridPartyConfig
// 运行状态
runState state_proto.SimulationStatus_SimulationState
} }
// 轨旁仿真内存模型 // 轨旁仿真内存模型
@ -102,7 +100,6 @@ func CreateSimulation(projectId int32, mapIds []int32, runConfig *dto.ProjectRun
verifySimulation := &VerifySimulation{ verifySimulation := &VerifySimulation{
ProjectId: projectId, ProjectId: projectId,
MapIds: mapIds, MapIds: mapIds,
runState: state_proto.SimulationStatus_PAUSE,
} }
// 设置运行环境 // 设置运行环境
err := verifySimulation.initRunConfig(runConfig) err := verifySimulation.initRunConfig(runConfig)
@ -117,25 +114,25 @@ func CreateSimulation(projectId int32, mapIds []int32, runConfig *dto.ProjectRun
return verifySimulation, nil return verifySimulation, nil
} }
// 启动 // // 启动
func (s *VerifySimulation) Start() { // func (s *VerifySimulation) Start() {
s.runState = state_proto.SimulationStatus_START // s.runState = state_proto.SimulationStatus_START
} // }
// 销毁 // // 销毁
func (s *VerifySimulation) Destroy() { // func (s *VerifySimulation) Destroy() {
s.runState = state_proto.SimulationStatus_DESTROY // s.runState = state_proto.SimulationStatus_DESTROY
} // }
// 暂停 // // 暂停
func (s *VerifySimulation) Pause() { // func (s *VerifySimulation) Pause() {
s.runState = state_proto.SimulationStatus_PAUSE // s.runState = state_proto.SimulationStatus_PAUSE
} // }
// 获取状态 // // 获取状态
func (s *VerifySimulation) GetRunState() state_proto.SimulationStatus_SimulationState { // func (s *VerifySimulation) GetRunState() state_proto.SimulationStatus_SimulationState {
return s.runState // return s.runState
} // }
// 获取仿真世界信息 // 获取仿真世界信息
func (s *VerifySimulation) GetComIdByUid(uid string) uint32 { func (s *VerifySimulation) GetComIdByUid(uid string) uint32 {

View File

@ -63,7 +63,7 @@ func CreateSimulation(projectId int32, mapIds []int32, runConfig *dto.ProjectRun
return "", err return "", err
} }
simulationMap.Store(simulationId, verifySimulation) simulationMap.Store(simulationId, verifySimulation)
verifySimulation.Start() // verifySimulation.Start()
// 全部成功,启动仿真 // 全部成功,启动仿真
verifySimulation.World.StartUp() verifySimulation.World.StartUp()
// 启动仿真消息服务 // 启动仿真消息服务
@ -80,12 +80,14 @@ func DestroySimulation(simulationId string) {
} }
simulationMap.Delete(simulationId) simulationMap.Delete(simulationId)
simulationInfo := s.(*memory.VerifySimulation) simulationInfo := s.(*memory.VerifySimulation)
simulationInfo.Destroy() // simulationInfo.Destroy()
// 停止ecs world // 停止ecs world
simulationInfo.World.Close() simulationInfo.World.Close()
message_server.Close(simulationInfo)
// 发布销毁消息
message_server.PubSimulationDestroyMsg(simulationId)
// 停止第三方 // 停止第三方
stopThirdParty(simulationInfo) stopThirdParty(simulationInfo)
message_server.Close(simulationInfo)
} }
// 创建world // 创建world