rts-sim-testing-service/grpcproto/message.go
2023-09-26 16:38:14 +08:00

99 lines
1.9 KiB
Go

package apiproto
import (
"time"
"go.uber.org/zap"
)
// 消息服务
type IMsgServer interface {
// 获取通道名
getChannelName() string
// 发送消息间隔时间,单位ms
getInterval() time.Duration
// 全量信息
allMsgData(params map[string]string) []byte
// 定时发送的消息
onTick() []TopicMsg
}
// 消息实体
type TopicMsg struct {
// 通道名称
channalName string
// 消息信息
data []byte
}
// 消息类型服务集合
var serverMap = make(map[string]*IMsgServer)
// 消息服务退出通道
var serverExitChannelMap = make(map[string]chan bool)
// 服务运行
func doServerRun(tick *time.Ticker, server IMsgServer, exitChannel chan bool) {
// 循环推送信息
for {
<-tick.C
topicMsgs := server.onTick()
if len(topicMsgs) == 0 {
continue
}
for _, msg := range topicMsgs {
PublishMsg(msg.channalName, msg.data)
}
select {
case <-exitChannel:
return
default:
}
}
}
// 注册服务
func RegisterMsgServer(server IMsgServer) {
if client == nil {
InitClient()
}
serverMap[server.getChannelName()] = &server
if server.getInterval() > 0 {
exitChannel := make(chan bool)
serverExitChannelMap[server.getChannelName()] = exitChannel
tick := time.NewTicker(server.getInterval())
go func() {
defer func() {
if r := recover(); r != nil {
zap.S().Debug("定时器发生错误,%v\n", r)
}
// 重新启动,防止服务卡死
doServerRun(tick, server, exitChannel)
}()
// 循环推送信息
doServerRun(tick, server, exitChannel)
}()
}
}
// 注销消息服务
func UnRegisterMsgServer(key string) {
channel := serverExitChannelMap[key]
if channel != nil {
// 定时任务取消
channel <- false
delete(serverExitChannelMap, key)
// 删除集合信息
delete(serverMap, key)
}
}
// 获取消息服务
func GetMsgServer(key string) *IMsgServer {
return serverMap[key]
}