package apiproto import ( context "context" "log/slog" "runtime/debug" "time" ) // 消息服务 type IMsgServer interface { // 获取通道名 getChannelName() string // 发送消息间隔时间,单位ms getInterval() time.Duration // 定时发送的消息 onTick() []TopicMsg } // 消息实体 type TopicMsg struct { // 通道名称 channalName string // 消息信息 data []byte } type MsgServer struct { IMsgServer ctx context.Context cancelFn context.CancelFunc } // 消息服务管理map var servers map[string]*MsgServer = make(map[string]*MsgServer) // 注册消息服务 func Register(server IMsgServer) *MsgServer { ms := &MsgServer{ IMsgServer: server, } ctx, cancelFn := context.WithCancel(context.Background()) ms.ctx = ctx ms.cancelFn = cancelFn go run(ms) servers[server.getChannelName()] = ms return ms } // 注销消息服务 func Unregister(key string) { server := servers[key] if server == nil { return } server.cancelFn() delete(servers, key) } // 消息服务运行 func run(server *MsgServer) { defer func() { if err := recover(); err != nil { slog.Error("消息服务运行异常", "serverChannelName", server.getChannelName(), "error", err, "stack", string(debug.Stack())) debug.PrintStack() } }() for { select { case <-server.ctx.Done(): slog.Info("消息服务退出", "serverChannelName", server.getChannelName()) return default: } topicMsgs := server.onTick() if len(topicMsgs) > 0 { for _, msg := range topicMsgs { PublishMsg(msg.channalName, msg.data) } } time.Sleep(server.getInterval()) } } // // 消息类型服务集合 // var serverMap = make(map[string]*IMsgServer) // // 消息服务退出通道 // var serverExitChannelMap = make(map[string]chan bool) // // 服务运行 // func doServerRun(tick *time.Ticker, server IMsgServer, exitChannel chan bool) { // // 循环推送信息 // for { // <-tick.C // topicMsgs := server.onTick() // if len(topicMsgs) == 0 { // continue // } // for _, msg := range topicMsgs { // PublishMsg(msg.channalName, msg.data) // } // select { // case <-exitChannel: // return // default: // } // } // } // // 注册服务 // func RegisterMsgServer(server IMsgServer) { // if client == nil { // InitClient() // } // serverMap[server.getChannelName()] = &server // if server.getInterval() > 0 { // exitChannel := make(chan bool) // serverExitChannelMap[server.getChannelName()] = exitChannel // tick := time.NewTicker(server.getInterval()) // go func() { // defer func() { // if r := recover(); r != nil { // slog.Debug("定时器发生错误,%v\n", r) // } // // 重新启动,防止服务卡死 // doServerRun(tick, server, exitChannel) // }() // // 循环推送信息 // doServerRun(tick, server, exitChannel) // }() // } // } // // 注销消息服务 // func UnRegisterMsgServer(key string) { // channel := serverExitChannelMap[key] // if channel != nil { // // 定时任务取消 // channel <- false // delete(serverExitChannelMap, key) // // 删除集合信息 // delete(serverMap, key) // } // } // // 获取消息服务 // func GetMsgServer(key string) *IMsgServer { // return serverMap[key] // }