package apiproto import ( "time" "go.uber.org/zap" ) // 消息服务 type IMsgServer interface { // 获取通道名 getChannelName() string // 发送消息间隔时间,单位ms getInterval() time.Duration // 全量信息 allMsgData(params map[string]string) []byte // 定时发送的消息 onTick() []TopicMsg } // 消息实体 type TopicMsg struct { // 通道名称 channalName string // 消息信息 data []byte } // 消息类型服务集合 var serverMap = make(map[string]*IMsgServer) // 消息服务退出通道 var serverExitChannelMap = make(map[string]chan bool) // 注册服务 func RegisterMsgServer(server IMsgServer) { if client == nil { InitClient() } serverMap[server.getChannelName()] = &server if server.getInterval() > 0 { exitChannel := make(chan bool) serverExitChannelMap[server.getChannelName()] = exitChannel tick := time.Tick(server.getInterval()) go func() { defer func() { if r := recover(); r != nil { zap.S().Debug("定时器发生错误,%v\n", r) } // ticker.Stop() // 意外退出时关闭定时器 }() // 循环推送信息 for { <-tick topicMsgs := server.onTick() if topicMsgs != nil && len(topicMsgs) != 0 { for _, msg := range topicMsgs { PublishMsg(msg.channalName, msg.data) } } select { case <-exitChannel: return default: } } }() } } // 用户初次进入系统后执行消息发送(这里可以使用对个人消息发送,但目前命名空间没有调试通) // 这个有问题暂时注释掉 //func Subscription() { // for key, server := range serverMap { // data := (*server).allMsgData() // if data != nil && len(data) > 0 { // PublishMsg(key, data) // } // } //} // 注销消息服务 func UnRegisterMsgServer(key string) { channel := serverExitChannelMap[key] if channel != nil { // 定时任务取消 channel <- false delete(serverExitChannelMap, key) // 删除集合信息 delete(serverMap, key) } } // 获取消息服务 func GetMsgServer(key string) *IMsgServer { return serverMap[key] }