Skip to content

Commit ea0c45b

Browse files
authored
refactor: consolidate periodic tasks into a single method for improved management
1 parent 81e5dd2 commit ea0c45b

File tree

1 file changed

+125
-126
lines changed

1 file changed

+125
-126
lines changed

internal/master.go

Lines changed: 125 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -372,18 +372,6 @@ func NewMaster(parsedURL *url.URL, tlsCode string, tlsConfig *tls.Config, logger
372372
// 启动事件分发器
373373
go master.startEventDispatcher()
374374

375-
// 启动定期备份
376-
go master.startPeriodicBackup()
377-
378-
// 启动定期更新
379-
go master.startPeriodicUpdate()
380-
381-
// 启动定期清理
382-
go master.startPeriodicCleanup()
383-
384-
// 启动定期重启
385-
go master.startPeriodicErrorRestart()
386-
387375
return master, nil
388376
}
389377

@@ -501,6 +489,9 @@ func (m *Master) Run() {
501489
}
502490
}()
503491

492+
// 启动定期任务
493+
go m.startPeriodicTasks()
494+
504495
// 处理系统信号
505496
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
506497
<-ctx.Done()
@@ -559,6 +550,128 @@ func (m *Master) Shutdown(ctx context.Context) error {
559550
})
560551
}
561552

553+
// startPeriodicTasks 启动所有定期任务
554+
func (m *Master) startPeriodicTasks() {
555+
go m.startPeriodicBackup()
556+
go m.startPeriodicUpdate()
557+
go m.startPeriodicCleanup()
558+
go m.startPeriodicRestart()
559+
}
560+
561+
// startPeriodicBackup 启动定期备份
562+
func (m *Master) startPeriodicBackup() {
563+
for {
564+
select {
565+
case <-time.After(ReloadInterval):
566+
// 固定备份文件名
567+
backupPath := fmt.Sprintf("%s.backup", m.statePath)
568+
569+
if err := m.saveStateToPath(backupPath); err != nil {
570+
m.logger.Error("startPeriodicBackup: backup state failed: %v", err)
571+
} else {
572+
m.logger.Info("State backup saved: %v", backupPath)
573+
}
574+
case <-m.periodicDone:
575+
return
576+
}
577+
}
578+
}
579+
580+
// startPeriodicUpdate 启动定期更新
581+
func (m *Master) startPeriodicUpdate() {
582+
for {
583+
select {
584+
case <-time.After(reportInterval):
585+
// 遍历所有实例并更新标签
586+
m.instances.Range(func(key, value any) bool {
587+
instance := value.(*Instance)
588+
// 跳过API Key实例
589+
if instance.ID != apiKeyID {
590+
m.updateInstanceConfigTag(instance)
591+
}
592+
return true
593+
})
594+
case <-m.periodicDone:
595+
return
596+
}
597+
}
598+
}
599+
600+
// startPeriodicCleanup 启动定期清理重复ID的实例
601+
func (m *Master) startPeriodicCleanup() {
602+
for {
603+
select {
604+
case <-time.After(reportInterval):
605+
// 收集实例并按ID分组
606+
idInstances := make(map[string][]*Instance)
607+
m.instances.Range(func(key, value any) bool {
608+
if id := key.(string); id != apiKeyID {
609+
idInstances[id] = append(idInstances[id], value.(*Instance))
610+
}
611+
return true
612+
})
613+
614+
// 清理重复实例
615+
for _, instances := range idInstances {
616+
if len(instances) <= 1 {
617+
continue
618+
}
619+
620+
// 选择保留实例
621+
keepIdx := 0
622+
for i, inst := range instances {
623+
if inst.Status == "running" && instances[keepIdx].Status != "running" {
624+
keepIdx = i
625+
}
626+
}
627+
628+
// 清理多余实例
629+
for i, inst := range instances {
630+
if i == keepIdx {
631+
continue
632+
}
633+
inst.deleted = true
634+
if inst.Status != "stopped" {
635+
m.stopInstance(inst)
636+
}
637+
m.instances.Delete(inst.ID)
638+
}
639+
}
640+
case <-m.periodicDone:
641+
return
642+
}
643+
}
644+
}
645+
646+
// startPeriodicRestart 启动定期错误实例重启
647+
func (m *Master) startPeriodicRestart() {
648+
for {
649+
select {
650+
case <-time.After(reportInterval):
651+
// 收集所有error状态的实例
652+
var errorInstances []*Instance
653+
m.instances.Range(func(key, value any) bool {
654+
if id := key.(string); id != apiKeyID {
655+
instance := value.(*Instance)
656+
if instance.Status == "error" && !instance.deleted {
657+
errorInstances = append(errorInstances, instance)
658+
}
659+
}
660+
return true
661+
})
662+
663+
// 重启所有error状态的实例
664+
for _, instance := range errorInstances {
665+
m.stopInstance(instance)
666+
time.Sleep(baseDuration)
667+
m.startInstance(instance)
668+
}
669+
case <-m.periodicDone:
670+
return
671+
}
672+
}
673+
}
674+
562675
// saveState 保存实例状态到文件
563676
func (m *Master) saveState() error {
564677
return m.saveStateToPath(m.statePath)
@@ -632,25 +745,6 @@ func (m *Master) saveStateToPath(filePath string) error {
632745
return nil
633746
}
634747

635-
// startPeriodicBackup 启动定期备份
636-
func (m *Master) startPeriodicBackup() {
637-
for {
638-
select {
639-
case <-time.After(ReloadInterval):
640-
// 固定备份文件名
641-
backupPath := fmt.Sprintf("%s.backup", m.statePath)
642-
643-
if err := m.saveStateToPath(backupPath); err != nil {
644-
m.logger.Error("startPeriodicBackup: backup state failed: %v", err)
645-
} else {
646-
m.logger.Info("State backup saved: %v", backupPath)
647-
}
648-
case <-m.periodicDone:
649-
return
650-
}
651-
}
652-
}
653-
654748
// loadState 从文件加载实例状态
655749
func (m *Master) loadState() {
656750
// 清理旧的临时文件
@@ -1693,101 +1787,6 @@ func (m *Master) updateInstanceConfigTag(instance *Instance) {
16931787
m.instances.Store(instance.ID, instance)
16941788
}
16951789

1696-
// startPeriodicUpdate 启动定期更新
1697-
func (m *Master) startPeriodicUpdate() {
1698-
for {
1699-
select {
1700-
case <-time.After(reportInterval):
1701-
// 遍历所有实例并更新标签
1702-
m.instances.Range(func(key, value any) bool {
1703-
instance := value.(*Instance)
1704-
// 跳过API Key实例
1705-
if instance.ID != apiKeyID {
1706-
m.updateInstanceConfigTag(instance)
1707-
}
1708-
return true
1709-
})
1710-
case <-m.periodicDone:
1711-
return
1712-
}
1713-
}
1714-
}
1715-
1716-
// startPeriodicCleanup 启动定期清理重复ID的实例
1717-
func (m *Master) startPeriodicCleanup() {
1718-
for {
1719-
select {
1720-
case <-time.After(reportInterval):
1721-
// 收集实例并按ID分组
1722-
idInstances := make(map[string][]*Instance)
1723-
m.instances.Range(func(key, value any) bool {
1724-
if id := key.(string); id != apiKeyID {
1725-
idInstances[id] = append(idInstances[id], value.(*Instance))
1726-
}
1727-
return true
1728-
})
1729-
1730-
// 清理重复实例
1731-
for _, instances := range idInstances {
1732-
if len(instances) <= 1 {
1733-
continue
1734-
}
1735-
1736-
// 选择保留实例
1737-
keepIdx := 0
1738-
for i, inst := range instances {
1739-
if inst.Status == "running" && instances[keepIdx].Status != "running" {
1740-
keepIdx = i
1741-
}
1742-
}
1743-
1744-
// 清理多余实例
1745-
for i, inst := range instances {
1746-
if i == keepIdx {
1747-
continue
1748-
}
1749-
inst.deleted = true
1750-
if inst.Status != "stopped" {
1751-
m.stopInstance(inst)
1752-
}
1753-
m.instances.Delete(inst.ID)
1754-
}
1755-
}
1756-
case <-m.periodicDone:
1757-
return
1758-
}
1759-
}
1760-
}
1761-
1762-
// startPeriodicErrorRestart 启动定期错误实例重启
1763-
func (m *Master) startPeriodicErrorRestart() {
1764-
for {
1765-
select {
1766-
case <-time.After(reportInterval):
1767-
// 收集所有error状态的实例
1768-
var errorInstances []*Instance
1769-
m.instances.Range(func(key, value any) bool {
1770-
if id := key.(string); id != apiKeyID {
1771-
instance := value.(*Instance)
1772-
if instance.Status == "error" && !instance.deleted {
1773-
errorInstances = append(errorInstances, instance)
1774-
}
1775-
}
1776-
return true
1777-
})
1778-
1779-
// 重启所有error状态的实例
1780-
for _, instance := range errorInstances {
1781-
m.stopInstance(instance)
1782-
time.Sleep(baseDuration)
1783-
m.startInstance(instance)
1784-
}
1785-
case <-m.periodicDone:
1786-
return
1787-
}
1788-
}
1789-
}
1790-
17911790
// generateID 生成随机ID
17921791
func generateID() string {
17931792
bytes := make([]byte, 4)

0 commit comments

Comments
 (0)