recover处添加堆栈打印

消息服务代码结构调整
This commit is contained in:
walker 2023-10-19 13:15:07 +08:00
parent ce10a3198c
commit fa0167271f
4 changed files with 126 additions and 63 deletions

View File

@ -39,10 +39,11 @@ func InitSimulationRouter(api *gin.RouterGroup, authMiddleware *jwt.GinJWTMiddle
// 初始化地图信息
initPublishMapInfo()
apiproto.RegisterMsgServer(&apiproto.SimulationServer{})
apiproto.RegisterMsgServer(&apiproto.SimulationIBPServer{})
apiproto.RegisterMsgServer(&apiproto.SimulationPSLServer{})
apiproto.RegisterMsgServer(&apiproto.MemoryChangeServer{SimulationMap: make(map[string]*state.SimulationStatus)})
apiproto.InitClient()
apiproto.Register(&apiproto.SimulationServer{})
apiproto.Register(&apiproto.SimulationIBPServer{})
apiproto.Register(&apiproto.SimulationPSLServer{})
apiproto.Register(&apiproto.MemoryChangeServer{SimulationMap: make(map[string]*state.SimulationStatus)})
}
func initPublishMapInfo() {

View File

@ -1,7 +1,9 @@
package apiproto
import (
context "context"
"log/slog"
"runtime/debug"
"time"
)
@ -26,69 +28,126 @@ type TopicMsg struct {
data []byte
}
// 消息类型服务集合
var serverMap = make(map[string]*IMsgServer)
// 消息服务退出通道
var serverExitChannelMap = make(map[string]chan bool)
// 服务运行
func doServerRun(tick *time.Ticker, server IMsgServer, exitChannel chan bool) {
// 循环推送信息
for {
<-tick.C
topicMsgs := server.onTick()
if len(topicMsgs) == 0 {
continue
}
for _, msg := range topicMsgs {
PublishMsg(msg.channalName, msg.data)
}
select {
case <-exitChannel:
return
default:
}
}
type MsgServer struct {
IMsgServer
ctx context.Context
cancelFn context.CancelFunc
}
// 注册服务
func RegisterMsgServer(server IMsgServer) {
if client == nil {
InitClient()
}
serverMap[server.getChannelName()] = &server
if server.getInterval() > 0 {
exitChannel := make(chan bool)
serverExitChannelMap[server.getChannelName()] = exitChannel
tick := time.NewTicker(server.getInterval())
go func() {
defer func() {
if r := recover(); r != nil {
slog.Debug("定时器发生错误,%v\n", r)
}
// 重新启动,防止服务卡死
doServerRun(tick, server, exitChannel)
}()
// 循环推送信息
doServerRun(tick, server, exitChannel)
}()
// 消息服务管理map
var servers map[string]*MsgServer = make(map[string]*MsgServer)
// 注册消息服务
func Register(server IMsgServer) *MsgServer {
ms := &MsgServer{
IMsgServer: server,
}
ctx, cancelFn := context.WithCancel(context.Background())
ms.ctx = ctx
ms.cancelFn = cancelFn
go run(ms)
servers[server.getChannelName()] = ms
return ms
}
// 注销消息服务
func UnRegisterMsgServer(key string) {
channel := serverExitChannelMap[key]
if channel != nil {
// 定时任务取消
channel <- false
delete(serverExitChannelMap, key)
// 删除集合信息
delete(serverMap, key)
func Unregister(key string) {
server := servers[key]
if server == nil {
return
}
server.cancelFn()
delete(servers, key)
}
// 消息服务运行
func run(server *MsgServer) {
defer func() {
if err := recover(); err != nil {
slog.Error("消息服务运行异常", "serverChannelName", server.getChannelName(), "error", err, "stack", string(debug.Stack()))
debug.PrintStack()
}
}()
for {
select {
case <-server.ctx.Done():
slog.Info("消息服务退出", "serverChannelName", server.getChannelName())
return
default:
}
topicMsgs := server.onTick()
if len(topicMsgs) > 0 {
for _, msg := range topicMsgs {
PublishMsg(msg.channalName, msg.data)
}
}
time.Sleep(server.getInterval())
}
}
// 获取消息服务
func GetMsgServer(key string) *IMsgServer {
return serverMap[key]
}
// // 消息类型服务集合
// var serverMap = make(map[string]*IMsgServer)
// // 消息服务退出通道
// var serverExitChannelMap = make(map[string]chan bool)
// // 服务运行
// func doServerRun(tick *time.Ticker, server IMsgServer, exitChannel chan bool) {
// // 循环推送信息
// for {
// <-tick.C
// topicMsgs := server.onTick()
// if len(topicMsgs) == 0 {
// continue
// }
// for _, msg := range topicMsgs {
// PublishMsg(msg.channalName, msg.data)
// }
// select {
// case <-exitChannel:
// return
// default:
// }
// }
// }
// // 注册服务
// func RegisterMsgServer(server IMsgServer) {
// if client == nil {
// InitClient()
// }
// serverMap[server.getChannelName()] = &server
// if server.getInterval() > 0 {
// exitChannel := make(chan bool)
// serverExitChannelMap[server.getChannelName()] = exitChannel
// tick := time.NewTicker(server.getInterval())
// go func() {
// defer func() {
// if r := recover(); r != nil {
// slog.Debug("定时器发生错误,%v\n", r)
// }
// // 重新启动,防止服务卡死
// doServerRun(tick, server, exitChannel)
// }()
// // 循环推送信息
// doServerRun(tick, server, exitChannel)
// }()
// }
// }
// // 注销消息服务
// func UnRegisterMsgServer(key string) {
// channel := serverExitChannelMap[key]
// if channel != nil {
// // 定时任务取消
// channel <- false
// delete(serverExitChannelMap, key)
// // 删除集合信息
// delete(serverMap, key)
// }
// }
// // 获取消息服务
// func GetMsgServer(key string) *IMsgServer {
// return serverMap[key]
// }

View File

@ -98,6 +98,7 @@ func CustomRecoveryWithSlog(logger *slog.Logger, stack bool, recovery gin.Recove
zap.String("request", string(httpRequest)),
zap.String("stack", string(debug.Stack())),
)
debug.PrintStack()
} else {
logger.Error("[Recovery from panic]",
zap.Time("time", time.Now()),

View File

@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"net/http"
"runtime/debug"
"time"
"joylink.club/bj-rtsts-server/config"
@ -214,7 +215,8 @@ var turnoutStateLifeSignal uint16 //道岔消息生命信号
func (d *dynamics) sendTurnoutStateTask(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
slog.Error("定时发送道岔状态任务异常", err)
slog.Error("定时发送道岔状态任务异常", "error", err, "stack", string(debug.Stack()))
debug.PrintStack()
}
}()
for {
@ -224,7 +226,7 @@ func (d *dynamics) sendTurnoutStateTask(ctx context.Context) {
default:
}
turnoutStates := d.manager.CollectDynamicsTurnoutInfo()
slog.Debug("发送道岔状态", "count", len(turnoutStates))
// slog.Debug("发送道岔状态", "count", len(turnoutStates))
for _, state := range turnoutStates {
turnoutStateLifeSignal++
state.LifeSignal = turnoutStateLifeSignal