From 731aea8c8d6fc58136254685015b7f89188a9215 Mon Sep 17 00:00:00 2001 From: thesai <1021828630@qq.com> Date: Thu, 4 Jul 2024 13:59:39 +0800 Subject: [PATCH] =?UTF-8?q?[=E9=87=8D=E5=86=99]=E5=8C=97=E4=BA=AC12?= =?UTF-8?q?=E5=8F=B7=E7=BA=BF=E8=AE=A1=E8=BD=B4=E9=80=9A=E4=BF=A1=E4=BA=A4?= =?UTF-8?q?=E4=BA=92=E9=80=BB=E8=BE=91=EF=BC=88=E6=9C=AA=E5=AE=8C=EF=BC=8C?= =?UTF-8?q?=E4=B8=BB=E8=A6=81=E6=98=AF=E5=AE=89=E5=85=A8=E6=A0=A1=E9=AA=8C?= =?UTF-8?q?=E5=9F=9F=E8=AE=A1=E7=AE=97=E9=80=BB=E8=BE=91=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/projectRunConfig.go | 2 +- config/config.go | 68 ++- rts-sim-module | 2 +- rts-sim-testing-message | 2 +- .../axle_device/beijing12/msg/common.go | 61 +++ third_party/axle_device/beijing12/msg/rsd.go | 142 ++++++ third_party/axle_device/beijing12/msg/sse.go | 30 ++ third_party/axle_device/beijing12/msg/ssr.go | 34 ++ third_party/axle_device/beijing12/service.go | 416 ++++++++++++++++++ third_party/axle_device/rssp_axle.go | 129 ------ third_party/axle_device/rssp_axle_manage.go | 65 --- third_party/axle_device/rssp_channel.go | 315 ------------- third_party/example/rssp/ci/ci_server.go | 155 ------- third_party/example/rssp/main.go | 108 +++-- third_party/message/rssp_code.go | 6 +- third_party/message/rssp_code_test.go | 20 + .../wayside/memory/wayside_simulation.go | 14 +- ts/test_simulation_manage.go | 8 +- 18 files changed, 824 insertions(+), 753 deletions(-) create mode 100644 third_party/axle_device/beijing12/msg/common.go create mode 100644 third_party/axle_device/beijing12/msg/rsd.go create mode 100644 third_party/axle_device/beijing12/msg/sse.go create mode 100644 third_party/axle_device/beijing12/msg/ssr.go create mode 100644 third_party/axle_device/beijing12/service.go delete mode 100644 third_party/axle_device/rssp_axle.go delete mode 100644 third_party/axle_device/rssp_axle_manage.go delete mode 100644 third_party/axle_device/rssp_channel.go delete mode 100644 third_party/example/rssp/ci/ci_server.go create mode 100644 third_party/message/rssp_code_test.go diff --git a/api/projectRunConfig.go b/api/projectRunConfig.go index 6fd9fd2..537c952 100644 --- a/api/projectRunConfig.go +++ b/api/projectRunConfig.go @@ -190,7 +190,7 @@ func deleteProjectRunConfig(c *gin.Context) { // @Failure 500 {object} dto.ErrorDto // @Router /api/v1/runconfig/description [get] func getRunCofigDescription(c *gin.Context) { - c.JSON(http.StatusOK, parseRunCofigStruct(&config.ThridPartyConfig{})) + c.JSON(http.StatusOK, parseRunCofigStruct(&config.ThirdPartyConfig{})) } // 解析环境配置结构 diff --git a/config/config.go b/config/config.go index 30d878a..4aac4d9 100644 --- a/config/config.go +++ b/config/config.go @@ -59,12 +59,12 @@ type mqtt struct { // } // 第三方配置结构 -type ThridPartyConfig struct { - Id int32 `json:"id"` - Dynamics DynamicsConfig `json:"dynamics" description:"动力学配置"` - Vobc VobcConfig `json:"vobc" description:"半实物配置"` - Interlocks []InterlockConfig `json:"interlock" description:"联锁配置"` - RsspAxleCfgs []RsspAxleConfig `json:"rsspAxleCfgs" description:"所有联锁集中站计轴RSSP-I配置"` +type ThirdPartyConfig struct { + Id int32 `json:"id"` + Dynamics DynamicsConfig `json:"dynamics" description:"动力学配置"` + Vobc VobcConfig `json:"vobc" description:"半实物配置"` + Interlocks []InterlockConfig `json:"interlock" description:"联锁配置"` + RsspAxleConfig RsspAxleConfig `json:"rsspAxleCfgs" description:"计轴通信配置"` //ElectricMachinery ElectricMachineryConfig `json:"electricMachinery" description:"电机配置"` ElectricMachinerys []ElectricMachineryConfig `json:"electricMachinerys" description:"电机配置"` BtmCanet BtmCanetConfig `json:"btmCanet" description:"BTM关联的网关设备CANET配置"` @@ -151,36 +151,6 @@ type BtmCanetConfig struct { Open bool `json:"open" description:"是否开启"` } -// RsspAxleConfig 计轴区段与联锁安全通信配置 -type RsspAxleConfig struct { - Open bool `json:"open" description:"是否开启"` - City string `json:"city" description:"所属城市"` - LineId string `json:"lineId" description:"所属线路"` - CentralizedStation string `json:"centralizedStation" description:"所属集中站"` - RsspCfg RsspConfig `json:"rsspCfg" description:"安全通道配置"` -} - -// RsspConfig CI系统与计轴设备的安全通信协议配置参数 -// 计轴设备(管理一个集中站的所有计轴器)配置 -type RsspConfig struct { - SrcAddr uint16 `json:"srcAddr" description:"16位源地址,本地地址"` //16位源地址,本地地址 - DstAddr uint16 `json:"dstAddr" description:"16位目的地址,远程地址"` //16位目的地址,远程地址 - DataVer1 uint32 `json:"dataVer1" description:"通道1数据版本"` //通道1数据版本 - DataVer2 uint32 `json:"dataVer2" description:"通道2数据版本"` //通道2数据版本 - SID1 uint32 `json:"sID1" description:"通道1源标识"` //通道1源标识 - SID2 uint32 `json:"sID2" description:"通道2源标识"` //通道2源标识 - SINIT1 uint32 `json:"sINIT1" description:"通道1序列初始"` //通道1序列初始 - SINIT2 uint32 `json:"sINIT2" description:"通道2序列初始"` //通道2序列初始 - SendingPeriod uint32 `json:"sendingPeriod" description:"发送周期值"` //接收方每个安全通信会话对应的发送周期值,单位ms - SsrRsspTimeout uint32 `json:"ssrRsspTimeout" description:"等待SSR回应的定时器超时值"` //等待SSR回应的定时器超时值,为RsspTimer时间,1=SendingPeriod - Mtv uint32 `json:"mtv" description:"最大时序偏差"` //每个安全通信会话可容忍的最大时序偏差,即当前接收的RSD的序列号与上一次RSD的序列号最大允许差值 - Udl uint32 `json:"udl" description:"RSD应用数据长度配置值"` //每个安全通信会话RSD应用数据长度发送和接收的配置值(支持固定长度和可变长度);0-可变长度,大于0即固定长度 - DeviceA bool `json:"deviceA" description:"true-A机;false-B机"` //true-A机;false-B机 - PicType byte `json:"picType" description:"协议交互类别"` //协议交互类别,message.PicType - RemoteIp string `json:"remoteIp" description:"远程服务器ip"` //远程服务器ip - RemoteUdpPort int `json:"remoteUdpPort" description:"远程服务器端口"` //远程服务器端口 - LocalUdpPort int `json:"localUdpPort" description:"本地服务器端口"` //本地服务器端口 -} type VehiclePCSimConfig2 struct { TrainEnds bool `json:"trainEnds" description:"列车端点A"` Open bool `json:"open" description:"是否开启"` @@ -197,9 +167,29 @@ type VehiclePCSimConfig struct { //LocalTestingPort uint32 `json:"localTestingPort" description:"本地测试端口"` } -// CheckAddress 检测目标源地址目的地址是否在配置中 -func (c *RsspConfig) CheckAddress(srcAddr uint16, dstAddr uint16) bool { - return true +// RsspAxleConfig Rssp计轴通信配置 +type RsspAxleConfig struct { + Open bool `json:"open" description:"开启"` + StationCode string `json:"stationCode" description:"集中站编号"` + NetAConfig RsspNetConfig `json:"netAConfig" description:"A网配置"` +} + +// RsspNetConfig 计轴通信配置 +type RsspNetConfig struct { + RemoteIp string `json:"remoteIp" description:"远端IP"` + RemotePort int `json:"remotePort" description:"远端端口"` + LocalPort int `json:"localPort" description:"本地端口"` + SourceAddr string `json:"sourceAddr" description:"源地址(16进制,2字节)"` + TargetAddr string `json:"targetAddr" description:"目的地址(16进制,2字节)"` + Sid1 string `json:"sid1" description:"SID_1(16进制,4字节)"` + Sid2 string `json:"sid2" description:"SID_2(16进制,4字节)"` + Sinit1 string `json:"sinit1" description:"SINIT_1(16进制,4字节)"` + Sinit2 string `json:"sinit2" description:"SINIT_2(16进制,4字节)"` + DataVer1 string `json:"dataVer1" description:"DATAVER_1(16进制,4字节)"` + DataVer2 string `json:"dataVer2" description:"DATAVER_2(16进制,4字节)"` + MaxDeviation uint32 `json:"maxDeviation" description:"可容忍的最大时序偏差"` + WaitSSRTimeout int `json:"waitSSRTimeout" description:"等待SSR回应的定时器超时值(ms)"` + Period int `json:"period" description:"RSD发送周期(ms)"` } /////////////////////////////////////////////////////////////////////////////////////// diff --git a/rts-sim-module b/rts-sim-module index 181dd99..ef7e469 160000 --- a/rts-sim-module +++ b/rts-sim-module @@ -1 +1 @@ -Subproject commit 181dd9951b16a0c1665779567d5d040568382615 +Subproject commit ef7e469175d27182823461972998049692fb425f diff --git a/rts-sim-testing-message b/rts-sim-testing-message index 547b0b1..c0aa3ab 160000 --- a/rts-sim-testing-message +++ b/rts-sim-testing-message @@ -1 +1 @@ -Subproject commit 547b0b1baf218f46e667e98852147a078be884e3 +Subproject commit c0aa3ab5b7e5e4819d4ce0147ef3ae6947b72daf diff --git a/third_party/axle_device/beijing12/msg/common.go b/third_party/axle_device/beijing12/msg/common.go new file mode 100644 index 0000000..d30c12b --- /dev/null +++ b/third_party/axle_device/beijing12/msg/common.go @@ -0,0 +1,61 @@ +package msg + +import ( + "bytes" + "encoding/binary" +) + +const ( + ProtocolType_Sync = 0x01 + ProtocolType_NoSync = 0x02 +) + +const ( + MessageType_A = 0x80 + MessageType_B = 0x81 + MessageType_SSE = 0x90 + MessageType_SSR = 0x91 +) + +const ( + CRC_POLY_1 = 0x100D4E63 //通道1的CRC多项式 + CRC_POLY_2 = 0x8CE56011 //通道2的CRC多项式 +) + +const ( + SCW_1 = 0xAE390B5A //通道1的SCW + SCW_2 = 0xC103589C //通道2的SCW +) + +const ( + T_POLY_1 = 0x0FC22F87 //通道1的时间戳生成多项式 + T_POLY_2 = 0xC3E887E1 //通道2的时间戳生成多项式 +) + +const Twait_sse = 3 //默认sse等待回应的周期数 + +func GetMessageType(data []byte) byte { + return data[1] +} + +type MsgHeader struct { + ProtocolType byte //协议交互类别 + MessageType byte //报文类型 + SourceAddr uint16 //源地址 + TargetAddr uint16 //目的地址 +} + +func (m *MsgHeader) encode() []byte { + var data []byte + data = append(data, m.ProtocolType) + data = append(data, m.MessageType) + data = binary.LittleEndian.AppendUint16(data, m.SourceAddr) + data = binary.LittleEndian.AppendUint16(data, m.TargetAddr) + return data +} + +func (m *MsgHeader) decode(data []byte) error { + buf := bytes.NewBuffer(data) + err := binary.Read(buf, binary.LittleEndian, m) + return err +} diff --git a/third_party/axle_device/beijing12/msg/rsd.go b/third_party/axle_device/beijing12/msg/rsd.go new file mode 100644 index 0000000..38a4254 --- /dev/null +++ b/third_party/axle_device/beijing12/msg/rsd.go @@ -0,0 +1,142 @@ +package msg + +import ( + "bytes" + "encoding/binary" + "joylink.club/bj-rtsts-server/third_party/message" +) + +// 实时安全数据消息 +type RsdMsg struct { + MsgHeader + SeqNum uint32 //序列号 + UserDataLen uint16 //用户数据包字节总数+8 + Svc1 uint32 //CRC1^SID1^T1(N)^SCW1 + Svc2 uint32 //CRC2^SID2^T2(N)^SCW2 + + UserData []byte //用户数据包 + + Tail uint16 //报文尾 CRC16 +} + +func (r *RsdMsg) Encode() []byte { + data := r.MsgHeader.encode() + data = binary.BigEndian.AppendUint32(data, r.SeqNum) + data = binary.BigEndian.AppendUint16(data, r.UserDataLen) + data = binary.BigEndian.AppendUint32(data, r.Svc1) + data = binary.BigEndian.AppendUint32(data, r.Svc2) + data = append(data, r.UserData...) + r.Tail = message.Rssp_I_Crc16(r.UserData) + data = binary.BigEndian.AppendUint16(data, r.Tail) + return data +} + +func (r *RsdMsg) Decode(data []byte) error { + err := r.MsgHeader.decode(data) + if err != nil { + return err + } + buf := bytes.NewBuffer(data[6:]) //去掉报文头的6个字节 + fields := []any{&r.SeqNum, &r.UserDataLen, &r.Svc1, &r.Svc2} + for _, field := range fields { + err := binary.Read(buf, binary.LittleEndian, field) + if err != nil { + return err + } + } + r.UserData = data[len(data)-buf.Len() : len(data)-2] + r.Tail = binary.LittleEndian.Uint16(data[len(data)-2:]) + return nil +} + +// RsdMsgBuilder 用来构建RSD,将无需用户赋值的字段去掉了 +type RsdMsgBuilder struct { + MsgHeader + SeqNum uint32 + Svc1 uint32 + Svc2 uint32 + UserData []byte +} + +func (b *RsdMsgBuilder) Build() *RsdMsg { + return &RsdMsg{ + MsgHeader: b.MsgHeader, + SeqNum: b.SeqNum, + UserDataLen: uint16(len(b.UserData) + 8), + Svc1: b.Svc1, + Svc2: b.Svc2, + UserData: b.UserData, + Tail: message.Rssp_I_Crc16(b.UserData), + } +} + +func (b *RsdMsgBuilder) Encode() []byte { + return b.Build().Encode() +} + +// CmdInfos 来自联锁的数据帧 +type CmdInfos []*cmdInfo + +func (f *CmdInfos) Decode(data []byte) error { + for _, b := range data { + cmdInfo := &cmdInfo{} + cmdInfo.decode(b) + *f = append(*f, cmdInfo) + } + return nil +} + +// StateInfos 发给联锁的数据帧 +type StateInfos []*StateInfo + +func (t StateInfos) Encode() []byte { + var data []byte + for _, info := range t { + data = append(data, info.encode()...) + } + return data +} + +type cmdInfo struct { + PRST bool //不使用 + RST bool //不使用 + DRST bool //直接复位 位索引5 + RRST bool //不使用 + RSTR bool //不使用 + PDRST bool //预复位 位索引2 + PRRST bool //不使用 +} + +func (c *cmdInfo) decode(data byte) { + c.DRST = (data>>5)&1 == 1 + c.PDRST = (data>>2)&1 == 1 +} + +type StateInfo struct { + CLR bool //计轴出清 位索引0-7(0字节的7位) + OCC bool //计轴占用 0-6 + RAC bool //计轴复位反馈 1-6 + RJO bool //运营原因拒绝计轴复位 1-5 + RJT bool //技术原因拒绝计轴复位 1-4 +} + +func (s *StateInfo) encode() []byte { + var b0 byte + var b1 byte + if s.CLR { + b0 = b0 | 1<<7 + } + if s.OCC { + b0 = b0 | 1<<6 + } + if s.RAC { + b1 = b1 | 1<<6 + } + if s.RJO { + b1 = b1 | 1<<5 + } + if s.RJT { + b1 = b1 | 1<<4 + } + return []byte{b0, b1} +} diff --git a/third_party/axle_device/beijing12/msg/sse.go b/third_party/axle_device/beijing12/msg/sse.go new file mode 100644 index 0000000..ca6f7de --- /dev/null +++ b/third_party/axle_device/beijing12/msg/sse.go @@ -0,0 +1,30 @@ +package msg + +import ( + "bytes" + "encoding/binary" + "joylink.club/bj-rtsts-server/third_party/message" +) + +type SseMsg struct { + MsgHeader + SeqNum uint32 // 序列号 + SeqEnq1 uint32 //时序校正请求通道1 SID_1^T_1(NE) + SeqEnq2 uint32 // 时序校正请求通道2 SID_2^T_2(NE) + Tail uint16 //报文位 CRC16 +} + +func (s *SseMsg) Encode() []byte { + data := s.MsgHeader.encode() + data = binary.LittleEndian.AppendUint32(data, s.SeqNum) + data = binary.LittleEndian.AppendUint32(data, s.SeqEnq1) + data = binary.LittleEndian.AppendUint32(data, s.SeqEnq2) + s.Tail = message.Rssp_I_Crc16(data) + data = binary.LittleEndian.AppendUint16(data, s.Tail) + return data +} + +func (s *SseMsg) Decode(data []byte) error { + buf := bytes.NewBuffer(data) + return binary.Read(buf, binary.LittleEndian, s) +} diff --git a/third_party/axle_device/beijing12/msg/ssr.go b/third_party/axle_device/beijing12/msg/ssr.go new file mode 100644 index 0000000..ac3245b --- /dev/null +++ b/third_party/axle_device/beijing12/msg/ssr.go @@ -0,0 +1,34 @@ +package msg + +import ( + "bytes" + "encoding/binary" + "joylink.club/bj-rtsts-server/third_party/message" +) + +type SsrMsg struct { + MsgHeader + SeqNumSsr uint32 // 应答方的序列号 + SeqNumSse uint32 // 请求方的序列号 + SeqInit1 uint32 // 时序初始化通道1 SEQENQ_1^SID_1^T_1(NR)^DATAVER_1 + SeqInit2 uint32 // 时序初始化通道2 SEQENQ_2^SID_2^T_2(NR)^DATAVER_2 + DataVer byte // 数据版本号 预留固定值0x01 + Tail uint16 // 报文位 CRC16 +} + +func (s *SsrMsg) Encode() []byte { + data := s.MsgHeader.encode() + data = binary.LittleEndian.AppendUint32(data, s.SeqNumSsr) + data = binary.LittleEndian.AppendUint32(data, s.SeqNumSse) + data = binary.LittleEndian.AppendUint32(data, s.SeqInit1) + data = binary.LittleEndian.AppendUint32(data, s.SeqInit2) + data = append(data, s.DataVer) + s.Tail = message.Rssp_I_Crc16(data) + data = binary.LittleEndian.AppendUint16(data, s.Tail) + return data +} + +func (s *SsrMsg) Decode(data []byte) error { + buf := bytes.NewBuffer(data) + return binary.Read(buf, binary.LittleEndian, s) +} diff --git a/third_party/axle_device/beijing12/service.go b/third_party/axle_device/beijing12/service.go new file mode 100644 index 0000000..48af7d5 --- /dev/null +++ b/third_party/axle_device/beijing12/service.go @@ -0,0 +1,416 @@ +package beijing12 + +import ( + "context" + "fmt" + "joylink.club/bj-rtsts-server/config" + "joylink.club/bj-rtsts-server/sys_error" + "joylink.club/bj-rtsts-server/third_party/axle_device/beijing12/msg" + "joylink.club/bj-rtsts-server/third_party/message" + "joylink.club/bj-rtsts-server/third_party/udp" + "joylink.club/bj-rtsts-server/ts/simulation/wayside/memory" + "joylink.club/rtsssimulation/component" + "joylink.club/rtsssimulation/entity" + "joylink.club/rtsssimulation/fi" + "joylink.club/rtsssimulation/repository/model/proto" + "log/slog" + "runtime/debug" + "strconv" + "sync" + "time" +) + +var ( //日志 + logTag = "[北京12号线计轴通信]" + privateLogger *slog.Logger + loggerInit sync.Once +) + +// key-集中站编号 +var contextMap = make(map[string]*serviceContext) +var mu = sync.Mutex{} + +type serviceContext struct { + sim *memory.VerifySimulation + config config.RsspAxleConfig + server udp.UdpServer + client udp.UdpClient + cancelFunc context.CancelFunc + ciSectionIndexConfigs []*proto.CiSectionCodePoint + + sourceAddr uint16 //源地址 从配置中的16进制字符串转来的 + targetAddr uint16 //目的地址 从配置中的16进制字符串转来的 + sid1 uint32 //SID1 从配置中的16进制字符串转来的 + sid2 uint32 //SID2 从配置中的16进制字符串转来的 + msgChan <-chan []byte //消息队列 + seqNum uint32 //当前的序列号 + lastSeqNum uint32 //最近一次收到的序列号 + lastTimeSeqParam uint32 //最近一次的有效时序参数 + sseMsg *msg.SseMsg //发送出去的时序校验请求 + sseWaitTimer <-chan time.Time //sse超时定时器 +} + +func Start(simulation *memory.VerifySimulation) { + mu.Lock() + defer mu.Unlock() + //检查服务启动条件 + rsspConfig := simulation.GetRunConfig().RsspAxleConfig + if !rsspConfig.Open { + return + } + if contextMap[rsspConfig.StationCode] != nil { + panic(sys_error.New(fmt.Sprintf("%s集中站[%s]服务已启动", logTag, rsspConfig.StationCode))) + } + station := simulation.Repo.FindStationByStationName(rsspConfig.StationCode) + if station == nil { + panic(sys_error.New(fmt.Sprintf("%s集中站[%s]不存在", logTag, rsspConfig.StationCode))) + } + ref := simulation.Repo.GetCentralizedStationRef(station.Id()) + if ref == nil { + panic(sys_error.New(fmt.Sprintf("%s集中站[%s]关联数据不存在", logTag, rsspConfig.StationCode))) + } + if len(ref.SectionCodePoints) == 0 { + logger().Warn(fmt.Sprintf("集中站[%s]无区段编码数据,服务不启动", rsspConfig.StationCode)) + return + } + sourceAddr, err := strconv.ParseUint(rsspConfig.NetAConfig.SourceAddr, 16, 16) + if err != nil { + panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析源地址[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.SourceAddr))) + } + targetAddr, err := strconv.ParseUint(rsspConfig.NetAConfig.TargetAddr, 16, 16) + if err != nil { + panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析目的地址[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.TargetAddr))) + } + sid1, err := strconv.ParseUint(rsspConfig.NetAConfig.Sid1, 16, 32) + if err != nil { + panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析SID1[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.Sid1))) + } + sid2, err := strconv.ParseUint(rsspConfig.NetAConfig.Sid2, 16, 32) + if err != nil { + panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析SID2[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.Sid2))) + } + //服务初始化及启动 + msgChan := make(chan []byte, 100) + serviceCtx := &serviceContext{ + sim: simulation, + config: rsspConfig, + ciSectionIndexConfigs: ref.SectionCodePoints, + sourceAddr: uint16(sourceAddr), + targetAddr: uint16(targetAddr), + sid1: uint32(sid1), + sid2: uint32(sid2), + msgChan: msgChan, + } + netAConfig := rsspConfig.NetAConfig + server := udp.NewServer(fmt.Sprintf(":%d", netAConfig.LocalPort), func(b []byte) { + msgChan <- b + }) + client := udp.NewClient(fmt.Sprintf("%s:%d", netAConfig.RemoteIp, netAConfig.RemotePort)) + err = server.Listen() + if err != nil { + panic(sys_error.New(fmt.Sprintf("%s集中站[%s]服务启动失败", logTag, rsspConfig.StationCode))) + } else { + logger().Info(fmt.Sprintf("监听[:%d]", netAConfig.LocalPort)) + } + serviceCtx.server = server + serviceCtx.client = client + cancelCtx, cancelFunc := context.WithCancel(context.Background()) + serviceCtx.cancelFunc = cancelFunc + serviceCtx.runCollectTask(cancelCtx) + serviceCtx.runHandleMsgTask(cancelCtx) + contextMap[rsspConfig.StationCode] = serviceCtx +} + +func Stop(simulation *memory.VerifySimulation) { + mu.Lock() + defer mu.Unlock() + rsspConfig := simulation.GetRunConfig().RsspAxleConfig + serviceCtx := contextMap[rsspConfig.StationCode] + if serviceCtx == nil { + return + } + serviceCtx.stop() + delete(contextMap, rsspConfig.StationCode) +} + +func (s *serviceContext) stop() { + if s.server != nil { + s.server.Close() + } + if s.client != nil { + s.client.Close() + } + s.cancelFunc() +} + +func (s *serviceContext) runCollectTask(ctx context.Context) { + go func() { + defer func() { + if err := recover(); err != nil { + logger().Error("状态收集任务出错,记录后重启", "error", err, "stack", string(debug.Stack())) + s.runCollectTask(ctx) + } + }() + for range time.Tick(time.Millisecond * time.Duration(s.config.NetAConfig.Period)) { + select { + case <-ctx.Done(): + return + default: + frame := s.collect() + err := s.client.Send(frame.Encode()) + if err != nil { + logger().Error("发送状态数据失败", "error", err) + } + } + } + }() +} + +func (s *serviceContext) collect() *msg.RsdMsgBuilder { + worldData := entity.GetWorldData(s.sim.World) + amdEntry := entity.FindAxleManageDevice(worldData, s.config.StationCode) + amd := component.AxleManageDeviceType.Get(amdEntry) + stateInfos := msg.StateInfos{} + for _, cfg := range s.ciSectionIndexConfigs { + sectionRuntime := amd.Adrs[cfg.SectionId] + sectionEntry, ok := entity.GetEntityByUid(s.sim.World, cfg.SectionId) + sectionState := component.PhysicalSectionStateType.Get(sectionEntry) + if !ok { + continue + } + stateInfos = append(stateInfos, &msg.StateInfo{ + CLR: !sectionState.Occ, + OCC: sectionState.Occ, + RAC: sectionRuntime.Rac, + RJO: sectionRuntime.Rjo, + RJT: sectionRuntime.Rjt, + }) + } + userData := stateInfos.Encode() + builder := &msg.RsdMsgBuilder{ + MsgHeader: msg.MsgHeader{ + ProtocolType: msg.ProtocolType_Sync, + MessageType: msg.MessageType_A, + SourceAddr: s.sourceAddr, + TargetAddr: s.targetAddr, + }, + SeqNum: s.seqNum, + Svc1: s.calculateSvc1(userData, s.seqNum), + Svc2: s.calculateSvc2(userData, s.seqNum), + UserData: userData, + } + s.seqNum++ + return builder +} + +func (s *serviceContext) calculateSvc1(userData []byte, seqNum uint32) uint32 { + return 0 +} + +func (s *serviceContext) calculateSvc2(userData []byte, seqNum uint32) uint32 { + return 0 +} + +func (s *serviceContext) runHandleMsgTask(ctx context.Context) { + go func() { + defer func() { + if err := recover(); err != nil { + logger().Error("消息处理任务出错,记录后重启", "error", err, "stack", string(debug.Stack())) + s.runHandleMsgTask(ctx) + } + }() + for { + select { + case <-ctx.Done(): + return + case <-s.sseWaitTimer: //SSE消息等待超时 + s.sseMsg = nil + case data := <-s.msgChan: + messageType := msg.GetMessageType(data) + switch messageType { + case msg.MessageType_A, msg.MessageType_B: + s.handleRsdMsg(data) + case msg.MessageType_SSE: + s.handleSseMsg(data) + case msg.MessageType_SSR: + s.handleSsrMsg(data) + default: + logger().Warn(fmt.Sprintf("未知的消息类型[%x]", messageType)) + } + } + } + }() +} + +func (s *serviceContext) handleRsdMsg(data []byte) { + if s.sseMsg == nil { //正在时序校正过程中 + return + } + rsdMsg := &msg.RsdMsg{} + err := rsdMsg.Decode(data) + if err != nil { + logger().Error("解析RSD数据出错", "error", err) + return + } + //校验 + if !s.validateRsdMsg(rsdMsg) { + return + } + //流程处理 + seqDeviation := rsdMsg.SeqNum - s.lastSeqNum + if s.lastSeqNum == 0 { + seqDeviation = 0 + } else if seqDeviation < 0 || seqDeviation > s.config.NetAConfig.MaxDeviation { //序列号减小或时序差超出容忍限度 + s.startSeeProgress() + return + } + s.lastSeqNum = rsdMsg.SeqNum + cmdInfos := msg.CmdInfos{} + err = cmdInfos.Decode(rsdMsg.UserData) + if err != nil { + logger().Error("解析命令信息出错", "error", err) + return + } + //驱动 + for i, cmdInfo := range cmdInfos { + sectionIndexConfig := s.ciSectionIndexConfigs[i] + err := fi.AxleSectionDrstDrive(s.sim.World, sectionIndexConfig.SectionId, cmdInfo.DRST) + if err != nil { + logger().Error("驱动计轴直接复位出错", "error", err) + } + err = fi.AxleSectionPdrstDrive(s.sim.World, sectionIndexConfig.SectionId, cmdInfo.PDRST) + if err != nil { + logger().Error("驱动计轴预复位出错", "error", err) + } + } +} + +func (s *serviceContext) handleSseMsg(data []byte) { + sseMsg := &msg.SseMsg{} + err := sseMsg.Decode(data) + if err != nil { + logger().Error("解析SSE数据出错", "error", err) + return + } + //校验 + if !s.validateSseMsg(sseMsg) { + return + } + //回复 + s.lastSeqNum = sseMsg.SeqNum + ssrMsg := msg.SsrMsg{ + MsgHeader: msg.MsgHeader{ + ProtocolType: msg.ProtocolType_Sync, + MessageType: msg.MessageType_SSR, + SourceAddr: s.sourceAddr, + TargetAddr: s.targetAddr, + }, + SeqNumSsr: s.seqNum, + SeqNumSse: sseMsg.SeqNum, + SeqInit1: s.calculateSeqInit1(sseMsg.SeqEnq1), + SeqInit2: s.calculateSeqInit1(sseMsg.SeqEnq1), + DataVer: 0x01, + } + err = s.client.Send(ssrMsg.Encode()) + if err != nil { + logger().Error("发送SSR数据失败", "error", err) + } +} + +func (s *serviceContext) handleSsrMsg(data []byte) { + if s.sseMsg == nil { //不在时序校正过程中 + return + } + ssrMsg := &msg.SsrMsg{} + err := ssrMsg.Decode(data) + if err != nil { + logger().Error("解析SSR数据出错", "error", err) + return + } + //校验 + if !s.validateSsrMsg(ssrMsg) { + return + } + //完成校正时序 + s.sseMsg = nil + s.lastSeqNum = ssrMsg.SeqNumSsr +} + +// 启动SSE流程 +func (s *serviceContext) startSeeProgress() { + sseMsg := &msg.SseMsg{ + MsgHeader: msg.MsgHeader{ + ProtocolType: msg.ProtocolType_Sync, + MessageType: msg.MessageType_SSE, + SourceAddr: s.sourceAddr, + TargetAddr: s.targetAddr, + }, + SeqNum: s.seqNum, + SeqEnq1: s.calculateSeqEnq1(), + SeqEnq2: s.calculateSeqEnq2(), + } + err := s.client.Send(sseMsg.Encode()) + if err != nil { + logger().Error("发送SSE数据失败", "error", err) + } else { + s.sseMsg = sseMsg + s.sseWaitTimer = time.After(time.Duration(s.config.NetAConfig.Period*msg.Twait_sse) * time.Millisecond) + } +} + +func (s *serviceContext) validateRsdMsg(rsdMsg *msg.RsdMsg) bool { + sourceAddr, _ := strconv.ParseUint(s.config.NetAConfig.SourceAddr, 16, 16) + if rsdMsg.SourceAddr != uint16(sourceAddr) { + logger().Error(fmt.Sprintf("源地址[%x]不正确[%s]", rsdMsg.SourceAddr, s.config.NetAConfig.SourceAddr)) + return false + } + targetAddr, _ := strconv.ParseUint(s.config.NetAConfig.TargetAddr, 16, 16) + if rsdMsg.TargetAddr != uint16(targetAddr) { + logger().Error(fmt.Sprintf("目的地址[%x]不正确[%s]", rsdMsg.TargetAddr, s.config.NetAConfig.TargetAddr)) + return false + } + if len(rsdMsg.UserData) != len(s.ciSectionIndexConfigs) { + logger().Error(fmt.Sprintf("用户数据长度[%d]与配置长度[%d]不符", len(rsdMsg.UserData), len(s.ciSectionIndexConfigs))) + return false + } + if message.Rssp_I_Crc16(rsdMsg.UserData) != rsdMsg.Tail { + logger().Error(fmt.Sprintf("报文位验证失败")) + return false + } + return true +} + +func (s *serviceContext) validateSseMsg(sseMsg *msg.SseMsg) bool { + return true +} + +func (s *serviceContext) validateSsrMsg(ssrMsg *msg.SsrMsg) bool { + if s.sseMsg.SeqNum != ssrMsg.SeqNumSse { + logger().Error(fmt.Sprintf("SSR的Ne[%d]与请求方不符[%d]", ssrMsg.SeqNumSse, s.sseMsg.SeqNum)) + return false + } + return true +} + +func (s *serviceContext) calculateSeqEnq1() uint32 { + return 0 +} + +func (s *serviceContext) calculateSeqEnq2() uint32 { + return 0 +} + +func (s *serviceContext) calculateSeqInit1(seqEnq1 uint32) uint32 { + return 0 +} + +func (s *serviceContext) calculateSeqInit2(seqEnq2 uint32) uint32 { + return 0 +} + +func logger() *slog.Logger { + loggerInit.Do(func() { + privateLogger = slog.Default().With("tag", logTag) + }) + return privateLogger +} diff --git a/third_party/axle_device/rssp_axle.go b/third_party/axle_device/rssp_axle.go deleted file mode 100644 index 236e572..0000000 --- a/third_party/axle_device/rssp_axle.go +++ /dev/null @@ -1,129 +0,0 @@ -package axle_device - -import ( - "context" - "fmt" - "joylink.club/bj-rtsts-server/config" - "joylink.club/bj-rtsts-server/third_party/message" - "log/slog" - "runtime/debug" - "time" -) - -//计轴设备与联锁系统安全通信应用层实现 - -type RsspAxle interface { - //Start 启动计轴设备与联锁系统安全通信服务 - Start(amm AxleMessageManager) error - //Stop 停止计轴设备与联锁系统安全通信服务 - Stop() -} - -type rsspAxle struct { - //所属城市 - city string - //所属线路 - lineId string - //所属集中站 - centralizedStation string - //接收方每个安全通信会话对应的发送周期值,单位ms - sendingPeriod uint32 - //主安全通道 - rsspChannel *RsspChannel - //收到应用层消息回调 - messageManager AxleMessageManager - //发送区段状态任务 - cancelSendStatus context.CancelFunc -} - -func InitRsspAxle(cfg *config.RsspAxleConfig) RsspAxle { - ra := &rsspAxle{} - // - ra.city = cfg.City - ra.lineId = cfg.LineId - ra.centralizedStation = cfg.CentralizedStation - ra.sendingPeriod = cfg.RsspCfg.SendingPeriod - // - mrc := &RsspChannel{} - ra.rsspChannel = mrc.Init(&cfg.RsspCfg) - // - return ra -} - -// rssp 安全层执行 -func (s *rsspAxle) rcvCmdMsg(data []byte) { - msg := &message.SectionCmdMsgPack{} - msg.Decode(data) - s.messageManager.HandleSectionCmdMsg(s.city, s.lineId, s.centralizedStation, msg) -} -func (s *rsspAxle) Start(amm AxleMessageManager) error { - s.messageManager = amm - //设置安全通道层 - if s.rsspChannel != nil { - s.rsspChannel.handleUserData = s.rcvCmdMsg - s.rsspChannel.Start() - } - // - sendContext, sendCancel := context.WithCancel(context.Background()) - go s.periodRun(sendContext) - s.cancelSendStatus = sendCancel - // - return nil -} -func (s *rsspAxle) Stop() { - if s.rsspChannel != nil { - s.rsspChannel.Stop() - } - // - if s.cancelSendStatus != nil { - s.cancelSendStatus() - } - s.messageManager = nil -} -func (s *rsspAxle) periodRun(runContext context.Context) { - time.Sleep(2 * time.Second) - defer func() { - if e := recover(); e != nil { - slog.Error(fmt.Sprintf("[%s-%s-%s]定时发送计轴区段状态任务异常", s.city, s.lineId, s.centralizedStation), "error", e, "stack", string(debug.Stack())) - debug.PrintStack() - } - }() - for { - select { - case <-runContext.Done(): - return - default: - } - //slog.Debug("计轴设备periodRun") - if s.messageManager == nil { - slog.Warn(fmt.Sprintf("[%s-%s-%s]定时发送计轴区段状态任务因messageManager不存在退出", s.city, s.lineId, s.centralizedStation)) - return - } - //收集区段状态 - sectionStatusMsg, e := s.messageManager.CollectSectionStatus(s.city, s.lineId, s.centralizedStation) - if e == nil { - if sectionStatusMsg != nil { - msgPack := &message.SectionStatusMsgPack{} - msgPack.Ck = 0 //暂时无用 - msgPack.Sms = sectionStatusMsg - s.sendStatusMsg(msgPack) - } - } else { - slog.Warn(e.Error()) - } - // - time.Sleep(time.Duration(s.sendingPeriod) * time.Millisecond) - //更新周期性参数 - s.rsspChannel.NextPeriod() - } -} - -// 发送计轴区段状态给联锁 -func (s *rsspAxle) sendStatusMsg(msg *message.SectionStatusMsgPack) { - data := msg.Encode() - //向主通道发送 - if s.rsspChannel != nil { - //slog.Debug("计轴设备发送SectionStatusMsgPack", "区段状态个数", len(msg.Sms), "packLen", len(data)) - s.rsspChannel.SendUserData(data) - } -} diff --git a/third_party/axle_device/rssp_axle_manage.go b/third_party/axle_device/rssp_axle_manage.go deleted file mode 100644 index eea5236..0000000 --- a/third_party/axle_device/rssp_axle_manage.go +++ /dev/null @@ -1,65 +0,0 @@ -package axle_device - -import ( - "joylink.club/bj-rtsts-server/config" - "joylink.club/bj-rtsts-server/third_party/message" - "log/slog" -) - -//联锁集中站计轴与联锁通信管理 - -type AxleMessageManager interface { - GetLineAllRsspAxleCfgs() []config.RsspAxleConfig - //HandleSectionCmdMsg 计轴设备接收到联锁发送来的控制命令 - HandleSectionCmdMsg(city string, lineId string, centralizedStation string, msg *message.SectionCmdMsgPack) - //CollectSectionStatus 收集仿真中计轴区段状态 - CollectSectionStatus(city string, lineId string, centralizedStation string) ([]*message.SectionStatusMsg, error) -} - -var allRsspAxleServices []RsspAxle - -func StartLineAllRsspAxleServices(ram AxleMessageManager) { - allRsspAxleServices = nil - cfgs := ram.GetLineAllRsspAxleCfgs() - //cfgs = getTestConfig() //测试用 - for _, cfg := range cfgs { - if cfg.Open { - as := InitRsspAxle(&cfg) - allRsspAxleServices = append(allRsspAxleServices, as) - as.Start(ram) - slog.Debug("启动计轴设备", "city", cfg.City, "lineId", cfg.LineId, "CentralizedStation", cfg.CentralizedStation) - } - } -} -func StopLineAllRsspAxleServices() { - for _, as := range allRsspAxleServices { - as.Stop() - } -} - -// 测试用 -func getTestConfig() []config.RsspAxleConfig { - cfg := &config.RsspAxleConfig{} - cfg.Open = true - cfg.City = "北京" - cfg.LineId = "12" - cfg.CentralizedStation = "酒仙桥" - cfg.RsspCfg.SrcAddr = 0x02 - cfg.RsspCfg.DstAddr = 0x01 - cfg.RsspCfg.DataVer1 = 0x0011 - cfg.RsspCfg.DataVer2 = 0x0012 - cfg.RsspCfg.SID1 = 0x10000000 - cfg.RsspCfg.SID2 = 0x00000001 - cfg.RsspCfg.SINIT1 = 0x01 - cfg.RsspCfg.SINIT2 = 0x02 - cfg.RsspCfg.SendingPeriod = 500 - cfg.RsspCfg.SsrRsspTimeout = 3 - cfg.RsspCfg.Mtv = 3 - cfg.RsspCfg.DeviceA = false - cfg.RsspCfg.PicType = message.PIC_MASTER - cfg.RsspCfg.RemoteIp = "192.168.3.5" - cfg.RsspCfg.RemoteUdpPort = 7777 - cfg.RsspCfg.LocalUdpPort = 6666 - // - return []config.RsspAxleConfig{*cfg} -} diff --git a/third_party/axle_device/rssp_channel.go b/third_party/axle_device/rssp_channel.go deleted file mode 100644 index 1e0d8f9..0000000 --- a/third_party/axle_device/rssp_channel.go +++ /dev/null @@ -1,315 +0,0 @@ -package axle_device - -import ( - "fmt" - "log/slog" - - "joylink.club/bj-rtsts-server/config" - "joylink.club/bj-rtsts-server/third_party/message" - "joylink.club/bj-rtsts-server/third_party/udp" -) - -// 铁路信号安全通信协议实现 - -// HandleUserData 回调应用层 -type HandleUserData func([]byte) - -// RsspChannel 实现rssp通信 -type RsspChannel struct { - //udp - udpServer udp.UdpServer - //udp - udpClient udp.UdpClient - //回调应用层 - handleUserData HandleUserData - //rssp安全通信配置 - config *config.RsspConfig - //rssp时钟 - rsspTimer *RsspTimer - //批次编号,发送序列号 - sn *message.RsspSn - //安全通道1时间戳 - ch1Ts *message.RsspLFSR - //安全通道2时间戳 - ch2Ts *message.RsspLFSR - //最近一次接收到的报文的安全通道1时间戳 - rcvCh1Ts uint32 - //最近一次接收到的报文的安全通道2时间戳 - rcvCh2Ts uint32 - //最近一次接收到的报文的序列号 - rcvSn uint32 - //时序校验请求发送记录 - sendSseRecord *SseFireRecord -} - -func (s *RsspChannel) SetRcvUserDataCallback(handleUserData HandleUserData) { - s.handleUserData = handleUserData -} -func (s *RsspChannel) Init(config *config.RsspConfig) *RsspChannel { - s.config = config - s.rsspTimer = &RsspTimer{t: 0} - s.sn = message.NewRsspSn(1) - s.ch1Ts = message.NewRsspLFSR(message.RSSP_I_C1_TS, 32, s.config.SID1, false) - s.ch2Ts = message.NewRsspLFSR(message.RSSP_I_C2_TS, 32, s.config.SID2, false) - return s -} - -// Start 启动安全通道 -func (s *RsspChannel) Start() { - // - s.udpServer = udp.NewServer(fmt.Sprintf(":%d", s.config.LocalUdpPort), s.handleRsspMsg) - s.udpServer.Listen() - // - s.udpClient = udp.NewClient(fmt.Sprintf("%s:%d", s.config.RemoteIp, s.config.RemoteUdpPort)) -} - -// Stop 关闭安全通道 -func (s *RsspChannel) Stop() { - if s.udpServer != nil { - s.udpServer.Close() - } - if s.udpClient != nil { - s.udpClient.Close() - } -} - -// RsspTimer rssp时钟,每一个tick周期为config.SendingPeriod -type RsspTimer struct { - t uint64 -} - -func (s *RsspTimer) tick() { - s.t++ -} -func (s *RsspTimer) now() uint64 { - return s.t -} - -// SseFireRecord 发送时序校验请求的记录 -type SseFireRecord struct { - send *message.RsspSse //已经发送的时序校验请求 - rsspTime uint64 //发送时序校验请求时的rssp时间 -} - -func (s *SseFireRecord) record(send *message.RsspSse, rsspTime uint64) { - s.send = send - s.rsspTime = rsspTime -} -func (s *SseFireRecord) clear() { - s.send = nil - s.rsspTime = 0 -} -func (s *SseFireRecord) hasRecord() bool { - return s.send != nil -} - -// 处理接收到的rssp报文 -// 注意本函数由udp socket 协程执行 -func (s *RsspChannel) handleRsspMsg(pack []byte) { - slog.Debug("接收到RSSP报文", "len", len(pack)) - //报文头校验 - head := &message.RsspHead{} - if !head.Parse(pack) { //解析报文头失败 - slog.Debug("丢弃接收的RSSP报文:解析报文头失败") - return - } - if !message.RsspHeadMcCheck(head) { //报文类别检测未通过 - slog.Debug("丢弃接收的RSSP报文:报文类别检测未通过") - return - } - if !message.RsspHeadPicCheck(head) { //协议交互类别检测未通过 - slog.Debug("丢弃接收的RSSP报文:协议交互类别检测未通过") - return - } - if !s.config.CheckAddress(head.Sa, head.Da) { //校验报文头中源地址和目的地址是否包含在已配置列表中 - slog.Debug("丢弃接收的RSSP报文:报文头中源地址或目的地址不在在已配置列表中") - return - } - //报文尾校验 - if !message.RsspPackCrc16Check(pack) { //整个报文crc16校验未通过 - slog.Debug("丢弃接收的RSSP报文:报文尾CRC16校验未通过") - return - } - //解析得到RSD、SSE或SRE - rssp := message.ParseRsspPack(head, pack) - if rssp == nil { //解析具体rssp包失败 - slog.Debug("丢弃接收的RSSP报文:解析具体类别包失败") - return - } - //处理接收到的具体类别RSSP包 - switch rssp.Type() { - case message.RSD_A: - fallthrough - case message.RSD_B: - s.handleRsspRsd(rssp.(*message.RsspRsd)) - case message.SSE: - s.handleRsspSse(rssp.(*message.RsspSse)) - case message.SSR: - s.handleRsspSsr(rssp.(*message.RsspSsr)) - } -} - -// 处理接收到的实时安全数据 RSD -func (s *RsspChannel) handleRsspRsd(rsd *message.RsspRsd) { - //slog.Debug("接收到的实时安全数据 RSD") - //如果为备机发送来的安全数据 - if s.config.PicType == message.PIC_SLAVE { //备安全通道 - slog.Debug("丢弃接收的RSSP-RSD报文:舍弃在备安全通道中接收到的安全数据") - //备安全通道收到安全数据,表示该物理通道连接正常 - return - } - //如果为主机发送来的安全数据 - if s.config.PicType == message.PIC_MASTER { //主安全通道 - if !rsd.IsMaster() { - slog.Debug("丢弃接收的RSSP-RSD报文:舍弃在主安全通道中收到的非主机发送来的安全数据") - return - } - } - //序列号校验 - //接收的序列号小于最近一次有效序列号,则触发SSE时序校验 - if rsd.Sn < s.rcvSn { - slog.Debug("丢弃接收的RSSP-RSD报文:当前接收RSD的序列号小于最近一次接收的RSD的序列号,触发SSE") - s.fireSse(rsd) - return - } - dSn := rsd.Sn - s.rcvSn - if dSn > s.config.Mtv { - slog.Debug("丢弃接收的RSSP-RSD报文:当前接收RSD的序列号与最近一次接收的RSD的序列号差值过大,触发SSE") - s.fireSse(rsd) - return - } - //SVC校验 - c1Crc32 := message.Rssp_I_Crc32C1(rsd.Sad) - c1SidTs := c1Crc32 ^ rsd.Svc1 ^ message.RSSP_I_C1_SCW //T(n) - // - c2Crc32 := message.Rssp_I_Crc32C2(rsd.Sad) - c2SidTs := c2Crc32 ^ rsd.Svc2 ^ message.RSSP_I_C2_SCW //T(n) - //todo ... SVC校验待完善 - _ = c1SidTs - _ = c2SidTs - //校验通过 - //记录本次接收RSD的序列号和安全校验通道时间戳 - s.rcvSn = rsd.Sn - s.rcvCh1Ts = c1SidTs ^ s.config.SID1 - s.rcvCh2Ts = c2SidTs ^ s.config.SID2 - //通知应用层接收应用数据 - s.handleUserData(rsd.Sad) -} - -// 触发时序校正请求 -func (s *RsspChannel) fireSse(rsd *message.RsspRsd) { - s.sendSseRecord = &SseFireRecord{send: s.sendSse(), rsspTime: s.rsspTimer.now()} -} - -// 接收到时序校正请求 -func (s *RsspChannel) handleRsspSse(sse *message.RsspSse) { - if s.config.PicType != message.PIC_MASTER { - slog.Debug("丢弃接收的RSSP-SSE报文:在非主安全通道中收到时序校正请求SSE") - return - } - //发送时序校正响应 - s.sendSsr(sse) -} - -// 接收到时序校正应答 -func (s *RsspChannel) handleRsspSsr(ssr *message.RsspSsr) { - //SSR校验 - if !s.sendSseRecord.hasRecord() { - slog.Debug("丢弃接收的RSSP-SSR报文:未发起过SSE时序校正请求") - return - } - if s.rsspTimer.t-s.sendSseRecord.rsspTime > uint64(s.config.SsrRsspTimeout) { - slog.Debug("丢弃接收的RSSP-SSR报文:等待SSE响应超时") - return - } - if ssr.SeSn != s.sendSseRecord.send.Sn { - slog.Debug("丢弃接收的RSSP-SSR报文:SSR与SSE不对应") - return - } - //恢复时序? - s.rcvSn = ssr.SrSn - s.rcvCh1Ts = ssr.Tic1 ^ s.sendSseRecord.send.SeqEnq1 ^ s.config.SID1 ^ s.config.DataVer1 - s.rcvCh2Ts = ssr.Tic2 ^ s.sendSseRecord.send.SeqEnq2 ^ s.config.SID2 ^ s.config.DataVer2 -} - -// NextPeriod 刷新与周期有关的:将序列号和时间戳更新到下一个值 -func (s *RsspChannel) NextPeriod() { - s.sn.GetAndAdd() - s.ch1Ts.GetAndMove() - s.ch2Ts.GetAndMove() - s.rsspTimer.tick() -} - -// 发送时序校正应答 -func (s *RsspChannel) sendSsr(sse *message.RsspSse) { - ssr := &message.RsspSsr{} - // - ssr.Pic = message.PIC_MASTER - ssr.Mc = message.SSR - ssr.Sa = s.config.SrcAddr - ssr.Da = s.config.DstAddr - ssr.SrSn = s.sn.Get() //当前序列号 - ssr.SeSn = sse.Sn - ssr.Tic1 = sse.SeqEnq1 ^ s.config.SID1 ^ s.ch1Ts.Get() ^ s.config.DataVer1 - ssr.Tic2 = sse.SeqEnq2 ^ s.config.SID2 ^ s.ch2Ts.Get() ^ s.config.DataVer2 - ssr.Dvn = 0x01 //预留固定值 - // - rsspPack := ssr.Encode() - s.sendPack(rsspPack) -} - -// 发送时序校正请求 -func (s *RsspChannel) sendSse() *message.RsspSse { - sse := &message.RsspSse{} - // - sse.Pic = message.PIC_MASTER - sse.Mc = message.SSE - sse.Sa = s.config.SrcAddr - sse.Da = s.config.DstAddr - //时序校正请求,把最近一次接收到的报文中的序列号时间戳发送给发送方 - sse.Sn = s.rcvSn - sse.SeqEnq1 = s.createSeqNeq(s.config.SID1, s.rcvCh1Ts) - sse.SeqEnq2 = s.createSeqNeq(s.config.SID2, s.rcvCh2Ts) - // - rsspPack := sse.Encode() - s.sendPack(rsspPack) - // - return sse -} - -// SendUserData 发送用户数据,即通过rssp安全通道发送应用层数据,通过rssp的RSD报文发送 -func (s *RsspChannel) SendUserData(userData []byte) { - rsd := &message.RsspRsd{} - rsd.Pic = s.config.PicType - if s.config.DeviceA { - rsd.Mc = message.RSD_A - } else { - rsd.Mc = message.RSD_B - } - rsd.Sa = s.config.SrcAddr - rsd.Da = s.config.DstAddr - // - rsd.Sn = s.sn.Get() - rsd.Sdl = uint16(len(userData) + 8) - //安全校验通道SVC_1 - crc_c1 := message.Rssp_I_Crc32C1(userData) - rsd.Svc1 = s.createSvcCode(crc_c1, s.config.SID1, s.ch1Ts.Get(), message.RSSP_I_C1_SCW) - //安全校验通道SVC_2 - crc_c2 := message.Rssp_I_Crc32C2(userData) - rsd.Svc1 = s.createSvcCode(crc_c2, s.config.SID2, s.ch2Ts.Get(), message.RSSP_I_C2_SCW) - rsd.Sad = userData - // - rsspPack := rsd.Encode() - s.sendPack(rsspPack) -} -func (s *RsspChannel) createSvcCode(crc32 uint32, sid uint32, ts uint32, scw uint32) uint32 { - return crc32 ^ sid ^ ts ^ scw -} -func (s *RsspChannel) createSeqNeq(sid uint32, ts uint32) uint32 { - return sid ^ ts -} - -// 通过网络发送数据 -func (s *RsspChannel) sendPack(rsspPack []byte) { - s.udpClient.Send(rsspPack) -} diff --git a/third_party/example/rssp/ci/ci_server.go b/third_party/example/rssp/ci/ci_server.go deleted file mode 100644 index 64736af..0000000 --- a/third_party/example/rssp/ci/ci_server.go +++ /dev/null @@ -1,155 +0,0 @@ -package ci - -import ( - "context" - "fmt" - "joylink.club/bj-rtsts-server/config" - "joylink.club/bj-rtsts-server/third_party/axle_device" - "joylink.club/bj-rtsts-server/third_party/message" - "sort" - "sync" - "time" -) - -type CiServer interface { - Start() - Stop() - //SendSectionReset 向计轴设备发送计轴命令 - SendSectionReset(sectionId string, drst bool, pdrst bool) - //HandleSectionStatus 收到来自计轴设备的物理区段状态 - HandleSectionStatus(status []*message.SectionStatusMsg) -} - -// 北京12号线酒仙桥集中站物理区段码表 -var codePointMap = map[int]string{0: "北京_12_酒仙桥_9G", 1: "北京_12_酒仙桥_1DG", 2: "北京_12_酒仙桥_11G", 3: "北京_12_酒仙桥_13G", 4: "北京_12_酒仙桥_15G", 5: "北京_12_酒仙桥_6G", 6: "北京_12_酒仙桥_8G", 7: "北京_12_酒仙桥_2DG", 8: "北京_12_酒仙桥_10G", 9: "北京_12_酒仙桥_12G", 10: "北京_12_酒仙桥_14G"} -var idRowMap = make(map[string]int) -var sectionCmds []*message.SectionCmdMsg -var cmdsLock = sync.Mutex{} - -// rssp 测试 -type ciServer struct { - //所属城市 - city string - //所属线路 - lineId string - //所属集中站 - centralizedStation string - //接收方每个安全通信会话对应的发送周期值,单位ms - sendingPeriod uint32 - //主安全通道 - rsspChannel *axle_device.RsspChannel - // - cancel context.CancelFunc -} - -func (s *ciServer) HandleSectionStatus(status []*message.SectionStatusMsg) { - if len(codePointMap) != len(status) { - fmt.Println("==>>接收到的区段状态与码表不对应:", "codePointsLen=", len(codePointMap), ",statusLen=", len(status)) - return - } - fmt.Println("==>>接收到的区段状态----------------------------------------------------------------------------i") - for row, state := range status { - sectionId := codePointMap[row] - fmt.Printf("==>>[%d]区段[%s]状态: Clr=%t Occ=%t Rac=%t Rjo=%t Rjt=%t\n", row, sectionId, state.Clr, state.Occ, state.Rac, state.Rjo, state.Rjt) - } - fmt.Println("==>>接收到的区段状态----------------------------------------------------------------------------o") -} -func (s *ciServer) periodRun(runContext context.Context) { - defer func() { - fmt.Println("==>>CI Server 周期运行退出 ...") - }() - for { - select { - case <-runContext.Done(): - return - default: - } - s.doSendSectionCmdMsg() - time.Sleep(time.Duration(s.sendingPeriod) * time.Millisecond) - //更新周期性参数 - s.rsspChannel.NextPeriod() - } -} -func (s *ciServer) doSendSectionCmdMsg() { - if s.rsspChannel != nil { - data := s.collectSectionCmdsData() - fmt.Println("==>>CI发送区段指令数据", ",len=", len(data)) - s.rsspChannel.SendUserData(data) - } -} -func (s *ciServer) collectSectionCmdsData() []byte { - defer func() { - cmdsLock.Unlock() - }() - cmdsLock.Lock() - // - data := (&message.SectionCmdMsgPack{Ck: 0x80, Scs: sectionCmds}).Encode() - return data -} - -// SendSectionReset 发送区段复位指令 -func (s *ciServer) SendSectionReset(sectionId string, drst bool, pdrst bool) { - defer func() { - cmdsLock.Unlock() - }() - cmdsLock.Lock() - // - sec := sectionCmds[idRowMap[sectionId]] - sec.Drst = drst - sec.Pdrst = pdrst -} - -type rowId struct { - row int - id string -} - -func (s *ciServer) Start() { - s.rsspChannel.SetRcvUserDataCallback(s.rcvUserData) - s.rsspChannel.Start() - ctx, cancel := context.WithCancel(context.Background()) - go s.periodRun(ctx) - s.cancel = cancel -} -func (s *ciServer) Stop() { - if s.rsspChannel != nil { - s.rsspChannel.Stop() - s.rsspChannel = nil - } - if s.cancel != nil { - s.cancel() - } -} -func (s *ciServer) rcvUserData(data []byte) { - fmt.Println("==>>CI接收到区段状态数据", ",len=", len(data)) - msg := &message.SectionStatusMsgPack{} - msg.Decode(data) - s.HandleSectionStatus(msg.Sms) -} -func NewRsspCiServer(cfg *config.RsspAxleConfig) CiServer { - ra := &ciServer{} - // - ra.city = cfg.City - ra.lineId = cfg.LineId - ra.centralizedStation = cfg.CentralizedStation - ra.sendingPeriod = cfg.RsspCfg.SendingPeriod - // - mrc := &axle_device.RsspChannel{} - ra.rsspChannel = mrc.Init(&cfg.RsspCfg) - // - return ra -} - -func InitCiServerCodePoints() { - var ris []*rowId - for row, id := range codePointMap { - idRowMap[id] = row - ris = append(ris, &rowId{row: row, id: id}) - } - sort.SliceStable(ris, func(i, j int) bool { - return ris[i].row < ris[j].row - }) - for range ris { - sectionCmds = append(sectionCmds, &message.SectionCmdMsg{Drst: false, Pdrst: false}) - } -} diff --git a/third_party/example/rssp/main.go b/third_party/example/rssp/main.go index fcf37d8..be755c8 100644 --- a/third_party/example/rssp/main.go +++ b/third_party/example/rssp/main.go @@ -2,45 +2,81 @@ package main import ( "fmt" - "time" - - "joylink.club/bj-rtsts-server/config" - "joylink.club/bj-rtsts-server/third_party/example/rssp/ci" "joylink.club/bj-rtsts-server/third_party/message" ) +//func main() { +// var scw1 uint32 = 0xAE390B5A +// var t_p uint32 = 0x0FC22F87 +// var sid1 uint32 = 0xa2bcfc8c +// var sinit uint32 = 0xb763ec88 +// +// var svcn_1 uint32 = 0x08b12b3b +// //var svcn_1 uint32 = 0x3b2bb108 +// var svcn uint32 = 0x547fd6ca +// //var svcn uint32 = 0xcad67f54 +// var userData []byte = []byte{0x80, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00} +// crc1 := crc32Encode(userData) +// +// m := func(svc1 uint32) uint32 { +// return (svc1 ^ crc1) ^ scw1 +// } +// +// rn_1 := m(svcn_1) +// rn := m(svcn) +// l1 := &Lfsr{ +// value: sinit, +// p: t_p, +// } +// //l1.add(sid1) +// l1.add(rn_1) +// l1.add(rn) +// fmt.Printf("%x\n", l1.value) +// +// //sinit~>ci server ...") - for { - time.Sleep(2 * time.Second) - ciServer.SendSectionReset("北京_12_酒仙桥_9G", true, false) + var scw uint32 = 0xAE390B5A + var t_p uint32 = 0x0FC22F87 + var sid uint32 = 0xa2bcfc8c + //var sinit uint32 = 0xb763ec88 + + lfsr := Lfsr{ + value: sid, + p: t_p, } - // - time.Sleep(3600 * time.Second) + lfsr.add(1) + var userData []byte = []byte{0x80, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00} + crc1 := crc32Encode(userData) + svc := crc1 ^ sid ^ lfsr.value ^ scw + fmt.Printf("%x\n", svc) +} + +func crc32Encode(data []byte) uint32 { + return message.Rssp_I_Crc32C1(data) +} + +type Lfsr struct { + value uint32 + p uint32 +} + +func (l *Lfsr) add(x uint32) uint32 { + l.value = l.value ^ x + for i := 0; i < 1; i++ { + l.value = l.value << 1 + if l.value>>31 == 1 { + l.value ^= l.p + } + } + return l.value } diff --git a/third_party/message/rssp_code.go b/third_party/message/rssp_code.go index 070aba6..626884e 100644 --- a/third_party/message/rssp_code.go +++ b/third_party/message/rssp_code.go @@ -20,11 +20,11 @@ const ( var ( // crc16多项式为G(x)=X16+X11+X4+1 - RSSP_I_CRC16 = &crc.Parameters{Width: 16, Polynomial: 0x0811, Init: 0x0, ReflectIn: false, ReflectOut: false, FinalXor: 0x0} + RSSP_I_CRC16 = &crc.Parameters{Width: 16, Polynomial: 0x0811, Init: 0x0, ReflectIn: true, ReflectOut: true, FinalXor: 0x0} // 通道1 crc32多项式为0x100d4e63 - RSSP_I_C1_CRC32 = &crc.Parameters{Width: 32, Polynomial: 0x100d4e63, Init: 0x0, ReflectIn: false, ReflectOut: false, FinalXor: 0x0} + RSSP_I_C1_CRC32 = &crc.Parameters{Width: 32, Polynomial: 0x100d4e63, Init: 0x0, ReflectIn: true, ReflectOut: true, FinalXor: 0x0} // 通道2 crc32多项式为0x8ce56011 - RSSP_I_C2_CRC32 = &crc.Parameters{Width: 32, Polynomial: 0x8ce56011, Init: 0x0, ReflectIn: false, ReflectOut: false, FinalXor: 0x0} + RSSP_I_C2_CRC32 = &crc.Parameters{Width: 32, Polynomial: 0x8ce56011, Init: 0x0, ReflectIn: true, ReflectOut: true, FinalXor: 0x0} ) // Rssp_I_Crc16计算 diff --git a/third_party/message/rssp_code_test.go b/third_party/message/rssp_code_test.go new file mode 100644 index 0000000..3a9907d --- /dev/null +++ b/third_party/message/rssp_code_test.go @@ -0,0 +1,20 @@ +package message + +import ( + "fmt" + "testing" +) + +func TestNewRsspLFSR(t *testing.T) { + lfsr := NewRsspLFSR(0x0FC22F87, 32, 0x7665986c, false) + for i := 0; i < 341; i++ { + lfsr.GetAndMove() + } + fmt.Printf("%x", lfsr.Get()) +} + +func TestRssp_I_Crc16(t *testing.T) { + bytes := []byte{0x01, 0x80, 0x3a, 0x30, 0x9e, 0x30, 0x24, 0x85, 00, 00, 0x23, 00, 0x3b, 0x2b, 0xb1, 0x08, 0xf8, 0xc0, 0x6c, 0x16, 0x80, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00} + crc := Rssp_I_Crc16(bytes) + fmt.Printf("crc16: %x\n", crc) +} diff --git a/ts/simulation/wayside/memory/wayside_simulation.go b/ts/simulation/wayside/memory/wayside_simulation.go index cb96044..e1a4518 100644 --- a/ts/simulation/wayside/memory/wayside_simulation.go +++ b/ts/simulation/wayside/memory/wayside_simulation.go @@ -45,7 +45,7 @@ type VerifySimulation struct { //设备UID映射 key-uid UidMap map[string]*elementIdStructure // 运行环境配置 - runConfig *config.ThridPartyConfig + runConfig *config.ThirdPartyConfig } // 轨旁仿真内存模型 @@ -146,6 +146,13 @@ func (s *VerifySimulation) GetComIdByUid(uid string) uint32 { return es[uid].CommonId } +func (s *VerifySimulation) GetRunConfig() config.ThirdPartyConfig { + if s.runConfig == nil { + return config.ThirdPartyConfig{} + } + return *s.runConfig +} + // GetBtmCanetConfig 获取CANET配置信息 func (s *VerifySimulation) GetBtmCanetConfig() config.BtmCanetConfig { return s.runConfig.BtmCanet @@ -175,9 +182,6 @@ func (s *VerifySimulation) GetConnVobcTrain() *state_proto.TrainState { func (s *VerifySimulation) GetBtmVobcConfig() config.BtmVobcConfig { return s.runConfig.BtmVobc } -func (s *VerifySimulation) GetLineAllRsspAxleCfgs() []config.RsspAxleConfig { - return s.runConfig.RsspAxleCfgs -} // GetSectionCodePoints 获取集中站的区段码表 func (s *VerifySimulation) GetSectionCodePoints(city string, lineId string, centralizedStation string) []*proto.CiSectionCodePoint { @@ -555,7 +559,7 @@ func (s *VerifySimulation) initRunConfig(runConfig *dto.ProjectRunConfigDto) err if runConfig == nil || runConfig.ConfigContent == "" { return nil } - var configMap config.ThridPartyConfig + var configMap config.ThirdPartyConfig err := json.Unmarshal([]byte(runConfig.ConfigContent), &configMap) if err != nil { return sys_error.New("配置信息格式错误", err) diff --git a/ts/test_simulation_manage.go b/ts/test_simulation_manage.go index dbec7c9..aba4fab 100644 --- a/ts/test_simulation_manage.go +++ b/ts/test_simulation_manage.go @@ -3,6 +3,7 @@ package ts import ( "fmt" "joylink.club/bj-rtsts-server/third_party/acc" + axleBeijing12 "joylink.club/bj-rtsts-server/third_party/axle_device/beijing12" "joylink.club/bj-rtsts-server/third_party/btm_vobc" "joylink.club/bj-rtsts-server/third_party/interlock/beijing11" "joylink.club/bj-rtsts-server/third_party/interlock/beijing12" @@ -16,7 +17,6 @@ import ( "joylink.club/bj-rtsts-server/third_party/can_btm" cidcmodbus "joylink.club/bj-rtsts-server/third_party/cidc_modbus" - "joylink.club/bj-rtsts-server/third_party/axle_device" "joylink.club/bj-rtsts-server/third_party/electrical_machinery" "joylink.club/bj-rtsts-server/message_server" @@ -130,7 +130,8 @@ func runThirdParty(s *memory.VerifySimulation) error { } } // 计轴RSSP启动 - axle_device.StartLineAllRsspAxleServices(s) + axleBeijing12.Start(s) + //obsolete.StartLineAllRsspAxleServices(s) // 电机UDP启动 electrical_machinery.Default().Start(s) // 车载BTM启动 @@ -166,7 +167,8 @@ func stopThirdParty(s *memory.VerifySimulation) { } } //计轴RSSP启动销毁 - axle_device.StopLineAllRsspAxleServices() + axleBeijing12.Stop(s) + //obsolete.StopLineAllRsspAxleServices() // 电机UDP停止 electrical_machinery.Default().Stop() // 车载BTM停止