rts-sim-testing-service/third_party/axle_device/beijing12/service.go

539 lines
17 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package beijing12
import (
"context"
"fmt"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/sys_error"
"joylink.club/bj-rtsts-server/third_party/axle_device/beijing12/msg"
"joylink.club/bj-rtsts-server/third_party/message"
"joylink.club/bj-rtsts-server/third_party/udp"
"joylink.club/bj-rtsts-server/ts/simulation/wayside/memory"
"joylink.club/rtsssimulation/component"
"joylink.club/rtsssimulation/entity"
"joylink.club/rtsssimulation/fi"
"joylink.club/rtsssimulation/repository/model/proto"
"log/slog"
"runtime/debug"
"strconv"
"sync"
"time"
)
var ( //日志
logTag = "[北京12号线计轴通信]"
privateLogger *slog.Logger
loggerInit sync.Once
)
// key-集中站编号
var contextMap = make(map[string]*serviceContext)
var mu = sync.Mutex{}
type serviceContext struct {
sim *memory.VerifySimulation
config config.RsspAxleConfig
server udp.UdpServer
client udp.UdpClient
cancelFunc context.CancelFunc
ciSectionIndexConfigs []*proto.CiSectionCodePoint
remoteAddr uint16 //联锁地址 从配置中的16进制字符串转来的
localAddr uint16 //计轴地址 从配置中的16进制字符串转来的
sid1 uint32 //联锁SID1 从配置中的16进制字符串转来的
sid2 uint32 //联锁SID2 从配置中的16进制字符串转来的
localSid1 uint32 //计轴SID1 从配置中的16进制字符串转来的
localSid2 uint32 //计轴SID2 从配置中的16进制字符串转来的
sinit1 uint32 //SINT16 从配置中的16进制字符串转来的
sinit2 uint32 //SINT26 从配置中的16进制字符串转来的
dataver1 uint32 //DATAVER1 从配置中的16进制字符串转来的
dataver2 uint32 //DATAVER2 从配置中的16进制字符串转来的
msgChan <-chan []byte //消息队列
seqNum uint32 //当前的序列号
lastSeqParam1 uint32 //最近一次的有效时序参数 sid1^t1(n)
lastSeqParam2 uint32 //最近一次的有效时序参数 sid2^t2(n)
lfsr1 *lfsr //用来计算时间戳的lfsr
lfsr2 *lfsr //用来计算时间戳的lfsr
sseMsg *msg.SseMsg //发送出去的时序校验请求
sseWaitTimer <-chan time.Time //sse超时定时器
}
func Start(simulation *memory.VerifySimulation) {
mu.Lock()
defer mu.Unlock()
//检查服务启动条件
rsspConfig := simulation.GetRunConfig().RsspAxleConfig
if !rsspConfig.Open {
return
}
if contextMap[rsspConfig.StationCode] != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]服务已启动", logTag, rsspConfig.StationCode)))
}
station := simulation.Repo.FindStationByStationName(rsspConfig.StationCode)
if station == nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]不存在", logTag, rsspConfig.StationCode)))
}
ref := simulation.Repo.GetCentralizedStationRef(station.Id())
if ref == nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]关联数据不存在", logTag, rsspConfig.StationCode)))
}
if len(ref.SectionCodePoints) == 0 {
logger().Warn(fmt.Sprintf("集中站[%s]无区段编码数据,服务不启动", rsspConfig.StationCode))
return
}
sourceAddr, err := strconv.ParseUint(rsspConfig.NetAConfig.RemoteAddr, 16, 16)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析源地址[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.RemoteAddr)))
}
targetAddr, err := strconv.ParseUint(rsspConfig.NetAConfig.LocalAddr, 16, 16)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析目的地址[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.LocalAddr)))
}
sid1, err := strconv.ParseUint(rsspConfig.NetAConfig.Sid1, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析联锁SID1[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.Sid1)))
}
sid2, err := strconv.ParseUint(rsspConfig.NetAConfig.Sid2, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析联锁SID2[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.Sid2)))
}
localSid1, err := strconv.ParseUint(rsspConfig.NetAConfig.LocalSid1, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析计轴SID1[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.LocalSid1)))
}
localSid2, err := strconv.ParseUint(rsspConfig.NetAConfig.LocalSid2, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析计轴SID2[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.LocalSid2)))
}
sinit1, err := strconv.ParseUint(rsspConfig.NetAConfig.Sinit1, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析SINT1[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.Sinit1)))
}
sinit2, err := strconv.ParseUint(rsspConfig.NetAConfig.Sinit2, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析SINT2[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.Sinit2)))
}
dataver1, err := strconv.ParseUint(rsspConfig.NetAConfig.DataVer1, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析DATAVER_1[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.DataVer1)))
}
dataver2, err := strconv.ParseUint(rsspConfig.NetAConfig.DataVer2, 16, 32)
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]解析DATAVER_2[%s]出错", logTag, rsspConfig.StationCode, rsspConfig.NetAConfig.DataVer2)))
}
//服务初始化及启动
msgChan := make(chan []byte, 100)
serviceCtx := &serviceContext{
sim: simulation,
config: rsspConfig,
ciSectionIndexConfigs: ref.SectionCodePoints,
remoteAddr: uint16(sourceAddr),
localAddr: uint16(targetAddr),
sid1: uint32(sid1),
sid2: uint32(sid2),
localSid1: uint32(localSid1),
localSid2: uint32(localSid2),
sinit1: uint32(sinit1),
sinit2: uint32(sinit2),
dataver1: uint32(dataver1),
dataver2: uint32(dataver2),
msgChan: msgChan,
lfsr1: &lfsr{value: uint32(localSid1), poly: msg.T_POLY_1},
lfsr2: &lfsr{value: uint32(localSid2), poly: msg.T_POLY_2},
}
netAConfig := rsspConfig.NetAConfig
server := udp.NewServer(fmt.Sprintf("%s:%d", netAConfig.LocalIp, netAConfig.LocalPort), func(b []byte) {
logger().Info(fmt.Sprintf("收到数据:%x", b))
msgChan <- b
})
client := udp.NewClient(fmt.Sprintf("%s:%d", netAConfig.RemoteIp, netAConfig.RemotePort))
err = server.Listen()
if err != nil {
panic(sys_error.New(fmt.Sprintf("%s集中站[%s]服务启动失败", logTag, rsspConfig.StationCode)))
} else {
logger().Info(fmt.Sprintf("监听[:%d]", netAConfig.LocalPort))
}
serviceCtx.server = server
serviceCtx.client = client
cancelCtx, cancelFunc := context.WithCancel(context.Background())
serviceCtx.cancelFunc = cancelFunc
serviceCtx.runCollectTask(cancelCtx)
serviceCtx.runHandleMsgTask(cancelCtx)
contextMap[rsspConfig.StationCode] = serviceCtx
////sse测试测完删除
//time.Sleep(1 * time.Second)
//serviceCtx.seqNum++
//serviceCtx.lfsr1.add(0)
//serviceCtx.lfsr2.add(0)
//serviceCtx.startSeeProgress()
}
func Stop(simulation *memory.VerifySimulation) {
mu.Lock()
defer mu.Unlock()
rsspConfig := simulation.GetRunConfig().RsspAxleConfig
serviceCtx := contextMap[rsspConfig.StationCode]
if serviceCtx == nil {
return
}
serviceCtx.stop()
delete(contextMap, rsspConfig.StationCode)
}
func (s *serviceContext) stop() {
if s.server != nil {
s.server.Close()
}
if s.client != nil {
s.client.Close()
}
s.cancelFunc()
}
func (s *serviceContext) runCollectTask(ctx context.Context) {
go func() {
defer func() {
if err := recover(); err != nil {
logger().Error("状态收集任务出错,记录后重启", "error", err, "stack", string(debug.Stack()))
s.runCollectTask(ctx)
}
}()
for range time.Tick(time.Duration(s.config.NetAConfig.Period) * time.Millisecond) {
select {
case <-ctx.Done():
return
default:
frame := s.collect()
data := frame.Encode()
err := s.client.Send(data)
if err != nil {
logger().Error("发送状态数据失败", "error", err)
} else {
logger().Info(fmt.Sprintf("发送数据:%x", data))
}
}
}
}()
}
func (s *serviceContext) runHandleMsgTask(ctx context.Context) {
go func() {
defer func() {
if err := recover(); err != nil {
logger().Error("消息处理任务出错,记录后重启", "error", err, "stack", string(debug.Stack()))
s.runHandleMsgTask(ctx)
}
}()
for {
select {
case <-ctx.Done():
return
case <-s.sseWaitTimer: //SSE消息等待超时
s.sseMsg = nil
case data := <-s.msgChan:
messageType := msg.GetMessageType(data)
switch messageType {
case msg.MessageType_A, msg.MessageType_B:
s.handleRsdMsg(data)
case msg.MessageType_SSE:
s.handleSseMsg(data)
case msg.MessageType_SSR:
s.handleSsrMsg(data)
default:
logger().Warn(fmt.Sprintf("未知的消息类型[%x]", messageType))
}
}
}
}()
}
func (s *serviceContext) collect() *msg.RsdMsgBuilder {
worldData := entity.GetWorldData(s.sim.World)
amdEntry := entity.FindAxleManageDevice(worldData, s.config.StationCode)
amd := component.AxleManageDeviceType.Get(amdEntry)
stateInfos := msg.StateInfos{}
for _, cfg := range s.ciSectionIndexConfigs {
sectionRuntime := amd.Adrs[cfg.SectionId]
sectionEntry, ok := entity.GetEntityByUid(s.sim.World, cfg.SectionId)
sectionState := component.PhysicalSectionStateType.Get(sectionEntry)
if !ok {
continue
}
stateInfos = append(stateInfos, &msg.StateInfo{
CLR: !sectionState.Occ,
OCC: sectionState.Occ,
RAC: sectionRuntime.Rac,
RJO: sectionRuntime.Rjo,
RJT: sectionRuntime.Rjt,
})
}
userData := []byte{0x00} //检查字节
userData = append(userData, stateInfos.Encode()...)
//更新序列号及时间戳
s.seqNum++
s.lfsr1.add(0)
s.lfsr2.add(0)
//构建消息
builder := &msg.RsdMsgBuilder{
MsgHeader: msg.MsgHeader{
ProtocolType: msg.ProtocolType_Sync,
MessageType: msg.MessageType_A,
SourceAddr: s.localAddr,
TargetAddr: s.remoteAddr,
},
SeqNum: s.seqNum,
Svc1: s.calculateSvc1(userData),
Svc2: s.calculateSvc2(userData),
UserData: userData,
}
return builder
}
func (s *serviceContext) calculateSvc1(userData []byte) uint32 {
return message.Rssp_I_Crc32C1(userData) ^ s.localSid1 ^ s.lfsr1.value ^ msg.SCW_1
}
func (s *serviceContext) calculateSvc2(userData []byte) uint32 {
return message.Rssp_I_Crc32C2(userData) ^ s.localSid2 ^ s.lfsr2.value ^ msg.SCW_2
}
func (s *serviceContext) handleRsdMsg(data []byte) {
if s.sseMsg == nil { //正在时序校正过程中
return
}
rsdMsg := &msg.RsdMsg{}
err := rsdMsg.Decode(data)
if err != nil {
logger().Error("解析RSD数据出错", "error", err)
return
}
//校验
validateResult := s.validateRsdMsg(rsdMsg)
if validateResult == 0 {
return
} else if validateResult == 2 {
//开启时序校正流程
logger().Error("时序校验失败,开始时序校正")
return
}
//流程处理
cmdInfos := msg.CmdInfos{}
err = cmdInfos.Decode(rsdMsg.UserData[1:]) //用户数据第一个字节是[检查字节]
if err != nil {
logger().Error("解析命令信息出错", "error", err)
return
}
//驱动
for i, cmdInfo := range cmdInfos {
sectionIndexConfig := s.ciSectionIndexConfigs[i]
err := fi.AxleSectionDrstDrive(s.sim.World, sectionIndexConfig.SectionId, cmdInfo.DRST)
if err != nil {
logger().Error("驱动计轴直接复位出错", "error", err)
}
err = fi.AxleSectionPdrstDrive(s.sim.World, sectionIndexConfig.SectionId, cmdInfo.PDRST)
if err != nil {
logger().Error("驱动计轴预复位出错", "error", err)
}
}
}
func (s *serviceContext) handleSseMsg(data []byte) {
sseMsg := &msg.SseMsg{}
err := sseMsg.Decode(data)
if err != nil {
logger().Error("解析SSE数据出错", "error", err)
return
}
//校验
if !s.validateSseMsg(sseMsg) {
logger().Error("SSE数据校验失败")
return
}
logger().Info("SSE数据通过校验")
//回复
ssrMsg := msg.SsrMsg{
MsgHeader: msg.MsgHeader{
ProtocolType: msg.ProtocolType_Sync,
MessageType: msg.MessageType_SSR,
SourceAddr: s.remoteAddr,
TargetAddr: s.localAddr,
},
SeqNumSsr: s.seqNum,
SeqNumSse: sseMsg.SeqNum,
SeqInit1: s.calculateSeqInit1(sseMsg.SeqEnq1),
SeqInit2: s.calculateSeqInit1(sseMsg.SeqEnq1),
DataVer: 0x01,
}
ssrBytes := ssrMsg.Encode()
err = s.client.Send(ssrBytes)
if err != nil {
logger().Error("发送SSR数据失败", "error", err)
} else {
logger().Info(fmt.Sprintf("发送SSR数据%x", ssrBytes))
//更新本地数据
s.lastSeqParam1 = sseMsg.SeqEnq1
s.lastSeqParam2 = sseMsg.SeqEnq2
}
}
func (s *serviceContext) handleSsrMsg(data []byte) {
if s.sseMsg == nil { //不在时序校正过程中
logger().Warn("不在时序校正流程中丢弃SSR数据")
return
}
ssrMsg := &msg.SsrMsg{}
err := ssrMsg.Decode(data)
if err != nil {
logger().Error("解析SSR数据出错", "error", err)
return
}
//校验
if !s.validateSsrMsg(ssrMsg) {
logger().Error("SSR数据校验失败")
return
}
logger().Info("SSR数据通过校验")
//完成校正时序
s.sseMsg = nil
s.lastSeqParam1 = ssrMsg.SeqNumSsr
}
// 启动SSE流程
func (s *serviceContext) startSeeProgress() {
sseMsg := &msg.SseMsg{
MsgHeader: msg.MsgHeader{
ProtocolType: msg.ProtocolType_Sync,
MessageType: msg.MessageType_SSE,
SourceAddr: s.remoteAddr,
TargetAddr: s.localAddr,
},
SeqNum: s.seqNum,
SeqEnq1: s.calculateSeqEnq1(),
SeqEnq2: s.calculateSeqEnq2(),
}
sseBytes := sseMsg.Encode()
err := s.client.Send(sseBytes)
if err != nil {
logger().Error("发送SSE数据失败", "error", err)
} else {
logger().Info(fmt.Sprintf("发送SSE数据%x", sseBytes))
s.sseMsg = sseMsg
s.sseWaitTimer = time.After(time.Duration(s.config.NetAConfig.Period*msg.Twait_sse) * time.Millisecond)
}
}
// 校验RSD消息
// return 0-时序校验之外的失败 1-成功 2-时序异常
func (s *serviceContext) validateRsdMsg(rsdMsg *msg.RsdMsg) int {
if rsdMsg.SourceAddr != s.remoteAddr {
logger().Error(fmt.Sprintf("源地址[%x]不正确[%s]", rsdMsg.SourceAddr, s.config.NetAConfig.RemoteAddr))
return 0
}
if rsdMsg.TargetAddr != s.localAddr {
logger().Error(fmt.Sprintf("目的地址[%x]不正确[%s]", rsdMsg.TargetAddr, s.config.NetAConfig.LocalAddr))
return 0
}
if len(rsdMsg.UserData)-1 != len(s.ciSectionIndexConfigs) { //用户数据第一个字节是[检查字节]
logger().Error(fmt.Sprintf("命令数据长度[%d]与配置长度[%d]不符", len(rsdMsg.UserData), len(s.ciSectionIndexConfigs)))
return 0
}
if message.Rssp_I_Crc16(rsdMsg.UserData) != rsdMsg.Tail {
logger().Error(fmt.Sprintf("报文尾验证失败"))
return 0
}
if s.validateSvc1(rsdMsg.Svc1, rsdMsg.UserData) {
logger().Error(fmt.Sprintf("SVC1[%x]校验未通过", rsdMsg.Svc1))
return 2
}
if s.validateSvc2(rsdMsg.Svc2, rsdMsg.UserData) {
logger().Error(fmt.Sprintf("SVC2[%x]校验未通过", rsdMsg.Svc2))
return 2
}
return 1
}
func (s *serviceContext) validateSseMsg(sseMsg *msg.SseMsg) bool {
return true
}
func (s *serviceContext) validateSsrMsg(ssrMsg *msg.SsrMsg) bool {
if s.sseMsg.SeqNum != ssrMsg.SeqNumSse {
logger().Error(fmt.Sprintf("SSR的Ne[%d]与请求方不符[%d]", ssrMsg.SeqNumSse, s.sseMsg.SeqNum))
return false
}
return true
}
func (s *serviceContext) validateSvc1(svc uint32, userData []byte) bool {
return s.validateSvc(svc, msg.SCW_1, message.Rssp_I_Crc32C1(userData), s.sid1, s.sinit1, &s.lastSeqParam1)
}
func (s *serviceContext) validateSvc2(svc uint32, userData []byte) bool {
return s.validateSvc(svc, msg.SCW_2, message.Rssp_I_Crc32C2(userData), s.sid2, s.sinit2, &s.lastSeqParam2)
}
func (s *serviceContext) validateSvc(svc uint32, scw uint32, crc1 uint32, sid uint32, sinit uint32, lastSeqParam *uint32) bool {
seqParam := crc1 ^ svc ^ scw
if *lastSeqParam == 0 {
*lastSeqParam = seqParam
return true
}
for i := 0; i < s.config.NetAConfig.MaxDeviation; i++ {
seqLfsr := lfsr{value: sinit}
seqLfsr.add(*lastSeqParam)
constLfsr := lfsr{value: sinit}
constLfsr.add(sid)
for j := 0; j < i; j++ {
seqLfsr.add(0)
constLfsr.add(0)
}
if seqLfsr.add(seqParam) == seqLfsr.add(sid) {
*lastSeqParam = seqParam
return true
}
}
return false
}
func (s *serviceContext) calculateSeqEnq1() uint32 {
return s.localSid1 ^ s.lfsr1.value
}
func (s *serviceContext) calculateSeqEnq2() uint32 {
return s.localSid2 ^ s.lfsr2.value
}
func (s *serviceContext) calculateSeqInit1(seqEnq1 uint32) uint32 {
return seqEnq1 ^ s.sid1 ^ s.dataver1 ^ s.lfsr1.value
}
func (s *serviceContext) calculateSeqInit2(seqEnq2 uint32) uint32 {
return seqEnq2 ^ s.sid2 ^ s.dataver2 ^ s.lfsr2.value
}
type lfsr struct {
value uint32
poly uint32 //时间戳生成多项式
}
func (l *lfsr) add(x uint32) uint32 {
l.value = l.value ^ x
var carry bool
for i := 0; i < 32; i++ {
carry = l.value&0x80000000 != 0
l.value = l.value << 1
if carry {
l.value ^= l.poly
}
}
return l.value
}
func logger() *slog.Logger {
loggerInit.Do(func() {
privateLogger = slog.Default().With("tag", logTag)
})
return privateLogger
}