package mqtt import ( "context" "fmt" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" "github.com/sagikazarmark/slog-shim" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" "joylink.club/bj-rtsts-server/dto/state_proto" ) var mqttClient *MqttClient // 客户端 type MqttClient struct { cc *autopaho.ClientConfig cm *autopaho.ConnectionManager } // 初始化并启动MQTT客户端服务 func Startup(cmc *MqttOptions) error { if err := checkConfig(cmc); err != nil { return err } cc, err := cmc.tryInto() if err != nil { return err } ctx := context.Background() cm, err := autopaho.NewConnection(ctx, *cc) if err != nil { return err } //添加等待连接,如果连接失败,则等待重连 if err = cm.AwaitConnection(ctx); err != nil { panic(err) } mqttClient = &MqttClient{cc: cc, cm: cm} return nil } // 检查配置信息 func checkConfig(cmc *MqttOptions) error { if cmc.AppId == "" { return fmt.Errorf("应用编号不能为空") } if cmc.ClientId == "" { return fmt.Errorf("客户端编号不能为空") } if cmc.BrokerUrl == "" { return fmt.Errorf("MQTT代理服务地址不能为空") } if cmc.Username == "" { return fmt.Errorf("MQTT用户名不能为空") } if cmc.Password == "" { return fmt.Errorf("MQTT密码不能为空") } return nil } func GetMsgClient() *MqttClient { return mqttClient } // 获取MQTT客户端id func GetClientId() string { return mqttClient.cc.ClientConfig.ClientID } // 发布数据 func (client *MqttClient) pub(topic string, data protoreflect.ProtoMessage) error { if data == nil { return fmt.Errorf("发布数据引用为nil") } b, err := proto.Marshal(data) if err != nil { return err } if !MatchTopic(topic) { slog.Error("未知发布主题", "topic", topic, "data", data) return fmt.Errorf("未知发布主题: topic=%s", topic) } // else { // slog.Debug("发布主题", "topic", topic, "data", data) // } _, err = client.cm.Publish(context.Background(), &paho.Publish{ Topic: topic, QoS: 0, Payload: b, }) return err } // 订阅mqtt服务 /*func subHandle(p *paho.Publish) { fmt.Println(fmt.Sprintf("%v-%v"), hex.EncodeToString(p.Payload), len(p.Payload)) } func (client *MqttClient) SubSimulationState(simulationId string) (*paho.Suback, error) { topIc := GetStateTopic(simulationId) _, err2 := client.cm.Subscribe(context.Background(), &paho.Subscribe{Subscriptions: []paho.SubscribeOptions{{Topic: topIc, QoS: 0}}}) if err2 != nil { fmt.Println(err2) } client.cc.Router.RegisterHandler(topIc, subHandle) return nil, nil }*/ // 发送仿真状态数据 func (client *MqttClient) PubSimulationState(simulationId string, msg *state_proto.SimulationStatus) error { return client.pub(GetStateTopic(simulationId), msg) } func (c *MqttClient) PubTpapiServiceState(simulationId string, msg *state_proto.SimulationThirdPartyApiService) error { return c.pub(GetTpapiServiceTopic(simulationId), msg) } // 发送IBP状态数据 func (client *MqttClient) PubIBPState(simulationId string, mapId int32, ibpId uint32, msg *state_proto.PushedDevicesStatus) error { return client.pub(GetIbpTopic(simulationId, mapId, ibpId), msg) } // 发送PSL状态数据 func (client *MqttClient) PubPSLState(simulationId string, mapId int32, boxId uint32, msg *state_proto.PushedDevicesStatus) error { return client.pub(GetPslTopic(simulationId, mapId, boxId), msg) } // 发送继电器状态数据 func (client *MqttClient) PubRCCState(simulationId string, mapId int32, msg *state_proto.PushedDevicesStatus) error { return client.pub(GetRccTopic(simulationId, mapId), msg) } // 发送站场图状态数据 func (client *MqttClient) PubSfpState(simulationId string, mapId int32, msg *state_proto.PushedDevicesStatus) error { return client.pub(GetSfpTopic(simulationId, mapId), msg) } // 发送列车控制状态 func (client *MqttClient) PubTrainControlState(simulationId string, trainId string, msg *state_proto.TrainControlStateMsg) error { return client.pub(GetTrainControlTopic(simulationId, trainId), msg) }