package vobc import ( "container/list" "encoding/binary" "fmt" "math" "net" "sync" "github.com/panjf2000/gnet/v2" "go.uber.org/zap" "joylink.club/bj-rtsts-server/config" ) var ( running bool mutex sync.Mutex receiveTrainLifeSignal uint16 //接收列车消息生命信号 sendTrainLifeSignal uint16 //发送的列车消息生命信号 trainLifeSignalInit bool //列车生命信号是否初始化 limit uint16 = 10000 //用于处理生命信号循环使用产生的各种特殊处理 vobcMsgInfoChan chan []byte = make(chan []byte) //列车消息队列 //处理方法列表 handlerList list.List ) func init() { go func() { for { info := <-vobcMsgInfoChan for e := handlerList.Front(); e != nil; e = e.Next() { func() { defer func() { r := recover() if r != nil { zap.S().Errorf("列车信息处理函数报错") } }() handler := e.Value.(VobcDataHandler) handler(info) }() } } }() } type VobcDataHandler func([]byte) type udpServer struct { gnet.BuiltinEventEngine eng gnet.Engine addr string multicore bool } // udp 启动时运行 func (server *udpServer) OnBoot(eng gnet.Engine) gnet.Action { server.eng = eng zap.S().Infof("vobc udp server with multi-core=%t is listening on %s", server.multicore, server.addr) return gnet.None } // OnTraffic 接收到数据后的解析 func (server *udpServer) OnTraffic(c gnet.Conn) gnet.Action { defer func() { if r := recover(); r != nil { zap.L().Error("vobc udp服务数据解析异常", zap.Any("panic", r)) } }() buf, _ := c.Next(-1) lifeSignal := binary.BigEndian.Uint16(buf[0:2]) if !trainLifeSignalInit { trainLifeSignalInit = true } else if receiveTrainLifeSignal < limit { if lifeSignal < receiveTrainLifeSignal || lifeSignal > receiveTrainLifeSignal-limit { zap.S().Debugf("丢弃列车信息[%d-%d]", lifeSignal, receiveTrainLifeSignal) return gnet.None } } else if receiveTrainLifeSignal < math.MaxUint16-10000 { if lifeSignal < receiveTrainLifeSignal { zap.S().Debugf("丢弃列车信息[%d-%d]", lifeSignal, receiveTrainLifeSignal) return gnet.None } } else { if lifeSignal < receiveTrainLifeSignal && lifeSignal > receiveTrainLifeSignal+10000 { zap.S().Debugf("丢弃列车信息[%d-%d]", lifeSignal, receiveTrainLifeSignal) return gnet.None } } receiveTrainLifeSignal = lifeSignal // trainInfo := decoderVobcTrainInfo(buf) //消息队列 vobcMsgInfoChan <- buf return gnet.None } // 注册处理vobc处理方法 func RegisterTrainInfoHandler(handler VobcDataHandler) { handlerList.PushBack(handler) } // 创建UDP服务 func RunUdpServer() { server := &udpServer{addr: fmt.Sprintf("udp://:%d", config.Config.Vobc.LocalPort), multicore: false} err := gnet.Run(server, server.addr, gnet.WithMulticore(server.multicore)) zap.L().Fatal("vobc udp服务启动失败", zap.Error(err)) } // 发送列车速度到VOBC func SendTrainSpeedTask(speed float32) error { if running { return nil } mutex.Lock() defer mutex.Unlock() trainInfo := &SendTrainInfo{ LifeSignal: sendTrainLifeSignal, Speed: uint16(speed), } err := sendVobcMsg(encoderVobcTrainInfo(trainInfo)) if err != nil { zap.S().Error(err) } sendTrainLifeSignal++ return err } // UDP停止 func Stop() { mutex.Lock() defer mutex.Unlock() running = false } // Vobc 消息发送消息 func sendVobcMsg(buf []byte) error { defer func() { if r := recover(); r != nil { zap.S().Error("发送列车速度信息失败", r) } }() addr := fmt.Sprintf("%v:%v", config.Config.Vobc.Ip, config.Config.Vobc.RemotePort) remoteAddr, _ := net.ResolveUDPAddr("udp", addr) conn, err := net.DialUDP("udp", nil, remoteAddr) if err != nil { zap.S().Error("UDP通信失败", err) return err } defer func(conn *net.UDPConn) { err := conn.Close() if err != nil { zap.S().Error(err) } }(conn) _, err = conn.Write(buf) return err } // 将道岔转为动力学的消息 func encoderVobcTrainInfo(info *SendTrainInfo) []byte { var data []byte data = binary.BigEndian.AppendUint16(data, info.LifeSignal) data = binary.BigEndian.AppendUint16(data, info.Speed) return data }