diff --git a/api/simulation.go b/api/simulation.go index 8d91e30..a226b62 100644 --- a/api/simulation.go +++ b/api/simulation.go @@ -39,10 +39,11 @@ func InitSimulationRouter(api *gin.RouterGroup, authMiddleware *jwt.GinJWTMiddle // 初始化地图信息 initPublishMapInfo() - apiproto.RegisterMsgServer(&apiproto.SimulationServer{}) - apiproto.RegisterMsgServer(&apiproto.SimulationIBPServer{}) - apiproto.RegisterMsgServer(&apiproto.SimulationPSLServer{}) - apiproto.RegisterMsgServer(&apiproto.MemoryChangeServer{SimulationMap: make(map[string]*state.SimulationStatus)}) + apiproto.InitClient() + apiproto.Register(&apiproto.SimulationServer{}) + apiproto.Register(&apiproto.SimulationIBPServer{}) + apiproto.Register(&apiproto.SimulationPSLServer{}) + apiproto.Register(&apiproto.MemoryChangeServer{SimulationMap: make(map[string]*state.SimulationStatus)}) } func initPublishMapInfo() { diff --git a/grpcproto/message.go b/grpcproto/message.go index 3a2371c..9aa4797 100644 --- a/grpcproto/message.go +++ b/grpcproto/message.go @@ -1,7 +1,9 @@ package apiproto import ( + context "context" "log/slog" + "runtime/debug" "time" ) @@ -26,69 +28,126 @@ type TopicMsg struct { data []byte } -// 消息类型服务集合 -var serverMap = make(map[string]*IMsgServer) - -// 消息服务退出通道 -var serverExitChannelMap = make(map[string]chan bool) - -// 服务运行 -func doServerRun(tick *time.Ticker, server IMsgServer, exitChannel chan bool) { - // 循环推送信息 - for { - <-tick.C - topicMsgs := server.onTick() - if len(topicMsgs) == 0 { - continue - } - for _, msg := range topicMsgs { - PublishMsg(msg.channalName, msg.data) - } - select { - case <-exitChannel: - return - default: - } - } +type MsgServer struct { + IMsgServer + ctx context.Context + cancelFn context.CancelFunc } -// 注册服务 -func RegisterMsgServer(server IMsgServer) { - if client == nil { - InitClient() - } - serverMap[server.getChannelName()] = &server - if server.getInterval() > 0 { - exitChannel := make(chan bool) - serverExitChannelMap[server.getChannelName()] = exitChannel - tick := time.NewTicker(server.getInterval()) - go func() { - defer func() { - if r := recover(); r != nil { - slog.Debug("定时器发生错误,%v\n", r) - } - // 重新启动,防止服务卡死 - doServerRun(tick, server, exitChannel) - }() - // 循环推送信息 - doServerRun(tick, server, exitChannel) - }() +// 消息服务管理map +var servers map[string]*MsgServer = make(map[string]*MsgServer) + +// 注册消息服务 +func Register(server IMsgServer) *MsgServer { + ms := &MsgServer{ + IMsgServer: server, } + ctx, cancelFn := context.WithCancel(context.Background()) + ms.ctx = ctx + ms.cancelFn = cancelFn + go run(ms) + servers[server.getChannelName()] = ms + return ms } // 注销消息服务 -func UnRegisterMsgServer(key string) { - channel := serverExitChannelMap[key] - if channel != nil { - // 定时任务取消 - channel <- false - delete(serverExitChannelMap, key) - // 删除集合信息 - delete(serverMap, key) +func Unregister(key string) { + server := servers[key] + if server == nil { + return + } + server.cancelFn() + delete(servers, key) +} + +// 消息服务运行 +func run(server *MsgServer) { + defer func() { + if err := recover(); err != nil { + slog.Error("消息服务运行异常", "serverChannelName", server.getChannelName(), "error", err, "stack", string(debug.Stack())) + debug.PrintStack() + } + }() + for { + select { + case <-server.ctx.Done(): + slog.Info("消息服务退出", "serverChannelName", server.getChannelName()) + return + default: + } + topicMsgs := server.onTick() + if len(topicMsgs) > 0 { + for _, msg := range topicMsgs { + PublishMsg(msg.channalName, msg.data) + } + } + time.Sleep(server.getInterval()) } } -// 获取消息服务 -func GetMsgServer(key string) *IMsgServer { - return serverMap[key] -} +// // 消息类型服务集合 +// var serverMap = make(map[string]*IMsgServer) + +// // 消息服务退出通道 +// var serverExitChannelMap = make(map[string]chan bool) + +// // 服务运行 +// func doServerRun(tick *time.Ticker, server IMsgServer, exitChannel chan bool) { +// // 循环推送信息 +// for { +// <-tick.C +// topicMsgs := server.onTick() +// if len(topicMsgs) == 0 { +// continue +// } +// for _, msg := range topicMsgs { +// PublishMsg(msg.channalName, msg.data) +// } +// select { +// case <-exitChannel: +// return +// default: +// } +// } +// } + +// // 注册服务 +// func RegisterMsgServer(server IMsgServer) { +// if client == nil { +// InitClient() +// } +// serverMap[server.getChannelName()] = &server +// if server.getInterval() > 0 { +// exitChannel := make(chan bool) +// serverExitChannelMap[server.getChannelName()] = exitChannel +// tick := time.NewTicker(server.getInterval()) +// go func() { +// defer func() { +// if r := recover(); r != nil { +// slog.Debug("定时器发生错误,%v\n", r) +// } +// // 重新启动,防止服务卡死 +// doServerRun(tick, server, exitChannel) +// }() +// // 循环推送信息 +// doServerRun(tick, server, exitChannel) +// }() +// } +// } + +// // 注销消息服务 +// func UnRegisterMsgServer(key string) { +// channel := serverExitChannelMap[key] +// if channel != nil { +// // 定时任务取消 +// channel <- false +// delete(serverExitChannelMap, key) +// // 删除集合信息 +// delete(serverMap, key) +// } +// } + +// // 获取消息服务 +// func GetMsgServer(key string) *IMsgServer { +// return serverMap[key] +// } diff --git a/init.go b/init.go index d1bb973..a969e63 100644 --- a/init.go +++ b/init.go @@ -98,6 +98,7 @@ func CustomRecoveryWithSlog(logger *slog.Logger, stack bool, recovery gin.Recove zap.String("request", string(httpRequest)), zap.String("stack", string(debug.Stack())), ) + debug.PrintStack() } else { logger.Error("[Recovery from panic]", zap.Time("time", time.Now()), diff --git a/third_party/dynamics/dynamics.go b/third_party/dynamics/dynamics.go index a151c0f..52d928f 100644 --- a/third_party/dynamics/dynamics.go +++ b/third_party/dynamics/dynamics.go @@ -7,6 +7,7 @@ import ( "fmt" "log/slog" "net/http" + "runtime/debug" "time" "joylink.club/bj-rtsts-server/config" @@ -214,7 +215,8 @@ var turnoutStateLifeSignal uint16 //道岔消息生命信号 func (d *dynamics) sendTurnoutStateTask(ctx context.Context) { defer func() { if err := recover(); err != nil { - slog.Error("定时发送道岔状态任务异常", err) + slog.Error("定时发送道岔状态任务异常", "error", err, "stack", string(debug.Stack())) + debug.PrintStack() } }() for { @@ -224,7 +226,7 @@ func (d *dynamics) sendTurnoutStateTask(ctx context.Context) { default: } turnoutStates := d.manager.CollectDynamicsTurnoutInfo() - slog.Debug("发送道岔状态", "count", len(turnoutStates)) + // slog.Debug("发送道岔状态", "count", len(turnoutStates)) for _, state := range turnoutStates { turnoutStateLifeSignal++ state.LifeSignal = turnoutStateLifeSignal