rssp axle test

This commit is contained in:
xzb 2023-11-14 17:57:54 +08:00
parent 3f89c45891
commit 9595b11f52
7 changed files with 257 additions and 44 deletions

View File

@ -81,6 +81,7 @@ func (s *rsspAxle) Stop() {
s.messageManager = nil
}
func (s *rsspAxle) periodRun(runContext context.Context) {
time.Sleep(2 * time.Second)
defer func() {
if e := recover(); e != nil {
slog.Error(fmt.Sprintf("[%s-%s-%s]定时发送计轴区段状态任务异常", s.city, s.lineId, s.centralizedStation), "error", e, "stack", string(debug.Stack()))
@ -93,6 +94,7 @@ func (s *rsspAxle) periodRun(runContext context.Context) {
return
default:
}
//slog.Debug("计轴设备periodRun")
if s.messageManager == nil {
slog.Warn(fmt.Sprintf("[%s-%s-%s]定时发送计轴区段状态任务因messageManager不存在退出", s.city, s.lineId, s.centralizedStation))
return
@ -107,12 +109,12 @@ func (s *rsspAxle) periodRun(runContext context.Context) {
s.sendStatusMsg(msgPack)
}
} else {
slog.Debug(e.Error())
slog.Warn(e.Error())
}
//
time.Sleep(time.Duration(s.sendingPeriod) * time.Millisecond)
//更新周期性参数
s.rsspChannel.nextPeriod()
s.rsspChannel.NextPeriod()
}
}
@ -121,6 +123,7 @@ func (s *rsspAxle) sendStatusMsg(msg *message.SectionStatusMsgPack) {
data := msg.Encode()
//向主通道发送
if s.rsspChannel != nil {
s.rsspChannel.sendUserData(data)
//slog.Debug("计轴设备发送SectionStatusMsgPack", "区段状态个数", len(msg.Sms), "packLen", len(data))
s.rsspChannel.SendUserData(data)
}
}

View File

@ -3,6 +3,7 @@ package axle_device
import (
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/third_party/message"
"log/slog"
)
//联锁集中站计轴与联锁通信管理
@ -20,11 +21,14 @@ var allRsspAxleServices []RsspAxle
func StartLineAllRsspAxleServices(ram AxleMessageManager) {
allRsspAxleServices = nil
cfgs := ram.GetLineAllRsspAxleCfgs()
cfgs = getTestConfig() //测试用
///////////////////////////////////////
for _, cfg := range cfgs {
if cfg.Open {
as := InitRsspAxle(&cfg)
allRsspAxleServices = append(allRsspAxleServices, as)
as.Start(ram)
slog.Debug("启动计轴设备", "city", cfg.City, "lineId", cfg.LineId, "CentralizedStation", cfg.CentralizedStation)
}
}
}
@ -33,3 +37,30 @@ func StopLineAllRsspAxleServices() {
as.Stop()
}
}
// 测试用
func getTestConfig() []config.RsspAxleConfig {
cfg := &config.RsspAxleConfig{}
cfg.Open = true
cfg.City = "北京"
cfg.LineId = "12"
cfg.CentralizedStation = "酒仙桥"
cfg.RsspCfg.SrcAddr = 0x02
cfg.RsspCfg.DstAddr = 0x01
cfg.RsspCfg.DataVer1 = 0x0011
cfg.RsspCfg.DataVer2 = 0x0012
cfg.RsspCfg.SID1 = 0x10000000
cfg.RsspCfg.SID2 = 0x00000001
cfg.RsspCfg.SINIT1 = 0x01
cfg.RsspCfg.SINIT2 = 0x02
cfg.RsspCfg.SendingPeriod = 500
cfg.RsspCfg.SsrRsspTimeout = 3
cfg.RsspCfg.Mtv = 3
cfg.RsspCfg.DeviceA = false
cfg.RsspCfg.PicType = message.PIC_MASTER
cfg.RsspCfg.RemoteIp = "192.168.3.5"
cfg.RsspCfg.RemoteUdpPort = 7777
cfg.RsspCfg.LocalUdpPort = 6666
//
return []config.RsspAxleConfig{*cfg}
}

View File

@ -41,6 +41,9 @@ type RsspChannel struct {
sendSseRecord *SseFireRecord
}
func (s *RsspChannel) SetRcvUserDataCallback(handleUserData HandleUserData) {
s.handleUserData = handleUserData
}
func (s *RsspChannel) Init(config *config.RsspConfig) *RsspChannel {
s.config = config
s.rsspTimer = &RsspTimer{t: 0}
@ -102,6 +105,7 @@ func (s *SseFireRecord) hasRecord() bool {
// 处理接收到的rssp报文
// 注意本函数由udp socket 协程执行
func (s *RsspChannel) handleRsspMsg(pack []byte) {
slog.Debug("接收到RSSP报文", "len", len(pack))
//报文头校验
head := &message.RsspHead{}
if !head.Parse(pack) { //解析报文头失败
@ -146,6 +150,7 @@ func (s *RsspChannel) handleRsspMsg(pack []byte) {
// 处理接收到的实时安全数据 RSD
func (s *RsspChannel) handleRsspRsd(rsd *message.RsspRsd) {
//slog.Debug("接收到的实时安全数据 RSD")
//如果为备机发送来的安全数据
if s.config.PicType == message.PIC_SLAVE { //备安全通道
slog.Debug("丢弃接收的RSSP-RSD报文舍弃在备安全通道中接收到的安全数据")
@ -220,12 +225,14 @@ func (s *RsspChannel) handleRsspSsr(ssr *message.RsspSsr) {
slog.Debug("丢弃接收的RSSP-SSR报文SSR与SSE不对应")
return
}
//恢复时序
//todo
//恢复时序?
s.rcvSn = ssr.SrSn
s.rcvCh1Ts = ssr.Tic1 ^ s.sendSseRecord.send.SeqEnq1 ^ s.config.SID1 ^ s.config.DataVer1
s.rcvCh2Ts = ssr.Tic2 ^ s.sendSseRecord.send.SeqEnq2 ^ s.config.SID2 ^ s.config.DataVer2
}
// 刷新与周期有关的:将序列号和时间戳更新到下一个值
func (s *RsspChannel) nextPeriod() {
// NextPeriod 刷新与周期有关的:将序列号和时间戳更新到下一个值
func (s *RsspChannel) NextPeriod() {
s.sn.GetAndAdd()
s.ch1Ts.GetAndMove()
s.ch2Ts.GetAndMove()
@ -269,8 +276,8 @@ func (s *RsspChannel) sendSse() *message.RsspSse {
return sse
}
// 发送用户数据即通过rssp安全通道发送应用层数据通过rssp的RSD报文发送
func (s *RsspChannel) sendUserData(userData []byte) {
// SendUserData 发送用户数据即通过rssp安全通道发送应用层数据通过rssp的RSD报文发送
func (s *RsspChannel) SendUserData(userData []byte) {
rsd := &message.RsspRsd{}
rsd.Pic = s.config.PicType
if s.config.DeviceA {

155
third_party/example/rssp/ci/ci_server.go vendored Normal file
View File

@ -0,0 +1,155 @@
package ci
import (
"context"
"fmt"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/third_party/axle_device"
"joylink.club/bj-rtsts-server/third_party/message"
"sort"
"sync"
"time"
)
type CiServer interface {
Start()
Stop()
//SendSectionReset 向计轴设备发送计轴命令
SendSectionReset(sectionId string, drst bool, pdrst bool)
//HandleSectionStatus 收到来自计轴设备的物理区段状态
HandleSectionStatus(status []*message.SectionStatusMsg)
}
// 北京12号线酒仙桥集中站物理区段码表
var codePointMap = map[int]string{0: "北京_12_酒仙桥_9G", 1: "北京_12_酒仙桥_1DG", 2: "北京_12_酒仙桥_11G", 3: "北京_12_酒仙桥_13G", 4: "北京_12_酒仙桥_15G", 5: "北京_12_酒仙桥_6G", 6: "北京_12_酒仙桥_8G", 7: "北京_12_酒仙桥_2DG", 8: "北京_12_酒仙桥_10G", 9: "北京_12_酒仙桥_12G", 10: "北京_12_酒仙桥_14G"}
var idRowMap = make(map[string]int)
var sectionCmds []*message.SectionCmdMsg
var cmdsLock = sync.Mutex{}
// rssp 测试
type ciServer struct {
//所属城市
city string
//所属线路
lineId string
//所属集中站
centralizedStation string
//接收方每个安全通信会话对应的发送周期值,单位ms
sendingPeriod uint32
//主安全通道
rsspChannel *axle_device.RsspChannel
//
cancel context.CancelFunc
}
func (s *ciServer) HandleSectionStatus(status []*message.SectionStatusMsg) {
if len(codePointMap) != len(status) {
fmt.Println("==>>接收到的区段状态与码表不对应:", "codePointsLen=", len(codePointMap), ",statusLen=", len(status))
return
}
fmt.Println("==>>接收到的区段状态----------------------------------------------------------------------------i")
for row, state := range status {
sectionId := codePointMap[row]
fmt.Printf("==>>[%d]区段[%s]状态: Clr=%t Occ=%t Rac=%t Rjo=%t Rjt=%t\n", row, sectionId, state.Clr, state.Occ, state.Rac, state.Rjo, state.Rjt)
}
fmt.Println("==>>接收到的区段状态----------------------------------------------------------------------------o")
}
func (s *ciServer) periodRun(runContext context.Context) {
defer func() {
fmt.Println("==>>CI Server 周期运行退出 ...")
}()
for {
select {
case <-runContext.Done():
return
default:
}
s.doSendSectionCmdMsg()
time.Sleep(time.Duration(s.sendingPeriod) * time.Millisecond)
//更新周期性参数
s.rsspChannel.NextPeriod()
}
}
func (s *ciServer) doSendSectionCmdMsg() {
if s.rsspChannel != nil {
data := s.collectSectionCmdsData()
fmt.Println("==>>CI发送区段指令数据", ",len=", len(data))
s.rsspChannel.SendUserData(data)
}
}
func (s *ciServer) collectSectionCmdsData() []byte {
defer func() {
cmdsLock.Unlock()
}()
cmdsLock.Lock()
//
data := (&message.SectionCmdMsgPack{Ck: 0x80, Scs: sectionCmds}).Encode()
return data
}
// SendSectionReset 发送区段复位指令
func (s *ciServer) SendSectionReset(sectionId string, drst bool, pdrst bool) {
defer func() {
cmdsLock.Unlock()
}()
cmdsLock.Lock()
//
sec := sectionCmds[idRowMap[sectionId]]
sec.Drst = drst
sec.Pdrst = pdrst
}
type rowId struct {
row int
id string
}
func (s *ciServer) Start() {
s.rsspChannel.SetRcvUserDataCallback(s.rcvUserData)
s.rsspChannel.Start()
ctx, cancel := context.WithCancel(context.Background())
go s.periodRun(ctx)
s.cancel = cancel
}
func (s *ciServer) Stop() {
if s.rsspChannel != nil {
s.rsspChannel.Stop()
s.rsspChannel = nil
}
if s.cancel != nil {
s.cancel()
}
}
func (s *ciServer) rcvUserData(data []byte) {
fmt.Println("==>>CI接收到区段状态数据", ",len=", len(data))
msg := &message.SectionStatusMsgPack{}
msg.Decode(data)
s.HandleSectionStatus(msg.Sms)
}
func NewRsspCiServer(cfg *config.RsspAxleConfig) CiServer {
ra := &ciServer{}
//
ra.city = cfg.City
ra.lineId = cfg.LineId
ra.centralizedStation = cfg.CentralizedStation
ra.sendingPeriod = cfg.RsspCfg.SendingPeriod
//
mrc := &axle_device.RsspChannel{}
ra.rsspChannel = mrc.Init(&cfg.RsspCfg)
//
return ra
}
func InitCiServerCodePoints() {
var ris []*rowId
for row, id := range codePointMap {
idRowMap[id] = row
ris = append(ris, &rowId{row: row, id: id})
}
sort.SliceStable(ris, func(i, j int) bool {
return ris[i].row < ris[j].row
})
for range ris {
sectionCmds = append(sectionCmds, &message.SectionCmdMsg{Drst: false, Pdrst: false})
}
}

View File

@ -1,46 +1,45 @@
package main
import (
"fmt"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/third_party/axle_device"
"joylink.club/bj-rtsts-server/third_party/example/rssp/ci"
"joylink.club/bj-rtsts-server/third_party/message"
"time"
)
type CiServer interface {
Start() error
Stop()
}
type CiMessageManager interface {
//SendSectionCmdMsg 向计轴设备发送计轴命令
SendSectionCmdMsg(cmdMsg *message.SectionCmdMsgPack)
//HandleSectionStatus 收到来自计轴设备的物理区段状态
HandleSectionStatus(status []*message.SectionStatusMsg)
}
// rssp 测试
type ciServer struct {
//所属城市
city string
//所属线路
lineId string
//所属集中站
centralizedStation string
//计轴区段码表
//key-行号value-区段id
sectionCodes map[int]string
//主安全通道
rsspChannel *axle_device.RsspChannel
}
func initRsspCiServer(cfg *config.RsspAxleConfig) any {
ra := &ciServer{}
func main() {
message.InitRsspCrcTable()
ci.InitCiServerCodePoints()
//
ra.city = cfg.City
ra.lineId = cfg.LineId
ra.centralizedStation = cfg.CentralizedStation
cfg := &config.RsspAxleConfig{}
cfg.City = "北京"
cfg.LineId = "12"
cfg.CentralizedStation = "酒仙桥"
cfg.RsspCfg.SrcAddr = 0x01
cfg.RsspCfg.DstAddr = 0x02
cfg.RsspCfg.DataVer1 = 0x0011
cfg.RsspCfg.DataVer2 = 0x0012
cfg.RsspCfg.SID1 = 0x10000000
cfg.RsspCfg.SID2 = 0x00000001
cfg.RsspCfg.SINIT1 = 0x01
cfg.RsspCfg.SINIT2 = 0x02
cfg.RsspCfg.SendingPeriod = 500
cfg.RsspCfg.SsrRsspTimeout = 3
cfg.RsspCfg.Mtv = 3
cfg.RsspCfg.DeviceA = true
cfg.RsspCfg.PicType = message.PIC_MASTER
cfg.RsspCfg.RemoteIp = "192.168.3.5"
cfg.RsspCfg.RemoteUdpPort = 6666
cfg.RsspCfg.LocalUdpPort = 7777
//
mrc := &axle_device.RsspChannel{}
ra.rsspChannel = mrc.Init(&cfg.RsspCfg)
//
return ra
ciServer := ci.NewRsspCiServer(cfg)
ciServer.Start()
fmt.Println("==>>ci server ...")
for {
time.Sleep(2 * time.Second)
ciServer.SendSectionReset("北京_12_酒仙桥_9G", true, false)
}
//
time.Sleep(3600 * time.Second)
}

View File

@ -169,6 +169,7 @@ func (s *VerifySimulation) CollectSectionStatus(city string, lineId string, cent
if len(codePoints) <= 0 {
return nil, fmt.Errorf("没有找到GetCentralizedStationRef[%s]的区段码表为空", stationUid)
}
//slog.Debug("收集计轴区段状态", "区段码表len", len(codePoints))
//
var msg []*message.SectionStatusMsg
var axleSectionIds []string
@ -176,10 +177,12 @@ func (s *VerifySimulation) CollectSectionStatus(city string, lineId string, cent
axleSectionIds = append(axleSectionIds, section.SectionId)
}
//
//slog.Debug("收集计轴区段状态", "计轴区段len", len(axleSectionIds), "axleSectionIds", axleSectionIds)
as, e := fi.FindAxleSectionsStatus(s.World, axleSectionIds)
if e != nil { //从仿真中收集计轴区段状态的失败列表
return nil, e
}
//slog.Debug("收集计轴区段状态", "仿真中计轴状态len", len(as))
//
stateMap := make(map[string]*fi.AxleSectionState)
for _, a := range as {
@ -205,6 +208,7 @@ func (s *VerifySimulation) CollectSectionStatus(city string, lineId string, cent
}
}
//
//slog.Debug("收集计轴区段状态", "区段计轴msgLen", len(msg))
return msg, nil
}

View File

@ -62,6 +62,20 @@ func CreateSimulation(projectId int32, mapIds []int32, runConfig *dto.ProjectRun
verifySimulation.World.StartUp()
// 启动仿真消息服务
message_server.Start(verifySimulation)
/*
//测试用
cps := verifySimulation.GetSectionCodePoints("北京", "12", "酒仙桥")
if len(cps) > 0 {
sort.SliceStable(cps, func(i, j int) bool {
return cps[i].Row < cps[j].Row
})
}
sb := strings.Builder{}
for row, cp := range cps {
sb.WriteString(fmt.Sprintf("%d:\"%s\",", row, cp.SectionId))
}
fmt.Println("====================@@@@@@@>>>>>>>>", sb.String())
*/
}
return simulationId, nil
}