diff --git a/events.go b/events.go index 6ebed86..cf88f13 100644 --- a/events.go +++ b/events.go @@ -9,15 +9,16 @@ import ( "github.com/yohamta/donburi/features/events" ) -// 注意事件相关操作的最终执行者均为world协程 +// 事件订阅发布功能,底层使用donburi事件订阅发布 +// 注意:事件订阅处理中应避免再次发布事件 // 事件相关定义 type ( // 事件类型定义 EventType[T any] struct { - et *events.EventType[T] - subscriberMap map[string]events.Subscriber[T] - subscriberMapLock sync.Mutex // + et *events.EventType[T] + mu sync.Mutex // 锁 + subMap map[string]events.Subscriber[T] } // 事件订阅者定义 Subscriber[T any] func(w World, event T) @@ -31,9 +32,9 @@ func EventsDebugEnable() { // 创建事件类型的实例 func NewEventType[T any]() *EventType[T] { return &EventType[T]{ - et: events.NewEventType[T](), - subscriberMap: make(map[string]events.Subscriber[T]), - subscriberMapLock: sync.Mutex{}, + et: events.NewEventType[T](), + mu: sync.Mutex{}, + subMap: make(map[string]events.Subscriber[T], 0), } } @@ -42,52 +43,47 @@ func processAllEvents(w World) { events.ProcessAllEvents(w) } +// 执行事件 +func (me *EventType[T]) ProcessEvents(wd World) { + me.mu.Lock() + defer me.mu.Unlock() + me.et.ProcessEvents(wd) +} + // 发布该类型的事件 func (me *EventType[T]) Publish(wd World, event *T) { - wd.Execute(func() { - me.et.Publish(wd, *event) - }) -} - -// 内部发布事件(同步发布,并直接触发) -func (me *EventType[T]) internalPublish(wd World, event *T) { + me.mu.Lock() + defer me.mu.Unlock() me.et.Publish(wd, *event) - processAllEvents(wd) } -/////////////////////////////////////////////////////////////////////////////////// - // 订阅该类型的事件 func (me *EventType[T]) Subscribe(wd World, subscriber Subscriber[T]) { - me.subscriberMapLock.Lock() - defer me.subscriberMapLock.Unlock() - subscriberKey := subscriberKey[T](wd, subscriber) - if _, exist := me.subscriberMap[subscriberKey]; !exist { - fn := func(w donburi.World, event T) { - subscriber(wd, event) - } - me.subscriberMap[subscriberKey] = fn - wd.Execute(func() { - me.et.Subscribe(wd, fn) - }) + me.mu.Lock() + defer me.mu.Unlock() + sp := buildKey(wd, subscriber) + if _, ok := me.subMap[sp]; ok { // 已经订阅过 + return } + sub := func(w donburi.World, event T) { + subscriber(wd, event) + } + me.subMap[sp] = sub + me.et.Subscribe(wd, sub) } // 取消订阅该类型的事件 func (me *EventType[T]) Unsubscribe(wd World, subscriber Subscriber[T]) { - me.subscriberMapLock.Lock() - defer me.subscriberMapLock.Unlock() - subscriberKey := subscriberKey[T](wd, subscriber) - if sub, ok := me.subscriberMap[subscriberKey]; ok { - wd.Execute(func() { - me.et.Unsubscribe(wd, sub) - delete(me.subscriberMap, subscriberKey) - }) + me.mu.Lock() + defer me.mu.Unlock() + sp := buildKey(wd, subscriber) + if sub, ok := me.subMap[sp]; ok { // 存在, 取消订阅 + me.et.Unsubscribe(wd, sub) } } -func subscriberKey[T any](wd World, subscriber Subscriber[T]) string { - wdSubscriberPointer := reflect.ValueOf(subscriber).Pointer() - subscriberKey := fmt.Sprintf("%d-%d", wd.Id(), wdSubscriberPointer) - return subscriberKey +// 生成订阅键 +func buildKey[T any](wd World, subscriber Subscriber[T]) string { + sp := fmt.Sprintf("%v_%v", wd.Id(), reflect.ValueOf(subscriber).Pointer()) + return sp } diff --git a/examples/rtss-event/main.go b/examples/rtss-event/main.go index db621c5..6e1e3ff 100644 --- a/examples/rtss-event/main.go +++ b/examples/rtss-event/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "time" @@ -43,8 +44,25 @@ var ( func main() { go world1() go world2() - time.Sleep(4 * time.Second) - fireSwitchDcEventType.Publish(wd2, &FireSwitchDcEvent{Dc: false}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + for { + select { + case <-ctx.Done(): + return + default: + } + fireSwitchDcEventType.Publish(wd2, &FireSwitchDcEvent{Dc: false}) + time.Sleep(10 * time.Millisecond) + } + }() + time.Sleep(2 * time.Second) + + wd1.Close() + wd2.Close() + + time.Sleep(5 * time.Second) } func world1() { world := wd1 @@ -52,6 +70,9 @@ func world1() { // switchSystem := NewSwitchSystem() world.AddSystem(switchSystem) + ecs.WorldStateChangeEvent.Subscribe(wd1, func(w ecs.World, event ecs.WorldStateChange) { + fmt.Println("世界1状态变更", event) + }) // fireSwitchDcEventType.Subscribe(world, switchSystem.WhenFireSwitchDcEvent) // @@ -68,6 +89,10 @@ func world2() { switchSystem := NewSwitchSystem() world.AddSystem(switchSystem) fireSwitchDcEventType.Subscribe(world, switchSystem.WhenFireSwitchDcEvent) + + ecs.WorldStateChangeEvent.Subscribe(wd2, func(w ecs.World, event ecs.WorldStateChange) { + fmt.Println("世界2状态变更", event) + }) // time.Sleep(3 * time.Second) // diff --git a/world.go b/world.go index f631630..ef4366a 100644 --- a/world.go +++ b/world.go @@ -1,6 +1,7 @@ package ecs import ( + "context" "fmt" "log/slog" "math" @@ -16,12 +17,11 @@ type WorldState int type WorldId = donburi.WorldId const ( - WorldInit WorldState = iota - WorldRunning - WorldPause - WorldError - WorldClose - WorldClosed + WorldInit WorldState = 0 + WorldRunning WorldState = 1 + WorldPause WorldState = 2 + WorldError WorldState = 3 + WorldClosed WorldState = 4 ) type ( @@ -38,7 +38,7 @@ type ( Resume() // 关闭世界 Close() - // 设置时间运行倍速 + // 设置运行倍速 SetSpeed(speed float64) error // 添加系统 AddSystem(sys ...ISystem) @@ -72,6 +72,9 @@ type world struct { // 待执行函数 toBeExecuteds chan HandleFunc + + cancel context.CancelFunc + done chan struct{} // 服务协程退出信号 } // 新建一个组件类型 @@ -109,11 +112,10 @@ func NewWorld(tick int) World { speed: 1, times: 1, toBeExecuteds: make(chan HandleFunc, 32), + done: make(chan struct{}), } } -func (w *world) Running() bool { - return w.state == WorldRunning -} + func (w *world) Tick() int { return w.tick } @@ -147,10 +149,11 @@ func (w *world) updateState(state WorldState) { old := w.state slog.Debug("世界状态变更", "oldstate", old, "state", state) w.state = state - WorldStateChangeEvent.internalPublish(w, &WorldStateChange{ + WorldStateChangeEvent.Publish(w, &WorldStateChange{ OldState: old, NewState: state, }) + WorldStateChangeEvent.ProcessEvents(w) } } @@ -179,8 +182,11 @@ func (w *world) SetSpeed(speed float64) error { // 启动世界,世界逻辑开始执行且世界为运行状态 func (w *world) StartUp() { if w.state == WorldInit { // 避免重复运行 + slog.Debug("启动世界", "id", w.Id()) + ctx, cancle := context.WithCancel(context.Background()) + go w.run(ctx) + w.cancel = cancle w.updateState(WorldRunning) - go w.run() } } @@ -188,7 +194,7 @@ func (w *world) StartUp() { func (w *world) Execute(fn HandleFunc) error { if w.state == WorldError { return fmt.Errorf("世界运行异常,无法执行请求") - } else if w.state != WorldRunning && w.state != WorldPause { + } else if w.state == WorldClosed { return fmt.Errorf("世界已经关闭,无法执行请求") } w.toBeExecuteds <- fn @@ -197,13 +203,8 @@ func (w *world) Execute(fn HandleFunc) error { // 关闭世界 func (w *world) Close() { - if w.state == WorldRunning || w.state == WorldPause { - w.Execute(func() { - w.updateState(WorldClose) - }) - } else if w.state == WorldError { - w.updateState(WorldClosed) - } + w.cancel() + <-w.done } // 执行待处理方法 @@ -222,7 +223,7 @@ func (w *world) executeTodos() { } // 世界循环 -func (w *world) run() { +func (w *world) run(ctx context.Context) { defer func() { if err := recover(); err != nil { w.exception(err) @@ -230,13 +231,15 @@ func (w *world) run() { debug.PrintStack() } }() + defer close(w.done) for range w.ticker.C { - // slog.Debug("世界运行") - if w.state == WorldClose { - // 世界正常关闭 + select { + case <-ctx.Done(): w.close() return + default: } + // start := time.Now() if w.state != WorldRunning { // 世界非运行状态 continue @@ -272,6 +275,7 @@ func (w *world) exception(err any) { // 世界正常关闭逻辑 func (w *world) close() { + slog.Debug("关闭世界", "id", w.Id()) // 世界正常关闭 w.updateState(WorldClosed) // 关闭定时器