Compare commits

...

5 Commits

10 changed files with 200 additions and 99 deletions

View File

@ -5403,8 +5403,8 @@ type LianSuoIndexData struct {
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` // 设备id Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` // 设备id
Index int32 `protobuf:"varint,2,opt,name=index,proto3" json:"index,omitempty"` //设备联锁编号 Index uint32 `protobuf:"varint,2,opt,name=index,proto3" json:"index,omitempty"` //设备联锁编号
} }
func (x *LianSuoIndexData) Reset() { func (x *LianSuoIndexData) Reset() {
@ -5439,14 +5439,14 @@ func (*LianSuoIndexData) Descriptor() ([]byte, []int) {
return file_stationLayoutGraphics_proto_rawDescGZIP(), []int{54} return file_stationLayoutGraphics_proto_rawDescGZIP(), []int{54}
} }
func (x *LianSuoIndexData) GetId() int32 { func (x *LianSuoIndexData) GetId() uint32 {
if x != nil { if x != nil {
return x.Id return x.Id
} }
return 0 return 0
} }
func (x *LianSuoIndexData) GetIndex() int32 { func (x *LianSuoIndexData) GetIndex() uint32 {
if x != nil { if x != nil {
return x.Index return x.Index
} }
@ -6661,8 +6661,8 @@ var file_stationLayoutGraphics_proto_rawDesc = []byte{
0x12, 0x10, 0x0a, 0x03, 0x69, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0d, 0x52, 0x03, 0x69, 0x12, 0x10, 0x0a, 0x03, 0x69, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0d, 0x52, 0x03, 0x69,
0x64, 0x73, 0x22, 0x38, 0x0a, 0x10, 0x4c, 0x69, 0x61, 0x6e, 0x53, 0x75, 0x6f, 0x49, 0x6e, 0x64, 0x64, 0x73, 0x22, 0x38, 0x0a, 0x10, 0x4c, 0x69, 0x61, 0x6e, 0x53, 0x75, 0x6f, 0x49, 0x6e, 0x64,
0x65, 0x78, 0x44, 0x61, 0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x65, 0x78, 0x44, 0x61, 0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01,
0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18,
0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x22, 0x83, 0x06, 0x0a, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x22, 0x83, 0x06, 0x0a,
0x0b, 0x4c, 0x69, 0x61, 0x6e, 0x53, 0x75, 0x6f, 0x44, 0x61, 0x74, 0x61, 0x12, 0x39, 0x0a, 0x08, 0x0b, 0x4c, 0x69, 0x61, 0x6e, 0x53, 0x75, 0x6f, 0x44, 0x61, 0x74, 0x61, 0x12, 0x39, 0x0a, 0x08,
0x73, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x73, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d,
0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x69, 0x63, 0x44, 0x61, 0x74, 0x61, 0x2e, 0x4c, 0x69, 0x61, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x69, 0x63, 0x44, 0x61, 0x74, 0x61, 0x2e, 0x4c, 0x69, 0x61,

View File

@ -20,7 +20,7 @@ var (
func main() { func main() {
//先安装以下插件 //先安装以下插件
//go install google.golang.org/protobuf/cmd/protoc-gen-go@latest //go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.33.0
protoFiles := getProtoFiles() protoFiles := getProtoFiles()
// 编译proto文件为Go文件 // 编译proto文件为Go文件

@ -1 +1 @@
Subproject commit c1499de7bb6e41b56f53c982d516da30988fbd44 Subproject commit 0844608c8674ec2988c06ba61e73e7b6afe7d660

View File

@ -1,63 +0,0 @@
// Package beijing11 北京11号线联锁通信
package beijing11
import (
"encoding/json"
"fmt"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/third_party/udp"
"log/slog"
"sync"
)
var (
initMutex = sync.Mutex{}
logTag = "[北京11号线联锁通信]"
running = false
server udp.UdpServer
)
func Start(interlockConfig *config.InterlockConfig) {
if interlockConfig == nil || interlockConfig.Ip == "" || !interlockConfig.Open {
return
}
if running {
return
}
initMutex.Lock()
defer initMutex.Unlock()
if running {
return
}
//UDP通信设施
server = udp.NewServer(fmt.Sprintf(":%d", interlockConfig.LocalPort), func(b []byte) {
slog.Info(fmt.Sprintf("%s收到消息%x", logTag, b))
frame := &FromInterlockFrame{}
err := frame.Decode(b)
if err != nil {
slog.Error(fmt.Sprintf("%s解析数据出错%s", logTag, err))
} else {
marshal, err := json.Marshal(frame)
if err != nil {
slog.Error(fmt.Sprintf("%s解析为json出错%s", logTag, err))
} else {
slog.Info(fmt.Sprintf("%s解析为json%s", logTag, string(marshal)))
}
}
})
err := server.Listen()
if err != nil {
panic(fmt.Sprintf("%s启动UDP服务失败%s", logTag, err))
}
running = true
}
func Stop() {
initMutex.Lock()
defer initMutex.Unlock()
running = false
if server != nil {
server.Close()
server = nil
}
}

34
third_party/interlock/beijing11/repo.go vendored Normal file
View File

@ -0,0 +1,34 @@
package beijing11
// StationDeviceIndexRepo 联锁站设备索引映射
type StationDeviceIndexRepo struct {
StationName string
TurnoutMap map[uint32]string
PsdMap map[uint32]string
EsbMap map[uint32]string
HoldTrainMap map[uint32]string
SignalMap map[uint32]string
AxleSectionMap map[uint32]string
WrzfMap map[uint32]string
FymMap map[uint32]string
SpksMap map[uint32]string
CkmMap map[uint32]string
XcjMap map[uint32]string
}
func NewStationDeviceIndexRepo() *StationDeviceIndexRepo {
return &StationDeviceIndexRepo{
StationName: "",
TurnoutMap: make(map[uint32]string),
PsdMap: make(map[uint32]string),
EsbMap: make(map[uint32]string),
HoldTrainMap: make(map[uint32]string),
SignalMap: make(map[uint32]string),
AxleSectionMap: make(map[uint32]string),
WrzfMap: make(map[uint32]string),
FymMap: make(map[uint32]string),
SpksMap: make(map[uint32]string),
CkmMap: make(map[uint32]string),
XcjMap: make(map[uint32]string),
}
}

View File

@ -0,0 +1,107 @@
// Package beijing11 北京11号线联锁通信
package beijing11
import (
"encoding/json"
"fmt"
"joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/dto/data_proto"
"joylink.club/bj-rtsts-server/third_party/udp"
"joylink.club/bj-rtsts-server/ts/simulation/wayside/memory"
"log/slog"
"sync"
)
const logTag = "[北京11号线联锁通信]"
var (
initMutex = sync.Mutex{}
running = false
server udp.UdpServer
sim *memory.VerifySimulation //启动服务所使用的仿真
iConfig *config.InterlockConfig //启动服务使用的联锁配置
//联锁区设备的联锁编号与uid的映射
stationMap map[string]*StationDeviceIndexRepo
)
func init() {
memory.RegisterListener(func(uidStructure *memory.StationUidStructure, data *data_proto.RtssGraphicStorage) {
//if data.LianSuoData == nil {
// return
//}
////初始化所有集中站设备映射结构体
//for _, station := range data.Stations {
// stationMap[station.StationName] = NewStationDeviceIndexRepo()
//}
////填充
//stationIdMap := uidStructure.StationIds
////道岔
//turnoutUidMap := uidStructure.TurnoutIds
//turnoutIndexMap := make(map[uint32]uint32)
//for _, turnout := range data.LianSuoData.Switchs {
// turnoutIndexMap[turnout.Id] =
//}
//for _, turnout := range data.Turnouts {
// for _, stationCid := range turnout.CentralizedStations {
// stationIdStruct := stationIdMap[stationCid]
// stationDeviceIndexRepo := stationMap[stationIdStruct.Code]
// stationDeviceIndexRepo.TurnoutMap[]
// }
//}
})
}
func Start(interlockConfig *config.InterlockConfig, simulation *memory.VerifySimulation) {
if interlockConfig == nil || interlockConfig.Ip == "" || !interlockConfig.Open {
return
}
if running {
return
}
initMutex.Lock()
defer initMutex.Unlock()
if running {
return
}
//UDP通信设施
server = udp.NewServer(fmt.Sprintf(":%d", interlockConfig.LocalPort), func(b []byte) {
slog.Info(fmt.Sprintf("%s收到消息%x", logTag, b))
frame := &FromInterlockFrame{}
err := frame.Decode(b)
if err != nil {
slog.Error(fmt.Sprintf("%s解析数据出错%s", logTag, err))
} else {
marshal, err := json.Marshal(frame)
if err != nil {
slog.Error(fmt.Sprintf("%s解析为json出错%s", logTag, err))
} else {
slog.Info(fmt.Sprintf("%s解析为json%s", logTag, string(marshal)))
}
}
})
err := server.Listen()
if err != nil {
panic(fmt.Sprintf("%s启动UDP服务失败%s", logTag, err))
}
running = true
sim = simulation
iConfig = interlockConfig
}
func Stop() {
initMutex.Lock()
defer initMutex.Unlock()
running = false
if server != nil {
server.Close()
server = nil
}
}
//func CollectRelayInfo() *FromInterlockFrame {
// sim.World
//}
//
//func HandleDriveInfo(frame *FromInterlockFrame) {
//
//}

View File

@ -4,6 +4,7 @@ package beijing12
import ( import (
"context" "context"
"fmt" "fmt"
"joylink.club/bj-rtsts-server/third_party/tcp"
"log/slog" "log/slog"
"runtime/debug" "runtime/debug"
"sync" "sync"
@ -11,9 +12,10 @@ import (
"joylink.club/bj-rtsts-server/config" "joylink.club/bj-rtsts-server/config"
"joylink.club/bj-rtsts-server/third_party/message" "joylink.club/bj-rtsts-server/third_party/message"
"joylink.club/bj-rtsts-server/third_party/udp"
) )
const logTag = "[北京12号线联锁通信]"
// 联锁代理通信接口 // 联锁代理通信接口
type InterlockMessageManager interface { type InterlockMessageManager interface {
CollectInterlockRelayInfo(code string) *message.InterlockSendMsgPkg CollectInterlockRelayInfo(code string) *message.InterlockSendMsgPkg
@ -26,8 +28,6 @@ type InterlockProxy interface {
Start(manager InterlockMessageManager) Start(manager InterlockMessageManager)
// 停止联锁消息功能 // 停止联锁消息功能
Stop() Stop()
// 发送联锁采集消息
SendCollectMessage(b []byte)
} }
var interlockMap = make(map[string]InterlockProxy) var interlockMap = make(map[string]InterlockProxy)
@ -43,8 +43,7 @@ func Default(c *config.InterlockConfig) InterlockProxy {
} }
type interlockProxy struct { type interlockProxy struct {
driveInfoUdpServer udp.UdpServer tcpClient *tcp.TcpClient
sendCollectUdpClient udp.UdpClient
manager InterlockMessageManager manager InterlockMessageManager
collectInfoTaskCancel context.CancelFunc collectInfoTaskCancel context.CancelFunc
@ -53,7 +52,7 @@ type interlockProxy struct {
// 驱动信息进行转发 // 驱动信息进行转发
func (i *interlockProxy) handleDriverInfo(b []byte) { func (i *interlockProxy) handleDriverInfo(b []byte) {
slog.Info(fmt.Sprintf("收到联锁驱动继电器数据:%x", b)) slog.Info(fmt.Sprintf("%s收到联锁驱动继电器数据:%x", logTag, b))
handler := i.manager handler := i.manager
if handler != nil { if handler != nil {
handler.HandleInterlockDriverInfo(i.runConfig.Code, b) handler.HandleInterlockDriverInfo(i.runConfig.Code, b)
@ -65,17 +64,17 @@ func (i *interlockProxy) Start(manager InterlockMessageManager) {
return return
} }
if manager == nil { if manager == nil {
panic("启动联锁消息服务错误: InterlockMessageManager不能为nil") panic(fmt.Sprintf("%s启动联锁消息服务错误: InterlockMessageManager不能为nil", logTag))
} }
if i.manager != nil { if i.manager != nil {
panic("启动联锁消息服务错误: 存在正在运行的任务") panic(fmt.Sprintf("%s启动联锁消息服务错误: 存在正在运行的任务", logTag))
} }
i.manager = manager i.manager = manager
// 初始化客户端、服务端 // 初始化客户端、服务端
i.initInterlockProxy() i.initInterlockProxy()
ctx, cancle := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go i.collectInfoStateTask(ctx) go i.collectInfoStateTask(ctx)
i.collectInfoTaskCancel = cancle i.collectInfoTaskCancel = cancel
} }
// 采集电路状态发送间隔,单位ms // 采集电路状态发送间隔,单位ms
@ -88,7 +87,7 @@ var serialNumber uint8
func (i *interlockProxy) collectInfoStateTask(ctx context.Context) { func (i *interlockProxy) collectInfoStateTask(ctx context.Context) {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
slog.Error("定时发送道岔状态任务异常", "error", err, "stack", string(debug.Stack())) slog.Error(logTag+"定时发送道岔状态任务异常", "error", err, "stack", string(debug.Stack()))
debug.PrintStack() debug.PrintStack()
} }
}() }()
@ -102,11 +101,11 @@ func (i *interlockProxy) collectInfoStateTask(ctx context.Context) {
if collectInfoState != nil { if collectInfoState != nil {
serialNumber++ serialNumber++
collectInfoState.SetSerialNumber(serialNumber) collectInfoState.SetSerialNumber(serialNumber)
err := i.sendCollectUdpClient.SendMsg(collectInfoState) err := i.tcpClient.Send(collectInfoState.Encode())
if err != nil { if err != nil {
slog.Error("向联锁发送继电器状态失败:", err) slog.Error(fmt.Sprintf("%s向联锁发送继电器状态失败%s", logTag, err))
} else { } else {
slog.Info(fmt.Sprintf("向联锁发送继电器数据成功:%x", collectInfoState.Encode())) slog.Info(fmt.Sprintf("%s向联锁发送继电器数据成功:%x", logTag, collectInfoState.Encode()))
} }
} }
time.Sleep(time.Millisecond * InterlockMessageSendInterval) time.Sleep(time.Millisecond * InterlockMessageSendInterval)
@ -117,11 +116,9 @@ func (i *interlockProxy) Stop() {
initMutex.Lock() initMutex.Lock()
defer initMutex.Unlock() defer initMutex.Unlock()
delete(interlockMap, i.runConfig.Code) delete(interlockMap, i.runConfig.Code)
if i.sendCollectUdpClient != nil { i.collectInfoTaskCancel()
i.sendCollectUdpClient.Close() if i.tcpClient != nil {
} i.tcpClient.Close()
if i.driveInfoUdpServer != nil {
i.driveInfoUdpServer.Close()
} }
if i.collectInfoTaskCancel != nil { if i.collectInfoTaskCancel != nil {
i.collectInfoTaskCancel() i.collectInfoTaskCancel()
@ -129,12 +126,21 @@ func (i *interlockProxy) Stop() {
i.manager = nil i.manager = nil
} }
func (i *interlockProxy) SendCollectMessage(b []byte) { func (i *interlockProxy) initInterlockProxy() {
i.sendCollectUdpClient.Send(b) client, err := tcp.StartTcpClient(fmt.Sprintf("%s:%d", i.runConfig.Ip, i.runConfig.RemotePort), i.handleDriveInfo, func(err error) {
slog.Error(fmt.Sprintf("%sTCP客户端读取数据出错终止通信服务%s", logTag, err))
i.Stop()
})
if err != nil {
panic(fmt.Sprintf("%s启动TCP客户端失败%s", logTag, err))
}
i.tcpClient = client
} }
func (i *interlockProxy) initInterlockProxy() { func (i *interlockProxy) handleDriveInfo(n int, data []byte) {
i.sendCollectUdpClient = udp.NewClient(fmt.Sprintf("%v:%v", i.runConfig.Ip, i.runConfig.RemotePort)) data = data[:n]
i.driveInfoUdpServer = udp.NewServer(fmt.Sprintf(":%d", i.runConfig.LocalPort), i.handleDriverInfo) slog.Info(fmt.Sprintf("%s收到联锁驱动继电器数据%x", logTag, data))
i.driveInfoUdpServer.Listen() if i.manager != nil {
i.manager.HandleInterlockDriverInfo(i.runConfig.Code, data)
}
} }

View File

@ -43,10 +43,14 @@ func StartTcpClient(rAddr string, handler func(n int, data []byte), readErr func
slog.Error(fmt.Sprintf("TCP客户端[rAddr:%s]读取数据异常连接可能断开:", rAddr), opErr) slog.Error(fmt.Sprintf("TCP客户端[rAddr:%s]读取数据异常连接可能断开:", rAddr), opErr)
client.conning = false client.conning = false
readErr(readDataErr) readErr(readDataErr)
} else if err == io.EOF { } else if readDataErr == io.EOF {
slog.Warn(fmt.Sprintf("TCP客户端[rAddr:%s]断开连接:", rAddr)) slog.Warn(fmt.Sprintf("TCP客户端[rAddr:%s]断开连接:", rAddr))
client.conning = false client.conning = false
readErr(err) readErr(readDataErr)
} else {
slog.Error(fmt.Sprintf("TCP客户端[rAddr:%s]读数据出错:%s", raddr, readDataErr))
client.conning = false
readErr(readDataErr)
} }
return return
} }

View File

@ -13,6 +13,15 @@ import (
"joylink.club/rtsssimulation/repository" "joylink.club/rtsssimulation/repository"
) )
// 地图数据处理方法。目前用于不同通信协议所需数据的预处理
type MapDataHandler func(uidStructure *StationUidStructure, data *data_proto.RtssGraphicStorage)
var mapDataHandlerList = make([]MapDataHandler, 0)
func RegisterListener(handler MapDataHandler) {
mapDataHandlerList = append(mapDataHandlerList, handler)
}
var giUidMap sync.Map var giUidMap sync.Map
type elementIdStructure struct { type elementIdStructure struct {
@ -426,6 +435,10 @@ func initStationUid(data *data_proto.RtssGraphicStorage) *StationUidStructure {
Uid: GenerateElementUid(city, lineId, nil, xcj.Code), Uid: GenerateElementUid(city, lineId, nil, xcj.Code),
} }
} }
//通信协议预处理所需数据
for _, handler := range mapDataHandlerList {
handler(gus, data)
}
return gus return gus
} }

View File

@ -124,7 +124,7 @@ func runThirdParty(s *memory.VerifySimulation) error {
for _, c := range s.GetInterlockCodes() { for _, c := range s.GetInterlockCodes() {
switch c.Line { switch c.Line {
case "11": case "11":
beijing11.Start(c) beijing11.Start(c, s)
default: default:
beijing12.Default(c).Start(s) beijing12.Default(c).Start(s)
} }