rts-sim-testing-service/grpcproto/message.go
walker fa0167271f recover处添加堆栈打印
消息服务代码结构调整
2023-10-19 13:15:07 +08:00

154 lines
3.2 KiB
Go

package apiproto
import (
context "context"
"log/slog"
"runtime/debug"
"time"
)
// 消息服务
type IMsgServer interface {
// 获取通道名
getChannelName() string
// 发送消息间隔时间,单位ms
getInterval() time.Duration
// 定时发送的消息
onTick() []TopicMsg
}
// 消息实体
type TopicMsg struct {
// 通道名称
channalName string
// 消息信息
data []byte
}
type MsgServer struct {
IMsgServer
ctx context.Context
cancelFn context.CancelFunc
}
// 消息服务管理map
var servers map[string]*MsgServer = make(map[string]*MsgServer)
// 注册消息服务
func Register(server IMsgServer) *MsgServer {
ms := &MsgServer{
IMsgServer: server,
}
ctx, cancelFn := context.WithCancel(context.Background())
ms.ctx = ctx
ms.cancelFn = cancelFn
go run(ms)
servers[server.getChannelName()] = ms
return ms
}
// 注销消息服务
func Unregister(key string) {
server := servers[key]
if server == nil {
return
}
server.cancelFn()
delete(servers, key)
}
// 消息服务运行
func run(server *MsgServer) {
defer func() {
if err := recover(); err != nil {
slog.Error("消息服务运行异常", "serverChannelName", server.getChannelName(), "error", err, "stack", string(debug.Stack()))
debug.PrintStack()
}
}()
for {
select {
case <-server.ctx.Done():
slog.Info("消息服务退出", "serverChannelName", server.getChannelName())
return
default:
}
topicMsgs := server.onTick()
if len(topicMsgs) > 0 {
for _, msg := range topicMsgs {
PublishMsg(msg.channalName, msg.data)
}
}
time.Sleep(server.getInterval())
}
}
// // 消息类型服务集合
// var serverMap = make(map[string]*IMsgServer)
// // 消息服务退出通道
// var serverExitChannelMap = make(map[string]chan bool)
// // 服务运行
// func doServerRun(tick *time.Ticker, server IMsgServer, exitChannel chan bool) {
// // 循环推送信息
// for {
// <-tick.C
// topicMsgs := server.onTick()
// if len(topicMsgs) == 0 {
// continue
// }
// for _, msg := range topicMsgs {
// PublishMsg(msg.channalName, msg.data)
// }
// select {
// case <-exitChannel:
// return
// default:
// }
// }
// }
// // 注册服务
// func RegisterMsgServer(server IMsgServer) {
// if client == nil {
// InitClient()
// }
// serverMap[server.getChannelName()] = &server
// if server.getInterval() > 0 {
// exitChannel := make(chan bool)
// serverExitChannelMap[server.getChannelName()] = exitChannel
// tick := time.NewTicker(server.getInterval())
// go func() {
// defer func() {
// if r := recover(); r != nil {
// slog.Debug("定时器发生错误,%v\n", r)
// }
// // 重新启动,防止服务卡死
// doServerRun(tick, server, exitChannel)
// }()
// // 循环推送信息
// doServerRun(tick, server, exitChannel)
// }()
// }
// }
// // 注销消息服务
// func UnRegisterMsgServer(key string) {
// channel := serverExitChannelMap[key]
// if channel != nil {
// // 定时任务取消
// channel <- false
// delete(serverExitChannelMap, key)
// // 删除集合信息
// delete(serverMap, key)
// }
// }
// // 获取消息服务
// func GetMsgServer(key string) *IMsgServer {
// return serverMap[key]
// }