rts-sim-testing-service/grpcproto/message.go
2023-08-31 16:16:18 +08:00

104 lines
2.0 KiB
Go

package apiproto
import (
"time"
"go.uber.org/zap"
)
// 消息服务
type IMsgServer interface {
// 获取通道名
getChannelName() string
// 发送消息间隔时间,单位ms
getInterval() time.Duration
// 全量信息
allMsgData(params map[string]string) []byte
// 定时发送的消息
onTick() []TopicMsg
}
// 消息实体
type TopicMsg struct {
// 通道名称
channalName string
// 消息信息
data []byte
}
// 消息类型服务集合
var serverMap = make(map[string]*IMsgServer)
// 消息服务退出通道
var serverExitChannelMap = make(map[string]chan bool)
// 注册服务
func RegisterMsgServer(server IMsgServer) {
if client == nil {
InitClient()
}
serverMap[server.getChannelName()] = &server
if server.getInterval() > 0 {
exitChannel := make(chan bool)
serverExitChannelMap[server.getChannelName()] = exitChannel
tick := time.NewTicker(server.getInterval())
go func() {
defer func() {
if r := recover(); r != nil {
zap.S().Debug("定时器发生错误,%v\n", r)
}
// ticker.Stop() // 意外退出时关闭定时器
}()
// 循环推送信息
for {
<-tick.C
topicMsgs := server.onTick()
if len(topicMsgs) == 0 {
continue
}
for _, msg := range topicMsgs {
PublishMsg(msg.channalName, msg.data)
}
select {
case <-exitChannel:
return
default:
}
}
}()
}
}
// 用户初次进入系统后执行消息发送(这里可以使用对个人消息发送,但目前命名空间没有调试通)
// 这个有问题暂时注释掉
//func Subscription() {
// for key, server := range serverMap {
// data := (*server).allMsgData()
// if data != nil && len(data) > 0 {
// PublishMsg(key, data)
// }
// }
//}
// 注销消息服务
func UnRegisterMsgServer(key string) {
channel := serverExitChannelMap[key]
if channel != nil {
// 定时任务取消
channel <- false
delete(serverExitChannelMap, key)
// 删除集合信息
delete(serverMap, key)
}
}
// 获取消息服务
func GetMsgServer(key string) *IMsgServer {
return serverMap[key]
}