jl-iot/example/subsys_use/main.go

320 lines
9.0 KiB
Go
Raw Permalink Normal View History

package main
import (
"context"
"errors"
"fmt"
"log/slog"
"net/url"
"time"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/autopaho/extensions/rpc"
"github.com/eclipse/paho.golang/paho"
"google.golang.org/protobuf/proto"
"joylink.club/iot/dto"
"joylink.club/iot/mqtt"
"joylink.club/iot/server"
"joylink.club/iot/service"
"joylink.club/iot/service/model"
)
// 作为子系统使用方式
func main() {
go server.StartIotQcServer()
time.Sleep(2 * time.Second) // 等待mqtt主题初始化
ac := initAppClient()
time.Sleep(3 * time.Second) // 等待应用mqtt连接
for i := 0; i < 4; i++ {
ac.startIotQcService() // 启动IOT驱采服务
resp, err := ac.iotLogReq(&dto.IotServiceLogReq{Count: 10})
if err != nil {
slog.Error("应用请求日志错误", "err", err)
} else {
slog.Info("应用请求日志成功", "resp", resp)
}
time.Sleep(10 * time.Second)
ac.stopIotQcService() // 停止IOT驱采服务
time.Sleep(5 * time.Second)
}
time.Sleep(10 * time.Second)
ac.disconnect()
}
type AppClient struct {
cfg *autopaho.ClientConfig
cm *autopaho.ConnectionManager
task service.IScheduledTask
}
func (app *AppClient) stopIotQcService() {
resp, err := app.iotStopReq(&dto.IotQcServiceStopReq{})
if err != nil {
panic(fmt.Errorf("停止服务请求错误, err: %v", err))
}
if resp.Code != 0 {
panic(fmt.Errorf("停止服务请求响应错误, code: %d, msg: %s", resp.Code, resp.Msg))
}
slog.Info("应用停止iot服务成功", "resp", resp)
app.cfg.Router.UnregisterHandler(mqtt.GetCjTopic())
app.cfg.Router.UnregisterHandler(mqtt.GetQdTopic())
t := app.task
app.task = nil
if t != nil {
t.Stop()
}
}
func (app *AppClient) startIotQcService() {
modbusCfg := &dto.ModbusConfig{
Url: "tcp://127.0.0.1:502",
UnitId: 2,
Timeout: 500,
Interval: 1000,
Qdl: 2, // 驱动数据字节数
Cjl: 2, // 采集数据字节数
Mapping: []*dto.ModbusDcMapping{
{
// Function: proto.Modbus_ReadHoldingRegister,
Function: dto.Modbus_ReadCoil,
Addr: 0,
Quantity: 16,
Type: dto.DataType_CJ,
Start: 0,
},
{
Function: dto.Modbus_RWCoils,
Addr: 16,
Quantity: 16,
Type: dto.DataType_QD,
Start: 0,
},
},
}
resp, err := app.iotServiceStartReq(&dto.IotQcServiceStartReq{
Config: modbusCfg,
})
if err != nil {
panic(fmt.Errorf("启动服务请求错误, err: %v", err))
}
if resp.Code != 0 {
panic(fmt.Errorf("启动服务请求响应错误, code: %d, msg: %s", resp.Code, resp.Msg))
}
slog.Info("应用启动iot服务成功", "resp", resp)
app.RegIotCjDataHandler(func(cj *dto.IotCj) {
slog.Info("应用收到采集数据", "cj", model.BytesDebug(cj.Data))
})
app.RegIotQdDataHandler(func(qd *dto.IotQd) {
slog.Info("应用收到驱动数据", "qd", model.BytesDebug(qd.Data))
})
i := 0
writeTask := service.NewScheduledTask(func() {
i++
idx := i % 8
err := app.PubIotQdData(&dto.IotQd{Data: []byte{byte(1 << idx), byte(3 << idx)}})
if err != nil {
slog.Error("发布写入驱动数据错误", "error", err)
}
}, time.Second)
app.task = writeTask
}
func (app *AppClient) disconnect() {
slog.Info("断开应用MQTT客户端")
ctx, cancle := context.WithTimeout(context.Background(), 5*time.Second)
defer cancle()
err := app.cm.Disconnect(ctx)
if err != nil {
slog.Error("断开MQTT客户端失败", "err", err)
}
}
func (app *AppClient) newRpcHandler(respTopicFmt string) (*rpc.Handler, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
h, err := rpc.NewHandler(ctx, rpc.HandlerOpts{
Conn: app.cm,
Router: app.cfg.Router,
ResponseTopicFmt: respTopicFmt,
ClientID: app.cfg.ClientID,
})
if err != nil {
slog.Error("创建RPC处理器失败", "err", err)
return nil, err
}
return h, nil
}
// 注册IOT服务启动请求处理
func (app *AppClient) iotServiceStartReq(req *dto.IotQcServiceStartReq) (*dto.IotQcServiceCommonResp, error) {
h, err := app.newRpcHandler("%s/iotstartresp")
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
b, err := proto.Marshal(req)
if err != nil {
slog.Error("序列化请求消息失败", "err", err)
return nil, err
}
resp, err := h.Request(ctx, &paho.Publish{
Topic: mqtt.GetIotServiceStartTopic(),
Payload: b,
})
if err != nil {
return nil, errors.Join(fmt.Errorf("发送启动服务请求错误"), err)
}
result := &dto.IotQcServiceCommonResp{}
proto.Unmarshal(resp.Payload, result)
return result, nil
}
func (app *AppClient) iotStopReq(req *dto.IotQcServiceStopReq) (*dto.IotQcServiceCommonResp, error) {
h, err := app.newRpcHandler("%s/iotstopresp")
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
b, err := proto.Marshal(req)
if err != nil {
return nil, errors.Join(fmt.Errorf("序列化IOT服务停止请求消息失败"), err)
}
resp, err := h.Request(ctx, &paho.Publish{
Topic: mqtt.GetIotServiceStopTopic(),
Payload: b,
})
if err != nil {
return nil, errors.Join(fmt.Errorf("发送停止IOT服务请求错误"), err)
}
result := &dto.IotQcServiceCommonResp{}
proto.Unmarshal(resp.Payload, result)
return result, nil
}
func (app *AppClient) iotLogReq(req *dto.IotServiceLogReq) (*dto.IotServiceLogResp, error) {
h, err := app.newRpcHandler("%s/iotlogresp")
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
b, err := proto.Marshal(req)
if err != nil {
return nil, errors.Join(fmt.Errorf("序列化IOT服务日志请求消息失败"), err)
}
resp, err := h.Request(ctx, &paho.Publish{
Topic: mqtt.GetLogReqTopic(),
Payload: b,
})
if err != nil {
return nil, errors.Join(fmt.Errorf("发送IOT服务日志请求错误"), err)
}
result := &dto.IotServiceLogResp{}
proto.Unmarshal(resp.Payload, result)
return result, nil
}
func (app *AppClient) RegIotCjDataHandler(h func(*dto.IotCj)) {
app.cfg.Router.RegisterHandler(mqtt.GetCjTopic(), func(p *paho.Publish) {
cj := &dto.IotCj{}
err := proto.Unmarshal(p.Payload, cj)
if err != nil {
slog.Error("采集数据proto.Unmarshal异常", "error", err)
return
}
h(cj)
})
}
func (app *AppClient) RegIotQdDataHandler(h func(*dto.IotQd)) {
app.cfg.Router.RegisterHandler(mqtt.GetQdTopic(), func(p *paho.Publish) {
qd := &dto.IotQd{}
err := proto.Unmarshal(p.Payload, qd)
if err != nil {
slog.Error("驱动数据proto.Unmarshal异常", "error", err)
return
}
h(qd)
})
}
func (app *AppClient) PubIotCjData(cj *dto.IotCj) error {
return app.pub(mqtt.GetCjTopic(), cj)
}
func (app *AppClient) PubIotQdData(qd *dto.IotQd) error {
slog.Warn("应用发布驱动数据", "topic", mqtt.GetQdTopic(), "data", model.BytesDebug(qd.Data))
return app.pub(mqtt.GetQdTopic(), qd)
}
func (app *AppClient) pub(topic string, data proto.Message) error {
b, err := proto.Marshal(data)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = app.cm.Publish(ctx, &paho.Publish{
Topic: topic,
Payload: b,
})
return err
}
func initAppClient() *AppClient {
clientId := "iotstartreq_test"
topics := []string{mqtt.GetIotServiceStateTopic(), mqtt.GetCjTopic(), mqtt.GetQdTopic()}
cfg := getCmConfig(clientId, topics)
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
cm, err := autopaho.NewConnection(context.Background(), cfg)
if err != nil {
panic(err)
}
ac := &AppClient{cfg: &cfg, cm: cm}
return ac
}
func getCmConfig(clientId string, subTopics []string) autopaho.ClientConfig {
addr, _ := url.Parse("tcp://192.168.3.233:1883")
cc := autopaho.ClientConfig{
BrokerUrls: []*url.URL{addr},
KeepAlive: 60,
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
fmt.Println("mqtt connection up")
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
for _, v := range subTopics {
if _, err := cm.Subscribe(ctx, &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{Topic: v, QoS: 0, NoLocal: true},
},
}); err != nil {
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
return
}
}
fmt.Println("mqtt subscription made")
},
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
ClientConfig: paho.ClientConfig{
ClientID: clientId,
Router: paho.NewStandardRouter(),
OnClientError: func(err error) { fmt.Printf("%s 客户端错误: %s\n", clientId, err) },
OnServerDisconnect: func(d *paho.Disconnect) {
if d.Properties != nil {
fmt.Printf("%s 服务断联: %v\n", clientId, d)
} else {
fmt.Printf("%s 服务断联; reason code: %d\n", clientId, d.ReasonCode)
}
},
},
}
cc.SetUsernamePassword("rtsts_service", []byte("joylink@0503"))
return cc
}