package btm_vobc import ( "context" "encoding/hex" "encoding/json" "fmt" "github.com/google/uuid" "github.com/snksoft/crc" "joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/const/balise_const" "joylink.club/bj-rtsts-server/dto/state_proto" "joylink.club/bj-rtsts-server/third_party/message" "joylink.club/bj-rtsts-server/third_party/udp" "log/slog" "math" "reflect" "strconv" "time" "strings" "sync" ) type BtmVobcManage interface { GetBtmVobcConfig() config.BtmVobcConfig GetAllTrain() []*state_proto.TrainState GetConnVobcTrain() *state_proto.TrainState } type BtmVobcService interface { Start(btmVobcManage BtmVobcManage) Stop() SendData(data []byte) AppendBaliseMsgForTrain(vobcBtm *state_proto.VobcBtmState, baliseId string, baliseSource []byte, arriveTime int64) UpdateTrainLeave(vobcBtm *state_proto.VobcBtmState, baliseId string, leaveTime int64) RemoveBaliseFirst(vobcBtm *state_proto.VobcBtmState) } type BtmVobcClient struct { calFun context.CancelFunc client udp.UdpClient server udp.UdpServer manage BtmVobcManage } var ( btmVobcLocker sync.Mutex btmVobcClient *BtmVobcClient btmVobcBaliseLocker sync.Mutex //最新接受数据时间 reviceTimeStamp int64 ) func Default() BtmVobcService { defer btmVobcLocker.Unlock() btmVobcLocker.Lock() if btmVobcClient == nil { btmVobcClient = &BtmVobcClient{} } return btmVobcClient } // 缓存列车经过的应答器报文 func (b *BtmVobcClient) AppendBaliseMsgForTrain(vobcBtm *state_proto.VobcBtmState, baliseId string, baliseSource []byte, arriveTime int64) { defer btmVobcBaliseLocker.Unlock() btmVobcBaliseLocker.Lock() baliseHex := hex.EncodeToString(baliseSource) tel := &state_proto.VobcBtmState_TelegramState{Telegram: baliseHex, BaliseId: baliseId, ArriveTime: arriveTime} if len(vobcBtm.TelegramState) == 0 { vobcBtm.TelegramState = append(vobcBtm.TelegramState, tel) } else { bs := crc.CalculateCRC(crc.CRC32, baliseSource) exists := false for _, d := range vobcBtm.TelegramState { sd, _ := hex.DecodeString(d.Telegram) t := crc.CalculateCRC(crc.CRC32, sd) if t == bs { exists = true } } if !exists { if len(vobcBtm.TelegramState) >= 8 { vobcBtm.TelegramState = append(vobcBtm.TelegramState[1:], tel) } else { vobcBtm.TelegramState = append(vobcBtm.TelegramState, tel) } } } } func (b *BtmVobcClient) UpdateTrainLeave(vobcBtm *state_proto.VobcBtmState, baliseId string, leaveTime int64) { defer btmVobcBaliseLocker.Unlock() btmVobcBaliseLocker.Lock() if vobcBtm.TelegramState == nil || len(vobcBtm.TelegramState) == 0 { return } for _, bs := range vobcBtm.TelegramState { if bs.BaliseId == baliseId { bs.LeaveTime = leaveTime } } } func (b *BtmVobcClient) RemoveBaliseFirst(vobcBtm *state_proto.VobcBtmState) { defer btmVobcBaliseLocker.Unlock() btmVobcBaliseLocker.Lock() if vobcBtm.TelegramState == nil || len(vobcBtm.TelegramState) == 0 { return } vobcBtm.TelegramState = vobcBtm.TelegramState[1:] } func (b *BtmVobcClient) Start(btmVobcManage BtmVobcManage) { cfg := btmVobcManage.GetBtmVobcConfig() if !cfg.Open { slog.Info("11号线 btm vobc配置未开启...") return } udpServer := udp.NewServer(fmt.Sprintf("%v:%d", cfg.LocalUdpIp, cfg.LocalUdpPort), b.handleBtmVobcFrames) err := udpServer.Listen() if err != nil { slog.Error("11号线 btm VOBC 服务启动失败...") return } // udpClient := udp.NewClient(fmt.Sprintf("%s:%d", cfg.RemoteIp, cfg.RemoteUdpPort)) b.manage = btmVobcManage b.server = udpServer b.client = udpClient reviceTimeStamp = time.Now().UnixMilli() ctx, calFun := context.WithCancel(context.Background()) b.calFun = calFun go b.checkTrainTTLIsOver(ctx) } func (b *BtmVobcClient) checkTrainTTLIsOver(ctx context.Context) { defer btmVobcBaliseLocker.Unlock() btmVobcBaliseLocker.Lock() for { select { case <-ctx.Done(): return default: } if time.Now().UnixMilli()-reviceTimeStamp < 1000 { return } trains := b.manage.GetAllTrain() if len(trains) > 0 { slog.Info("检测网络中断情况 前沿ttl 超时...") newTel := make([]*state_proto.VobcBtmState_TelegramState, 0) for _, train := range b.manage.GetAllTrain() { for _, state := range train.VobcBtm.TelegramState { if time.Now().UnixMilli()-state.ArriveTime < 1000 { newTel = append(newTel, state) } } } } time.Sleep(time.Second * 1) } } func (b *BtmVobcClient) handleBtmVobcFrames(cfs []byte) { reviceTimeStamp = time.Now().UnixMilli() train := b.manage.GetConnVobcTrain() if train == nil { //slog.Error("vobc btm 未找到VOBC连接的列车...") return } requestId := uuid.New().String() slog.Info(fmt.Sprintf("获取到vobc btm原始数据:%v, requestId:%v", hex.EncodeToString(cfs), requestId)) frameType, dataText, err := message.BtmVobcDecode(cfs) if err != nil { slog.Error(fmt.Sprintf("%v,请求id:%v", err, requestId)) return } receiveDataTime := time.Now().UnixMicro() decodePayMicoTime := (receiveDataTime - receiveDataTime) / 100 if frameType == message.COMMAND_TYPE { idCommand := &message.BtmVobcIdCommand{} idCommand.Decode(dataText) slog.Info(fmt.Sprintf("成功接受btm vobc的id命令帧,requestId:%v,接受时间(微秒):%v", requestId, receiveDataTime)) b.packets(requestId, idCommand.VobcLifeNum, idCommand.AutoIdFrame, receiveDataTime, decodePayMicoTime, train.VobcBtm) } else if frameType == message.REQUEST_TYPE { slog.Info(fmt.Sprintf("成功接受btm vobc的请求帧,requestId:%v,接受时间(微秒):%v", requestId, receiveDataTime)) req := &message.BtmVobcReq{} req.Decode(dataText) b.RequestFramePackets(req.VobcLifeNum, req.AutoIdFrame, requestId, receiveDataTime, decodePayMicoTime, req, train.VobcBtm) } else { slog.Error(fmt.Sprintf("btm vobc 解析未知命令帧类型:0x%v,原始数据:%v,长度:%v,requestId:%v", strconv.FormatInt(int64(frameType), 16), hex.EncodeToString(cfs), len(cfs), requestId)) return } } func createFreeBalisePacketString() string { return strings.Repeat("00", balise_const.UserTelegramByteLen) } func createFreeBalisePackets() []byte { str := createFreeBalisePacketString() d, _ := hex.DecodeString(str) return d } const MAX_SEND_COUNT = 3 // 请求帧 func (b *BtmVobcClient) RequestFramePackets(vobcLifeNum uint32, autoId byte, requestId string, receiveTime int64, decodePayMicoTime int64, req *message.BtmVobcReq, vobcbtm *state_proto.VobcBtmState) { if req.FrameStatus == message.REQ_FRAME_STATUS_BOOT && req.MessageType == message.REQ_PACKETS_TYPE_BOOT { slog.Info(fmt.Sprintf("接受请求帧,准备发送空闲帧数据 帧状态:0x%v,消息类型:0x%v ,requestId:%v", strconv.FormatInt(int64(req.FrameStatus), 16), strconv.FormatInt(int64(req.MessageType), 16), requestId)) vobcbtm.TelegramState = make([]*state_proto.VobcBtmState_TelegramState, 0) vobcbtm.History = make(map[uint32]*state_proto.VobcBtmState_VobcBtmHistoryState) b.packets(requestId, vobcLifeNum, autoId, receiveTime, decodePayMicoTime, vobcbtm) } else if req.FrameStatus == message.REQ_FRAME_STATUS_OK { //帧正确,删除之前发送的数据 slog.Info(fmt.Sprintf("接受请求帧,帧正确,删除之前发送的数据requestId:%v", requestId)) delete(vobcbtm.History, uint32(req.MessageSerial)) } else if req.FrameStatus == message.REQ_FRAME_STATUS_ERROR { //帧不正确 重新发送2次,如果2次后仍然不正确,则删除之前发送的数据 dd := vobcbtm.History[uint32(req.MessageSerial)] if dd.SendCount > MAX_SEND_COUNT { slog.Info(fmt.Sprintf("接受请求帧,帧不正确,重发超过2次,删除之前发送的数据,准备发送新数据 requestId:%v", requestId)) sendBalisePacket := dd.BalisePacket delete(vobcbtm.History, uint32(req.MessageSerial)) if vobcbtm.TelegramState[0].Telegram == sendBalisePacket { b.RemoveBaliseFirst(vobcbtm) } if len(vobcbtm.TelegramState) == 0 { b.balisePacketsFree(requestId, time.Now().UnixMicro(), req.VobcLifeNum, req.AutoIdFrame, vobcbtm) } else { b.balisePackets(requestId, vobcbtm.TelegramState[0], receiveTime, decodePayMicoTime, req.VobcLifeNum, req.AutoIdFrame, vobcbtm) } } else { dd.SendCount = dd.SendCount + 1 if dd.IsFreePacket { freeMsg := &message.BtmVobcMsgFree{} if b.unmarshalJson(dd.SendTelegram, freeMsg) == nil { freeMsg.FreeMsg = createFreeBalisePackets() repeatData := freeMsg.Encode() logStr := fmt.Sprintf("重新发送空闲帧数据,发送次数:%v,帧系列id:%v,数据:%v,requestId:%v", dd.SendCount, freeMsg.MsgSerial, hex.EncodeToString(repeatData), requestId) slog.Info(logStr) err := b.client.Send(repeatData) if err != nil { slog.Error(logStr, err) } } } else { packetMsg := &message.BtmVobcMessage{} if b.unmarshalJson(dd.SendTelegram, packetMsg) == nil { balise, _ := hex.DecodeString(dd.BalisePacket) packetMsg.BtmMsg = balise baliseData := packetMsg.Encode() logStr := fmt.Sprintf("重新发送空闲帧数据,发送次数:%v,帧系列id:%v 数据:%v ,requestId:%v", dd.SendCount, packetMsg.MsgSerial, hex.EncodeToString(baliseData), requestId) slog.Info(logStr) err := b.client.Send(baliseData) if err != nil { slog.Error(logStr, err) } } } } } } func (b *BtmVobcClient) unmarshalJson(jsonStr string, obj any) error { err := json.Unmarshal([]byte(jsonStr), obj) if err != nil { slog.Error(fmt.Sprintf("vobc btm 数据转换失败 source:%v,对象:%v", jsonStr, reflect.TypeOf(obj).Name())) return err } return nil } // 有应答器报文 func (b *BtmVobcClient) balisePackets(requestId string, tel *state_proto.VobcBtmState_TelegramState, receiveTime int64, decodePayMicoTime int64, vobcLifeNum uint32, autoCommandId byte, vobcbtm *state_proto.VobcBtmState) { data, e := hex.DecodeString(tel.Telegram) if e != nil { slog.Error(fmt.Sprintf("解析应答器报文失败应答器报文长度:%v", tel.Telegram), e) return } if len(data) < balise_const.UserTelegramByteLen { for i := 0; i < balise_const.UserTelegramByteLen-len(data); i++ { data = append(data, 0) } } //前沿时间 var fttl uint16 = 0 tmpFttl := int64(math.Abs(float64(time.Now().UnixMilli() - tel.ArriveTime))) if tmpFttl >= 0xffff { fttl = 0xffff } else { fttl = uint16(tmpFttl) } var bttl uint16 = 0 tmpBttl := int64(math.Abs(float64(time.Now().UnixMilli() - tel.ArriveTime))) if tmpBttl >= 0xffff { bttl = 0xffff } else { bttl = uint16(tmpBttl) } repTimeMicro := (receiveTime - time.Now().UnixMicro()) / 100 baliseMsg := &message.BtmVobcMessage{FontTtl: fttl, BtmStatus: message.BTM_STSTUS_NORMAL, DecodeTime: uint16(decodePayMicoTime), BackTtl: bttl, ResponseTime: byte(repTimeMicro), MsgSerial: message.GetAutoMessageId(), VobcLifeNum: vobcLifeNum, BaseBtmVobc: message.BaseBtmVobc{AutoIdFrame: autoCommandId}} u32MsgId := uint32(baliseMsg.MsgSerial) jsonArr, _ := json.Marshal(baliseMsg) baliseMsg.BtmMsg = data baliseData := baliseMsg.Encode() baliseMsgHex := hex.EncodeToString(baliseData) vobcbtm.History[u32MsgId] = &state_proto.VobcBtmState_VobcBtmHistoryState{SendCount: 1, PacketSendId: u32MsgId, VobcLifeNum: vobcLifeNum, IsFreePacket: false, SendTelegram: string(jsonArr), BalisePacket: tel.Telegram} logStr := fmt.Sprintf("发送btm vobc 报文数据 报文序列id:%v 报文内容:%v 长度:%v ,requestId:%v", u32MsgId, baliseMsgHex, len(baliseData), requestId) slog.Info(logStr) err := b.client.Send(baliseData) if err != nil { slog.Error(logStr, err) return } return } // 无应答器报文 func (b *BtmVobcClient) balisePacketsFree(requestId string, receiveTime int64, vobcLifeNum uint32, autoCommandId byte, vobcbtm *state_proto.VobcBtmState) { repTimeMicro := (receiveTime - time.Now().UnixMicro()) / 100 data := createFreeBalisePackets() freeMsg := &message.BtmVobcMsgFree{BtmStatus: message.BTM_STSTUS_NORMAL, Fun1: 0xffff, Fun2: 0x00CF, Fun3: uint16(0), Fun4: uint16(0), FreeMsg: data, RespTime: byte(repTimeMicro), MsgSerial: message.GetAutoMessageId(), VobcLifeNum: vobcLifeNum, BaseBtmVobc: message.BaseBtmVobc{AutoIdFrame: autoCommandId}} u32MsgId := uint32(freeMsg.MsgSerial) jsonArr, _ := json.Marshal(freeMsg) vobcbtm.History[u32MsgId] = &state_proto.VobcBtmState_VobcBtmHistoryState{SendCount: 1, PacketSendId: u32MsgId, VobcLifeNum: vobcLifeNum, IsFreePacket: true, SendTelegram: string(jsonArr)} freeData := freeMsg.Encode() dataEncode := hex.EncodeToString(freeData) logStr := fmt.Sprintf("发送btm vobc 报文序列id:%v 空闲帧报文:%v 长度:%v ,requestId:%v", u32MsgId, dataEncode, len(freeData), requestId) slog.Info(logStr) err := b.client.Send(freeData) if err != nil { slog.Error(logStr, err) return } } // 应答器报文或空报文 func (b *BtmVobcClient) packets(requestId string, vobcLifeNum uint32, autoIdFrame byte, receiveTime int64, decodePayMicoTime int64, vobcbtm *state_proto.VobcBtmState) { if len(vobcbtm.TelegramState) == 0 { b.balisePacketsFree(requestId, receiveTime, vobcLifeNum, autoIdFrame, vobcbtm) } else { b.balisePackets(requestId, vobcbtm.TelegramState[0], receiveTime, decodePayMicoTime, vobcLifeNum, autoIdFrame, vobcbtm) } } func (b *BtmVobcClient) SendData(data []byte) { if b.client != nil { slog.Info(fmt.Sprintf("发送btm vobc 报文:%v 长度:%v", hex.EncodeToString(data), len(data))) err := b.client.Send(data) if err != nil { slog.Error("发送btm vobc 报文失败:", err) return } } } func (b *BtmVobcClient) Stop() { if b.server != nil { b.calFun() b.server.Close() } if b.client != nil { b.client.Close() } }