package ms_manage import ( "context" "log/slog" "runtime/debug" "time" apiproto "joylink.club/bj-rtsts-server/grpcproto" "joylink.club/bj-rtsts-server/message_server/ms_api" ) type MsgServer struct { ms_api.IMsgServer ctx context.Context cancelFn context.CancelFunc } // 消息服务管理map var servers map[string]*MsgServer = make(map[string]*MsgServer) // 注册消息服务 func Register(server ms_api.IMsgServer) *MsgServer { ms := &MsgServer{ IMsgServer: server, } ctx, cancelFn := context.WithCancel(context.Background()) ms.ctx = ctx ms.cancelFn = cancelFn go run(ms) servers[server.GetChannel()] = ms return ms } // 注销消息服务 func Unregister(server ms_api.IMsgServer) { if server == nil { return } s := servers[server.GetChannel()] s.cancelFn() delete(servers, server.GetChannel()) } // 消息服务运行 func run(server *MsgServer) { defer func() { if err := recover(); err != nil { slog.Error("消息服务运行异常", "channel", server.GetChannel(), "error", err, "stack", string(debug.Stack())) debug.PrintStack() } }() for { select { case <-server.ctx.Done(): slog.Info("消息服务退出", "channel", server.GetChannel()) return default: } topicMsgs, err := server.OnTick() if err != nil { server.OnError(err) slog.Error("消息服务构建定时发送消息错误,服务退出", "channel", server.GetChannel(), "error", err) break } if len(topicMsgs) > 0 { for _, msg := range topicMsgs { apiproto.PublishMsg(msg.Channel, msg.Data) } } time.Sleep(server.GetInterval()) } }