diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..eb6d221 --- /dev/null +++ b/config/config.go @@ -0,0 +1,61 @@ +package config + +import ( + "flag" + "fmt" + "log/slog" + + "github.com/spf13/viper" +) + +type Config struct { + Mqtt mqtt +} + +// MQTT客户端配置 +type mqtt struct { + Topic topic + ClientId string + Address string + Username string + Password string + KeepAlive uint16 // 保活时间间隔,单位s + ConnectRetryDelay uint16 // 连接重试延时,单位s + ConnectTimeout uint16 // 连接操作超时,单位s +} + +type topic struct { + App string +} + +var Cfg Config + +// 获取配置文件名称,从运行flag参数config中获取,若未提供,使用默认'dev' +func getConfigName() string { + configName := "" + flag.StringVar(&configName, "config", "dev", "config name, eg: -config test") + flag.Parse() + if configName == "" { + configName = "dev" + } + slog.Info("读取配置文件", "配置文件名称", configName) + return configName +} + +// 加载配置 +func LoadConfig() { + cnf := viper.New() + cnf.SetConfigName(getConfigName()) + cnf.SetConfigType("yml") + cnf.AddConfigPath("./config/") + cnf.AddConfigPath(".") + err := cnf.ReadInConfig() + if err != nil { + panic(fmt.Errorf("读取配置文件错误: %w", err)) + } + err = cnf.Unmarshal(&Cfg) + if err != nil { + panic(fmt.Errorf("解析配置文件错误: %w", err)) + } + slog.Info("成功加载配置", "config", Cfg) +} diff --git a/config/dev.yml b/config/dev.yml new file mode 100644 index 0000000..c46c1a9 --- /dev/null +++ b/config/dev.yml @@ -0,0 +1,10 @@ +mqtt: + topic: + app: rtsts + address: tcp://192.168.3.233:1883 + clientId: cgy_unittec + username: rtsts_service + password: joylink@0503 + keepAlive: 30 # 保活时间间隔,单位s + connectRetryDelay: 5 # 连接重试延时,单位s + connectTimeout: 5 # 连接操作超时,单位s \ No newline at end of file diff --git a/go.mod b/go.mod index 5290812..3757b3e 100644 --- a/go.mod +++ b/go.mod @@ -2,9 +2,38 @@ module joylink.club/iot go 1.21 -require github.com/simonvetter/modbus v1.6.0 +require ( + github.com/eclipse/paho.golang v0.12.0 + github.com/simonvetter/modbus v1.6.0 +) + +require ( + github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect + github.com/hashicorp/hcl v1.0.0 // indirect + github.com/magiconair/properties v1.8.7 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/pelletier/go-toml/v2 v2.1.0 // indirect + github.com/sagikazarmark/locafero v0.4.0 // indirect + github.com/sagikazarmark/slog-shim v0.1.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect + github.com/spf13/afero v1.11.0 // indirect + github.com/spf13/cast v1.6.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + go.uber.org/atomic v1.9.0 // indirect + go.uber.org/multierr v1.9.0 // indirect + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect + golang.org/x/net v0.19.0 // indirect + golang.org/x/sync v0.5.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) require ( github.com/goburrow/serial v0.1.0 // indirect - google.golang.org/protobuf v1.26.0 + github.com/spf13/viper v1.18.1 + google.golang.org/protobuf v1.31.0 ) diff --git a/go.sum b/go.sum index e0ae27a..b1f4cff 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,89 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.golang v0.12.0 h1:EXQFJbJklDnUqW6lyAknMWRhM2NgpHxwrrL8riUmp3Q= +github.com/eclipse/paho.golang v0.12.0/go.mod h1:TSDCUivu9JnoR9Hl+H7sQMcHkejWH2/xKK1NJGtLbIE= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/goburrow/serial v0.1.0 h1:v2T1SQa/dlUqQiYIT8+Cu7YolfqAi3K96UmhwYyuSrA= github.com/goburrow/serial v0.1.0/go.mod h1:sAiqG0nRVswsm1C97xsttiYCzSLBmUZ/VSlVLZJ8haA= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= +github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= +github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= +github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= +github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= +github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/simonvetter/modbus v1.6.0 h1:RDHJevtc7LDIVoHAbhDun8fy+QwnGe+ZU+sLm9ZZzjc= github.com/simonvetter/modbus v1.6.0/go.mod h1:hh90ZaTaPLcK2REj6/fpTbiV0J6S7GWmd8q+GVRObPw= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= +github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= +github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= +github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= +github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.18.1 h1:rmuU42rScKWlhhJDyXZRKJQHXFX02chSVW1IvkPGiVM= +github.com/spf13/viper v1.18.1/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 9816576..b93d7c8 100644 --- a/main.go +++ b/main.go @@ -6,9 +6,12 @@ import ( "os" "time" - "joylink.club/iot/service" - "joylink.club/iot/service/model" - "joylink.club/iot/service/proto" + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" + "google.golang.org/protobuf/proto" + "joylink.club/iot/config" + "joylink.club/iot/mqtt" + mproto "joylink.club/iot/mqtt/proto" ) func main() { @@ -16,50 +19,107 @@ func main() { Level: slog.LevelDebug, AddSource: false, }))) - dc := model.NewDC(make([]byte, 2), make([]byte, 2)) - mds, err := service.NewModbusQcService(&proto.ModbusConfig{ - Url: "tcp://127.0.0.1:502", - UnitId: 2, - Timeout: 500, - Interval: 1000, - Mapping: []*proto.ModbusDcMapping{ - { - // Function: proto.Modbus_ReadHoldingRegister, - Function: proto.Modbus_ReadCoil, - Addr: 0, - Quantity: 16, - Type: proto.DataType_CollectTable, - Start: 0, - }, - { - Function: proto.Modbus_WriteCoils, - Addr: 16, - Quantity: 16, - Type: proto.DataType_DriveTable, - WriteStrategy: proto.Modbus_OnUpdate, - Start: 0, - }, + config.LoadConfig() + mqttcfg := config.Cfg.Mqtt + mqtt.BuildTopics(mqttcfg.Topic.App, mqttcfg.ClientId) + cmc := &mqtt.ClientManageConfig{ + BrokerUrl: mqttcfg.Address, + ClientId: mqttcfg.ClientId, + Username: mqttcfg.Username, + Password: mqttcfg.Password, + KeepAlive: mqttcfg.KeepAlive, + ConnectRetryDelay: mqttcfg.ConnectRetryDelay, + ConnectTimeout: mqttcfg.ConnectTimeout, + OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) { + slog.Info("MQTT连接成功") + // err := mqtt.SubIotServiceState(mqtt.GetIotServiceStateTopic()) + // if err != nil { + // slog.Error("订阅IotServiceState失败", "error", err) + // os.Exit(1) + // } }, - }, dc) + } + err := mqtt.Start(cmc) if err != nil { - panic(err) + slog.Error("启动MQTT客户端失败", "error", err) + os.Exit(1) } + time.Sleep(time.Second * 3) + err = mqtt.SubIotServiceState(mqtt.GetIotServiceStateTopic()) + if err != nil { + slog.Error("订阅IotServiceState失败", "error", err) + os.Exit(1) + } + + i := 0 + mqtt.RegisterHandler(mqtt.GetIotServiceStateTopic(), func(m *paho.Publish) { + iss := &mproto.IotServiceState{} + err := proto.Unmarshal(m.Payload, iss) + if err != nil { + slog.Error("SubIotServiceState proto.Unmarshal异常", "error", err) + return + } + slog.Debug("收到IotServiceState发布消息", "state", iss) + i++ + fmt.Printf("%v次处理IotServiceState: %v\n", i, iss) + }) + go func() { - i := 0 for { - c := dc.GetCollect() - fmt.Printf("采集数据: %v\n", c) - i++ - if i%3 == 0 { - idx := i % 8 - dc.UpdateDriveByBytes(0, []byte{byte(1 << idx)}) - fmt.Printf("设置驱动数据: %v\n", dc.GetDrive()) - } time.Sleep(time.Second) + mqtt.PubIotServiceState(&mproto.IotServiceState{ + Code: config.Cfg.Mqtt.ClientId, + State: mproto.ServiceState_Normal, + }) } }() - time.Sleep(time.Minute * 2) - mds.Stop() + time.Sleep(time.Minute) + // dc := model.NewDC(make([]byte, 2), make([]byte, 2)) + // mds, err := service.NewModbusQcService(&proto.ModbusConfig{ + // Url: "tcp://127.0.0.1:502", + // UnitId: 2, + // Timeout: 500, + // Interval: 1000, + // Mapping: []*proto.ModbusDcMapping{ + // { + // // Function: proto.Modbus_ReadHoldingRegister, + // Function: proto.Modbus_ReadCoil, + // Addr: 0, + // Quantity: 16, + // Type: proto.DataType_CollectTable, + // Start: 0, + // }, + // { + // Function: proto.Modbus_WriteCoils, + // Addr: 16, + // Quantity: 16, + // Type: proto.DataType_DriveTable, + // WriteStrategy: proto.Modbus_OnUpdate, + // Start: 0, + // }, + // }, + // }, dc) + // if err != nil { + // panic(err) + // } + + // go func() { + // i := 0 + // for { + // c := dc.GetCollect() + // fmt.Printf("采集数据: %v\n", c) + // i++ + // if i%3 == 0 { + // idx := i % 8 + // dc.UpdateDriveByBytes(0, []byte{byte(1 << idx)}) + // fmt.Printf("设置驱动数据: %v\n", dc.GetDrive()) + // } + // time.Sleep(time.Second) + // } + // }() + + // time.Sleep(time.Minute * 2) + // mds.Stop() } diff --git a/mqtt/app_protocol.go b/mqtt/app_protocol.go new file mode 100644 index 0000000..2ac0ca9 --- /dev/null +++ b/mqtt/app_protocol.go @@ -0,0 +1,28 @@ +package mqtt + +import "joylink.club/iot/mqtt/proto" + +type IotServiceStateHandler func(state *proto.IotServiceState) + +type IotService interface { + PubIotServiceState(state *proto.IotServiceState) + PubIotQdData(qd *proto.IotQd) + SubIotQd() + RegIotQd(h func(qd *proto.IotQd)) + PubIotCjData(cj *proto.IotCj) + SubIotCj() + RegIotCj(h func(cj *proto.IotCj)) + SubIotReq(cmd *proto.IotServiceReq) +} + +type Service interface { + SubIotServiceState() + RegIotServiceState(h func(state *proto.IotServiceState)) + PubIotQdData(qd *proto.IotQd) + SubIotQd() + RegIotQd(h func(qd *proto.IotQd)) + PubIotCjData(cj *proto.IotCj) + SubIotCj() + RegIotCj(h func(cj *proto.IotCj)) + ReqIotService(cmd *proto.IotServiceReq) +} diff --git a/mqtt/client.go b/mqtt/client.go new file mode 100644 index 0000000..345d764 --- /dev/null +++ b/mqtt/client.go @@ -0,0 +1,87 @@ +package mqtt + +import ( + "context" + "log/slog" + + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" + "google.golang.org/protobuf/proto" + mproto "joylink.club/iot/mqtt/proto" +) + +type Manager struct { + cc *autopaho.ClientConfig + cm *autopaho.ConnectionManager +} + +var manager *Manager + +func Start(cmc *ClientManageConfig) error { + cc, err := cmc.tryInto() + if err != nil { + return err + } + cm, err := autopaho.NewConnection(context.Background(), *cc) + if err != nil { + return err + } + manager = &Manager{ + cc: cc, + cm: cm, + } + return nil +} + +func Publish(ctx context.Context, publish *paho.Publish) (*paho.PublishResponse, error) { + return manager.cm.Publish(ctx, publish) +} + +func PubIotServiceState(s *mproto.IotServiceState) error { + return manager.PubIotServiceState(s) +} + +func SubIotServiceState(topic string) error { + return manager.SubIotServiceState(topic) +} + +func (m *Manager) PubIotServiceState(s *mproto.IotServiceState) error { + if s == nil { + return nil + } + slog.Debug("PubIotServiceState", "topic", GetIotServiceStateTopic(), "state", s) + b, err := proto.Marshal(s) + if err != nil { + return err + } + _, err = m.cm.Publish(context.Background(), &paho.Publish{ + Topic: GetIotServiceStateTopic(), + QoS: 0, + Payload: b, + }) + return err +} + +func RegisterHandler(topic string, h func(m *paho.Publish)) { + manager.cc.Router.RegisterHandler(topic, h) +} + +func (m *Manager) RegisterHandler(topic string, h func(m *paho.Publish)) { + m.cc.Router.RegisterHandler(topic, h) +} + +func (m *Manager) SubIotServiceState(topic string) error { + slog.Debug("订阅IotServiceState", "topic", topic) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, err := m.cm.Subscribe(ctx, &paho.Subscribe{ + Subscriptions: []paho.SubscribeOptions{ + { + Topic: topic, + QoS: 0, + // NoLocal: true, + }, + }, + }) + return err +} diff --git a/mqtt/config.go b/mqtt/config.go new file mode 100644 index 0000000..3752318 --- /dev/null +++ b/mqtt/config.go @@ -0,0 +1,61 @@ +package mqtt + +import ( + "fmt" + "log/slog" + "net/url" + "time" + + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" +) + +type ClientManageConfig struct { + BrokerUrl string // Broker地址 + ClientId string // 客户端ID + Username string // 用户名 + Password string // 密码 + KeepAlive uint16 // 保活时间间隔,单位s,默认为60 + ConnectRetryDelay uint16 // 连接重试延时,单位s,默认为3 + ConnectTimeout uint16 // 连接操作超时,单位s,默认为3 + OnConnectionUp func(*autopaho.ConnectionManager, *paho.Connack) +} + +func (c *ClientManageConfig) tryInto() (*autopaho.ClientConfig, error) { + addr, err := url.Parse(c.BrokerUrl) + if err != nil { + return nil, fmt.Errorf("Mqtt.Address格式错误, %s: %w", c.BrokerUrl, err) + } + if c.KeepAlive == 0 { + c.KeepAlive = 60 + } + if c.ConnectRetryDelay == 0 { + c.ConnectRetryDelay = 3 + } + if c.ConnectTimeout == 0 { + c.ConnectTimeout = 3 + } + cc := &autopaho.ClientConfig{ + BrokerUrls: []*url.URL{ + addr, + }, + KeepAlive: c.KeepAlive, + ConnectRetryDelay: time.Duration(c.ConnectRetryDelay) * time.Second, + ConnectTimeout: time.Duration(c.ConnectTimeout) * time.Second, + OnConnectionUp: c.OnConnectionUp, + OnConnectError: func(err error) { + slog.Error("MQTT连接失败", "error", err) + }, + ClientConfig: paho.ClientConfig{ + ClientID: c.ClientId, + Router: paho.NewStandardRouter(), + OnClientError: func(err error) { fmt.Printf("%s Mqtt客户端发生错误: %s\n", c.ClientId, err) }, + OnServerDisconnect: func(d *paho.Disconnect) { + fmt.Printf("%s 连接断开; reason code: %d,properties: %v\n", c.ClientId, d.ReasonCode, d.Properties) + }, + }, + } + cc.SetUsernamePassword(c.Username, []byte(c.Password)) + cc.SetWillMessage(GetIotServiceStateTopic(), []byte("离线"), 1, true) + return cc, nil +} diff --git a/mqtt/proto/mqtt.pb.go b/mqtt/proto/mqtt.pb.go new file mode 100644 index 0000000..4c313f7 --- /dev/null +++ b/mqtt/proto/mqtt.pb.go @@ -0,0 +1,535 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.23.1 +// source: mqtt/mqtt.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ServiceState int32 + +const ( + ServiceState_Normal ServiceState = 0 // 正常 + ServiceState_Offline ServiceState = 1 // 离线 +) + +// Enum value maps for ServiceState. +var ( + ServiceState_name = map[int32]string{ + 0: "Normal", + 1: "Offline", + } + ServiceState_value = map[string]int32{ + "Normal": 0, + "Offline": 1, + } +) + +func (x ServiceState) Enum() *ServiceState { + p := new(ServiceState) + *p = x + return p +} + +func (x ServiceState) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ServiceState) Descriptor() protoreflect.EnumDescriptor { + return file_mqtt_mqtt_proto_enumTypes[0].Descriptor() +} + +func (ServiceState) Type() protoreflect.EnumType { + return &file_mqtt_mqtt_proto_enumTypes[0] +} + +func (x ServiceState) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ServiceState.Descriptor instead. +func (ServiceState) EnumDescriptor() ([]byte, []int) { + return file_mqtt_mqtt_proto_rawDescGZIP(), []int{0} +} + +type ServiceRequest int32 + +const ( + ServiceRequest_Logs ServiceRequest = 0 // 日志 +) + +// Enum value maps for ServiceRequest. +var ( + ServiceRequest_name = map[int32]string{ + 0: "Logs", + } + ServiceRequest_value = map[string]int32{ + "Logs": 0, + } +) + +func (x ServiceRequest) Enum() *ServiceRequest { + p := new(ServiceRequest) + *p = x + return p +} + +func (x ServiceRequest) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ServiceRequest) Descriptor() protoreflect.EnumDescriptor { + return file_mqtt_mqtt_proto_enumTypes[1].Descriptor() +} + +func (ServiceRequest) Type() protoreflect.EnumType { + return &file_mqtt_mqtt_proto_enumTypes[1] +} + +func (x ServiceRequest) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ServiceRequest.Descriptor instead. +func (ServiceRequest) EnumDescriptor() ([]byte, []int) { + return file_mqtt_mqtt_proto_rawDescGZIP(), []int{1} +} + +// IOT服务状态 +type IotServiceState struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号 + State ServiceState `protobuf:"varint,2,opt,name=state,proto3,enum=mqtt_api.ServiceState" json:"state,omitempty"` // 服务状态 +} + +func (x *IotServiceState) Reset() { + *x = IotServiceState{} + if protoimpl.UnsafeEnabled { + mi := &file_mqtt_mqtt_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *IotServiceState) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IotServiceState) ProtoMessage() {} + +func (x *IotServiceState) ProtoReflect() protoreflect.Message { + mi := &file_mqtt_mqtt_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IotServiceState.ProtoReflect.Descriptor instead. +func (*IotServiceState) Descriptor() ([]byte, []int) { + return file_mqtt_mqtt_proto_rawDescGZIP(), []int{0} +} + +func (x *IotServiceState) GetCode() string { + if x != nil { + return x.Code + } + return "" +} + +func (x *IotServiceState) GetState() ServiceState { + if x != nil { + return x.State + } + return ServiceState_Normal +} + +// IOT服务请求 +type IotServiceReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号 + Req ServiceRequest `protobuf:"varint,2,opt,name=req,proto3,enum=mqtt_api.ServiceRequest" json:"req,omitempty"` // 服务请求 +} + +func (x *IotServiceReq) Reset() { + *x = IotServiceReq{} + if protoimpl.UnsafeEnabled { + mi := &file_mqtt_mqtt_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *IotServiceReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IotServiceReq) ProtoMessage() {} + +func (x *IotServiceReq) ProtoReflect() protoreflect.Message { + mi := &file_mqtt_mqtt_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IotServiceReq.ProtoReflect.Descriptor instead. +func (*IotServiceReq) Descriptor() ([]byte, []int) { + return file_mqtt_mqtt_proto_rawDescGZIP(), []int{1} +} + +func (x *IotServiceReq) GetCode() string { + if x != nil { + return x.Code + } + return "" +} + +func (x *IotServiceReq) GetReq() ServiceRequest { + if x != nil { + return x.Req + } + return ServiceRequest_Logs +} + +// IOT服务响应 +type IotServiceResp struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号 +} + +func (x *IotServiceResp) Reset() { + *x = IotServiceResp{} + if protoimpl.UnsafeEnabled { + mi := &file_mqtt_mqtt_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *IotServiceResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IotServiceResp) ProtoMessage() {} + +func (x *IotServiceResp) ProtoReflect() protoreflect.Message { + mi := &file_mqtt_mqtt_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IotServiceResp.ProtoReflect.Descriptor instead. +func (*IotServiceResp) Descriptor() ([]byte, []int) { + return file_mqtt_mqtt_proto_rawDescGZIP(), []int{2} +} + +func (x *IotServiceResp) GetCode() string { + if x != nil { + return x.Code + } + return "" +} + +// IOT驱动数据 +type IotQd struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号 + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` // 驱动数据 +} + +func (x *IotQd) Reset() { + *x = IotQd{} + if protoimpl.UnsafeEnabled { + mi := &file_mqtt_mqtt_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *IotQd) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IotQd) ProtoMessage() {} + +func (x *IotQd) ProtoReflect() protoreflect.Message { + mi := &file_mqtt_mqtt_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IotQd.ProtoReflect.Descriptor instead. +func (*IotQd) Descriptor() ([]byte, []int) { + return file_mqtt_mqtt_proto_rawDescGZIP(), []int{3} +} + +func (x *IotQd) GetCode() string { + if x != nil { + return x.Code + } + return "" +} + +func (x *IotQd) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +// IOT采集数据 +type IotCj struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号 + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` // 采集数据 +} + +func (x *IotCj) Reset() { + *x = IotCj{} + if protoimpl.UnsafeEnabled { + mi := &file_mqtt_mqtt_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *IotCj) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IotCj) ProtoMessage() {} + +func (x *IotCj) ProtoReflect() protoreflect.Message { + mi := &file_mqtt_mqtt_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IotCj.ProtoReflect.Descriptor instead. +func (*IotCj) Descriptor() ([]byte, []int) { + return file_mqtt_mqtt_proto_rawDescGZIP(), []int{4} +} + +func (x *IotCj) GetCode() string { + if x != nil { + return x.Code + } + return "" +} + +func (x *IotCj) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +var File_mqtt_mqtt_proto protoreflect.FileDescriptor + +var file_mqtt_mqtt_proto_rawDesc = []byte{ + 0x0a, 0x0f, 0x6d, 0x71, 0x74, 0x74, 0x2f, 0x6d, 0x71, 0x74, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x08, 0x6d, 0x71, 0x74, 0x74, 0x5f, 0x61, 0x70, 0x69, 0x22, 0x53, 0x0a, 0x0f, 0x49, + 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x12, + 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x63, 0x6f, + 0x64, 0x65, 0x12, 0x2c, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0e, 0x32, 0x16, 0x2e, 0x6d, 0x71, 0x74, 0x74, 0x5f, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, + 0x22, 0x4f, 0x0a, 0x0d, 0x49, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, + 0x71, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x2a, 0x0a, 0x03, 0x72, 0x65, 0x71, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x18, 0x2e, 0x6d, 0x71, 0x74, 0x74, 0x5f, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x03, 0x72, 0x65, + 0x71, 0x22, 0x24, 0x0a, 0x0e, 0x49, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x22, 0x2f, 0x0a, 0x05, 0x49, 0x6f, 0x74, 0x51, 0x64, + 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x63, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x2f, 0x0a, 0x05, 0x49, 0x6f, 0x74, 0x43, + 0x6a, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x2a, 0x27, 0x0a, 0x0c, 0x53, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x4e, 0x6f, 0x72, + 0x6d, 0x61, 0x6c, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x4f, 0x66, 0x66, 0x6c, 0x69, 0x6e, 0x65, + 0x10, 0x01, 0x2a, 0x1a, 0x0a, 0x0e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x08, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x10, 0x00, 0x42, 0x0e, + 0x5a, 0x0c, 0x2e, 0x2f, 0x6d, 0x71, 0x74, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_mqtt_mqtt_proto_rawDescOnce sync.Once + file_mqtt_mqtt_proto_rawDescData = file_mqtt_mqtt_proto_rawDesc +) + +func file_mqtt_mqtt_proto_rawDescGZIP() []byte { + file_mqtt_mqtt_proto_rawDescOnce.Do(func() { + file_mqtt_mqtt_proto_rawDescData = protoimpl.X.CompressGZIP(file_mqtt_mqtt_proto_rawDescData) + }) + return file_mqtt_mqtt_proto_rawDescData +} + +var file_mqtt_mqtt_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_mqtt_mqtt_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_mqtt_mqtt_proto_goTypes = []interface{}{ + (ServiceState)(0), // 0: mqtt_api.ServiceState + (ServiceRequest)(0), // 1: mqtt_api.ServiceRequest + (*IotServiceState)(nil), // 2: mqtt_api.IotServiceState + (*IotServiceReq)(nil), // 3: mqtt_api.IotServiceReq + (*IotServiceResp)(nil), // 4: mqtt_api.IotServiceResp + (*IotQd)(nil), // 5: mqtt_api.IotQd + (*IotCj)(nil), // 6: mqtt_api.IotCj +} +var file_mqtt_mqtt_proto_depIdxs = []int32{ + 0, // 0: mqtt_api.IotServiceState.state:type_name -> mqtt_api.ServiceState + 1, // 1: mqtt_api.IotServiceReq.req:type_name -> mqtt_api.ServiceRequest + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_mqtt_mqtt_proto_init() } +func file_mqtt_mqtt_proto_init() { + if File_mqtt_mqtt_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_mqtt_mqtt_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*IotServiceState); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mqtt_mqtt_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*IotServiceReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mqtt_mqtt_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*IotServiceResp); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mqtt_mqtt_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*IotQd); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mqtt_mqtt_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*IotCj); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_mqtt_mqtt_proto_rawDesc, + NumEnums: 2, + NumMessages: 5, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_mqtt_mqtt_proto_goTypes, + DependencyIndexes: file_mqtt_mqtt_proto_depIdxs, + EnumInfos: file_mqtt_mqtt_proto_enumTypes, + MessageInfos: file_mqtt_mqtt_proto_msgTypes, + }.Build() + File_mqtt_mqtt_proto = out.File + file_mqtt_mqtt_proto_rawDesc = nil + file_mqtt_mqtt_proto_goTypes = nil + file_mqtt_mqtt_proto_depIdxs = nil +} diff --git a/mqtt/topic.go b/mqtt/topic.go new file mode 100644 index 0000000..cc77a26 --- /dev/null +++ b/mqtt/topic.go @@ -0,0 +1,35 @@ +package mqtt + +import "fmt" + +const ( + Topic_IotServiceState string = "/%s/%s/iotss" + Topic_IotCmd string = "/%s/%s/iotcmd" + Topic_IotQd string = "/%s/%s/iotqd" + Topic_IotCj string = "/%s/%s/iotcj" +) + +var topicMap = make(map[string]string, 4) + +func BuildTopics(sysCode, iotCode string) { + topicMap[Topic_IotServiceState] = fmt.Sprintf(Topic_IotServiceState, sysCode, iotCode) + topicMap[Topic_IotCmd] = fmt.Sprintf(Topic_IotCmd, sysCode, iotCode) + topicMap[Topic_IotQd] = fmt.Sprintf(Topic_IotQd, sysCode, iotCode) + topicMap[Topic_IotCj] = fmt.Sprintf(Topic_IotCj, sysCode, iotCode) +} + +func GetIotServiceStateTopic() string { + return topicMap[Topic_IotServiceState] +} + +func GetCmdTopic() string { + return topicMap[Topic_IotCmd] +} + +func GetQdTopic() string { + return topicMap[Topic_IotQd] +} + +func GetCjTopic() string { + return topicMap[Topic_IotCj] +} diff --git a/proto/src/mqtt/mqtt.proto b/proto/src/mqtt/mqtt.proto new file mode 100644 index 0000000..4b82dd8 --- /dev/null +++ b/proto/src/mqtt/mqtt.proto @@ -0,0 +1,52 @@ +syntax = "proto3"; + +package mqtt_api; +option go_package = "./mqtt/proto"; + + +enum ServiceState { + Normal = 0; // 正常 + Offline = 1; // 离线 +} + +enum ServiceRequest { + Logs = 0; // 日志 +} + +// IOT服务状态 +message IotServiceState { + string code = 1; // 服务编号 + ServiceState state = 2; // 服务状态 +} + +// IOT服务请求 +message IotServiceReq { + string code = 1; // 服务编号 + ServiceRequest req = 2; // 服务请求 +} + +// IOT服务响应 +message IotServiceResp { + string code = 1; // 服务编号 + int32 err = 2; // 错误码 + string errmsg = 3; // 错误信息 + oneof data { + IotServiceLog logs = 4; // 日志 + } +} + +message IotServiceLog { + repeated string logs = 1; // 日志 +} + +// IOT驱动数据 +message IotQd { + string code = 1; // 服务编号 + bytes data = 2; // 驱动数据 +} + +// IOT采集数据 +message IotCj { + string code = 1; // 服务编号 + bytes data = 2; // 采集数据 +} diff --git a/service/modbus_dc_mapping.go b/service/modbus_dc_mapping.go index d69f3e1..3931eca 100644 --- a/service/modbus_dc_mapping.go +++ b/service/modbus_dc_mapping.go @@ -19,6 +19,7 @@ type modbusQcService struct { cli modbus.MasterClient qc model.QC cancel context.CancelFunc + done chan struct{} // 服务协程退出信号 } func NewModbusQcService(config *sproto.ModbusConfig, dc model.QC) (IotService, error) { @@ -43,6 +44,7 @@ func NewModbusQcService(config *sproto.ModbusConfig, dc model.QC) (IotService, e config: config, cli: cli, qc: dc, + done: make(chan struct{}), } s.initOnUpdateTask() ctx, cancel := context.WithCancel(context.Background()) @@ -95,11 +97,14 @@ func isWriteFunction(modbus_Function sproto.Modbus_Function) bool { } func (m *modbusQcService) run(ctx context.Context) { + defer close(m.done) +mainLoop: for { select { case <-ctx.Done(): - slog.Info("Modbus驱动采集服务退出", "url", m.config.Url) - return + slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url) + modbus.DeleteClient(m.config.Url) + break mainLoop default: } m.mappingTaskExecute() @@ -266,7 +271,8 @@ func (m *modbusQcService) Start() error { func (m *modbusQcService) Stop() error { m.cancel() - modbus.DeleteClient(m.config.Url) + <-m.done + slog.Info("Modbus驱采映射服务线程退出", "url", m.config.Url) return nil }