rts-sim-testing-service/dynamics/udp.go
walker 47e3b5c6f7 升级go版本到1.21
修改日志库使用slog
修改相关打印日志调用
2023-10-12 10:10:23 +08:00

270 lines
7.1 KiB
Go

package dynamics
import (
"container/list"
"encoding/binary"
"errors"
"fmt"
"log/slog"
"math"
"net"
"sync"
"time"
"github.com/panjf2000/gnet/v2"
"go.uber.org/zap"
"joylink.club/bj-rtsts-server/config"
)
func init() {
go func() {
for {
info := <-trainInfoChan
for e := handlerList.Front(); e != nil; e = e.Next() {
func() {
defer func() {
r := recover()
if r != nil {
slog.Error("列车信息处理函数报错", "error", r)
}
}()
handler := e.Value.(TrainInfoHandler)
handler(info)
}()
}
}
}()
}
type TurnoutInfoFunc func() []*TurnoutInfo
var (
running bool
mutex sync.Mutex
turnoutInfoFunc TurnoutInfoFunc
//道岔消息生命信号
turnoutLifeSignal uint16
//列车消息生命信号
trainLifeSignal uint16
//列车生命信号是否初始化
trainLifeSignalInit bool
//用于处理生命信号循环使用产生的各种特殊处理
limit uint16 = 10000
//列车消息队列
trainInfoChan chan *TrainInfo = make(chan *TrainInfo)
)
var handlerList list.List
type TrainInfoHandler func(info *TrainInfo)
type udpServer struct {
gnet.BuiltinEventEngine
eng gnet.Engine
addr string
multicore bool
}
func (server *udpServer) OnBoot(eng gnet.Engine) gnet.Action {
server.eng = eng
slog.Info("udp server启动", "multicore", server.multicore, "listeningOn", server.addr)
return gnet.None
}
// OnTraffic 接收到数据后的解析
func (server *udpServer) OnTraffic(c gnet.Conn) gnet.Action {
defer func() {
if r := recover(); r != nil {
zap.L().Error("udp服务数据解析异常", zap.Any("panic", r))
}
}()
buf, _ := c.Next(-1)
lifeSignal := binary.BigEndian.Uint16(buf[0:2])
if !trainLifeSignalInit {
trainLifeSignalInit = true
} else if trainLifeSignal < limit {
if lifeSignal < trainLifeSignal || lifeSignal > trainLifeSignal-limit {
slog.Debug("丢弃列车信息", "lifeSignal", lifeSignal, "trainLifeSignal", trainLifeSignal)
return gnet.None
}
} else if trainLifeSignal < math.MaxUint16-10000 {
if lifeSignal < trainLifeSignal {
slog.Debug("丢弃列车信息", "lifeSignal", lifeSignal, "trainLifeSignal", trainLifeSignal)
return gnet.None
}
} else {
if lifeSignal < trainLifeSignal && lifeSignal > trainLifeSignal+10000 {
slog.Debug("丢弃列车信息", "lifeSignal", lifeSignal, "trainLifeSignal", trainLifeSignal)
return gnet.None
}
}
trainLifeSignal = lifeSignal
trainInfo := decoderDynamicsTrainInfo(buf)
trainInfoChan <- trainInfo
return gnet.None
}
func RunUdpServer() {
if !config.Config.Dynamics.Open {
return
}
server := &udpServer{addr: fmt.Sprintf("udp://:%d", config.Config.Dynamics.UdpLocalPort), multicore: false}
err := gnet.Run(server, server.addr, gnet.WithMulticore(server.multicore))
zap.L().Fatal("udp服务启动失败", zap.Error(err))
}
func Run(tiFunc TurnoutInfoFunc) error {
if !config.Config.Dynamics.Open {
return nil
}
mutex.Lock()
defer mutex.Unlock()
trainLifeSignalInit = false
return runSendTurnoutStateTask(tiFunc)
}
// 注册数据操作
func RegisterTrainInfoHandler(handler TrainInfoHandler) {
handlerList.PushBack(handler)
}
func Stop() {
mutex.Lock()
defer mutex.Unlock()
turnoutInfoFunc = nil
}
// 动力学消息发送消息
func sendDynamicsMsg(buf []byte) error {
defer func() {
if r := recover(); r != nil {
slog.Error("发送道岔信息失败", r)
}
}()
addr := fmt.Sprintf("%v:%v", config.Config.Dynamics.Ip, config.Config.Dynamics.UdpRemotePort)
remoteAddr, _ := net.ResolveUDPAddr("udp", addr)
conn, err := net.DialUDP("udp", nil, remoteAddr)
if err != nil {
slog.Error("UDP通信失败", err)
return err
}
defer func(conn *net.UDPConn) {
err := conn.Close()
if err != nil {
slog.Error("关闭UDP连接失败", "error", err)
}
}(conn)
_, err = conn.Write(buf)
return err
}
// 给动力学发送列车信息
func SendDynamicsTrainMsg(buf []byte) error {
defer func() {
if r := recover(); r != nil {
slog.Error("发送道岔信息失败", r)
}
}()
addr := fmt.Sprintf("%v:%v", config.Config.Dynamics.Ip, config.Config.Dynamics.UdpRemoteTrainPort)
remoteAddr, _ := net.ResolveUDPAddr("udp", addr)
conn, err := net.DialUDP("udp", nil, remoteAddr)
if err != nil {
slog.Error("UDP通信失败", err)
return err
}
defer func(conn *net.UDPConn) {
err := conn.Close()
if err != nil {
slog.Error("关闭UDP连接失败", "error", err)
}
}(conn)
_, err = conn.Write(buf)
return err
}
// 发送道岔状态任务
func runSendTurnoutStateTask(tiFunc TurnoutInfoFunc) error {
if running {
return nil
}
if tiFunc == nil {
return errors.New("tiFunc不能为空")
}
turnoutInfoFunc = tiFunc
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered from panic:", r)
}
// 重新运行
doDynamicsTurnoutUDPRun()
}()
// 动力学UDP逻辑运行
doDynamicsTurnoutUDPRun()
}()
return nil
}
func doDynamicsTurnoutUDPRun() {
tick := time.Tick(50 * time.Millisecond)
for range tick {
if turnoutInfoFunc == nil {
continue
}
slice := turnoutInfoFunc()
for _, turnoutInfo := range slice {
turnoutInfo.lifeSignal = turnoutLifeSignal
// sendTurnoutInfo 发送道岔信息
err := sendDynamicsMsg(encoderDynamicsTurnout(turnoutInfo))
if err != nil {
slog.Error("发送道岔信息到动力学失败", "error", err)
}
}
turnoutLifeSignal++
}
}
// 解析动力学的列车信息
func decoderDynamicsTrainInfo(buf []byte) *TrainInfo {
trainInfo := &TrainInfo{}
trainInfo.LifeSignal = binary.BigEndian.Uint16(buf[0:2])
trainInfo.Number = buf[2]
trainInfo.Len = binary.BigEndian.Uint16(buf[3:5])
trainInfo.Link = buf[5]
trainInfo.LinkOffset = binary.BigEndian.Uint32(buf[6:10])
trainInfo.Slope = binary.BigEndian.Uint16(buf[10:12])
b := buf[12]
trainInfo.UpSlope = (b & (1 << 7)) != 0
trainInfo.Up = (b & (1 << 6)) != 0
trainInfo.TotalResistance = int32(binary.BigEndian.Uint32(buf[14:18]))
trainInfo.AirResistance = int32(binary.BigEndian.Uint32(buf[18:22]))
trainInfo.SlopeResistance = int32(binary.BigEndian.Uint32(buf[22:26]))
trainInfo.CurveResistance = int32(binary.BigEndian.Uint32(buf[26:30]))
trainInfo.Speed = math.Float32frombits(binary.BigEndian.Uint32(buf[30:34]))
trainInfo.HeadSpeed1 = math.Float32frombits(binary.BigEndian.Uint32(buf[34:38]))
trainInfo.HeadSpeed2 = math.Float32frombits(binary.BigEndian.Uint32(buf[38:42]))
trainInfo.TailSpeed1 = math.Float32frombits(binary.BigEndian.Uint32(buf[42:46]))
trainInfo.TailSpeed2 = math.Float32frombits(binary.BigEndian.Uint32(buf[46:50]))
trainInfo.HeadRadarSpeed = math.Float32frombits(binary.BigEndian.Uint32(buf[50:54]))
trainInfo.TailRadarSpeed = math.Float32frombits(binary.BigEndian.Uint32(buf[54:58]))
trainInfo.Acceleration = math.Float32frombits(binary.BigEndian.Uint32(buf[58:62]))
return trainInfo
}
// 将道岔转为动力学的消息
func encoderDynamicsTurnout(info *TurnoutInfo) []byte {
var data []byte
data = binary.BigEndian.AppendUint16(data, info.lifeSignal)
data = binary.BigEndian.AppendUint16(data, info.Code)
var b byte
if info.NPosition {
b |= 1 << 7
}
if info.RPosition {
b |= 1 << 6
}
data = append(data, b)
return data
}