rts-sim-testing-service/message_server/ms_manage/manage.go
walker 2abd4e4f92 添加mqtt发布客户端及配置
注销centrifugo发布客户端grpc及配置
调整go mod
2023-11-16 16:54:23 +08:00

75 lines
1.6 KiB
Go

package ms_manage
import (
"context"
"log/slog"
"runtime/debug"
"time"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/mqtt"
)
type MsgServer struct {
ms_api.IMsgServer
ctx context.Context
cancelFn context.CancelFunc
}
// 消息服务管理map
var servers map[string]*MsgServer = make(map[string]*MsgServer)
// 注册消息服务
func Register(server ms_api.IMsgServer) *MsgServer {
ms := &MsgServer{
IMsgServer: server,
}
ctx, cancelFn := context.WithCancel(context.Background())
ms.ctx = ctx
ms.cancelFn = cancelFn
go run(ms)
servers[server.GetChannel()] = ms
return ms
}
// 注销消息服务
func Unregister(server ms_api.IMsgServer) {
if server == nil {
return
}
s := servers[server.GetChannel()]
s.cancelFn()
delete(servers, server.GetChannel())
}
// 消息服务运行
func run(server *MsgServer) {
defer func() {
if err := recover(); err != nil {
slog.Error("消息服务运行异常", "channel", server.GetChannel(), "error", err, "stack", string(debug.Stack()))
debug.PrintStack()
}
}()
for {
select {
case <-server.ctx.Done():
slog.Info("消息服务退出", "channel", server.GetChannel())
return
default:
}
topicMsgs, err := server.OnTick()
if err != nil {
server.OnError(err)
slog.Error("消息服务构建定时发送消息错误,服务退出", "channel", server.GetChannel(), "error", err)
break
}
if len(topicMsgs) > 0 {
for _, msg := range topicMsgs {
// apiproto.PublishMsg(msg.Channel, msg.Data)
mqtt.Publish(msg.Channel, msg.Data)
}
}
time.Sleep(server.GetInterval())
}
}