94 lines
2.0 KiB
Go
94 lines
2.0 KiB
Go
package apiproto
|
||
|
||
import (
|
||
"log"
|
||
"time"
|
||
)
|
||
|
||
// 消息服务
|
||
type IMsgServer interface {
|
||
// 获取通道名
|
||
getChannelName() string
|
||
|
||
// 发送消息间隔时间,单位ms
|
||
getInterval() time.Duration
|
||
|
||
// 全量信息
|
||
allMsgData() []byte
|
||
|
||
// 定时发送的消息
|
||
onTick() []TopicMsg
|
||
}
|
||
|
||
// 消息实体
|
||
type TopicMsg struct {
|
||
// 通道名称
|
||
channalName string
|
||
|
||
// 消息信息
|
||
data []byte
|
||
}
|
||
|
||
// 消息类型服务集合
|
||
var serverMap = make(map[string]*IMsgServer)
|
||
|
||
// 消息服务退出通道
|
||
var serverExitChannelMap = make(map[string]chan bool)
|
||
|
||
// 注册服务
|
||
func RegisterMsgServer(server IMsgServer) {
|
||
if client == nil { // 客户端没有初始化的情况下,调用初始化方法,init方使用时,环境参数还未初始化
|
||
InitClient()
|
||
}
|
||
serverMap[server.getChannelName()] = &server
|
||
if server.getInterval() > 0 {
|
||
exitChannel := make(chan bool)
|
||
serverExitChannelMap[server.getChannelName()] = exitChannel
|
||
ticker := time.NewTicker(server.getInterval())
|
||
go func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Fatalf("定时器发生错误,%v\n", r)
|
||
}
|
||
ticker.Stop() // 意外退出时关闭定时器
|
||
}()
|
||
// 循环推送信息
|
||
for range ticker.C {
|
||
select {
|
||
case <-ticker.C:
|
||
topicMsgs := server.onTick()
|
||
if topicMsgs != nil && len(topicMsgs) != 0 {
|
||
for _, msg := range topicMsgs {
|
||
PublishMsg(msg.channalName, msg.data)
|
||
}
|
||
}
|
||
case <-exitChannel:
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
}
|
||
|
||
// 用户初次进入系统后执行消息发送(这里可以使用对个人消息发送,但目前命名空间没有调试通)
|
||
func Subscription() {
|
||
for key, server := range serverMap {
|
||
data := (*server).allMsgData()
|
||
if data != nil && len(data) > 0 {
|
||
PublishMsg(key, data)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 注销消息服务
|
||
func UnRegisterMsgServer(key string) {
|
||
channel := serverExitChannelMap[key]
|
||
if channel != nil {
|
||
// 定时任务取消
|
||
channel <- false
|
||
delete(serverExitChannelMap, key)
|
||
// 删除集合信息
|
||
delete(serverMap, key)
|
||
}
|
||
}
|