【修改MQTT消息发送逻辑】

This commit is contained in:
weizhihong 2023-12-20 10:37:54 +08:00
parent ee4c04cf3d
commit 1ff7a8f28c
12 changed files with 574 additions and 516 deletions

View File

@ -1,11 +1,10 @@
package message_server
import (
"fmt"
"time"
"google.golang.org/protobuf/proto"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/mqtt"
"joylink.club/bj-rtsts-server/ts/protos/graphicData"
"joylink.club/bj-rtsts-server/ts/protos/state"
"joylink.club/bj-rtsts-server/ts/simulation/wayside/memory"
@ -15,68 +14,42 @@ import (
)
// 综合后备盘IBP消息服务
type IbpMs struct {
vs *memory.VerifySimulation
mapId int32
}
func NewIBPMs(vs *memory.VerifySimulation, mapId int32) *IbpMs {
return &IbpMs{vs: vs, mapId: mapId}
}
func (ms *IbpMs) GetChannel() string {
return SimulationIbpTopic
// return "simulation-ibp-%s_%d_%s-status"
}
func (ms *IbpMs) GetInterval() time.Duration {
return 200 * time.Millisecond
}
func (ms *IbpMs) OnTick() ([]*ms_api.TopicMsg, error) {
var msgArr []*ms_api.TopicMsg
mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](ms.mapId)
func NewIBPMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask {
mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](mapId)
return ms_api.NewScheduleTask("综合后备盘IBP", func() error {
for _, station := range mapData.Stations {
sid := memory.GetMapElementId(station.Common)
channel := ms.handlerIBPChannelName(sid)
stationIbpState, err := ms.collectStationIbpState(station)
stationIbpState, err := collectStationIbpState(mapId, vs.World, station)
if err != nil {
return nil, err
return err
}
b, err := proto.Marshal(stationIbpState)
if err != nil {
return nil, fmt.Errorf("IBP设备状态消息服务数据序列化失败: %s", err)
mqtt.PubIBPState(vs.SimulationId, mapId, sid, stationIbpState)
}
msgArr = append(msgArr, ms_api.NewTopicMsg(channel, b))
}
return msgArr, nil
return nil
}, 200*time.Millisecond)
}
// 当发生错误时执行的逻辑
func (ms *IbpMs) OnError(err error) {
}
func (ms *IbpMs) collectStationIbpState(station *graphicData.Station) (*state.PushedDevicesStatus, error) {
func collectStationIbpState(mapId int32, world ecs.World, station *graphicData.Station) (*state.PushedDevicesStatus, error) {
if station.RefIbpMapCode == "" {
return nil, nil
}
sid := memory.GetMapElementId(station.Common)
stationUid := memory.QueryUidByMidAndComId(ms.mapId, sid, &graphicData.Station{})
stationUid := memory.QueryUidByMidAndComId(mapId, sid, &graphicData.Station{})
ibpMapId, ibpStorage := memory.QueryGiDataByName[*graphicData.IBPGraphicStorage](station.RefIbpMapCode)
ibpUidsMap := memory.QueryUidStructure[*memory.IBPUidStructure](ibpMapId)
buttonStates, err := ms.collectIBPButtonState(stationUid, ibpUidsMap, ibpStorage.IbpButtons)
buttonStates, err := collectIBPButtonState(world, stationUid, ibpUidsMap, ibpStorage.IbpButtons)
if err != nil {
return nil, err
}
alarmStates, err := ms.collectIBPAlarmState(stationUid, ibpUidsMap, ibpStorage.IbpAlarms)
alarmStates, err := collectIBPAlarmState(world, stationUid, ibpUidsMap, ibpStorage.IbpAlarms)
if err != nil {
return nil, err
}
lightStates, err := ms.collectIBPLightState(stationUid, ibpUidsMap, ibpStorage.IbpLights)
lightStates, err := collectIBPLightState(world, stationUid, ibpUidsMap, ibpStorage.IbpLights)
if err != nil {
return nil, err
}
keyStates, err := ms.collectIBPKeyState(stationUid, ibpUidsMap, ibpStorage.IbpKeys)
keyStates, err := collectIBPKeyState(world, stationUid, ibpUidsMap, ibpStorage.IbpKeys)
if err != nil {
return nil, err
}
@ -92,12 +65,12 @@ func (ms *IbpMs) collectStationIbpState(station *graphicData.Station) (*state.Pu
}
// 收集IBP按钮状态
func (ms *IbpMs) collectIBPButtonState(stationUid string, uidsMap *memory.IBPUidStructure, ibpButtons []*graphicData.IBPButton) ([]*state.ButtonState, error) {
func collectIBPButtonState(world ecs.World, stationUid string, uidsMap *memory.IBPUidStructure, ibpButtons []*graphicData.IBPButton) ([]*state.ButtonState, error) {
var states []*state.ButtonState
for _, data := range ibpButtons { // 按钮
did := memory.GetMapElementId(data.Common)
uid := stationUid + "_" + uidsMap.IbpButtonIds[did].Uid
entry, ok := entity.GetEntityByUid(ms.vs.World, uid)
entry, ok := entity.GetEntityByUid(world, uid)
if !ok {
continue
}
@ -122,12 +95,12 @@ func getIBPButtonState(id uint32, entry *ecs.Entry) *state.ButtonState {
}
// 收集报警器状态
func (ms *IbpMs) collectIBPAlarmState(stationUid string, uidsMap *memory.IBPUidStructure, ibpAlarms []*graphicData.IbpAlarm) ([]*state.AlarmState, error) {
func collectIBPAlarmState(world ecs.World, stationUid string, uidsMap *memory.IBPUidStructure, ibpAlarms []*graphicData.IbpAlarm) ([]*state.AlarmState, error) {
var states []*state.AlarmState
for _, data := range ibpAlarms { // 报警器
did := memory.GetMapElementId(data.Common)
uid := stationUid + "_" + uidsMap.IbpAlarmIds[did].Uid
entry, ok := entity.GetEntityByUid(ms.vs.World, uid)
entry, ok := entity.GetEntityByUid(world, uid)
if !ok {
continue
}
@ -139,12 +112,12 @@ func (ms *IbpMs) collectIBPAlarmState(stationUid string, uidsMap *memory.IBPUidS
}
// 收集灯状态信息
func (ms *IbpMs) collectIBPLightState(stationUid string, uidsMap *memory.IBPUidStructure, ibpLights []*graphicData.IbpLight) ([]*state.LightState, error) {
func collectIBPLightState(world ecs.World, stationUid string, uidsMap *memory.IBPUidStructure, ibpLights []*graphicData.IbpLight) ([]*state.LightState, error) {
var states []*state.LightState
for _, data := range ibpLights { // 指示灯
did := memory.GetMapElementId(data.Common)
uid := stationUid + "_" + uidsMap.IbpLightIds[did].Uid
entry, ok := entity.GetEntityByUid(ms.vs.World, uid)
entry, ok := entity.GetEntityByUid(world, uid)
if !ok {
continue
}
@ -156,12 +129,12 @@ func (ms *IbpMs) collectIBPLightState(stationUid string, uidsMap *memory.IBPUidS
}
// 收集钥匙状态
func (ms *IbpMs) collectIBPKeyState(stationUid string, uidsMap *memory.IBPUidStructure, ibpKeys []*graphicData.IbpKey) ([]*state.KeyState, error) {
func collectIBPKeyState(world ecs.World, stationUid string, uidsMap *memory.IBPUidStructure, ibpKeys []*graphicData.IbpKey) ([]*state.KeyState, error) {
var states []*state.KeyState
for _, data := range ibpKeys { // 钥匙
did := memory.GetMapElementId(data.Common)
uid := stationUid + "_" + uidsMap.IbpKeyIds[did].Uid
entry, ok := entity.GetEntityByUid(ms.vs.World, uid)
entry, ok := entity.GetEntityByUid(world, uid)
if !ok {
continue
}
@ -171,8 +144,3 @@ func (ms *IbpMs) collectIBPKeyState(stationUid string, uidsMap *memory.IBPUidStr
}
return states, nil
}
// 处理订阅通道名称
func (ms *IbpMs) handlerIBPChannelName(stationId uint32) string {
return fmt.Sprintf(SimulationIbpTopic, ms.vs.SimulationId, ms.mapId, stationId)
}

View File

@ -1,34 +1,86 @@
package ms_api
import "time"
import (
"context"
"fmt"
"time"
// 消息服务
type IMsgServer interface {
// 获取消息服务名
GetChannel() string
"joylink.club/bj-rtsts-server/sys_error"
)
// 发送消息间隔时间,单位ms
GetInterval() time.Duration
// 构造定时发送的消息
OnTick() ([]*TopicMsg, error)
// 当发生错误时执行的逻辑
OnError(err error)
type MsgTask interface {
// 停止
Stop()
}
// 消息实体
type TopicMsg struct {
// 通道
Channel string
// 消息内容
Data []byte
// 监控型任务
type msgMonitorTask struct {
name string
fn func()
}
func NewTopicMsg(channel string, data []byte) *TopicMsg {
return &TopicMsg{
Channel: channel,
Data: data,
// 监控型任务停止
func (t *msgMonitorTask) Stop() {
fmt.Printf("【%s】处理任务线程退出", t.name)
}
// 定时型任务
type msgScheduleTask struct {
name string
fn func() error
interval time.Duration
cancel context.CancelFunc
done chan struct{} // 服务协程退出信号
}
// Stop
func (t *msgScheduleTask) Stop() {
t.cancel()
<-t.done
fmt.Printf("【%s】处理任务线程退出", t.name)
}
// 定时任务运行
func (t *msgScheduleTask) run(ctx context.Context) {
defer close(t.done)
mainLoop:
for {
select {
case <-ctx.Done():
break mainLoop
default:
}
err := t.fn()
if err != nil {
panic(sys_error.New(fmt.Sprintf("仿真任务【%s】状态消息收集异常", t.name), err))
}
time.Sleep(t.interval)
}
}
// 创建定时任务
func NewScheduleTask(name string, run func() error, interval time.Duration) MsgTask {
if interval <= 0 {
interval = time.Second
}
task := &msgScheduleTask{
name: name,
fn: run,
interval: interval,
done: make(chan struct{}),
}
ctx, cancel := context.WithCancel(context.Background())
go task.run(ctx)
task.cancel = cancel
return task
}
// 创建监控任务
func NewMonitorTask(name string, run func()) MsgTask {
task := &msgMonitorTask{
name: name,
fn: run,
}
go task.fn()
return task
}

View File

@ -1,74 +0,0 @@
package ms_manage
import (
"context"
"log/slog"
"runtime/debug"
"time"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/mqtt"
)
type MsgServer struct {
ms_api.IMsgServer
ctx context.Context
cancelFn context.CancelFunc
}
// 消息服务管理map
var servers map[string]*MsgServer = make(map[string]*MsgServer)
// 注册消息服务
func Register(server ms_api.IMsgServer) *MsgServer {
ms := &MsgServer{
IMsgServer: server,
}
ctx, cancelFn := context.WithCancel(context.Background())
ms.ctx = ctx
ms.cancelFn = cancelFn
go run(ms)
servers[server.GetChannel()] = ms
return ms
}
// 注销消息服务
func Unregister(server ms_api.IMsgServer) {
if server == nil {
return
}
s := servers[server.GetChannel()]
s.cancelFn()
delete(servers, server.GetChannel())
}
// 消息服务运行
func run(server *MsgServer) {
defer func() {
if err := recover(); err != nil {
slog.Error("消息服务运行异常", "channel", server.GetChannel(), "error", err, "stack", string(debug.Stack()))
debug.PrintStack()
}
}()
for {
select {
case <-server.ctx.Done():
slog.Info("消息服务退出", "channel", server.GetChannel())
return
default:
}
topicMsgs, err := server.OnTick()
if err != nil {
server.OnError(err)
slog.Error("消息服务构建定时发送消息错误,服务退出", "channel", server.GetChannel(), "error", err)
break
}
if len(topicMsgs) > 0 {
for _, msg := range topicMsgs {
// apiproto.PublishMsg(msg.Channel, msg.Data)
mqtt.Publish(msg.Channel, msg.Data)
}
}
time.Sleep(server.GetInterval())
}
}

View File

@ -4,66 +4,34 @@ import (
"fmt"
"time"
"joylink.club/ecs"
"joylink.club/rtsssimulation/component"
"joylink.club/rtsssimulation/entity"
"google.golang.org/protobuf/proto"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/mqtt"
"joylink.club/bj-rtsts-server/ts/protos/graphicData"
"joylink.club/bj-rtsts-server/ts/protos/state"
"joylink.club/bj-rtsts-server/ts/simulation/wayside/memory"
"joylink.club/ecs"
"joylink.club/rtsssimulation/component"
"joylink.club/rtsssimulation/entity"
)
// PSL或门控箱消息服务
type PslMs struct {
vs *memory.VerifySimulation
mapId int32
}
func NewPSLMs(vs *memory.VerifySimulation, mapId int32) *PslMs {
return &PslMs{vs: vs, mapId: mapId}
}
func (p *PslMs) GetChannel() string {
return SimulationPslTopic
// return "simulation-psl-%s_%d_%s-status"
}
func (p *PslMs) GetInterval() time.Duration {
return 200 * time.Millisecond
}
func (p *PslMs) OnTick() ([]*ms_api.TopicMsg, error) {
var msgArr []*ms_api.TopicMsg
mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](p.mapId)
func NewPSLMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask {
mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](mapId)
return ms_api.NewScheduleTask("综合后备盘IBP", func() error {
for _, box := range mapData.GateBoxs {
did := memory.GetMapElementId(box.Common)
channel := p.handlerPSLChannelName(did)
state, err := p.collectGateBoxPSLState(box)
state, err := collectGateBoxPSLState(vs.World, mapId, box)
if err != nil {
return nil, err
return err
}
b, err := proto.Marshal(state)
if err != nil {
return nil, fmt.Errorf("PSL设备状态消息服务数据序列化失败: %s", err)
mqtt.PubPSLState(vs.SimulationId, mapId, did, state)
}
msgArr = append(msgArr, ms_api.NewTopicMsg(channel, b))
}
return msgArr, nil
return nil
}, 200*time.Millisecond)
}
func (p *PslMs) OnError(err error) {}
// 处理订阅通道名称
func (p *PslMs) handlerPSLChannelName(gateBoxId uint32) string {
return fmt.Sprintf(SimulationPslTopic, p.vs.SimulationId, p.mapId, gateBoxId)
}
func (p *PslMs) collectGateBoxPSLState(box *graphicData.GatedBox) (*state.PushedDevicesStatus, error) {
world := p.vs.World
func collectGateBoxPSLState(world ecs.World, mapId int32, box *graphicData.GatedBox) (*state.PushedDevicesStatus, error) {
did := memory.GetMapElementId(box.Common)
uidStructure := memory.QueryUidStructure[*memory.StationUidStructure](p.mapId)
uidStructure := memory.QueryUidStructure[*memory.StationUidStructure](mapId)
boxUid := uidStructure.GateBoxIds[did].Uid
mkxEntry, ok := entity.GetEntityByUid(world, boxUid)
if !ok {

View File

@ -1,48 +1,23 @@
package message_server
import (
"fmt"
"time"
"google.golang.org/protobuf/proto"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/mqtt"
"joylink.club/bj-rtsts-server/ts/protos/graphicData"
"joylink.club/bj-rtsts-server/ts/protos/state"
"joylink.club/bj-rtsts-server/ts/simulation/wayside/memory"
"joylink.club/ecs"
"joylink.club/rtsssimulation/component"
"joylink.club/rtsssimulation/entity"
)
// 继电器组合柜布置图消息服务
type RccMs struct {
vs *memory.VerifySimulation
mapId int32
channel string
}
func NewRccMs(vs *memory.VerifySimulation, mapId int32) *RccMs {
return &RccMs{
vs: vs,
mapId: mapId,
channel: fmt.Sprintf(SimulationRccTopic, vs.SimulationId, mapId),
}
}
// 获取消息服务名
func (r *RccMs) GetChannel() string {
return r.channel
}
// 发送消息间隔时间,单位ms
func (r *RccMs) GetInterval() time.Duration {
return 200 * time.Millisecond
}
// 构造定时发送的消息
func (r *RccMs) OnTick() ([]*ms_api.TopicMsg, error) {
relayStates, err := r.collectRelayState()
func NewRccMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask {
return ms_api.NewScheduleTask("继电器状态", func() error {
relayStates, err := collectRelayState(vs.World, mapId)
if err != nil {
return nil, err
return err
}
ststes := &state.PushedDevicesStatus{
All: true,
@ -50,24 +25,18 @@ func (r *RccMs) OnTick() ([]*ms_api.TopicMsg, error) {
ReplyState: relayStates,
},
}
b, err := proto.Marshal(ststes)
if err != nil {
return nil, fmt.Errorf("信号布置图设备状态消息服务数据序列化失败, %s", err)
}
return []*ms_api.TopicMsg{ms_api.NewTopicMsg(r.channel, b)}, nil
}
// 当发生错误时执行的逻辑
func (r *RccMs) OnError(err error) {
mqtt.PubRCCState(vs.SimulationId, mapId, ststes)
return nil
}, 200*time.Millisecond)
}
// 获取仿真地图的继电器状态,前端推送
func (r *RccMs) collectRelayState() ([]*state.ReplyState, error) {
func collectRelayState(world ecs.World, mapId int32) ([]*state.ReplyState, error) {
// 获取本地图下的继电器信息
uidMap := memory.QueryMapUidMapByType(r.mapId, &graphicData.Relay{})
uidMap := memory.QueryMapUidMapByType(mapId, &graphicData.Relay{})
var replyStateArr []*state.ReplyState
for _, u := range uidMap {
entry, ok := entity.GetEntityByUid(r.vs.World, u.Uid)
entry, ok := entity.GetEntityByUid(world, u.Uid)
if !ok {
// 暂时注释,很多继电器都没初始化
//return nil, fmt.Errorf("继电器实体不存在: World id=%d, uid=%s", r.vs.World.Id(), u.Uid)

View File

@ -5,8 +5,8 @@ import (
"strings"
"time"
"google.golang.org/protobuf/proto"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/mqtt"
"joylink.club/bj-rtsts-server/ts/protos/graphicData"
"joylink.club/bj-rtsts-server/ts/protos/state"
"joylink.club/bj-rtsts-server/ts/simulation/wayside/memory"
@ -16,59 +16,36 @@ import (
)
// 信号平面布置图消息服务
type SfpMs struct {
vs *memory.VerifySimulation
mapId int32
channel string
}
func NewSfpMs(vs *memory.VerifySimulation, mapId int32) *SfpMs {
return &SfpMs{
vs: vs,
mapId: mapId,
channel: fmt.Sprintf(SimulationSfpTopic, vs.SimulationId, mapId),
}
}
// 获取通道名
func (ms *SfpMs) GetChannel() string {
return ms.channel
}
// 发送消息间隔时间,单位ms
func (ms *SfpMs) GetInterval() time.Duration {
return 200 * time.Millisecond
}
// 定时发送的消息
func (ms *SfpMs) OnTick() ([]*ms_api.TopicMsg, error) {
turnoutStates, err := ms.collectTurnoutStates()
func NewSfpMs(vs *memory.VerifySimulation, mapId int32) ms_api.MsgTask {
return ms_api.NewScheduleTask("信号平面布置图", func() error {
turnoutStates, err := collectTurnoutStates(vs, mapId)
if err != nil {
return nil, err
return err
}
signalStates, err := ms.collectSignalStates()
signalStates, err := collectSignalStates(vs.World, mapId)
if err != nil {
return nil, err
return err
}
buttonStates, err := ms.collectStationButtonStates()
buttonStates, err := collectStationButtonStates(vs.World, mapId)
if err != nil {
return nil, err
return err
}
psdStates, err := ms.collectPsdStates()
psdStates, err := collectPsdStates(vs.World, mapId)
if err != nil {
return nil, err
return err
}
sectionStates, err := ms.collectSectionStates()
sectionStates, err := collectSectionStates(vs.World, mapId)
if err != nil {
return nil, err
return err
}
platformStates, err := ms.collectPlatformStates()
platformStates, err := collectPlatformStates(vs.World, mapId)
if err != nil {
return nil, err
return err
}
trainState, err := ms.collectTrainStates()
trainState, err := collectTrainStates(vs)
if err != nil {
return nil, err
return err
}
ststes := &state.PushedDevicesStatus{
All: true,
@ -82,22 +59,15 @@ func (ms *SfpMs) OnTick() ([]*ms_api.TopicMsg, error) {
PlatformState: platformStates,
},
}
b, err := proto.Marshal(ststes)
if err != nil {
return nil, fmt.Errorf("信号布置图设备状态消息服务数据序列化失败, %s", err)
}
return []*ms_api.TopicMsg{ms_api.NewTopicMsg(ms.channel, b)}, nil
}
func (ms *SfpMs) OnError(err error) {
// TODO: 错误处理
mqtt.PubSfpState(vs.SimulationId, mapId, ststes)
return nil
}, 200*time.Millisecond)
}
// 收集屏蔽门状态
func (ms *SfpMs) collectPsdStates() ([]*state.PsdState, error) {
world := ms.vs.World
uidStructure := memory.QueryUidStructure[*memory.StationUidStructure](ms.mapId)
data := memory.QueryGiData[*graphicData.RtssGraphicStorage](ms.mapId)
func collectPsdStates(world ecs.World, mapId int32) ([]*state.PsdState, error) {
uidStructure := memory.QueryUidStructure[*memory.StationUidStructure](mapId)
data := memory.QueryGiData[*graphicData.RtssGraphicStorage](mapId)
var psdStateArr []*state.PsdState
for _, door := range data.ScreenDoors {
did := memory.GetMapElementId(door.Common)
@ -126,11 +96,11 @@ func (ms *SfpMs) collectPsdStates() ([]*state.PsdState, error) {
}
// 收集区段状态
func (ms *SfpMs) collectSectionStates() ([]*state.SectionState, error) {
uidMap := memory.QueryMapUidMapByType(ms.mapId, &graphicData.Section{})
func collectSectionStates(world ecs.World, mapId int32) ([]*state.SectionState, error) {
uidMap := memory.QueryMapUidMapByType(mapId, &graphicData.Section{})
var sectionArr []*state.SectionState
for _, u := range uidMap {
s := handlerSectionState(ms.vs.World, u.Uid)
s := handlerSectionState(world, u.Uid)
if s == nil {
continue
}
@ -157,14 +127,14 @@ func handlerSectionState(w ecs.World, uid string) *state.SectionState {
}
// 收集车站按钮状态
func (ms *SfpMs) collectStationButtonStates() ([]*state.ButtonState, error) {
func collectStationButtonStates(world ecs.World, mapId int32) ([]*state.ButtonState, error) {
// 获取地图上的按钮状态
uidMap := memory.QueryMapUidMapByType(ms.mapId, &graphicData.EsbButton{})
uidMap := memory.QueryMapUidMapByType(mapId, &graphicData.EsbButton{})
var btnStateArr []*state.ButtonState
for _, u := range uidMap {
entry, ok := entity.GetEntityByUid(ms.vs.World, u.Uid)
entry, ok := entity.GetEntityByUid(world, u.Uid)
if !ok {
return nil, fmt.Errorf("ESB按钮实体不存在: World id=%d, uid=%s", ms.vs.World.Id(), u.Uid)
return nil, fmt.Errorf("ESB按钮实体不存在: World id=%d, uid=%s", world.Id(), u.Uid)
}
if entry.HasComponent(component.ButtonTag) { // 按钮
bit := component.BitStateType.Get(entry)
@ -175,11 +145,11 @@ func (ms *SfpMs) collectStationButtonStates() ([]*state.ButtonState, error) {
}
// 收集信号机状态
func (ms *SfpMs) collectSignalStates() ([]*state.SignalState, error) {
uidMap := memory.QueryMapUidMapByType(ms.mapId, &graphicData.Signal{})
func collectSignalStates(world ecs.World, mapId int32) ([]*state.SignalState, error) {
uidMap := memory.QueryMapUidMapByType(mapId, &graphicData.Signal{})
var signalArr []*state.SignalState
for _, u := range uidMap {
s, err := handlerSignalState(ms.vs.World, u.Uid)
s, err := handlerSignalState(world, u.Uid)
if err != nil {
return nil, err
}
@ -238,8 +208,8 @@ func handlerSignalState(w ecs.World, uid string) (*state.SignalState, error) {
}
// 收集列车状态
func (ms *SfpMs) collectTrainStates() ([]*state.TrainMapState, error) {
allTrainMap := &ms.vs.Memory.Status.TrainStateMap
func collectTrainStates(vs *memory.VerifySimulation) ([]*state.TrainMapState, error) {
allTrainMap := &vs.Memory.Status.TrainStateMap
var trainArr []*state.TrainMapState
allTrainMap.Range(func(_, v any) bool {
trainArr = append(trainArr, convertTrainState(v.(*state.TrainState)))
@ -318,9 +288,8 @@ func convertTrainState(v *state.TrainState) *state.TrainMapState {
}
// 收集道岔状态
func (ms *SfpMs) collectTurnoutStates() ([]*state.SwitchState, error) {
sim := ms.vs
uidMap := memory.QueryMapUidMapByType(ms.mapId, &graphicData.Turnout{})
func collectTurnoutStates(sim *memory.VerifySimulation, mapId int32) ([]*state.SwitchState, error) {
uidMap := memory.QueryMapUidMapByType(mapId, &graphicData.Turnout{})
wd := entity.GetWorldData(sim.World)
var switchArr []*state.SwitchState
for _, u := range uidMap {
@ -394,10 +363,10 @@ func (ms *SfpMs) collectTurnoutStates() ([]*state.SwitchState, error) {
}
// 收集站台状态
func (ms *SfpMs) collectPlatformStates() ([]*state.PlatformState, error) {
func collectPlatformStates(world ecs.World, mapId int32) ([]*state.PlatformState, error) {
var states []*state.PlatformState
mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](ms.mapId)
uidsMap := memory.QueryUidStructure[*memory.StationUidStructure](ms.mapId)
mapData := memory.QueryGiData[*graphicData.RtssGraphicStorage](mapId)
uidsMap := memory.QueryUidStructure[*memory.StationUidStructure](mapId)
platformScreenDoorMap := wrapScreenDoorToPlatform(mapData)
for _, platform := range mapData.Platforms {
pid := memory.GetMapElementId(platform.Common)
@ -409,9 +378,9 @@ func (ms *SfpMs) collectPlatformStates() ([]*state.PlatformState, error) {
if uidInfo == nil {
return nil, fmt.Errorf("车站实体不存在uid映射:id=%v", stationCommonId)
}
entry, ok := entity.GetEntityByUid(ms.vs.World, uidInfo.Uid)
entry, ok := entity.GetEntityByUid(world, uidInfo.Uid)
if !ok {
return nil, fmt.Errorf("车站实体不存在: World id=%d, uid=%s", ms.vs.World.Id(), uidInfo.Uid)
return nil, fmt.Errorf("车站实体不存在: World id=%d, uid=%s", world.Id(), uidInfo.Uid)
}
sta := &state.PlatformState{Id: pid}
isX := strings.Contains(platform.Code, "下行站台") //下行站台
@ -440,9 +409,9 @@ func (ms *SfpMs) collectPlatformStates() ([]*state.PlatformState, error) {
if !ok {
continue
}
psdEntry, ok := entity.GetEntityByUid(ms.vs.World, psdUid.Uid)
psdEntry, ok := entity.GetEntityByUid(world, psdUid.Uid)
if !ok {
return nil, fmt.Errorf("屏蔽门实体不存在: World id=%d, uid=%s", ms.vs.World.Id(), psdUid.Uid)
return nil, fmt.Errorf("屏蔽门实体不存在: World id=%d, uid=%s", world.Id(), psdUid.Uid)
}
if psdEntry.HasComponent(component.PlatformMkxCircuitType) {
mkxCircuit := component.PlatformMkxCircuitType.Get(psdEntry)

View File

@ -2,36 +2,19 @@ package message_server
import (
"sync"
"time"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/message_server/ms_manage"
"joylink.club/bj-rtsts-server/ts/protos/graphicData"
"joylink.club/bj-rtsts-server/ts/simulation/wayside/memory"
)
const (
SimulationTopicPrefix = "/" + config.SystemName + "/simulation/%s/"
// 仿真状态消息topic
SimulationStateTopic = SimulationTopicPrefix + "state"
// 信号布置图设备状态消息topic
SimulationSfpTopic = SimulationTopicPrefix + "sfp/%d"
// 继电器组合柜布置图设备状态消息topic
SimulationRccTopic = SimulationTopicPrefix + "rcc/%d"
// PSL设备状态消息topic
SimulationPslTopic = SimulationTopicPrefix + "psl/%d/%d"
// IBP设备状态消息topic
SimulationIbpTopic = SimulationTopicPrefix + "ibp/%d/%d"
)
var smsMap sync.Map
// 仿真消息服务
// 管理仿真消息服务,整体可以作为一个消息服务,也可以每个消息子服务各自作为一个消息服务,暂时先按整体作为一个消息服务的方式使用
type SimulationMs struct {
vs *memory.VerifySimulation
mss []ms_api.IMsgServer
tasks []ms_api.MsgTask
}
// 启动仿真所需的消息服务
@ -40,21 +23,21 @@ func Start(vs *memory.VerifySimulation) {
if !ok {
sms := &SimulationMs{
vs: vs,
mss: []ms_api.IMsgServer{},
tasks: []ms_api.MsgTask{},
//mss: []ms_api.IMsgServer{},
}
for _, mapId := range vs.MapIds {
t := memory.QueryGiType(mapId)
switch t {
case graphicData.PictureType_StationLayout: // 平面布置图
// 添加车站关联的平面布置图、IBP、PSL信息
sms.mss = append(sms.mss, NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId))
sms.tasks = append(sms.tasks, NewSfpMs(vs, mapId), NewIBPMs(vs, mapId), NewPSLMs(vs, mapId))
case graphicData.PictureType_RelayCabinetLayout: // 继电器柜
sms.mss = append(sms.mss, NewRccMs(vs, mapId))
sms.tasks = append(sms.tasks, NewRccMs(vs, mapId))
}
}
// 启动仿真状态服务
NewSimulationStateMs(vs)
ms_manage.Register(sms)
sms.tasks = append(sms.tasks, NewStateMs(vs))
smsMap.Store(vs.SimulationId, sms)
}
}
@ -63,36 +46,11 @@ func Start(vs *memory.VerifySimulation) {
func Close(vs *memory.VerifySimulation) {
sms, ok := smsMap.Load(vs.SimulationId)
if ok {
ms_manage.Unregister(sms.(*SimulationMs))
ms := sms.(*SimulationMs)
for _, task := range ms.tasks {
task.Stop()
}
ms.tasks = nil
smsMap.Delete(vs.SimulationId)
}
}
// 获取通道
func (sms *SimulationMs) GetChannel() string {
return sms.vs.SimulationId
}
// 发送消息间隔时间,单位ms
func (sms *SimulationMs) GetInterval() time.Duration {
return 200 * time.Millisecond
}
// 构造定时发送的消息
func (sms *SimulationMs) OnTick() ([]*ms_api.TopicMsg, error) {
var tmList []*ms_api.TopicMsg
for _, ms := range sms.mss {
tm, err := ms.OnTick()
if err != nil {
return nil, err
}
if len(tm) > 0 {
tmList = append(tmList, tm...)
}
}
return tmList, nil
}
func (sms *SimulationMs) OnError(err error) {
// TODO: 仿真消息错误处理
}

View File

@ -1,54 +1,33 @@
package message_server
import (
"fmt"
"google.golang.org/protobuf/proto"
"joylink.club/bj-rtsts-server/message_server/ms_api"
"joylink.club/bj-rtsts-server/mqtt"
"joylink.club/bj-rtsts-server/ts/protos/state"
"joylink.club/bj-rtsts-server/ts/simulation/wayside/memory"
"joylink.club/ecs"
)
// 世界状态变化消息服务
type SimulationStateMs struct {
vs *memory.VerifySimulation
channel string
}
func NewSimulationStateMs(vs *memory.VerifySimulation) *SimulationStateMs {
ms := &SimulationStateMs{
vs: vs,
channel: fmt.Sprintf(SimulationStateTopic, vs.SimulationId),
}
ecs.WorldStateChangeEvent.Subscribe(ms.vs.World, func(_ ecs.World, e ecs.WorldStateChange) {
func NewStateMs(vs *memory.VerifySimulation) ms_api.MsgTask {
return ms_api.NewMonitorTask("仿真状态", func() {
ecs.WorldStateChangeEvent.Subscribe(vs.World, func(_ ecs.World, e ecs.WorldStateChange) {
switch e.NewState {
case ecs.WorldClose:
doSendState(ms.channel, &state.SimulationStatus{
mqtt.PubSimulationState(vs.SimulationId, &state.SimulationStatus{
SimulationId: vs.SimulationId,
State: state.SimulationStatus_DESTROY,
})
case ecs.WorldError:
doSendState(ms.channel, &state.SimulationStatus{
mqtt.PubSimulationState(vs.SimulationId, &state.SimulationStatus{
SimulationId: vs.SimulationId,
State: state.SimulationStatus_ERROR,
})
case ecs.WorldPause:
doSendState(ms.channel, &state.SimulationStatus{
mqtt.PubSimulationState(vs.SimulationId, &state.SimulationStatus{
SimulationId: vs.SimulationId,
State: state.SimulationStatus_PAUSE,
})
}
})
return ms
}
func doSendState(channel string, status *state.SimulationStatus) error {
b, err := proto.Marshal(status)
if err != nil {
return fmt.Errorf("仿真状态消息服务数据序列化失败, %s", err)
}
mqtt.Publish(channel, b)
// apiproto.PublishMsg(channel, b)
return nil
})
}

View File

@ -1,52 +1,69 @@
package mqtt
import (
"context"
"fmt"
"log/slog"
"math/rand"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"github.com/google/uuid"
"github.com/sagikazarmark/slog-shim"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"joylink.club/bj-rtsts-server/ts/protos/state"
)
// MQTT客户端连接配置
type MqttOptions struct {
// MQTT Broker 代理
Broker string
// 认证用户名
Username string
// 认证密码
Password string
}
func NewMqttOptions(address, username, password string) MqttOptions {
return MqttOptions{
Broker: address,
Username: username,
Password: password,
}
}
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
slog.Debug("MQTT收到消息", "topic", msg.Topic(), "msg", string(msg.Payload()))
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
or := client.OptionsReader()
slog.Info("MQTT连接成功", "ClientID", or.ClientID())
// subs := make(map[string]byte)
// subs["$SYS/brokers/+/clients/+/+"] = 0
// client.SubscribeMultiple(subs, messagePubHandler)
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
slog.Error("MQTT连接断开", "error", err)
}
var mqttClient mqtt.Client
var mqttClient *MqttClient
var clientId string
// 客户端
type MqttClient struct {
cmc *MqttOptions
cc *autopaho.ClientConfig
cm *autopaho.ConnectionManager
}
// 初始化并启动MQTT客户端服务
func Startup(cmc *MqttOptions) error {
initClientId()
cmc.ClientId = clientId
if err := checkConfig(cmc); err != nil {
return err
}
cc, err := cmc.tryInto()
if err != nil {
return err
}
cm, err := autopaho.NewConnection(context.Background(), *cc)
if err != nil {
return err
}
mqttClient = &MqttClient{cmc: cmc, cc: cc, cm: cm}
return nil
}
// 检查配置信息
func checkConfig(cmc *MqttOptions) error {
if cmc.AppId == "" {
return fmt.Errorf("应用编号不能为空")
}
if cmc.ClientId == "" {
return fmt.Errorf("客户端编号不能为空")
}
if cmc.BrokerUrl == "" {
return fmt.Errorf("MQTT代理服务地址不能为空")
}
if cmc.Username == "" {
return fmt.Errorf("MQTT用户名不能为空")
}
if cmc.Password == "" {
return fmt.Errorf("MQTT密码不能为空")
}
return nil
}
// 初始化MQTT客户端id
func initClientId() {
if clientId == "" {
@ -62,29 +79,48 @@ func GetClientId() string {
return clientId
}
// 启动MQTT
func Startup(options MqttOptions) {
initClientId()
opts := mqtt.NewClientOptions()
opts.AddBroker(options.Broker)
opts.SetClientID(clientId)
opts.SetUsername(options.Username)
opts.SetPassword(options.Password)
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
// 发布数据
func pub(topic string, data protoreflect.ProtoMessage) error {
if data == nil {
return fmt.Errorf("发布数据引用为nil")
}
mqttClient = client
b, err := proto.Marshal(data)
if err != nil {
return err
}
if !MatchTopic(topic) {
slog.Error("未知发布主题", "topic", topic, "data", data)
return fmt.Errorf("未知发布主题: topic=%s", topic)
}
_, err = mqttClient.cm.Publish(context.Background(), &paho.Publish{
Topic: topic,
QoS: 0,
Payload: b,
})
return err
}
// 发布消息
func Publish(dest string, data any) error {
token := mqttClient.Publish(dest, 0, false, data)
if token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
// 发送仿真状态数据
func PubSimulationState(simulationId string, msg *state.SimulationStatus) error {
return pub(GetStateTopic(simulationId), msg)
}
// 发送IBP状态数据
func PubIBPState(simulationId string, mapId int32, stationId uint32, msg *state.PushedDevicesStatus) error {
return pub(GetIbpTopic(simulationId, mapId, stationId), msg)
}
// 发送PSL状态数据
func PubPSLState(simulationId string, mapId int32, boxId uint32, msg *state.PushedDevicesStatus) error {
return pub(GetPslTopic(simulationId, mapId, boxId), msg)
}
// 发送继电器状态数据
func PubRCCState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error {
return pub(GetRccTopic(simulationId, mapId), msg)
}
// 发送站场图状态数据
func PubSfpState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error {
return pub(GetSfpTopic(simulationId, mapId), msg)
}

72
mqtt/config.go Normal file
View File

@ -0,0 +1,72 @@
package mqtt
import (
"fmt"
"log/slog"
"net/url"
"time"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"joylink.club/bj-rtsts-server/config"
)
type MqttOptions struct {
AppId string // 所属应用编号
BrokerUrl string // Broker地址
ClientId string // 客户端ID
Username string // 用户名
Password string // 密码
KeepAlive uint16 // 保活时间间隔,单位s,默认为60
ConnectRetryDelay uint16 // 连接重试延时,单位s,默认为3
ConnectTimeout uint16 // 连接操作超时,单位s,默认为3
}
func NewMqttOptions(address, username, password string) *MqttOptions {
return &MqttOptions{
AppId: config.SystemName,
BrokerUrl: address,
Username: username,
Password: password,
}
}
func (c *MqttOptions) tryInto() (*autopaho.ClientConfig, error) {
addr, err := url.Parse(c.BrokerUrl)
if err != nil {
return nil, fmt.Errorf("Mqtt.Address格式错误, %s: %w", c.BrokerUrl, err)
}
if c.KeepAlive == 0 {
c.KeepAlive = 60
}
if c.ConnectRetryDelay == 0 {
c.ConnectRetryDelay = 3
}
if c.ConnectTimeout == 0 {
c.ConnectTimeout = 3
}
cc := &autopaho.ClientConfig{
BrokerUrls: []*url.URL{
addr,
},
KeepAlive: c.KeepAlive,
ConnectRetryDelay: time.Duration(c.ConnectRetryDelay) * time.Second,
ConnectTimeout: time.Duration(c.ConnectTimeout) * time.Second,
OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) {
slog.Info("MQTT连接成功")
},
OnConnectError: func(err error) {
slog.Error("MQTT连接失败", "error", err)
},
ClientConfig: paho.ClientConfig{
ClientID: c.ClientId,
Router: paho.NewStandardRouter(),
OnClientError: func(err error) { fmt.Printf("%s Mqtt客户端发生错误: %s\n", c.ClientId, err) },
OnServerDisconnect: func(d *paho.Disconnect) {
fmt.Printf("%s 连接断开; reason code: %d,properties: %v\n", c.ClientId, d.ReasonCode, d.Properties)
},
},
}
cc.SetUsernamePassword(c.Username, []byte(c.Password))
return cc, nil
}

75
mqtt/topic.go Normal file
View File

@ -0,0 +1,75 @@
package mqtt
import (
"fmt"
"strings"
"joylink.club/bj-rtsts-server/config"
)
const (
topicPrefix = "/" + config.SystemName + "/simulation/%s/"
stateTopic = topicPrefix + "state"
sfpTopic = topicPrefix + "sfp/%d"
rccTopic = topicPrefix + "rcc/%d"
pslTopic = topicPrefix + "psl/%d/%d"
ibpTopic = topicPrefix + "ibp/%d/%d"
)
var topicMap = map[string]string{
"state": stateTopic,
"sfp": sfpTopic,
"rcc": rccTopic,
"psl": pslTopic,
"ibp": ibpTopic,
}
// 检测topic是否合法
func MatchTopic(topic string) bool {
topicArr := strings.Split(topic, "/")
for k, v := range topicMap {
result := strings.Contains(topic, "/"+k)
if result {
fmtArr := strings.Split(v, "/")
for i, f := range fmtArr {
if f == "%s" || f == "%d" {
continue
} else {
result = topicArr[i] == f
}
if !result {
break
}
}
}
if result {
return true
}
}
return false
}
// 仿真状态消息topic
func GetStateTopic(simulationId string) string {
return fmt.Sprintf(stateTopic, simulationId)
}
// 信号布置图设备状态消息topic
func GetSfpTopic(simulationId string, mapId int32) string {
return fmt.Sprintf(sfpTopic, simulationId, mapId)
}
// 继电器组合柜布置图设备状态消息topic
func GetRccTopic(simulationId string, mapId int32) string {
return fmt.Sprintf(rccTopic, simulationId, mapId)
}
// PSL设备状态消息topic
func GetPslTopic(simulationId string, mapId int32, boxId uint32) string {
return fmt.Sprintf(pslTopic, simulationId, mapId, boxId)
}
// IBP设备状态消息topic
func GetIbpTopic(simulationId string, mapId int32, stationId uint32) string {
return fmt.Sprintf(ibpTopic, simulationId, mapId, stationId)
}

86
tmp/proto_test.go Normal file
View File

@ -0,0 +1,86 @@
package memory_test
import (
"fmt"
"log/slog"
"testing"
"time"
"google.golang.org/protobuf/proto"
"joylink.club/bj-rtsts-server/ts/protos/state"
)
func TestTrainProto(t *testing.T) {
st := &state.TrainState{
Id: "1",
TrainLength: 96000,
Show: true,
HeadDeviceId: 682,
HeadOffset: 1017073,
PointTo: true,
RunDirection: true,
HeadDirection: true,
DynamicState: &state.TrainDynamicState{
Heartbeat: 24512,
HeadLinkId: "9",
HeadLinkOffset: 1095653,
TailLinkId: "9",
TailLinkOffset: 999653,
Slope: 5290,
RunningUp: true,
RunningResistanceSum: 1.787,
AirResistance: 9.136,
RampResistance: -8.295,
CurveResistance: 0.947,
Speed: 8011,
HeadSensorSpeed1: 8011,
HeadSensorSpeed2: 8011,
TailSensorSpeed1: 8011,
TailSensorSpeed2: 8011,
Acceleration: -0.011171325,
},
VobcState: &state.TrainVobcState{
LifeSignal: 23457,
Tc2Active: true,
TractionForce: 9040,
BrakeForce: 9040,
TrainLoad: 16000,
UpdateTime: 1702534082337,
},
TrainKilometer: 126984703,
ControlDelayTime: 22,
WheelDiameter: 800,
}
stopTime := time.Now().Add(5 * time.Second)
for {
if stopTime.Before(time.Now()) {
break
}
st.VobcState.UpdateTime = time.Now().Unix()
d, err := proto.Marshal(st)
if err != nil {
slog.Error("转换出错", err)
}
fmt.Println(d)
time.Sleep(20 * time.Millisecond)
}
// st2 := &state.TrainState{}
// dd := []byte{
// 10, 1, 49, 32, 128, 238, 5, 40, 1, 48, 170, 5, 56, 241, 137,
// 62, 72, 1, 80, 1, 88, 1, 98, 67, 8, 192, 191, 1, 18, 1, 57, 24,
// 229, 239, 66, 34, 1, 57, 40, 229, 129, 61, 56, 170, 41, 72, 1,
// 85, 106, 188, 228, 63, 93, 14, 45, 18, 65, 101, 82, 184, 4, 193,
// 109, 152, 110, 114, 63, 112, 203, 62, 120, 203, 62, 128, 1, 203,
// 62, 136, 1, 203, 62, 144, 1, 203, 62, 181, 1, 239, 7, 55, 188, 106,
// 35, 8, 161, 183, 1, 24, 1, 32, 1, 72, 1, 112, 1, 128, 1, 1, 136, 1, 208,
// 70, 144, 1, 208, 70, 152, 1, 128, 125, 208, 1, 247, 223, 193, 183,
// 198, 49, 112, 255, 195, 198, 60, 120, 22, 128, 1, 160, 6,
// }
// err2 := proto.Unmarshal(dd, st2)
// if err2 != nil {
// slog.Error("转换出错", err2)
// }
// fmt.Println(st2)
}