75 lines
1.6 KiB
Go
75 lines
1.6 KiB
Go
package ms_manage
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"runtime/debug"
|
|
"time"
|
|
|
|
"joylink.club/bj-rtsts-server/message_server/ms_api"
|
|
"joylink.club/bj-rtsts-server/mqtt"
|
|
)
|
|
|
|
type MsgServer struct {
|
|
ms_api.IMsgServer
|
|
ctx context.Context
|
|
cancelFn context.CancelFunc
|
|
}
|
|
|
|
// 消息服务管理map
|
|
var servers map[string]*MsgServer = make(map[string]*MsgServer)
|
|
|
|
// 注册消息服务
|
|
func Register(server ms_api.IMsgServer) *MsgServer {
|
|
ms := &MsgServer{
|
|
IMsgServer: server,
|
|
}
|
|
ctx, cancelFn := context.WithCancel(context.Background())
|
|
ms.ctx = ctx
|
|
ms.cancelFn = cancelFn
|
|
go run(ms)
|
|
servers[server.GetChannel()] = ms
|
|
return ms
|
|
}
|
|
|
|
// 注销消息服务
|
|
func Unregister(server ms_api.IMsgServer) {
|
|
if server == nil {
|
|
return
|
|
}
|
|
s := servers[server.GetChannel()]
|
|
s.cancelFn()
|
|
delete(servers, server.GetChannel())
|
|
}
|
|
|
|
// 消息服务运行
|
|
func run(server *MsgServer) {
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
slog.Error("消息服务运行异常", "channel", server.GetChannel(), "error", err, "stack", string(debug.Stack()))
|
|
debug.PrintStack()
|
|
}
|
|
}()
|
|
for {
|
|
select {
|
|
case <-server.ctx.Done():
|
|
slog.Info("消息服务退出", "channel", server.GetChannel())
|
|
return
|
|
default:
|
|
}
|
|
topicMsgs, err := server.OnTick()
|
|
if err != nil {
|
|
server.OnError(err)
|
|
slog.Error("消息服务构建定时发送消息错误,服务退出", "channel", server.GetChannel(), "error", err)
|
|
break
|
|
}
|
|
if len(topicMsgs) > 0 {
|
|
for _, msg := range topicMsgs {
|
|
// apiproto.PublishMsg(msg.Channel, msg.Data)
|
|
mqtt.Publish(msg.Channel, msg.Data)
|
|
}
|
|
}
|
|
time.Sleep(server.GetInterval())
|
|
}
|
|
}
|