package dynamics import ( "container/list" "encoding/binary" "errors" "fmt" "math" "net" "sync" "time" "github.com/panjf2000/gnet/v2" "go.uber.org/zap" "joylink.club/bj-rtsts-server/config" ) func init() { go func() { for { info := <-trainInfoChan for e := handlerList.Front(); e != nil; e = e.Next() { func() { defer func() { r := recover() if r != nil { zap.S().Errorf("列车信息处理函数报错", r) } }() handler := e.Value.(TrainInfoHandler) handler(info) }() } } }() } type TurnoutInfoFunc func() []*TurnoutInfo var ( running bool mutex sync.Mutex turnoutInfoFunc TurnoutInfoFunc //道岔消息生命信号 turnoutLifeSignal uint16 //列车消息生命信号 trainLifeSignal uint16 //列车生命信号是否初始化 trainLifeSignalInit bool //用于处理生命信号循环使用产生的各种特殊处理 limit uint16 = 10000 //列车消息队列 trainInfoChan chan *TrainInfo = make(chan *TrainInfo) ) var handlerList list.List type TrainInfoHandler func(info *TrainInfo) type udpServer struct { gnet.BuiltinEventEngine eng gnet.Engine addr string multicore bool eventHandlers []gnet.EventHandler } func (server *udpServer) OnBoot(eng gnet.Engine) gnet.Action { server.eng = eng zap.S().Infof("udp server with multi-core=%t is listening on %s\n", server.multicore, server.addr) return gnet.None } // OnTraffic 接收到数据后的解析 func (server *udpServer) OnTraffic(c gnet.Conn) gnet.Action { defer func() { if r := recover(); r != nil { zap.L().Error("udp服务数据解析异常", zap.Any("panic", r)) } }() buf, _ := c.Next(-1) lifeSignal := binary.BigEndian.Uint16(buf[0:2]) if !trainLifeSignalInit { trainLifeSignalInit = true } else if trainLifeSignal < limit { if lifeSignal < trainLifeSignal || lifeSignal > trainLifeSignal-limit { zap.S().Debugf("丢弃列车信息[%d-%d]", lifeSignal, trainLifeSignal) return gnet.None } } else if trainLifeSignal < math.MaxUint16-10000 { if lifeSignal < trainLifeSignal { zap.S().Debugf("丢弃列车信息[%d-%d]", lifeSignal, trainLifeSignal) return gnet.None } } else { if lifeSignal < trainLifeSignal && lifeSignal > trainLifeSignal+10000 { zap.S().Debugf("丢弃列车信息[%d-%d]", lifeSignal, trainLifeSignal) return gnet.None } } trainLifeSignal = lifeSignal trainInfo := decoderDynamicsTrainInfo(buf) trainInfoChan <- trainInfo return gnet.None } func RunUdpServer() { server := &udpServer{addr: fmt.Sprintf("udp://:%d", config.Config.Dynamics.UdpLocalPort), multicore: false} err := gnet.Run(server, server.addr, gnet.WithMulticore(server.multicore)) zap.L().Fatal("udp服务启动失败", zap.Error(err)) } func Run(tiFunc TurnoutInfoFunc) error { mutex.Lock() defer mutex.Unlock() trainLifeSignalInit = false return runSendTurnoutStateTask(tiFunc) } // 注册数据操作 func RegisterTrainInfoHandler(handler TrainInfoHandler) { handlerList.PushBack(handler) } func Stop() { mutex.Lock() defer mutex.Unlock() turnoutInfoFunc = nil } // 动力学消息发送消息 func sendDynamicsMsg(buf []byte) error { defer func() { if r := recover(); r != nil { zap.S().Error("发送道岔信息失败", r) } }() addr := fmt.Sprintf("%v:%v", config.Config.Dynamics.Ip, config.Config.Dynamics.UdpRemotePort) remoteAddr, _ := net.ResolveUDPAddr("udp", addr) conn, err := net.DialUDP("udp", nil, remoteAddr) if err != nil { zap.S().Error("UDP通信失败", err) return err } defer func(conn *net.UDPConn) { err := conn.Close() if err != nil { zap.S().Error(err) } }(conn) _, err = conn.Write(buf) return err } // 给动力学发送列车信息 func SendDynamicsTrainMsg(buf []byte) error { defer func() { if r := recover(); r != nil { zap.S().Error("发送道岔信息失败", r) } }() addr := fmt.Sprintf("%v:%v", config.Config.Dynamics.Ip, config.Config.Dynamics.UdpRemoteTrainPort) remoteAddr, _ := net.ResolveUDPAddr("udp", addr) conn, err := net.DialUDP("udp", nil, remoteAddr) if err != nil { zap.S().Error("UDP通信失败", err) return err } defer func(conn *net.UDPConn) { err := conn.Close() if err != nil { zap.S().Error(err) } }(conn) _, err = conn.Write(buf) return err } // 发送道岔状态任务 func runSendTurnoutStateTask(tiFunc TurnoutInfoFunc) error { if running { return nil } if tiFunc == nil { return errors.New("tiFunc不能为空") } turnoutInfoFunc = tiFunc go func() { defer func() { if r := recover(); r != nil { fmt.Println("Recovered from panic:", r) } }() tick := time.Tick(50 * time.Millisecond) for range tick { if turnoutInfoFunc == nil { continue } slice := turnoutInfoFunc() for _, turnoutInfo := range slice { turnoutInfo.lifeSignal = turnoutLifeSignal // sendTurnoutInfo 发送道岔信息 err := sendDynamicsMsg(encoderDynamicsTurnout(turnoutInfo)) if err != nil { zap.S().Error(err) } } turnoutLifeSignal++ } }() return nil } // 解析动力学的列车信息 func decoderDynamicsTrainInfo(buf []byte) *TrainInfo { trainInfo := &TrainInfo{} trainInfo.LifeSignal = binary.BigEndian.Uint16(buf[0:2]) trainInfo.Number = buf[2] trainInfo.Len = binary.BigEndian.Uint16(buf[3:5]) trainInfo.Link = buf[5] trainInfo.LinkOffset = binary.BigEndian.Uint32(buf[6:10]) trainInfo.Slope = binary.BigEndian.Uint16(buf[10:12]) b := buf[12] trainInfo.UpSlope = (b & (1 << 7)) != 0 trainInfo.Up = (b & (1 << 6)) != 0 trainInfo.TotalResistance = binary.BigEndian.Uint32(buf[14:18]) trainInfo.AirResistance = binary.BigEndian.Uint32(buf[18:22]) trainInfo.SlopeResistance = binary.BigEndian.Uint32(buf[22:26]) trainInfo.CurveResistance = binary.BigEndian.Uint32(buf[26:30]) trainInfo.Speed = math.Float32frombits(binary.BigEndian.Uint32(buf[30:34])) trainInfo.HeadSpeed1 = math.Float32frombits(binary.BigEndian.Uint32(buf[34:38])) trainInfo.HeadSpeed2 = math.Float32frombits(binary.BigEndian.Uint32(buf[38:42])) trainInfo.TailSpeed1 = math.Float32frombits(binary.BigEndian.Uint32(buf[42:46])) trainInfo.TailSpeed2 = math.Float32frombits(binary.BigEndian.Uint32(buf[46:50])) trainInfo.HeadRadarSpeed = math.Float32frombits(binary.BigEndian.Uint32(buf[50:54])) trainInfo.TailRadarSpeed = math.Float32frombits(binary.BigEndian.Uint32(buf[54:58])) return trainInfo } // 将道岔转为动力学的消息 func encoderDynamicsTurnout(info *TurnoutInfo) []byte { var data []byte data = binary.BigEndian.AppendUint16(data, info.lifeSignal) data = binary.BigEndian.AppendUint16(data, info.Code) var b byte if info.NPosition { b |= 1 << 7 } if info.RPosition { b |= 1 << 6 } data = append(data, b) return data }