【仿真消息修改】

This commit is contained in:
weizhihong 2023-12-25 14:15:22 +08:00
parent e179497de9
commit 7e9c9c969d
8 changed files with 62 additions and 86 deletions

View File

@ -23,7 +23,7 @@ func NewIBPMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask {
if err != nil {
return err
}
mqtt.PubIBPState(vs.SimulationId, mapId, sid, stationIbpState)
mqtt.GetMsgClient().PubIBPState(vs.SimulationId, mapId, sid, stationIbpState)
}
return nil
}, 200*time.Millisecond)

View File

@ -23,7 +23,7 @@ func NewPSLMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask {
if err != nil {
return err
}
mqtt.PubPSLState(vs.SimulationId, mapId, did, state)
mqtt.GetMsgClient().PubPSLState(vs.SimulationId, mapId, did, state)
}
return nil
}, 200*time.Millisecond)

View File

@ -25,7 +25,7 @@ func NewRccMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask {
ReplyState: relayStates,
},
}
mqtt.PubRCCState(vs.SimulationId, mapId, ststes)
mqtt.GetMsgClient().PubRCCState(vs.SimulationId, mapId, ststes)
return nil
}, 200*time.Millisecond)
}

View File

@ -59,7 +59,7 @@ func NewSfpMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask {
PlatformState: platformStates,
},
}
mqtt.PubSfpState(vs.SimulationId, mapId, ststes)
mqtt.GetMsgClient().PubSfpState(vs.SimulationId, mapId, ststes)
return nil
}, 200*time.Millisecond)
}

View File

@ -1,55 +1,26 @@
package message_server
import (
"sync"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/mqtt"
"joylink.club/bj-rtsts-server/ts/protos/graphicData"
"joylink.club/bj-rtsts-server/ts/simulation/wayside/memory"
)
var smsMap sync.Map
// 仿真消息服务
// 管理仿真消息服务,整体可以作为一个消息服务,也可以每个消息子服务各自作为一个消息服务,暂时先按整体作为一个消息服务的方式使用
type SimulationMs struct {
vs *memory.VerifySimulation
tasks []ms_api.MsgTask
}
// 启动仿真所需的消息服务
func Start(vs *memory.VerifySimulation) {
_, ok := smsMap.Load(vs.SimulationId)
if !ok {
sms := &SimulationMs{
vs: vs,
tasks: []ms_api.MsgTask{},
}
for _, mapId := range vs.MapIds {
t := memory.QueryGiType(mapId)
switch t {
case graphicData.PictureType_StationLayout: // 平面布置图
// 添加车站关联的平面布置图、IBP、PSL信息
sms.tasks = append(sms.tasks, NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId))
mqtt.GetMsgClient().PublishTask(NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId))
case graphicData.PictureType_RelayCabinetLayout: // 继电器柜
sms.tasks = append(sms.tasks, NewRccMs(vs, mapId))
mqtt.GetMsgClient().PublishTask(NewRccMs(vs, mapId))
}
}
// 启动仿真状态服务
sms.tasks = append(sms.tasks, NewStateMs(vs))
smsMap.Store(vs.SimulationId, sms)
}
}
// 关闭仿真消息服务
func Close(vs *memory.VerifySimulation) {
sms, ok := smsMap.Load(vs.SimulationId)
if ok {
ms := sms.(*SimulationMs)
for _, task := range ms.tasks {
task.Stop()
}
ms.tasks = nil
smsMap.Delete(vs.SimulationId)
}
mqtt.GetMsgClient().CloseTask()
}

View File

@ -11,23 +11,18 @@ import (
func NewStateMs(vs *memory.VerifySimulation) ms_api.MsgTask {
return ms_api.NewMonitorTask("仿真状态", func() {
ecs.WorldStateChangeEvent.Subscribe(vs.World, func(_ ecs.World, e ecs.WorldStateChange) {
s := &state.SimulationStatus{SimulationId: vs.SimulationId}
switch e.NewState {
case ecs.WorldClose:
mqtt.PubSimulationState(vs.SimulationId, &state.SimulationStatus{
SimulationId: vs.SimulationId,
State: state.SimulationStatus_DESTROY,
})
s.State = state.SimulationStatus_DESTROY
case ecs.WorldError:
mqtt.PubSimulationState(vs.SimulationId, &state.SimulationStatus{
SimulationId: vs.SimulationId,
State: state.SimulationStatus_ERROR,
})
s.State = state.SimulationStatus_ERROR
case ecs.WorldPause:
mqtt.PubSimulationState(vs.SimulationId, &state.SimulationStatus{
SimulationId: vs.SimulationId,
State: state.SimulationStatus_PAUSE,
})
s.State = state.SimulationStatus_PAUSE
default:
return
}
mqtt.GetMsgClient().PubSimulationState(vs.SimulationId, s)
})
})
}

View File

@ -3,32 +3,27 @@ package mqtt
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"github.com/google/uuid"
"github.com/sagikazarmark/slog-shim"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/ts/protos/state"
)
var mqttClient *MqttClient
var clientId string
// 客户端
type MqttClient struct {
cmc *MqttOptions
cc *autopaho.ClientConfig
cm *autopaho.ConnectionManager
tasks []ms_api.MsgTask
}
// 初始化并启动MQTT客户端服务
func Startup(cmc *MqttOptions) error {
initClientId()
cmc.ClientId = clientId
if err := checkConfig(cmc); err != nil {
return err
}
@ -40,7 +35,7 @@ func Startup(cmc *MqttOptions) error {
if err != nil {
return err
}
mqttClient = &MqttClient{cmc: cmc, cc: cc, cm: cm}
mqttClient = &MqttClient{cc: cc, cm: cm}
return nil
}
@ -64,23 +59,17 @@ func checkConfig(cmc *MqttOptions) error {
return nil
}
// 初始化MQTT客户端id
func initClientId() {
if clientId == "" {
us := uuid.New().String()
usl := len(us)
sufix5 := us[usl-5 : usl]
clientId = fmt.Sprintf("%s%d", sufix5, rand.New(rand.NewSource(time.Now().UnixNano())).Int()%1000)
}
func GetMsgClient() *MqttClient {
return mqttClient
}
// 获取MQTT客户端id
func GetClientId() string {
return clientId
return mqttClient.cc.ClientConfig.ClientID
}
// 发布数据
func pub(topic string, data protoreflect.ProtoMessage) error {
func (client *MqttClient) pub(topic string, data protoreflect.ProtoMessage) error {
if data == nil {
return fmt.Errorf("发布数据引用为nil")
}
@ -92,7 +81,7 @@ func pub(topic string, data protoreflect.ProtoMessage) error {
slog.Error("未知发布主题", "topic", topic, "data", data)
return fmt.Errorf("未知发布主题: topic=%s", topic)
}
_, err = mqttClient.cm.Publish(context.Background(), &paho.Publish{
_, err = client.cm.Publish(context.Background(), &paho.Publish{
Topic: topic,
QoS: 0,
Payload: b,
@ -100,27 +89,40 @@ func pub(topic string, data protoreflect.ProtoMessage) error {
return err
}
// 发布任务
func (client *MqttClient) PublishTask(tasks ...ms_api.MsgTask) {
client.tasks = append(client.tasks, tasks...)
}
// 停止任务
func (client *MqttClient) CloseTask() {
for _, task := range client.tasks {
task.Stop()
}
client.tasks = nil
}
// 发送仿真状态数据
func PubSimulationState(simulationId string, msg *state.SimulationStatus) error {
return pub(GetStateTopic(simulationId), msg)
func (client *MqttClient) PubSimulationState(simulationId string, msg *state.SimulationStatus) error {
return client.pub(GetStateTopic(simulationId), msg)
}
// 发送IBP状态数据
func PubIBPState(simulationId string, mapId int32, stationId uint32, msg *state.PushedDevicesStatus) error {
return pub(GetIbpTopic(simulationId, mapId, stationId), msg)
func (client *MqttClient) PubIBPState(simulationId string, mapId int32, stationId uint32, msg *state.PushedDevicesStatus) error {
return client.pub(GetIbpTopic(simulationId, mapId, stationId), msg)
}
// 发送PSL状态数据
func PubPSLState(simulationId string, mapId int32, boxId uint32, msg *state.PushedDevicesStatus) error {
return pub(GetPslTopic(simulationId, mapId, boxId), msg)
func (client *MqttClient) PubPSLState(simulationId string, mapId int32, boxId uint32, msg *state.PushedDevicesStatus) error {
return client.pub(GetPslTopic(simulationId, mapId, boxId), msg)
}
// 发送继电器状态数据
func PubRCCState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error {
return pub(GetRccTopic(simulationId, mapId), msg)
func (client *MqttClient) PubRCCState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error {
return client.pub(GetRccTopic(simulationId, mapId), msg)
}
// 发送站场图状态数据
func PubSfpState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error {
return pub(GetSfpTopic(simulationId, mapId), msg)
func (client *MqttClient) PubSfpState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error {
return client.pub(GetSfpTopic(simulationId, mapId), msg)
}

View File

@ -3,11 +3,13 @@ package mqtt
import (
"fmt"
"log/slog"
"math/rand"
"net/url"
"time"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"github.com/google/uuid"
"joylink.club/bj-rtsts-server/config"
)
@ -28,6 +30,12 @@ func NewMqttOptions(address, username, password string) *MqttOptions {
BrokerUrl: address,
Username: username,
Password: password,
ClientId: (func() string { // 初始化MQTT客户端id
us := uuid.New().String()
usl := len(us)
sufix5 := us[usl-5 : usl]
return fmt.Sprintf("%s%d", sufix5, rand.New(rand.NewSource(time.Now().UnixNano())).Int()%1000)
})(),
}
}