MQTT相关功能开发,未完成

This commit is contained in:
walker 2023-12-15 18:08:06 +08:00
parent 694ece6350
commit 4dd10ccddb
12 changed files with 1089 additions and 48 deletions

61
config/config.go Normal file
View File

@ -0,0 +1,61 @@
package config
import (
"flag"
"fmt"
"log/slog"
"github.com/spf13/viper"
)
type Config struct {
Mqtt mqtt
}
// MQTT客户端配置
type mqtt struct {
Topic topic
ClientId string
Address string
Username string
Password string
KeepAlive uint16 // 保活时间间隔,单位s
ConnectRetryDelay uint16 // 连接重试延时,单位s
ConnectTimeout uint16 // 连接操作超时,单位s
}
type topic struct {
App string
}
var Cfg Config
// 获取配置文件名称,从运行flag参数config中获取若未提供使用默认'dev'
func getConfigName() string {
configName := ""
flag.StringVar(&configName, "config", "dev", "config name, eg: -config test")
flag.Parse()
if configName == "" {
configName = "dev"
}
slog.Info("读取配置文件", "配置文件名称", configName)
return configName
}
// 加载配置
func LoadConfig() {
cnf := viper.New()
cnf.SetConfigName(getConfigName())
cnf.SetConfigType("yml")
cnf.AddConfigPath("./config/")
cnf.AddConfigPath(".")
err := cnf.ReadInConfig()
if err != nil {
panic(fmt.Errorf("读取配置文件错误: %w", err))
}
err = cnf.Unmarshal(&Cfg)
if err != nil {
panic(fmt.Errorf("解析配置文件错误: %w", err))
}
slog.Info("成功加载配置", "config", Cfg)
}

10
config/dev.yml Normal file
View File

@ -0,0 +1,10 @@
mqtt:
topic:
app: rtsts
address: tcp://192.168.3.233:1883
clientId: cgy_unittec
username: rtsts_service
password: joylink@0503
keepAlive: 30 # 保活时间间隔,单位s
connectRetryDelay: 5 # 连接重试延时,单位s
connectTimeout: 5 # 连接操作超时,单位s

33
go.mod
View File

@ -2,9 +2,38 @@ module joylink.club/iot
go 1.21 go 1.21
require github.com/simonvetter/modbus v1.6.0 require (
github.com/eclipse/paho.golang v0.12.0
github.com/simonvetter/modbus v1.6.0
)
require (
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
require ( require (
github.com/goburrow/serial v0.1.0 // indirect github.com/goburrow/serial v0.1.0 // indirect
google.golang.org/protobuf v1.26.0 github.com/spf13/viper v1.18.1
google.golang.org/protobuf v1.31.0
) )

85
go.sum
View File

@ -1,12 +1,89 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.golang v0.12.0 h1:EXQFJbJklDnUqW6lyAknMWRhM2NgpHxwrrL8riUmp3Q=
github.com/eclipse/paho.golang v0.12.0/go.mod h1:TSDCUivu9JnoR9Hl+H7sQMcHkejWH2/xKK1NJGtLbIE=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/goburrow/serial v0.1.0 h1:v2T1SQa/dlUqQiYIT8+Cu7YolfqAi3K96UmhwYyuSrA= github.com/goburrow/serial v0.1.0 h1:v2T1SQa/dlUqQiYIT8+Cu7YolfqAi3K96UmhwYyuSrA=
github.com/goburrow/serial v0.1.0/go.mod h1:sAiqG0nRVswsm1C97xsttiYCzSLBmUZ/VSlVLZJ8haA= github.com/goburrow/serial v0.1.0/go.mod h1:sAiqG0nRVswsm1C97xsttiYCzSLBmUZ/VSlVLZJ8haA=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ=
github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/simonvetter/modbus v1.6.0 h1:RDHJevtc7LDIVoHAbhDun8fy+QwnGe+ZU+sLm9ZZzjc= github.com/simonvetter/modbus v1.6.0 h1:RDHJevtc7LDIVoHAbhDun8fy+QwnGe+ZU+sLm9ZZzjc=
github.com/simonvetter/modbus v1.6.0/go.mod h1:hh90ZaTaPLcK2REj6/fpTbiV0J6S7GWmd8q+GVRObPw= github.com/simonvetter/modbus v1.6.0/go.mod h1:hh90ZaTaPLcK2REj6/fpTbiV0J6S7GWmd8q+GVRObPw=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY=
github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0=
github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.18.1 h1:rmuU42rScKWlhhJDyXZRKJQHXFX02chSVW1IvkPGiVM=
github.com/spf13/viper v1.18.1/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

138
main.go
View File

@ -6,9 +6,12 @@ import (
"os" "os"
"time" "time"
"joylink.club/iot/service" "github.com/eclipse/paho.golang/autopaho"
"joylink.club/iot/service/model" "github.com/eclipse/paho.golang/paho"
"joylink.club/iot/service/proto" "google.golang.org/protobuf/proto"
"joylink.club/iot/config"
"joylink.club/iot/mqtt"
mproto "joylink.club/iot/mqtt/proto"
) )
func main() { func main() {
@ -16,50 +19,107 @@ func main() {
Level: slog.LevelDebug, Level: slog.LevelDebug,
AddSource: false, AddSource: false,
}))) })))
dc := model.NewDC(make([]byte, 2), make([]byte, 2)) config.LoadConfig()
mds, err := service.NewModbusQcService(&proto.ModbusConfig{ mqttcfg := config.Cfg.Mqtt
Url: "tcp://127.0.0.1:502", mqtt.BuildTopics(mqttcfg.Topic.App, mqttcfg.ClientId)
UnitId: 2, cmc := &mqtt.ClientManageConfig{
Timeout: 500, BrokerUrl: mqttcfg.Address,
Interval: 1000, ClientId: mqttcfg.ClientId,
Mapping: []*proto.ModbusDcMapping{ Username: mqttcfg.Username,
{ Password: mqttcfg.Password,
// Function: proto.Modbus_ReadHoldingRegister, KeepAlive: mqttcfg.KeepAlive,
Function: proto.Modbus_ReadCoil, ConnectRetryDelay: mqttcfg.ConnectRetryDelay,
Addr: 0, ConnectTimeout: mqttcfg.ConnectTimeout,
Quantity: 16, OnConnectionUp: func(*autopaho.ConnectionManager, *paho.Connack) {
Type: proto.DataType_CollectTable, slog.Info("MQTT连接成功")
Start: 0, // err := mqtt.SubIotServiceState(mqtt.GetIotServiceStateTopic())
}, // if err != nil {
{ // slog.Error("订阅IotServiceState失败", "error", err)
Function: proto.Modbus_WriteCoils, // os.Exit(1)
Addr: 16, // }
Quantity: 16,
Type: proto.DataType_DriveTable,
WriteStrategy: proto.Modbus_OnUpdate,
Start: 0,
},
}, },
}, dc) }
err := mqtt.Start(cmc)
if err != nil { if err != nil {
panic(err) slog.Error("启动MQTT客户端失败", "error", err)
os.Exit(1)
} }
time.Sleep(time.Second * 3)
err = mqtt.SubIotServiceState(mqtt.GetIotServiceStateTopic())
if err != nil {
slog.Error("订阅IotServiceState失败", "error", err)
os.Exit(1)
}
i := 0
mqtt.RegisterHandler(mqtt.GetIotServiceStateTopic(), func(m *paho.Publish) {
iss := &mproto.IotServiceState{}
err := proto.Unmarshal(m.Payload, iss)
if err != nil {
slog.Error("SubIotServiceState proto.Unmarshal异常", "error", err)
return
}
slog.Debug("收到IotServiceState发布消息", "state", iss)
i++
fmt.Printf("%v次处理IotServiceState: %v\n", i, iss)
})
go func() { go func() {
i := 0
for { for {
c := dc.GetCollect()
fmt.Printf("采集数据: %v\n", c)
i++
if i%3 == 0 {
idx := i % 8
dc.UpdateDriveByBytes(0, []byte{byte(1 << idx)})
fmt.Printf("设置驱动数据: %v\n", dc.GetDrive())
}
time.Sleep(time.Second) time.Sleep(time.Second)
mqtt.PubIotServiceState(&mproto.IotServiceState{
Code: config.Cfg.Mqtt.ClientId,
State: mproto.ServiceState_Normal,
})
} }
}() }()
time.Sleep(time.Minute * 2) time.Sleep(time.Minute)
mds.Stop() // dc := model.NewDC(make([]byte, 2), make([]byte, 2))
// mds, err := service.NewModbusQcService(&proto.ModbusConfig{
// Url: "tcp://127.0.0.1:502",
// UnitId: 2,
// Timeout: 500,
// Interval: 1000,
// Mapping: []*proto.ModbusDcMapping{
// {
// // Function: proto.Modbus_ReadHoldingRegister,
// Function: proto.Modbus_ReadCoil,
// Addr: 0,
// Quantity: 16,
// Type: proto.DataType_CollectTable,
// Start: 0,
// },
// {
// Function: proto.Modbus_WriteCoils,
// Addr: 16,
// Quantity: 16,
// Type: proto.DataType_DriveTable,
// WriteStrategy: proto.Modbus_OnUpdate,
// Start: 0,
// },
// },
// }, dc)
// if err != nil {
// panic(err)
// }
// go func() {
// i := 0
// for {
// c := dc.GetCollect()
// fmt.Printf("采集数据: %v\n", c)
// i++
// if i%3 == 0 {
// idx := i % 8
// dc.UpdateDriveByBytes(0, []byte{byte(1 << idx)})
// fmt.Printf("设置驱动数据: %v\n", dc.GetDrive())
// }
// time.Sleep(time.Second)
// }
// }()
// time.Sleep(time.Minute * 2)
// mds.Stop()
} }

28
mqtt/app_protocol.go Normal file
View File

@ -0,0 +1,28 @@
package mqtt
import "joylink.club/iot/mqtt/proto"
type IotServiceStateHandler func(state *proto.IotServiceState)
type IotService interface {
PubIotServiceState(state *proto.IotServiceState)
PubIotQdData(qd *proto.IotQd)
SubIotQd()
RegIotQd(h func(qd *proto.IotQd))
PubIotCjData(cj *proto.IotCj)
SubIotCj()
RegIotCj(h func(cj *proto.IotCj))
SubIotReq(cmd *proto.IotServiceReq)
}
type Service interface {
SubIotServiceState()
RegIotServiceState(h func(state *proto.IotServiceState))
PubIotQdData(qd *proto.IotQd)
SubIotQd()
RegIotQd(h func(qd *proto.IotQd))
PubIotCjData(cj *proto.IotCj)
SubIotCj()
RegIotCj(h func(cj *proto.IotCj))
ReqIotService(cmd *proto.IotServiceReq)
}

87
mqtt/client.go Normal file
View File

@ -0,0 +1,87 @@
package mqtt
import (
"context"
"log/slog"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"google.golang.org/protobuf/proto"
mproto "joylink.club/iot/mqtt/proto"
)
type Manager struct {
cc *autopaho.ClientConfig
cm *autopaho.ConnectionManager
}
var manager *Manager
func Start(cmc *ClientManageConfig) error {
cc, err := cmc.tryInto()
if err != nil {
return err
}
cm, err := autopaho.NewConnection(context.Background(), *cc)
if err != nil {
return err
}
manager = &Manager{
cc: cc,
cm: cm,
}
return nil
}
func Publish(ctx context.Context, publish *paho.Publish) (*paho.PublishResponse, error) {
return manager.cm.Publish(ctx, publish)
}
func PubIotServiceState(s *mproto.IotServiceState) error {
return manager.PubIotServiceState(s)
}
func SubIotServiceState(topic string) error {
return manager.SubIotServiceState(topic)
}
func (m *Manager) PubIotServiceState(s *mproto.IotServiceState) error {
if s == nil {
return nil
}
slog.Debug("PubIotServiceState", "topic", GetIotServiceStateTopic(), "state", s)
b, err := proto.Marshal(s)
if err != nil {
return err
}
_, err = m.cm.Publish(context.Background(), &paho.Publish{
Topic: GetIotServiceStateTopic(),
QoS: 0,
Payload: b,
})
return err
}
func RegisterHandler(topic string, h func(m *paho.Publish)) {
manager.cc.Router.RegisterHandler(topic, h)
}
func (m *Manager) RegisterHandler(topic string, h func(m *paho.Publish)) {
m.cc.Router.RegisterHandler(topic, h)
}
func (m *Manager) SubIotServiceState(topic string) error {
slog.Debug("订阅IotServiceState", "topic", topic)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err := m.cm.Subscribe(ctx, &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{
Topic: topic,
QoS: 0,
// NoLocal: true,
},
},
})
return err
}

61
mqtt/config.go Normal file
View File

@ -0,0 +1,61 @@
package mqtt
import (
"fmt"
"log/slog"
"net/url"
"time"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
)
type ClientManageConfig struct {
BrokerUrl string // Broker地址
ClientId string // 客户端ID
Username string // 用户名
Password string // 密码
KeepAlive uint16 // 保活时间间隔,单位s,默认为60
ConnectRetryDelay uint16 // 连接重试延时,单位s,默认为3
ConnectTimeout uint16 // 连接操作超时,单位s,默认为3
OnConnectionUp func(*autopaho.ConnectionManager, *paho.Connack)
}
func (c *ClientManageConfig) tryInto() (*autopaho.ClientConfig, error) {
addr, err := url.Parse(c.BrokerUrl)
if err != nil {
return nil, fmt.Errorf("Mqtt.Address格式错误, %s: %w", c.BrokerUrl, err)
}
if c.KeepAlive == 0 {
c.KeepAlive = 60
}
if c.ConnectRetryDelay == 0 {
c.ConnectRetryDelay = 3
}
if c.ConnectTimeout == 0 {
c.ConnectTimeout = 3
}
cc := &autopaho.ClientConfig{
BrokerUrls: []*url.URL{
addr,
},
KeepAlive: c.KeepAlive,
ConnectRetryDelay: time.Duration(c.ConnectRetryDelay) * time.Second,
ConnectTimeout: time.Duration(c.ConnectTimeout) * time.Second,
OnConnectionUp: c.OnConnectionUp,
OnConnectError: func(err error) {
slog.Error("MQTT连接失败", "error", err)
},
ClientConfig: paho.ClientConfig{
ClientID: c.ClientId,
Router: paho.NewStandardRouter(),
OnClientError: func(err error) { fmt.Printf("%s Mqtt客户端发生错误: %s\n", c.ClientId, err) },
OnServerDisconnect: func(d *paho.Disconnect) {
fmt.Printf("%s 连接断开; reason code: %d,properties: %v\n", c.ClientId, d.ReasonCode, d.Properties)
},
},
}
cc.SetUsernamePassword(c.Username, []byte(c.Password))
cc.SetWillMessage(GetIotServiceStateTopic(), []byte("离线"), 1, true)
return cc, nil
}

535
mqtt/proto/mqtt.pb.go Normal file
View File

@ -0,0 +1,535 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.31.0
// protoc v4.23.1
// source: mqtt/mqtt.proto
package proto
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ServiceState int32
const (
ServiceState_Normal ServiceState = 0 // 正常
ServiceState_Offline ServiceState = 1 // 离线
)
// Enum value maps for ServiceState.
var (
ServiceState_name = map[int32]string{
0: "Normal",
1: "Offline",
}
ServiceState_value = map[string]int32{
"Normal": 0,
"Offline": 1,
}
)
func (x ServiceState) Enum() *ServiceState {
p := new(ServiceState)
*p = x
return p
}
func (x ServiceState) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (ServiceState) Descriptor() protoreflect.EnumDescriptor {
return file_mqtt_mqtt_proto_enumTypes[0].Descriptor()
}
func (ServiceState) Type() protoreflect.EnumType {
return &file_mqtt_mqtt_proto_enumTypes[0]
}
func (x ServiceState) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Use ServiceState.Descriptor instead.
func (ServiceState) EnumDescriptor() ([]byte, []int) {
return file_mqtt_mqtt_proto_rawDescGZIP(), []int{0}
}
type ServiceRequest int32
const (
ServiceRequest_Logs ServiceRequest = 0 // 日志
)
// Enum value maps for ServiceRequest.
var (
ServiceRequest_name = map[int32]string{
0: "Logs",
}
ServiceRequest_value = map[string]int32{
"Logs": 0,
}
)
func (x ServiceRequest) Enum() *ServiceRequest {
p := new(ServiceRequest)
*p = x
return p
}
func (x ServiceRequest) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (ServiceRequest) Descriptor() protoreflect.EnumDescriptor {
return file_mqtt_mqtt_proto_enumTypes[1].Descriptor()
}
func (ServiceRequest) Type() protoreflect.EnumType {
return &file_mqtt_mqtt_proto_enumTypes[1]
}
func (x ServiceRequest) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Use ServiceRequest.Descriptor instead.
func (ServiceRequest) EnumDescriptor() ([]byte, []int) {
return file_mqtt_mqtt_proto_rawDescGZIP(), []int{1}
}
// IOT服务状态
type IotServiceState struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号
State ServiceState `protobuf:"varint,2,opt,name=state,proto3,enum=mqtt_api.ServiceState" json:"state,omitempty"` // 服务状态
}
func (x *IotServiceState) Reset() {
*x = IotServiceState{}
if protoimpl.UnsafeEnabled {
mi := &file_mqtt_mqtt_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *IotServiceState) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*IotServiceState) ProtoMessage() {}
func (x *IotServiceState) ProtoReflect() protoreflect.Message {
mi := &file_mqtt_mqtt_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use IotServiceState.ProtoReflect.Descriptor instead.
func (*IotServiceState) Descriptor() ([]byte, []int) {
return file_mqtt_mqtt_proto_rawDescGZIP(), []int{0}
}
func (x *IotServiceState) GetCode() string {
if x != nil {
return x.Code
}
return ""
}
func (x *IotServiceState) GetState() ServiceState {
if x != nil {
return x.State
}
return ServiceState_Normal
}
// IOT服务请求
type IotServiceReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号
Req ServiceRequest `protobuf:"varint,2,opt,name=req,proto3,enum=mqtt_api.ServiceRequest" json:"req,omitempty"` // 服务请求
}
func (x *IotServiceReq) Reset() {
*x = IotServiceReq{}
if protoimpl.UnsafeEnabled {
mi := &file_mqtt_mqtt_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *IotServiceReq) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*IotServiceReq) ProtoMessage() {}
func (x *IotServiceReq) ProtoReflect() protoreflect.Message {
mi := &file_mqtt_mqtt_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use IotServiceReq.ProtoReflect.Descriptor instead.
func (*IotServiceReq) Descriptor() ([]byte, []int) {
return file_mqtt_mqtt_proto_rawDescGZIP(), []int{1}
}
func (x *IotServiceReq) GetCode() string {
if x != nil {
return x.Code
}
return ""
}
func (x *IotServiceReq) GetReq() ServiceRequest {
if x != nil {
return x.Req
}
return ServiceRequest_Logs
}
// IOT服务响应
type IotServiceResp struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号
}
func (x *IotServiceResp) Reset() {
*x = IotServiceResp{}
if protoimpl.UnsafeEnabled {
mi := &file_mqtt_mqtt_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *IotServiceResp) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*IotServiceResp) ProtoMessage() {}
func (x *IotServiceResp) ProtoReflect() protoreflect.Message {
mi := &file_mqtt_mqtt_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use IotServiceResp.ProtoReflect.Descriptor instead.
func (*IotServiceResp) Descriptor() ([]byte, []int) {
return file_mqtt_mqtt_proto_rawDescGZIP(), []int{2}
}
func (x *IotServiceResp) GetCode() string {
if x != nil {
return x.Code
}
return ""
}
// IOT驱动数据
type IotQd struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` // 驱动数据
}
func (x *IotQd) Reset() {
*x = IotQd{}
if protoimpl.UnsafeEnabled {
mi := &file_mqtt_mqtt_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *IotQd) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*IotQd) ProtoMessage() {}
func (x *IotQd) ProtoReflect() protoreflect.Message {
mi := &file_mqtt_mqtt_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use IotQd.ProtoReflect.Descriptor instead.
func (*IotQd) Descriptor() ([]byte, []int) {
return file_mqtt_mqtt_proto_rawDescGZIP(), []int{3}
}
func (x *IotQd) GetCode() string {
if x != nil {
return x.Code
}
return ""
}
func (x *IotQd) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
// IOT采集数据
type IotCj struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` // 服务编号
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` // 采集数据
}
func (x *IotCj) Reset() {
*x = IotCj{}
if protoimpl.UnsafeEnabled {
mi := &file_mqtt_mqtt_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *IotCj) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*IotCj) ProtoMessage() {}
func (x *IotCj) ProtoReflect() protoreflect.Message {
mi := &file_mqtt_mqtt_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use IotCj.ProtoReflect.Descriptor instead.
func (*IotCj) Descriptor() ([]byte, []int) {
return file_mqtt_mqtt_proto_rawDescGZIP(), []int{4}
}
func (x *IotCj) GetCode() string {
if x != nil {
return x.Code
}
return ""
}
func (x *IotCj) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
var File_mqtt_mqtt_proto protoreflect.FileDescriptor
var file_mqtt_mqtt_proto_rawDesc = []byte{
0x0a, 0x0f, 0x6d, 0x71, 0x74, 0x74, 0x2f, 0x6d, 0x71, 0x74, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x12, 0x08, 0x6d, 0x71, 0x74, 0x74, 0x5f, 0x61, 0x70, 0x69, 0x22, 0x53, 0x0a, 0x0f, 0x49,
0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x12,
0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x63, 0x6f,
0x64, 0x65, 0x12, 0x2c, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
0x0e, 0x32, 0x16, 0x2e, 0x6d, 0x71, 0x74, 0x74, 0x5f, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x65, 0x72,
0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65,
0x22, 0x4f, 0x0a, 0x0d, 0x49, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65,
0x71, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x2a, 0x0a, 0x03, 0x72, 0x65, 0x71, 0x18, 0x02, 0x20, 0x01,
0x28, 0x0e, 0x32, 0x18, 0x2e, 0x6d, 0x71, 0x74, 0x74, 0x5f, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x03, 0x72, 0x65,
0x71, 0x22, 0x24, 0x0a, 0x0e, 0x49, 0x6f, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52,
0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x22, 0x2f, 0x0a, 0x05, 0x49, 0x6f, 0x74, 0x51, 0x64,
0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
0x63, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01,
0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x2f, 0x0a, 0x05, 0x49, 0x6f, 0x74, 0x43,
0x6a, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20,
0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x2a, 0x27, 0x0a, 0x0c, 0x53, 0x65, 0x72,
0x76, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x4e, 0x6f, 0x72,
0x6d, 0x61, 0x6c, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x4f, 0x66, 0x66, 0x6c, 0x69, 0x6e, 0x65,
0x10, 0x01, 0x2a, 0x1a, 0x0a, 0x0e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x12, 0x08, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x10, 0x00, 0x42, 0x0e,
0x5a, 0x0c, 0x2e, 0x2f, 0x6d, 0x71, 0x74, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_mqtt_mqtt_proto_rawDescOnce sync.Once
file_mqtt_mqtt_proto_rawDescData = file_mqtt_mqtt_proto_rawDesc
)
func file_mqtt_mqtt_proto_rawDescGZIP() []byte {
file_mqtt_mqtt_proto_rawDescOnce.Do(func() {
file_mqtt_mqtt_proto_rawDescData = protoimpl.X.CompressGZIP(file_mqtt_mqtt_proto_rawDescData)
})
return file_mqtt_mqtt_proto_rawDescData
}
var file_mqtt_mqtt_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
var file_mqtt_mqtt_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_mqtt_mqtt_proto_goTypes = []interface{}{
(ServiceState)(0), // 0: mqtt_api.ServiceState
(ServiceRequest)(0), // 1: mqtt_api.ServiceRequest
(*IotServiceState)(nil), // 2: mqtt_api.IotServiceState
(*IotServiceReq)(nil), // 3: mqtt_api.IotServiceReq
(*IotServiceResp)(nil), // 4: mqtt_api.IotServiceResp
(*IotQd)(nil), // 5: mqtt_api.IotQd
(*IotCj)(nil), // 6: mqtt_api.IotCj
}
var file_mqtt_mqtt_proto_depIdxs = []int32{
0, // 0: mqtt_api.IotServiceState.state:type_name -> mqtt_api.ServiceState
1, // 1: mqtt_api.IotServiceReq.req:type_name -> mqtt_api.ServiceRequest
2, // [2:2] is the sub-list for method output_type
2, // [2:2] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
}
func init() { file_mqtt_mqtt_proto_init() }
func file_mqtt_mqtt_proto_init() {
if File_mqtt_mqtt_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_mqtt_mqtt_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*IotServiceState); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_mqtt_mqtt_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*IotServiceReq); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_mqtt_mqtt_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*IotServiceResp); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_mqtt_mqtt_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*IotQd); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_mqtt_mqtt_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*IotCj); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_mqtt_mqtt_proto_rawDesc,
NumEnums: 2,
NumMessages: 5,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_mqtt_mqtt_proto_goTypes,
DependencyIndexes: file_mqtt_mqtt_proto_depIdxs,
EnumInfos: file_mqtt_mqtt_proto_enumTypes,
MessageInfos: file_mqtt_mqtt_proto_msgTypes,
}.Build()
File_mqtt_mqtt_proto = out.File
file_mqtt_mqtt_proto_rawDesc = nil
file_mqtt_mqtt_proto_goTypes = nil
file_mqtt_mqtt_proto_depIdxs = nil
}

35
mqtt/topic.go Normal file
View File

@ -0,0 +1,35 @@
package mqtt
import "fmt"
const (
Topic_IotServiceState string = "/%s/%s/iotss"
Topic_IotCmd string = "/%s/%s/iotcmd"
Topic_IotQd string = "/%s/%s/iotqd"
Topic_IotCj string = "/%s/%s/iotcj"
)
var topicMap = make(map[string]string, 4)
func BuildTopics(sysCode, iotCode string) {
topicMap[Topic_IotServiceState] = fmt.Sprintf(Topic_IotServiceState, sysCode, iotCode)
topicMap[Topic_IotCmd] = fmt.Sprintf(Topic_IotCmd, sysCode, iotCode)
topicMap[Topic_IotQd] = fmt.Sprintf(Topic_IotQd, sysCode, iotCode)
topicMap[Topic_IotCj] = fmt.Sprintf(Topic_IotCj, sysCode, iotCode)
}
func GetIotServiceStateTopic() string {
return topicMap[Topic_IotServiceState]
}
func GetCmdTopic() string {
return topicMap[Topic_IotCmd]
}
func GetQdTopic() string {
return topicMap[Topic_IotQd]
}
func GetCjTopic() string {
return topicMap[Topic_IotCj]
}

52
proto/src/mqtt/mqtt.proto Normal file
View File

@ -0,0 +1,52 @@
syntax = "proto3";
package mqtt_api;
option go_package = "./mqtt/proto";
enum ServiceState {
Normal = 0; //
Offline = 1; // 线
}
enum ServiceRequest {
Logs = 0; //
}
// IOT服务状态
message IotServiceState {
string code = 1; //
ServiceState state = 2; //
}
// IOT服务请求
message IotServiceReq {
string code = 1; //
ServiceRequest req = 2; //
}
// IOT服务响应
message IotServiceResp {
string code = 1; //
int32 err = 2; //
string errmsg = 3; //
oneof data {
IotServiceLog logs = 4; //
}
}
message IotServiceLog {
repeated string logs = 1; //
}
// IOT驱动数据
message IotQd {
string code = 1; //
bytes data = 2; //
}
// IOT采集数据
message IotCj {
string code = 1; //
bytes data = 2; //
}

View File

@ -19,6 +19,7 @@ type modbusQcService struct {
cli modbus.MasterClient cli modbus.MasterClient
qc model.QC qc model.QC
cancel context.CancelFunc cancel context.CancelFunc
done chan struct{} // 服务协程退出信号
} }
func NewModbusQcService(config *sproto.ModbusConfig, dc model.QC) (IotService, error) { func NewModbusQcService(config *sproto.ModbusConfig, dc model.QC) (IotService, error) {
@ -43,6 +44,7 @@ func NewModbusQcService(config *sproto.ModbusConfig, dc model.QC) (IotService, e
config: config, config: config,
cli: cli, cli: cli,
qc: dc, qc: dc,
done: make(chan struct{}),
} }
s.initOnUpdateTask() s.initOnUpdateTask()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -95,11 +97,14 @@ func isWriteFunction(modbus_Function sproto.Modbus_Function) bool {
} }
func (m *modbusQcService) run(ctx context.Context) { func (m *modbusQcService) run(ctx context.Context) {
defer close(m.done)
mainLoop:
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
slog.Info("Modbus驱动采集服务退出", "url", m.config.Url) slog.Debug("Modbus驱采映射循环取消,关闭modbus客户端", "url", m.config.Url)
return modbus.DeleteClient(m.config.Url)
break mainLoop
default: default:
} }
m.mappingTaskExecute() m.mappingTaskExecute()
@ -266,7 +271,8 @@ func (m *modbusQcService) Start() error {
func (m *modbusQcService) Stop() error { func (m *modbusQcService) Stop() error {
m.cancel() m.cancel()
modbus.DeleteClient(m.config.Url) <-m.done
slog.Info("Modbus驱采映射服务线程退出", "url", m.config.Url)
return nil return nil
} }