From 9595b11f52ccb90c7473e0ae8ca9db68bed90306 Mon Sep 17 00:00:00 2001 From: xzb <223@qq.com> Date: Tue, 14 Nov 2023 17:57:54 +0800 Subject: [PATCH] rssp axle test --- third_party/axle_device/rssp_axle.go | 9 +- third_party/axle_device/rssp_axle_manage.go | 31 ++++ third_party/axle_device/rssp_channel.go | 19 ++- third_party/example/rssp/ci/ci_server.go | 155 ++++++++++++++++++ third_party/example/rssp/main.go | 69 ++++---- .../wayside/memory/wayside_simulation.go | 4 + ts/test_simulation_manage.go | 14 ++ 7 files changed, 257 insertions(+), 44 deletions(-) create mode 100644 third_party/example/rssp/ci/ci_server.go diff --git a/third_party/axle_device/rssp_axle.go b/third_party/axle_device/rssp_axle.go index 71862bb..236e572 100644 --- a/third_party/axle_device/rssp_axle.go +++ b/third_party/axle_device/rssp_axle.go @@ -81,6 +81,7 @@ func (s *rsspAxle) Stop() { s.messageManager = nil } func (s *rsspAxle) periodRun(runContext context.Context) { + time.Sleep(2 * time.Second) defer func() { if e := recover(); e != nil { slog.Error(fmt.Sprintf("[%s-%s-%s]定时发送计轴区段状态任务异常", s.city, s.lineId, s.centralizedStation), "error", e, "stack", string(debug.Stack())) @@ -93,6 +94,7 @@ func (s *rsspAxle) periodRun(runContext context.Context) { return default: } + //slog.Debug("计轴设备periodRun") if s.messageManager == nil { slog.Warn(fmt.Sprintf("[%s-%s-%s]定时发送计轴区段状态任务因messageManager不存在退出", s.city, s.lineId, s.centralizedStation)) return @@ -107,12 +109,12 @@ func (s *rsspAxle) periodRun(runContext context.Context) { s.sendStatusMsg(msgPack) } } else { - slog.Debug(e.Error()) + slog.Warn(e.Error()) } // time.Sleep(time.Duration(s.sendingPeriod) * time.Millisecond) //更新周期性参数 - s.rsspChannel.nextPeriod() + s.rsspChannel.NextPeriod() } } @@ -121,6 +123,7 @@ func (s *rsspAxle) sendStatusMsg(msg *message.SectionStatusMsgPack) { data := msg.Encode() //向主通道发送 if s.rsspChannel != nil { - s.rsspChannel.sendUserData(data) + //slog.Debug("计轴设备发送SectionStatusMsgPack", "区段状态个数", len(msg.Sms), "packLen", len(data)) + s.rsspChannel.SendUserData(data) } } diff --git a/third_party/axle_device/rssp_axle_manage.go b/third_party/axle_device/rssp_axle_manage.go index ad5f6ab..d8aa1e5 100644 --- a/third_party/axle_device/rssp_axle_manage.go +++ b/third_party/axle_device/rssp_axle_manage.go @@ -3,6 +3,7 @@ package axle_device import ( "joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/third_party/message" + "log/slog" ) //联锁集中站计轴与联锁通信管理 @@ -20,11 +21,14 @@ var allRsspAxleServices []RsspAxle func StartLineAllRsspAxleServices(ram AxleMessageManager) { allRsspAxleServices = nil cfgs := ram.GetLineAllRsspAxleCfgs() + cfgs = getTestConfig() //测试用 + /////////////////////////////////////// for _, cfg := range cfgs { if cfg.Open { as := InitRsspAxle(&cfg) allRsspAxleServices = append(allRsspAxleServices, as) as.Start(ram) + slog.Debug("启动计轴设备", "city", cfg.City, "lineId", cfg.LineId, "CentralizedStation", cfg.CentralizedStation) } } } @@ -33,3 +37,30 @@ func StopLineAllRsspAxleServices() { as.Stop() } } + +// 测试用 +func getTestConfig() []config.RsspAxleConfig { + cfg := &config.RsspAxleConfig{} + cfg.Open = true + cfg.City = "北京" + cfg.LineId = "12" + cfg.CentralizedStation = "酒仙桥" + cfg.RsspCfg.SrcAddr = 0x02 + cfg.RsspCfg.DstAddr = 0x01 + cfg.RsspCfg.DataVer1 = 0x0011 + cfg.RsspCfg.DataVer2 = 0x0012 + cfg.RsspCfg.SID1 = 0x10000000 + cfg.RsspCfg.SID2 = 0x00000001 + cfg.RsspCfg.SINIT1 = 0x01 + cfg.RsspCfg.SINIT2 = 0x02 + cfg.RsspCfg.SendingPeriod = 500 + cfg.RsspCfg.SsrRsspTimeout = 3 + cfg.RsspCfg.Mtv = 3 + cfg.RsspCfg.DeviceA = false + cfg.RsspCfg.PicType = message.PIC_MASTER + cfg.RsspCfg.RemoteIp = "192.168.3.5" + cfg.RsspCfg.RemoteUdpPort = 7777 + cfg.RsspCfg.LocalUdpPort = 6666 + // + return []config.RsspAxleConfig{*cfg} +} diff --git a/third_party/axle_device/rssp_channel.go b/third_party/axle_device/rssp_channel.go index 4b45bcd..cfcd442 100644 --- a/third_party/axle_device/rssp_channel.go +++ b/third_party/axle_device/rssp_channel.go @@ -41,6 +41,9 @@ type RsspChannel struct { sendSseRecord *SseFireRecord } +func (s *RsspChannel) SetRcvUserDataCallback(handleUserData HandleUserData) { + s.handleUserData = handleUserData +} func (s *RsspChannel) Init(config *config.RsspConfig) *RsspChannel { s.config = config s.rsspTimer = &RsspTimer{t: 0} @@ -102,6 +105,7 @@ func (s *SseFireRecord) hasRecord() bool { // 处理接收到的rssp报文 // 注意本函数由udp socket 协程执行 func (s *RsspChannel) handleRsspMsg(pack []byte) { + slog.Debug("接收到RSSP报文", "len", len(pack)) //报文头校验 head := &message.RsspHead{} if !head.Parse(pack) { //解析报文头失败 @@ -146,6 +150,7 @@ func (s *RsspChannel) handleRsspMsg(pack []byte) { // 处理接收到的实时安全数据 RSD func (s *RsspChannel) handleRsspRsd(rsd *message.RsspRsd) { + //slog.Debug("接收到的实时安全数据 RSD") //如果为备机发送来的安全数据 if s.config.PicType == message.PIC_SLAVE { //备安全通道 slog.Debug("丢弃接收的RSSP-RSD报文:舍弃在备安全通道中接收到的安全数据") @@ -220,12 +225,14 @@ func (s *RsspChannel) handleRsspSsr(ssr *message.RsspSsr) { slog.Debug("丢弃接收的RSSP-SSR报文:SSR与SSE不对应") return } - //恢复时序 - //todo + //恢复时序? + s.rcvSn = ssr.SrSn + s.rcvCh1Ts = ssr.Tic1 ^ s.sendSseRecord.send.SeqEnq1 ^ s.config.SID1 ^ s.config.DataVer1 + s.rcvCh2Ts = ssr.Tic2 ^ s.sendSseRecord.send.SeqEnq2 ^ s.config.SID2 ^ s.config.DataVer2 } -// 刷新与周期有关的:将序列号和时间戳更新到下一个值 -func (s *RsspChannel) nextPeriod() { +// NextPeriod 刷新与周期有关的:将序列号和时间戳更新到下一个值 +func (s *RsspChannel) NextPeriod() { s.sn.GetAndAdd() s.ch1Ts.GetAndMove() s.ch2Ts.GetAndMove() @@ -269,8 +276,8 @@ func (s *RsspChannel) sendSse() *message.RsspSse { return sse } -// 发送用户数据,即通过rssp安全通道发送应用层数据,通过rssp的RSD报文发送 -func (s *RsspChannel) sendUserData(userData []byte) { +// SendUserData 发送用户数据,即通过rssp安全通道发送应用层数据,通过rssp的RSD报文发送 +func (s *RsspChannel) SendUserData(userData []byte) { rsd := &message.RsspRsd{} rsd.Pic = s.config.PicType if s.config.DeviceA { diff --git a/third_party/example/rssp/ci/ci_server.go b/third_party/example/rssp/ci/ci_server.go new file mode 100644 index 0000000..64736af --- /dev/null +++ b/third_party/example/rssp/ci/ci_server.go @@ -0,0 +1,155 @@ +package ci + +import ( + "context" + "fmt" + "joylink.club/bj-rtsts-server/config" + "joylink.club/bj-rtsts-server/third_party/axle_device" + "joylink.club/bj-rtsts-server/third_party/message" + "sort" + "sync" + "time" +) + +type CiServer interface { + Start() + Stop() + //SendSectionReset 向计轴设备发送计轴命令 + SendSectionReset(sectionId string, drst bool, pdrst bool) + //HandleSectionStatus 收到来自计轴设备的物理区段状态 + HandleSectionStatus(status []*message.SectionStatusMsg) +} + +// 北京12号线酒仙桥集中站物理区段码表 +var codePointMap = map[int]string{0: "北京_12_酒仙桥_9G", 1: "北京_12_酒仙桥_1DG", 2: "北京_12_酒仙桥_11G", 3: "北京_12_酒仙桥_13G", 4: "北京_12_酒仙桥_15G", 5: "北京_12_酒仙桥_6G", 6: "北京_12_酒仙桥_8G", 7: "北京_12_酒仙桥_2DG", 8: "北京_12_酒仙桥_10G", 9: "北京_12_酒仙桥_12G", 10: "北京_12_酒仙桥_14G"} +var idRowMap = make(map[string]int) +var sectionCmds []*message.SectionCmdMsg +var cmdsLock = sync.Mutex{} + +// rssp 测试 +type ciServer struct { + //所属城市 + city string + //所属线路 + lineId string + //所属集中站 + centralizedStation string + //接收方每个安全通信会话对应的发送周期值,单位ms + sendingPeriod uint32 + //主安全通道 + rsspChannel *axle_device.RsspChannel + // + cancel context.CancelFunc +} + +func (s *ciServer) HandleSectionStatus(status []*message.SectionStatusMsg) { + if len(codePointMap) != len(status) { + fmt.Println("==>>接收到的区段状态与码表不对应:", "codePointsLen=", len(codePointMap), ",statusLen=", len(status)) + return + } + fmt.Println("==>>接收到的区段状态----------------------------------------------------------------------------i") + for row, state := range status { + sectionId := codePointMap[row] + fmt.Printf("==>>[%d]区段[%s]状态: Clr=%t Occ=%t Rac=%t Rjo=%t Rjt=%t\n", row, sectionId, state.Clr, state.Occ, state.Rac, state.Rjo, state.Rjt) + } + fmt.Println("==>>接收到的区段状态----------------------------------------------------------------------------o") +} +func (s *ciServer) periodRun(runContext context.Context) { + defer func() { + fmt.Println("==>>CI Server 周期运行退出 ...") + }() + for { + select { + case <-runContext.Done(): + return + default: + } + s.doSendSectionCmdMsg() + time.Sleep(time.Duration(s.sendingPeriod) * time.Millisecond) + //更新周期性参数 + s.rsspChannel.NextPeriod() + } +} +func (s *ciServer) doSendSectionCmdMsg() { + if s.rsspChannel != nil { + data := s.collectSectionCmdsData() + fmt.Println("==>>CI发送区段指令数据", ",len=", len(data)) + s.rsspChannel.SendUserData(data) + } +} +func (s *ciServer) collectSectionCmdsData() []byte { + defer func() { + cmdsLock.Unlock() + }() + cmdsLock.Lock() + // + data := (&message.SectionCmdMsgPack{Ck: 0x80, Scs: sectionCmds}).Encode() + return data +} + +// SendSectionReset 发送区段复位指令 +func (s *ciServer) SendSectionReset(sectionId string, drst bool, pdrst bool) { + defer func() { + cmdsLock.Unlock() + }() + cmdsLock.Lock() + // + sec := sectionCmds[idRowMap[sectionId]] + sec.Drst = drst + sec.Pdrst = pdrst +} + +type rowId struct { + row int + id string +} + +func (s *ciServer) Start() { + s.rsspChannel.SetRcvUserDataCallback(s.rcvUserData) + s.rsspChannel.Start() + ctx, cancel := context.WithCancel(context.Background()) + go s.periodRun(ctx) + s.cancel = cancel +} +func (s *ciServer) Stop() { + if s.rsspChannel != nil { + s.rsspChannel.Stop() + s.rsspChannel = nil + } + if s.cancel != nil { + s.cancel() + } +} +func (s *ciServer) rcvUserData(data []byte) { + fmt.Println("==>>CI接收到区段状态数据", ",len=", len(data)) + msg := &message.SectionStatusMsgPack{} + msg.Decode(data) + s.HandleSectionStatus(msg.Sms) +} +func NewRsspCiServer(cfg *config.RsspAxleConfig) CiServer { + ra := &ciServer{} + // + ra.city = cfg.City + ra.lineId = cfg.LineId + ra.centralizedStation = cfg.CentralizedStation + ra.sendingPeriod = cfg.RsspCfg.SendingPeriod + // + mrc := &axle_device.RsspChannel{} + ra.rsspChannel = mrc.Init(&cfg.RsspCfg) + // + return ra +} + +func InitCiServerCodePoints() { + var ris []*rowId + for row, id := range codePointMap { + idRowMap[id] = row + ris = append(ris, &rowId{row: row, id: id}) + } + sort.SliceStable(ris, func(i, j int) bool { + return ris[i].row < ris[j].row + }) + for range ris { + sectionCmds = append(sectionCmds, &message.SectionCmdMsg{Drst: false, Pdrst: false}) + } +} diff --git a/third_party/example/rssp/main.go b/third_party/example/rssp/main.go index cc0a987..50d3b5c 100644 --- a/third_party/example/rssp/main.go +++ b/third_party/example/rssp/main.go @@ -1,46 +1,45 @@ package main import ( + "fmt" "joylink.club/bj-rtsts-server/config" - "joylink.club/bj-rtsts-server/third_party/axle_device" + "joylink.club/bj-rtsts-server/third_party/example/rssp/ci" "joylink.club/bj-rtsts-server/third_party/message" + "time" ) -type CiServer interface { - Start() error - Stop() -} -type CiMessageManager interface { - //SendSectionCmdMsg 向计轴设备发送计轴命令 - SendSectionCmdMsg(cmdMsg *message.SectionCmdMsgPack) - //HandleSectionStatus 收到来自计轴设备的物理区段状态 - HandleSectionStatus(status []*message.SectionStatusMsg) -} - -// rssp 测试 -type ciServer struct { - //所属城市 - city string - //所属线路 - lineId string - //所属集中站 - centralizedStation string - //计轴区段码表 - //key-行号,value-区段id - sectionCodes map[int]string - //主安全通道 - rsspChannel *axle_device.RsspChannel -} - -func initRsspCiServer(cfg *config.RsspAxleConfig) any { - ra := &ciServer{} +func main() { + message.InitRsspCrcTable() + ci.InitCiServerCodePoints() // - ra.city = cfg.City - ra.lineId = cfg.LineId - ra.centralizedStation = cfg.CentralizedStation + cfg := &config.RsspAxleConfig{} + cfg.City = "北京" + cfg.LineId = "12" + cfg.CentralizedStation = "酒仙桥" + cfg.RsspCfg.SrcAddr = 0x01 + cfg.RsspCfg.DstAddr = 0x02 + cfg.RsspCfg.DataVer1 = 0x0011 + cfg.RsspCfg.DataVer2 = 0x0012 + cfg.RsspCfg.SID1 = 0x10000000 + cfg.RsspCfg.SID2 = 0x00000001 + cfg.RsspCfg.SINIT1 = 0x01 + cfg.RsspCfg.SINIT2 = 0x02 + cfg.RsspCfg.SendingPeriod = 500 + cfg.RsspCfg.SsrRsspTimeout = 3 + cfg.RsspCfg.Mtv = 3 + cfg.RsspCfg.DeviceA = true + cfg.RsspCfg.PicType = message.PIC_MASTER + cfg.RsspCfg.RemoteIp = "192.168.3.5" + cfg.RsspCfg.RemoteUdpPort = 6666 + cfg.RsspCfg.LocalUdpPort = 7777 // - mrc := &axle_device.RsspChannel{} - ra.rsspChannel = mrc.Init(&cfg.RsspCfg) + ciServer := ci.NewRsspCiServer(cfg) + ciServer.Start() + fmt.Println("==>>ci server ...") + for { + time.Sleep(2 * time.Second) + ciServer.SendSectionReset("北京_12_酒仙桥_9G", true, false) + } // - return ra + time.Sleep(3600 * time.Second) } diff --git a/ts/simulation/wayside/memory/wayside_simulation.go b/ts/simulation/wayside/memory/wayside_simulation.go index 88f535a..27af094 100644 --- a/ts/simulation/wayside/memory/wayside_simulation.go +++ b/ts/simulation/wayside/memory/wayside_simulation.go @@ -169,6 +169,7 @@ func (s *VerifySimulation) CollectSectionStatus(city string, lineId string, cent if len(codePoints) <= 0 { return nil, fmt.Errorf("没有找到GetCentralizedStationRef[%s]的区段码表为空", stationUid) } + //slog.Debug("收集计轴区段状态", "区段码表len", len(codePoints)) // var msg []*message.SectionStatusMsg var axleSectionIds []string @@ -176,10 +177,12 @@ func (s *VerifySimulation) CollectSectionStatus(city string, lineId string, cent axleSectionIds = append(axleSectionIds, section.SectionId) } // + //slog.Debug("收集计轴区段状态", "计轴区段len", len(axleSectionIds), "axleSectionIds", axleSectionIds) as, e := fi.FindAxleSectionsStatus(s.World, axleSectionIds) if e != nil { //从仿真中收集计轴区段状态的失败列表 return nil, e } + //slog.Debug("收集计轴区段状态", "仿真中计轴状态len", len(as)) // stateMap := make(map[string]*fi.AxleSectionState) for _, a := range as { @@ -205,6 +208,7 @@ func (s *VerifySimulation) CollectSectionStatus(city string, lineId string, cent } } // + //slog.Debug("收集计轴区段状态", "区段计轴msgLen", len(msg)) return msg, nil } diff --git a/ts/test_simulation_manage.go b/ts/test_simulation_manage.go index febf782..084a8fc 100644 --- a/ts/test_simulation_manage.go +++ b/ts/test_simulation_manage.go @@ -62,6 +62,20 @@ func CreateSimulation(projectId int32, mapIds []int32, runConfig *dto.ProjectRun verifySimulation.World.StartUp() // 启动仿真消息服务 message_server.Start(verifySimulation) + /* + //测试用 + cps := verifySimulation.GetSectionCodePoints("北京", "12", "酒仙桥") + if len(cps) > 0 { + sort.SliceStable(cps, func(i, j int) bool { + return cps[i].Row < cps[j].Row + }) + } + sb := strings.Builder{} + for row, cp := range cps { + sb.WriteString(fmt.Sprintf("%d:\"%s\",", row, cp.SectionId)) + } + fmt.Println("====================@@@@@@@>>>>>>>>", sb.String()) + */ } return simulationId, nil }