Skip to content

Commit 3781a86

Browse files
authored
refactor: streamline SSE connection shutdown process and enhance event handling
1 parent 6ea7748 commit 3781a86

File tree

1 file changed

+36
-45
lines changed

1 file changed

+36
-45
lines changed

internal/master.go

Lines changed: 36 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -467,49 +467,11 @@ func (m *Master) Run() {
467467
// Shutdown 关闭主控
468468
func (m *Master) Shutdown(ctx context.Context) error {
469469
return m.shutdown(ctx, func() {
470-
// 声明一个已关闭通道的集合,避免重复关闭
471-
var closedChannels sync.Map
472-
473-
var wg sync.WaitGroup
474-
475-
// 给所有订阅者一个关闭通知
476-
m.subscribers.Range(func(key, value any) bool {
477-
subscriberChan := value.(chan *InstanceEvent)
478-
wg.Add(1)
479-
go func(ch chan *InstanceEvent) {
480-
defer wg.Done()
481-
// 非阻塞的方式发送关闭事件
482-
select {
483-
case ch <- &InstanceEvent{
484-
Type: "shutdown",
485-
Time: time.Now(),
486-
}:
487-
default:
488-
// 不可用,忽略
489-
}
490-
}(subscriberChan)
491-
return true
492-
})
493-
494-
// 等待所有订阅者处理完关闭事件
495-
time.Sleep(baseDuration)
496-
497-
// 关闭所有订阅者通道
498-
m.subscribers.Range(func(key, value any) bool {
499-
subscriberChan := value.(chan *InstanceEvent)
500-
// 检查通道是否已关闭,如果没有则关闭它
501-
if _, loaded := closedChannels.LoadOrStore(subscriberChan, true); !loaded {
502-
wg.Add(1)
503-
go func(k any, ch chan *InstanceEvent) {
504-
defer wg.Done()
505-
close(ch)
506-
m.subscribers.Delete(k)
507-
}(key, subscriberChan)
508-
}
509-
return true
510-
})
470+
// 通知并关闭SSE连接
471+
m.shutdownSSEConnections()
511472

512473
// 停止所有运行中的实例
474+
var wg sync.WaitGroup
513475
m.instances.Range(func(key, value any) bool {
514476
instance := value.(*Instance)
515477
// 如果实例正在运行,则停止它
@@ -1103,8 +1065,9 @@ func (m *Master) handlePutInstance(w http.ResponseWriter, r *http.Request, id st
11031065
func (m *Master) regenerateAPIKey(instance *Instance) {
11041066
instance.URL = generateAPIKey()
11051067
m.instances.Store(apiKeyID, instance)
1106-
go m.saveState()
11071068
m.logger.Info("API Key regenerated: %v", instance.URL)
1069+
go m.saveState()
1070+
go m.shutdownSSEConnections()
11081071
}
11091072

11101073
// processInstanceAction 处理实例操作
@@ -1202,12 +1165,14 @@ func (m *Master) handleSSE(w http.ResponseWriter, r *http.Request) {
12021165
// 客户端连接关闭标志
12031166
connectionClosed := make(chan struct{})
12041167

1205-
// 监听客户端连接是否关闭,但不关闭通道,留给Shutdown处理
1168+
// 监听客户端连接是否关闭
12061169
go func() {
12071170
<-ctx.Done()
12081171
close(connectionClosed)
1209-
// 只从映射表中移除,但不关闭通道
1210-
m.subscribers.Delete(subscriberID)
1172+
// 从映射表中移除并关闭通道
1173+
if ch, exists := m.subscribers.LoadAndDelete(subscriberID); exists {
1174+
close(ch.(chan *InstanceEvent))
1175+
}
12111176
}()
12121177

12131178
// 持续发送事件到客户端
@@ -1255,6 +1220,32 @@ func (m *Master) sendSSEEvent(eventType string, instance *Instance, logs ...stri
12551220
}
12561221
}
12571222

1223+
// shutdownSSEConnections 通知并关闭SSE连接
1224+
func (m *Master) shutdownSSEConnections() {
1225+
var wg sync.WaitGroup
1226+
1227+
// 发送shutdown通知并关闭通道
1228+
m.subscribers.Range(func(key, value any) bool {
1229+
ch := value.(chan *InstanceEvent)
1230+
wg.Add(1)
1231+
go func(subscriberID any, eventChan chan *InstanceEvent) {
1232+
defer wg.Done()
1233+
// 发送shutdown通知
1234+
select {
1235+
case eventChan <- &InstanceEvent{Type: "shutdown", Time: time.Now()}:
1236+
default:
1237+
}
1238+
// 从映射表中移除并关闭通道
1239+
if _, exists := m.subscribers.LoadAndDelete(subscriberID); exists {
1240+
close(eventChan)
1241+
}
1242+
}(key, ch)
1243+
return true
1244+
})
1245+
1246+
wg.Wait()
1247+
}
1248+
12581249
// startEventDispatcher 启动事件分发器
12591250
func (m *Master) startEventDispatcher() {
12601251
for event := range m.notifyChannel {

0 commit comments

Comments
 (0)