ff67c84f18
All checks were successful
local-test分支打包构建docker并发布运行 / Docker-Build (push) Successful in 4m7s
152 lines
3.2 KiB
Go
152 lines
3.2 KiB
Go
package tpapi
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
type ThirdPartyApiServiceState uint32
|
||
|
||
const (
|
||
// 服务未启动
|
||
ThirdPartyState_Closed ThirdPartyApiServiceState = iota
|
||
// 服务正常
|
||
ThirdPartyState_Normal
|
||
// 服务异常
|
||
ThirdPartyState_Broken
|
||
)
|
||
|
||
// 第三方对接接口服务
|
||
type ThirdPartyApiService interface {
|
||
Name() string
|
||
// 服务状态
|
||
State() ThirdPartyApiServiceState
|
||
FindAppendApiService() []ThirdPartyApiService
|
||
//是否真实服务,如果为假,就会调用FindAppendApiService方法
|
||
TrueService() bool
|
||
//服务描述
|
||
ServiceDesc() string
|
||
}
|
||
|
||
// func NewThirdPartyApiService() ThirdPartyApiService {
|
||
// return &thirdPartyApiService{
|
||
// state: ThirdPartyState_Closed,
|
||
// }
|
||
// }
|
||
|
||
// type thirdPartyApiService struct {
|
||
// state ThirdPartyApiServiceState
|
||
// }
|
||
|
||
// func (s *thirdPartyApiService) UpdateState(state ThirdPartyApiServiceState) {
|
||
// s.state = state
|
||
// }
|
||
|
||
// func (s *thirdPartyApiService) State() ThirdPartyApiServiceState {
|
||
// return s.state
|
||
// }
|
||
|
||
// UDP通信延时记录
|
||
type UdpDelayRecorder struct {
|
||
mu sync.Mutex
|
||
|
||
// 接收消息间隔,单位毫秒
|
||
interval int
|
||
// 上一次收到消息时刻
|
||
lastReceiedTime int64
|
||
// 近期收到消息时间间隔
|
||
intervals []int64
|
||
// 是否发生异常
|
||
err error
|
||
// 长时间未收到消息或恢复处理回调
|
||
handler func(err error)
|
||
|
||
cancelFunc context.CancelFunc
|
||
done chan struct{} // 服务协程退出信号
|
||
}
|
||
|
||
func NewUdpDelayRecorder(interval int, handler func(err error)) *UdpDelayRecorder {
|
||
return &UdpDelayRecorder{
|
||
interval: interval,
|
||
lastReceiedTime: 0,
|
||
handler: handler,
|
||
}
|
||
}
|
||
|
||
func (r *UdpDelayRecorder) Start() {
|
||
r.mu.Lock()
|
||
defer r.mu.Unlock()
|
||
r.lastReceiedTime = 0
|
||
r.err = nil
|
||
r.done = make(chan struct{})
|
||
ctx, cf := context.WithCancel(context.Background())
|
||
r.cancelFunc = cf
|
||
go r.checkTimeout(ctx)
|
||
}
|
||
|
||
func (r *UdpDelayRecorder) checkTimeout(ctx context.Context) {
|
||
defer close(r.done)
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
default:
|
||
}
|
||
time.Sleep(time.Millisecond * time.Duration(r.interval*2))
|
||
lastReceiedTime := r.lastReceiedTime
|
||
interval := int(time.Now().UnixMilli() - lastReceiedTime)
|
||
if r.err == nil && interval > calUdpTimeout(r.interval) { // 超时故障回调
|
||
r.err = fmt.Errorf("UDP通信超时未收到消息,可能网络异常")
|
||
if r.handler != nil {
|
||
r.handler(r.err)
|
||
}
|
||
} else if r.err != nil && interval <= r.interval*3 { // 恢复正常回调
|
||
r.err = nil
|
||
if r.handler != nil {
|
||
r.handler(r.err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (r *UdpDelayRecorder) Stop() {
|
||
r.mu.Lock()
|
||
defer r.mu.Unlock()
|
||
if r.cancelFunc == nil {
|
||
return
|
||
}
|
||
r.cancelFunc()
|
||
<-r.done
|
||
r.lastReceiedTime = 0
|
||
r.err = nil
|
||
}
|
||
|
||
// 记录时间间隔
|
||
func (r *UdpDelayRecorder) RecordInterval() {
|
||
r.mu.Lock()
|
||
defer r.mu.Unlock()
|
||
// 记录时间及间隔
|
||
ts := time.Now().UnixMilli()
|
||
if r.lastReceiedTime > 0 {
|
||
r.intervals = append(r.intervals, ts-r.lastReceiedTime)
|
||
if len(r.intervals) > 10 {
|
||
r.intervals = r.intervals[1:]
|
||
}
|
||
}
|
||
r.lastReceiedTime = ts
|
||
}
|
||
|
||
func (r *UdpDelayRecorder) GetIntervals() []int64 {
|
||
return r.intervals
|
||
}
|
||
|
||
func calUdpTimeout(interval int) int {
|
||
if interval <= 50 {
|
||
return interval * 10
|
||
} else {
|
||
return interval * 5
|
||
}
|
||
}
|