package apiproto import ( "time" "go.uber.org/zap" ) // 消息服务 type IMsgServer interface { // 获取通道名 getChannelName() string // 发送消息间隔时间,单位ms getInterval() time.Duration // 全量信息 allMsgData(params map[string]string) []byte // 定时发送的消息 onTick() []TopicMsg } // 消息实体 type TopicMsg struct { // 通道名称 channalName string // 消息信息 data []byte } // 消息类型服务集合 var serverMap = make(map[string]*IMsgServer) // 消息服务退出通道 var serverExitChannelMap = make(map[string]chan bool) // 服务运行 func doServerRun(tick *time.Ticker, server IMsgServer, exitChannel chan bool) { // 循环推送信息 for { <-tick.C topicMsgs := server.onTick() if len(topicMsgs) == 0 { continue } for _, msg := range topicMsgs { PublishMsg(msg.channalName, msg.data) } select { case <-exitChannel: return default: } } } // 注册服务 func RegisterMsgServer(server IMsgServer) { if client == nil { InitClient() } serverMap[server.getChannelName()] = &server if server.getInterval() > 0 { exitChannel := make(chan bool) serverExitChannelMap[server.getChannelName()] = exitChannel tick := time.NewTicker(server.getInterval()) go func() { defer func() { if r := recover(); r != nil { zap.S().Debug("定时器发生错误,%v\n", r) } // 重新启动,防止服务卡死 doServerRun(tick, server, exitChannel) }() // 循环推送信息 doServerRun(tick, server, exitChannel) }() } } // 注销消息服务 func UnRegisterMsgServer(key string) { channel := serverExitChannelMap[key] if channel != nil { // 定时任务取消 channel <- false delete(serverExitChannelMap, key) // 删除集合信息 delete(serverMap, key) } } // 获取消息服务 func GetMsgServer(key string) *IMsgServer { return serverMap[key] }