diff --git a/third_party/tcp/tcp_client.go b/third_party/tcp/tcp_client.go index dacc61b..3f9031f 100644 --- a/third_party/tcp/tcp_client.go +++ b/third_party/tcp/tcp_client.go @@ -9,14 +9,13 @@ import ( ) type TcpClient struct { - conn *net.TCPConn - handler func(n int, data []byte) - ctx context.CancelFunc - conning bool - properties map[string]interface{} + conn *net.TCPConn + handler func(n int, data []byte) + ctx context.CancelFunc + conning bool } -func StartTcpClient(rAddr string, properties map[string]interface{}, handler func(n int, data []byte, clientProperties map[string]interface{}), readErr func(err error)) (*TcpClient, error) { +func StartTcpClient(rAddr string, handler func(n int, data []byte), readErr func(err error)) (*TcpClient, error) { raddr, addErr := net.ResolveTCPAddr("tcp", rAddr) if addErr != nil { return nil, addErr @@ -27,7 +26,7 @@ func StartTcpClient(rAddr string, properties map[string]interface{}, handler fun if err != nil { return nil, err } - client := &TcpClient{conn: conn, ctx: ctxFun, properties: properties} + client := &TcpClient{conn: conn, ctx: ctxFun} go func() { for { select { @@ -50,7 +49,7 @@ func StartTcpClient(rAddr string, properties map[string]interface{}, handler fun } } client.conning = true - handler(l, data, client.properties) + handler(l, data) } }() client.conning = true diff --git a/third_party/train_pc_sim/train_pc_sim.go b/third_party/train_pc_sim/train_pc_sim.go index fcfbc55..0a0e9ee 100644 --- a/third_party/train_pc_sim/train_pc_sim.go +++ b/third_party/train_pc_sim/train_pc_sim.go @@ -58,6 +58,40 @@ type TrainPcSimManage interface { // TrainBtmQuery 处理列车btm查询 TrainBtmQuery(connType state_proto.TrainConnState_TrainConnType, data []byte) } +type trainPcReciverData struct { + clientKey string + tcpClient *tcp.TcpClient + pcSimManage TrainPcSimManage +} + +func (rd *trainPcReciverData) reciverDataHandle(n int, data []byte) { + + connType := state_proto.TrainConnState_PC_SIM_A + if rd.clientKey == "B" { + connType = state_proto.TrainConnState_PC_SIM_B + } + baseMsg := &message.TrainPcSimBaseMessage{} + err := baseMsg.Decode(data) + if err != nil { + slog.Error("车载pc仿真接受数据解析失败 ") + return + } + + switch baseMsg.Type { + //case RECIVE_TRAIN_CREATE_REMOVE: + // pc.trainPcSimManage.TrainPcSimConnOrRemoveHandle(baseMsg.Data[0]) + case RECIVE_TRAIN_INTERFACE_CABINET_OUTR: + rd.pcSimManage.TrainPcSimDigitalOutInfoHandle(connType, baseMsg.Data) + case RECIVE_TRAIN_INTERFACE_CABINET_OUTR_BACK: + rd.pcSimManage.TrainPcSimDigitalReportHandle(connType, baseMsg.Data) + case RECIVE_TRAIN_QUERY_STATUS: + rd.pcSimManage.TrainBtmQuery(connType, baseMsg.Data) + case RECIVE_TRAIN_MOCK_DATA: + rd.pcSimManage.TrainPcSimMockInfo(connType, baseMsg.Data) + //case RECIVE_TRAIN_DOOR_MODE: + // pc.trainPcSimManage.TrainDoorModeHandle(baseMsg.Data[0]) + } +} const Name = "车载pc仿真" const CLIENT_KEY = "clientKey" @@ -97,11 +131,12 @@ func Default() TrainPcSim { type trainPcSimService struct { state tpapi.ThirdPartyApiServiceState //pcSimClient *tcp.TcpClient - pcSimClientMap map[string]*tcp.TcpClient - cancleContext context.CancelFunc - trainPcSimManage TrainPcSimManage - speedPlace *message.TrainSpeedPlaceReportMsg - configs []config.VehiclePCSimConfig + //pcSimClientMap map[string]*tcp.TcpClient + newPcSimclientMap map[string]*trainPcReciverData + cancleContext context.CancelFunc + trainPcSimManage TrainPcSimManage + speedPlace *message.TrainSpeedPlaceReportMsg + configs []config.VehiclePCSimConfig } // 接受来自pc仿真的消息 @@ -110,22 +145,39 @@ func (d *trainPcSimService) readError(err error) { d.updateState(tpapi.ThirdPartyState_Broken) } - -func (d *trainPcSimService) closeAllConn() { - for key, client := range d.pcSimClientMap { - if client != nil { - client.Close() +func (d *trainPcSimService) newCloseAllConn() { + for _, rd := range d.newPcSimclientMap { + if rd != nil { + rd.tcpClient.Close() } - delete(d.pcSimClientMap, key) } +} -} -func (d *trainPcSimService) closeConn(clientKey string) { - if d.pcSimClientMap[clientKey] != nil { - d.pcSimClientMap[clientKey].Close() - delete(d.pcSimClientMap, clientKey) +/* + func (d *trainPcSimService) closeAllConn() { + for key, client := range d.pcSimClientMap { + if client != nil { + client.Close() + } + delete(d.pcSimClientMap, key) + } + } +*/ +func (d *trainPcSimService) newCloseConn(clientKey string) { + rd := d.newPcSimclientMap[clientKey] + if rd != nil { + rd.tcpClient.Close() } } + +/* + func (d *trainPcSimService) closeConn(clientKey string) { + if d.pcSimClientMap[clientKey] != nil { + d.pcSimClientMap[clientKey].Close() + delete(d.pcSimClientMap, clientKey) + } + } +*/ func (d *trainPcSimService) findConfig(tcChar string) (*config.VehiclePCSimConfig, error) { configFlag := false if tcChar == "A" { @@ -144,13 +196,13 @@ func (d *trainPcSimService) findConfig(tcChar string) (*config.VehiclePCSimConfi return nil, fmt.Errorf("") } func (d *trainPcSimService) connTrainPcSim(ctx context.Context) { - //reconnIndex := 0 - //ctx, ctxFun := context.WithCancel(context.Background()) + go func() { for { select { case <-ctx.Done(): - d.closeAllConn() + //d.closeAllConn() + d.newCloseAllConn() return default: } @@ -160,15 +212,22 @@ func (d *trainPcSimService) connTrainPcSim(ctx context.Context) { clientKey := FindTrainPcSimClientKey(t) if clientKey == "" { slog.Error("未找到对应的pc仿真连接,trainId:", t.Id, "删除对应客户端") - d.closeConn(clientKey) + //d.closeConn(clientKey) + + d.newCloseConn(clientKey) continue } - client := d.pcSimClientMap[clientKey] - if !client.IsConning() { - client.Close() - d.initConn(clientKey) + //client := d.pcSimClientMap[clientKey] + rd := d.newPcSimclientMap[clientKey] + if rd == nil { + d.newPcSimclientMap[clientKey] = &trainPcReciverData{pcSimManage: d.trainPcSimManage, clientKey: clientKey, tcpClient: &tcp.TcpClient{}} + } + if !rd.tcpClient.IsConning() { + //client.Close() + d.newCloseConn(clientKey) + d.initConn(clientKey) } } } @@ -181,29 +240,45 @@ func (d *trainPcSimService) connTrainPcSim(ctx context.Context) { func (d *trainPcSimService) initConn(clientKey string) { - client := d.pcSimClientMap[clientKey] - if d.pcSimClientMap[clientKey] == nil { - client = &tcp.TcpClient{} - d.pcSimClientMap[clientKey] = client + rd := d.newPcSimclientMap[clientKey] + if rd == nil { + rd = &trainPcReciverData{pcSimManage: d.trainPcSimManage, clientKey: clientKey, tcpClient: &tcp.TcpClient{}} + d.newPcSimclientMap[clientKey] = rd } - config, _ := d.findConfig(clientKey) - addr := fmt.Sprintf("%v:%v", config.PcSimIp, config.PcSimPort) - properties := map[string]interface{}{ - CLIENT_KEY: clientKey, - } - fmt.Println(properties[CLIENT_KEY]) - client2, err := tcp.StartTcpClient(addr, properties, d.reivceData, d.readError) + cfg, _ := d.findConfig(clientKey) + addr := fmt.Sprintf("%v:%v", cfg.PcSimIp, cfg.PcSimPort) + + client2, err := tcp.StartTcpClient(addr, rd.reciverDataHandle, d.readError) if err != nil { slog.Error("车载pc连接失败 clientKey:", clientKey, "error:", err.Error()) d.updateState(tpapi.ThirdPartyState_Broken) - } else { - d.pcSimClientMap[clientKey] = client2 + rd.tcpClient = client2 } + + /*if d.pcSimClientMap[clientKey] != nil { + return + } + + cfg, _ := d.findConfig(clientKey) + addr := fmt.Sprintf("%v:%v", cfg.PcSimIp, cfg.PcSimPort) + + properties := map[string]interface{}{ + CLIENT_KEY: clientKey, + } + client2, err := tcp.StartTcpClient(addr, properties, d.reivceData, d.readError) + if err != nil { + slog.Error("车载pc连接失败 clientKey:", clientKey, "error:", err.Error()) + d.updateState(tpapi.ThirdPartyState_Broken) + } else { + d.pcSimClientMap[clientKey] = client2 + }*/ + } func (d *trainPcSimService) Start(pcSimManage TrainPcSimManage) { configs := pcSimManage.GetTrainPcSimConfig() - d.pcSimClientMap = map[string]*tcp.TcpClient{} + d.newPcSimclientMap = make(map[string]*trainPcReciverData) + //d.pcSimClientMap = map[string]*tcp.TcpClient{} if len(configs) <= 0 { slog.Info("车载pc仿真配置未开启") return @@ -218,7 +293,6 @@ func (d *trainPcSimService) Start(pcSimManage TrainPcSimManage) { slog.Info("车载pc仿真配置未开启") return } - d.configs = configs ctx, ctxFun := context.WithCancel(context.Background()) d.cancleContext = ctxFun @@ -235,7 +309,8 @@ func (d *trainPcSimService) Stop() { d.cancleContext() d.cancleContext = nil } - d.closeAllConn() + //d.closeAllConn() + d.newCloseAllConn() } func (d *trainPcSimService) CreateOrRemoveSpeedPLace(train *state_proto.TrainState) { if train.ConnState.Conn && (train.ConnState.ConnType == state_proto.TrainConnState_PC_SIM_A || train.ConnState.ConnType == state_proto.TrainConnState_PC_SIM_B) { @@ -251,15 +326,24 @@ func (d *trainPcSimService) CreateOrRemoveTrain(train *state_proto.TrainState, m if msgType == RECIVE_TRAIN_CREATE_REMOVE && data[0] == 0x01 { d.initConn(clientKey) } - msg := &message.TrainPcSimBaseMessage{Data: data, Type: uint16(msgType)} - client := d.pcSimClientMap[clientKey] + rd := d.newPcSimclientMap[clientKey] + if rd != nil { + err := rd.tcpClient.Send(msg.Encode()) + if err != nil { + return err + } + if data[0] != 0x01 { + d.newCloseConn(clientKey) + } + } + /*client := d.pcSimClientMap[clientKey] err := client.Send(msg.Encode()) if data[0] != 0x01 { d.closeConn(clientKey) - } + }*/ - return err + return nil } // 依据文档80ms发送列车速度位置 @@ -274,15 +358,24 @@ func (d *trainPcSimService) sendTrainLocationAndSpeedTask(ctx context.Context) { for _, train := range trains { if train.ConnState.Conn && train.PluseCount != nil { clientKey := FindTrainPcSimClientKey(train) + rd := d.newPcSimclientMap[clientKey] - client := d.pcSimClientMap[clientKey] s1, s2 := train.PluseCount.PulseCount1, train.PluseCount.PulseCount2 d.speedPlace.ParsePulseCount1(s1, s2) data := d.speedPlace.Encode(train.TrainRunUp, s1, s2) bm := &message.TrainPcSimBaseMessage{Type: SENDER_TRAIN_LOCATION_INFO, Data: data} train.PluseCount.PulseCount1 = 0 train.PluseCount.PulseCount2 = 0 - client.Send(bm.Encode()) + rd.tcpClient.Send(bm.Encode()) + + /*client := d.pcSimClientMap[clientKey] + s1, s2 := train.PluseCount.PulseCount1, train.PluseCount.PulseCount2 + d.speedPlace.ParsePulseCount1(s1, s2) + data := d.speedPlace.Encode(train.TrainRunUp, s1, s2) + bm := &message.TrainPcSimBaseMessage{Type: SENDER_TRAIN_LOCATION_INFO, Data: data} + train.PluseCount.PulseCount1 = 0 + train.PluseCount.PulseCount2 = 0 + client.Send(bm.Encode())*/ } } @@ -296,7 +389,8 @@ func (d *trainPcSimService) SendDriverActive(train *state_proto.TrainState) { vobc := train.VobcState clientKey := FindTrainPcSimClientKey(train) - client := d.pcSimClientMap[clientKey] + //client := d.pcSimClientMap[clientKey] + rd := d.newPcSimclientMap[clientKey] defulatBuf := make([]byte, 0) msg := &message.TrainPcSimBaseMessage{Data: defulatBuf} if train.TrainRunUp { @@ -314,7 +408,8 @@ func (d *trainPcSimService) SendDriverActive(train *state_proto.TrainState) { } da := msg.Encode() slog.Info("发送驾驶激活 列车", train.Id, "数据", hex.EncodeToString(da)) - err := client.Send(da) + err := rd.tcpClient.Send(da) + //err := client.Send(da) if err != nil { slog.Error("发送驾驶激活 列车", train.Id, "数据", hex.EncodeToString(da), err) } @@ -324,7 +419,9 @@ func (d *trainPcSimService) SendHandleSwitch(oldTraction, oldBrakeForce int64, t if tc.Conn { vobc := train.VobcState clientKey := FindTrainPcSimClientKey(train) - client := d.pcSimClientMap[clientKey] + rd := d.newPcSimclientMap[clientKey] + + //client := d.pcSimClientMap[clientKey] msg := &message.TrainPcSimBaseMessage{} newTraction := vobc.TractionForce newBrake := -vobc.BrakeForce @@ -355,7 +452,8 @@ func (d *trainPcSimService) SendHandleSwitch(oldTraction, oldBrakeForce int64, t } da := msg.Encode() slog.Info("发送列车手柄消息", "clientKey", clientKey, "msg", hex.EncodeToString(da)) - err := client.Send(da) + err := rd.tcpClient.Send(da) + //err := client.Send(da) if err != nil { slog.Error("发送列车手柄消息失败", "clientKey", clientKey, "msg", hex.EncodeToString(da)) } @@ -373,11 +471,13 @@ func (d *trainPcSimService) SendTrainDirection(train *state_proto.TrainState, tr baseMsgs = append(baseMsgs, &message.TrainPcSimBaseMessage{Type: RECIVE_TRAIN_HAND_KEY_BACKWARD}) } clientKey := FindTrainPcSimClientKey(train) - client := d.pcSimClientMap[clientKey] + rd := d.newPcSimclientMap[clientKey] + //client := d.pcSimClientMap[clientKey] for _, msg := range baseMsgs { da := msg.Encode() slog.Info("发送列车方向列车", train.Id, "数据", hex.EncodeToString(da)) - err := client.Send(da) + err := rd.tcpClient.Send(da) + //err := client.Send(da) if err != nil { slog.Error("发送列车方向失败列车", train.Id, "数据", hex.EncodeToString(da)) } @@ -389,11 +489,13 @@ func (d *trainPcSimService) SendBaliseData(train *state_proto.TrainState, msgTyp msg.Type = msgType msg.Data = data clientKey := FindTrainPcSimClientKey(train) - client := d.pcSimClientMap[clientKey] + rd := d.newPcSimclientMap[clientKey] + + //client := d.pcSimClientMap[clientKey] da := msg.Encode() slog.Info("发送列车PC仿真应答器信息,数据", hex.EncodeToString(da)) - err := client.Send(da) + err := rd.tcpClient.Send(da) if err != nil { slog.Info("发送列车PC仿真应答器信息失败,数据", hex.EncodeToString(da)) } @@ -414,20 +516,23 @@ func (d *trainPcSimService) PublishTrainControlEvent(train *state_proto.TrainSta return } clientKey := FindTrainPcSimClientKey(train) - client := d.pcSimClientMap[clientKey] + rd := d.newPcSimclientMap[clientKey] + + //client := d.pcSimClientMap[clientKey] for _, event := range events { msg := &message.TrainPcSimBaseMessage{} msg.Type = SENDER_TRAIN_OUTR_INFO data := []byte{event.Command, event.Status} msg.Data = data - client.Send(msg.Encode()) + rd.tcpClient.Send(msg.Encode()) + //client.Send(msg.Encode()) //FireTrainControlEventType.Publish(world, &event) } } // 接受来自pc仿真的消息 -func (d *trainPcSimService) reivceData(len int, data []byte, properties map[string]interface{}) { +/*func (d *trainPcSimService) reivceData(len int, data []byte, properties map[string]interface{}) { clientKey := properties[CLIENT_KEY] ck := fmt.Sprintf("%v", clientKey) if d.pcSimClientMap[ck] == nil { @@ -460,4 +565,4 @@ func (d *trainPcSimService) reivceData(len int, data []byte, properties map[stri //case RECIVE_TRAIN_DOOR_MODE: // pc.trainPcSimManage.TrainDoorModeHandle(baseMsg.Data[0]) } -} +}*/