rts-sim-testing-service/third_party/axle_device/beijing12/service.go

503 lines
15 KiB
Go

package beijing12
import (
"context"
"fmt"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/sys_error"
"joylink.club/bj-rtsts-server/third_party/axle_device/beijing12/msg"
"joylink.club/bj-rtsts-server/third_party/message"
"joylink.club/bj-rtsts-server/third_party/udp"
"joylink.club/bj-rtsts-server/ts/simulation/wayside/memory"
"joylink.club/rtsssimulation/component"
"joylink.club/rtsssimulation/entity"
"joylink.club/rtsssimulation/fi"
"joylink.club/rtsssimulation/repository/model/proto"
"log/slog"
"runtime/debug"
"strconv"
"sync"
"time"
)
var ( //日志
logTag = "[北京12号线计轴通信]"
privateLogger *slog.Logger
loggerInit sync.Once
)
// key-集中站编号
var contextMap = make(map[string]*serviceContext)
var mu = sync.Mutex{}
type serviceContext struct {
sim *memory.VerifySimulation
config config.RsspAxleConfig
server udp.UdpServer
client udp.UdpClient
cancelFunc context.CancelFunc
ciSectionIndexConfigs []*proto.CiSectionCodePoint
sourceAddr uint16 //源地址 从配置中的16进制字符串转来的
targetAddr uint16 //目的地址 从配置中的16进制字符串转来的
sid1 uint32 //联锁SID1 从配置中的16进制字符串转来的
sid2 uint32 //联锁SID2 从配置中的16进制字符串转来的
localSid1 uint32 //计轴SID1 从配置中的16进制字符串转来的
localSid2 uint32 //计轴SID2 从配置中的16进制字符串转来的
sinit1 uint32 //SINT16 从配置中的16进制字符串转来的
sinit2 uint32 //SINT26 从配置中的16进制字符串转来的
msgChan <-chan []byte //消息队列
seqNum uint32 //当前的序列号
lastSeqParam uint32 //最近一次的有效时序参数
lfsr1 lfsr //用来计算时间戳的lfsr
lfsr2 lfsr //用来计算时间戳的lfsr
sseMsg *msg.SseMsg //发送出去的时序校验请求
sseWaitTimer <-chan time.Time //sse超时定时器
}
func Start(simulation *memory.VerifySimulation) {
mu.Lock()
defer mu.Unlock()
//检查服务启动条件
rsspConfig := simulation.GetRunConfig().RsspAxleConfig
if !rsspConfig.Open {
return
}
if contextMap[rsspConfig.StationCode] != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]服务已启动", logTag, rsspConfig.StationCode)))
}
station := simulation.Repo.FindStationByStationName(rsspConfig.StationCode)
if station == nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]不存在", logTag, rsspConfig.StationCode)))
}
ref := simulation.Repo.GetCentralizedStationRef(station.Id())
if ref == nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]关联数据不存在", logTag, rsspConfig.StationCode)))
}
if len(ref.SectionCodePoints) == 0 {
logger().Warn(fmt.Sprintf("集中站[%s]无区段编码数据,服务不启动", rsspConfig.StationCode))
return
}
sourceAddr, err := strconv.ParseUint(rsspConfig.NetAConfig.SourceAddr, 16, 16)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析源地址[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.SourceAddr)))
}
targetAddr, err := strconv.ParseUint(rsspConfig.NetAConfig.TargetAddr, 16, 16)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析目的地址[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.TargetAddr)))
}
sid1, err := strconv.ParseUint(rsspConfig.NetAConfig.Sid1, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析联锁SID1[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.Sid1)))
}
sid2, err := strconv.ParseUint(rsspConfig.NetAConfig.Sid2, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析联锁SID2[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.Sid2)))
}
localSid1, err := strconv.ParseUint(rsspConfig.NetAConfig.LocalSid1, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析计轴SID1[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.LocalSid1)))
}
localSid2, err := strconv.ParseUint(rsspConfig.NetAConfig.LocalSid2, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析计轴SID2[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.LocalSid2)))
}
sinit1, err := strconv.ParseUint(rsspConfig.NetAConfig.Sinit1, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析SINT1[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.Sinit1)))
}
sinit2, err := strconv.ParseUint(rsspConfig.NetAConfig.Sinit2, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析SINT2[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.Sinit2)))
}
//服务初始化及启动
msgChan := make(chan []byte, 100)
serviceCtx := &serviceContext{
sim: simulation,
config: rsspConfig,
ciSectionIndexConfigs: ref.SectionCodePoints,
sourceAddr: uint16(sourceAddr),
targetAddr: uint16(targetAddr),
sid1: uint32(sid1),
sid2: uint32(sid2),
localSid1: uint32(localSid1),
localSid2: uint32(localSid2),
sinit1: uint32(sinit1),
sinit2: uint32(sinit2),
msgChan: msgChan,
seqNum: 1,
lfsr1: lfsr{value: uint32(localSid1)},
lfsr2: lfsr{value: uint32(localSid2)},
}
netAConfig := rsspConfig.NetAConfig
server := udp.NewServer(fmt.Sprintf(":%d", netAConfig.LocalPort), func(b []byte) {
msgChan <- b
})
client := udp.NewClient(fmt.Sprintf("%s:%d", netAConfig.RemoteIp, netAConfig.RemotePort))
err = server.Listen()
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]服务启动失败", logTag, rsspConfig.StationCode)))
} else {
logger().Info(fmt.Sprintf("监听[:%d]", netAConfig.LocalPort))
}
serviceCtx.server = server
serviceCtx.client = client
cancelCtx, cancelFunc := context.WithCancel(context.Background())
serviceCtx.cancelFunc = cancelFunc
serviceCtx.runCollectTask(cancelCtx)
serviceCtx.runHandleMsgTask(cancelCtx)
contextMap[rsspConfig.StationCode] = serviceCtx
}
func Stop(simulation *memory.VerifySimulation) {
mu.Lock()
defer mu.Unlock()
rsspConfig := simulation.GetRunConfig().RsspAxleConfig
serviceCtx := contextMap[rsspConfig.StationCode]
if serviceCtx == nil {
return
}
serviceCtx.stop()
delete(contextMap, rsspConfig.StationCode)
}
func (s *serviceContext) stop() {
if s.server != nil {
s.server.Close()
}
if s.client != nil {
s.client.Close()
}
s.cancelFunc()
}
func (s *serviceContext) runCollectTask(ctx context.Context) {
go func() {
defer func() {
if err := recover(); err != nil {
logger().Error("状态收集任务出错,记录后重启", "error", err, "stack", string(debug.Stack()))
s.runCollectTask(ctx)
}
}()
for range time.Tick(time.Millisecond * time.Duration(s.config.NetAConfig.Period)) {
select {
case <-ctx.Done():
return
default:
frame := s.collect()
err := s.client.Send(frame.Encode())
if err != nil {
logger().Error("发送状态数据失败", "error", err)
}
}
}
}()
}
func (s *serviceContext) runHandleMsgTask(ctx context.Context) {
go func() {
defer func() {
if err := recover(); err != nil {
logger().Error("消息处理任务出错,记录后重启", "error", err, "stack", string(debug.Stack()))
s.runHandleMsgTask(ctx)
}
}()
for {
select {
case <-ctx.Done():
return
case <-s.sseWaitTimer: //SSE消息等待超时
s.sseMsg = nil
case data := <-s.msgChan:
messageType := msg.GetMessageType(data)
switch messageType {
case msg.MessageType_A, msg.MessageType_B:
s.handleRsdMsg(data)
case msg.MessageType_SSE:
s.handleSseMsg(data)
case msg.MessageType_SSR:
s.handleSsrMsg(data)
default:
logger().Warn(fmt.Sprintf("未知的消息类型[%x]", messageType))
}
}
}
}()
}
func (s *serviceContext) collect() *msg.RsdMsgBuilder {
worldData := entity.GetWorldData(s.sim.World)
amdEntry := entity.FindAxleManageDevice(worldData, s.config.StationCode)
amd := component.AxleManageDeviceType.Get(amdEntry)
stateInfos := msg.StateInfos{}
for _, cfg := range s.ciSectionIndexConfigs {
sectionRuntime := amd.Adrs[cfg.SectionId]
sectionEntry, ok := entity.GetEntityByUid(s.sim.World, cfg.SectionId)
sectionState := component.PhysicalSectionStateType.Get(sectionEntry)
if !ok {
continue
}
stateInfos = append(stateInfos, &msg.StateInfo{
CLR: !sectionState.Occ,
OCC: sectionState.Occ,
RAC: sectionRuntime.Rac,
RJO: sectionRuntime.Rjo,
RJT: sectionRuntime.Rjt,
})
}
userData := stateInfos.Encode()
builder := &msg.RsdMsgBuilder{
MsgHeader: msg.MsgHeader{
ProtocolType: msg.ProtocolType_Sync,
MessageType: msg.MessageType_A,
SourceAddr: s.sourceAddr,
TargetAddr: s.targetAddr,
},
SeqNum: s.seqNum,
Svc1: s.calculateSvc1(userData),
Svc2: s.calculateSvc2(userData),
UserData: userData,
}
s.seqNum++
return builder
}
func (s *serviceContext) calculateSvc1(userData []byte) uint32 {
return s.calculateSvc(userData, s.localSid1, msg.SCW_1, s.lfsr1)
}
func (s *serviceContext) calculateSvc2(userData []byte) uint32 {
return s.calculateSvc(userData, s.localSid2, msg.SCW_2, s.lfsr2)
}
func (s *serviceContext) calculateSvc(userData []byte, sid uint32, scw uint32, l lfsr) uint32 {
crc := message.Rssp_I_Crc32C1(userData)
l.add(0)
return crc ^ sid ^ l.value ^ scw
}
func (s *serviceContext) handleRsdMsg(data []byte) {
if s.sseMsg == nil { //正在时序校正过程中
return
}
rsdMsg := &msg.RsdMsg{}
err := rsdMsg.Decode(data)
if err != nil {
logger().Error("解析RSD数据出错", "error", err)
return
}
//校验
validateResult := s.validateRsdMsg(rsdMsg)
if validateResult == 0 {
return
} else if validateResult == 2 {
//开启时序校正流程
}
//流程处理
cmdInfos := msg.CmdInfos{}
err = cmdInfos.Decode(rsdMsg.UserData)
if err != nil {
logger().Error("解析命令信息出错", "error", err)
return
}
//驱动
for i, cmdInfo := range cmdInfos {
sectionIndexConfig := s.ciSectionIndexConfigs[i]
err := fi.AxleSectionDrstDrive(s.sim.World, sectionIndexConfig.SectionId, cmdInfo.DRST)
if err != nil {
logger().Error("驱动计轴直接复位出错", "error", err)
}
err = fi.AxleSectionPdrstDrive(s.sim.World, sectionIndexConfig.SectionId, cmdInfo.PDRST)
if err != nil {
logger().Error("驱动计轴预复位出错", "error", err)
}
}
}
func (s *serviceContext) handleSseMsg(data []byte) {
sseMsg := &msg.SseMsg{}
err := sseMsg.Decode(data)
if err != nil {
logger().Error("解析SSE数据出错", "error", err)
return
}
//校验
if !s.validateSseMsg(sseMsg) {
return
}
//回复
s.lastSeqParam = sseMsg.SeqNum
ssrMsg := msg.SsrMsg{
MsgHeader: msg.MsgHeader{
ProtocolType: msg.ProtocolType_Sync,
MessageType: msg.MessageType_SSR,
SourceAddr: s.sourceAddr,
TargetAddr: s.targetAddr,
},
SeqNumSsr: s.seqNum,
SeqNumSse: sseMsg.SeqNum,
SeqInit1: s.calculateSeqInit1(sseMsg.SeqEnq1),
SeqInit2: s.calculateSeqInit1(sseMsg.SeqEnq1),
DataVer: 0x01,
}
err = s.client.Send(ssrMsg.Encode())
if err != nil {
logger().Error("发送SSR数据失败", "error", err)
}
}
func (s *serviceContext) handleSsrMsg(data []byte) {
if s.sseMsg == nil { //不在时序校正过程中
return
}
ssrMsg := &msg.SsrMsg{}
err := ssrMsg.Decode(data)
if err != nil {
logger().Error("解析SSR数据出错", "error", err)
return
}
//校验
if !s.validateSsrMsg(ssrMsg) {
return
}
//完成校正时序
s.sseMsg = nil
s.lastSeqParam = ssrMsg.SeqNumSsr
}
// 启动SSE流程
func (s *serviceContext) startSeeProgress() {
sseMsg := &msg.SseMsg{
MsgHeader: msg.MsgHeader{
ProtocolType: msg.ProtocolType_Sync,
MessageType: msg.MessageType_SSE,
SourceAddr: s.sourceAddr,
TargetAddr: s.targetAddr,
},
SeqNum: s.seqNum,
SeqEnq1: s.calculateSeqEnq1(),
SeqEnq2: s.calculateSeqEnq2(),
}
err := s.client.Send(sseMsg.Encode())
if err != nil {
logger().Error("发送SSE数据失败", "error", err)
} else {
s.sseMsg = sseMsg
s.sseWaitTimer = time.After(time.Duration(s.config.NetAConfig.Period*msg.Twait_sse) * time.Millisecond)
}
}
// 校验RSD消息
// return 0-时序校验之外的失败 1-成功 2-时序异常
func (s *serviceContext) validateRsdMsg(rsdMsg *msg.RsdMsg) int {
sourceAddr, _ := strconv.ParseUint(s.config.NetAConfig.SourceAddr, 16, 16)
if rsdMsg.SourceAddr != uint16(sourceAddr) {
logger().Error(fmt.Sprintf("源地址[%x]不正确[%s]", rsdMsg.SourceAddr, s.config.NetAConfig.SourceAddr))
return 0
}
targetAddr, _ := strconv.ParseUint(s.config.NetAConfig.TargetAddr, 16, 16)
if rsdMsg.TargetAddr != uint16(targetAddr) {
logger().Error(fmt.Sprintf("目的地址[%x]不正确[%s]", rsdMsg.TargetAddr, s.config.NetAConfig.TargetAddr))
return 0
}
if len(rsdMsg.UserData) != len(s.ciSectionIndexConfigs) {
logger().Error(fmt.Sprintf("用户数据长度[%d]与配置长度[%d]不符", len(rsdMsg.UserData), len(s.ciSectionIndexConfigs)))
return 0
}
if message.Rssp_I_Crc16(rsdMsg.UserData) != rsdMsg.Tail {
logger().Error(fmt.Sprintf("报文尾验证失败"))
return 0
}
if s.validateSvc1(rsdMsg.Svc1, rsdMsg.UserData) {
logger().Error(fmt.Sprintf("SVC1[%x]校验未通过", rsdMsg.Svc1))
return 2
}
if s.validateSvc2(rsdMsg.Svc2, rsdMsg.UserData) {
logger().Error(fmt.Sprintf("SVC2[%x]校验未通过", rsdMsg.Svc2))
return 2
}
return 1
}
func (s *serviceContext) validateSseMsg(sseMsg *msg.SseMsg) bool {
return true
}
func (s *serviceContext) validateSsrMsg(ssrMsg *msg.SsrMsg) bool {
if s.sseMsg.SeqNum != ssrMsg.SeqNumSse {
logger().Error(fmt.Sprintf("SSR的Ne[%d]与请求方不符[%d]", ssrMsg.SeqNumSse, s.sseMsg.SeqNum))
return false
}
return true
}
func (s *serviceContext) validateSvc1(svc uint32, userData []byte) bool {
return s.validateSvc(svc, msg.SCW_1, message.Rssp_I_Crc32C1(userData), s.sid1, s.sinit1)
}
func (s *serviceContext) validateSvc2(svc uint32, userData []byte) bool {
return s.validateSvc(svc, msg.SCW_2, message.Rssp_I_Crc32C2(userData), s.sid2, s.sinit2)
}
func (s *serviceContext) validateSvc(svc uint32, scw uint32, crc1 uint32, sid uint32, sinit uint32) bool {
seqParam := crc1 ^ svc ^ scw
if s.lastSeqParam == 0 {
s.lastSeqParam = seqParam
return true
}
for i := 0; i < s.config.NetAConfig.MaxDeviation; i++ {
seqLfsr := lfsr{value: sinit}
seqLfsr.add(s.lastSeqParam)
constLfsr := lfsr{value: sinit}
constLfsr.add(sid)
for j := 0; j < i; j++ {
seqLfsr.add(0)
constLfsr.add(0)
}
if seqLfsr.add(seqParam) == seqLfsr.add(sid) {
return true
}
}
return false
}
func (s *serviceContext) calculateSeqEnq1() uint32 {
return 0
}
func (s *serviceContext) calculateSeqEnq2() uint32 {
return 0
}
func (s *serviceContext) calculateSeqInit1(seqEnq1 uint32) uint32 {
return 0
}
func (s *serviceContext) calculateSeqInit2(seqEnq2 uint32) uint32 {
return 0
}
type lfsr struct {
value uint32
}
func (l *lfsr) add(x uint32) uint32 {
l.value = l.value ^ x
var carry bool
for i := 0; i < 32; i++ {
carry = l.value&0x80000000 != 0
l.value = l.value << 1
if carry {
l.value ^= msg.T_POLY_1
}
}
return l.value
}
func logger() *slog.Logger {
loggerInit.Do(func() {
privateLogger = slog.Default().With("tag", logTag)
})
return privateLogger
}