rts-sim-testing-service/mqtt/client.go
2023-12-20 10:37:54 +08:00

127 lines
3.0 KiB
Go

package mqtt
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"github.com/google/uuid"
"github.com/sagikazarmark/slog-shim"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"joylink.club/bj-rtsts-server/ts/protos/state"
)
var mqttClient *MqttClient
var clientId string
// 客户端
type MqttClient struct {
cmc *MqttOptions
cc *autopaho.ClientConfig
cm *autopaho.ConnectionManager
}
// 初始化并启动MQTT客户端服务
func Startup(cmc *MqttOptions) error {
initClientId()
cmc.ClientId = clientId
if err := checkConfig(cmc); err != nil {
return err
}
cc, err := cmc.tryInto()
if err != nil {
return err
}
cm, err := autopaho.NewConnection(context.Background(), *cc)
if err != nil {
return err
}
mqttClient = &MqttClient{cmc: cmc, cc: cc, cm: cm}
return nil
}
// 检查配置信息
func checkConfig(cmc *MqttOptions) error {
if cmc.AppId == "" {
return fmt.Errorf("应用编号不能为空")
}
if cmc.ClientId == "" {
return fmt.Errorf("客户端编号不能为空")
}
if cmc.BrokerUrl == "" {
return fmt.Errorf("MQTT代理服务地址不能为空")
}
if cmc.Username == "" {
return fmt.Errorf("MQTT用户名不能为空")
}
if cmc.Password == "" {
return fmt.Errorf("MQTT密码不能为空")
}
return nil
}
// 初始化MQTT客户端id
func initClientId() {
if clientId == "" {
us := uuid.New().String()
usl := len(us)
sufix5 := us[usl-5 : usl]
clientId = fmt.Sprintf("%s%d", sufix5, rand.New(rand.NewSource(time.Now().UnixNano())).Int()%1000)
}
}
// 获取MQTT客户端id
func GetClientId() string {
return clientId
}
// 发布数据
func pub(topic string, data protoreflect.ProtoMessage) error {
if data == nil {
return fmt.Errorf("发布数据引用为nil")
}
b, err := proto.Marshal(data)
if err != nil {
return err
}
if !MatchTopic(topic) {
slog.Error("未知发布主题", "topic", topic, "data", data)
return fmt.Errorf("未知发布主题: topic=%s", topic)
}
_, err = mqttClient.cm.Publish(context.Background(), &paho.Publish{
Topic: topic,
QoS: 0,
Payload: b,
})
return err
}
// 发送仿真状态数据
func PubSimulationState(simulationId string, msg *state.SimulationStatus) error {
return pub(GetStateTopic(simulationId), msg)
}
// 发送IBP状态数据
func PubIBPState(simulationId string, mapId int32, stationId uint32, msg *state.PushedDevicesStatus) error {
return pub(GetIbpTopic(simulationId, mapId, stationId), msg)
}
// 发送PSL状态数据
func PubPSLState(simulationId string, mapId int32, boxId uint32, msg *state.PushedDevicesStatus) error {
return pub(GetPslTopic(simulationId, mapId, boxId), msg)
}
// 发送继电器状态数据
func PubRCCState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error {
return pub(GetRccTopic(simulationId, mapId), msg)
}
// 发送站场图状态数据
func PubSfpState(simulationId string, mapId int32, msg *state.PushedDevicesStatus) error {
return pub(GetSfpTopic(simulationId, mapId), msg)
}