Skip to content

Commit 09ebc49

Browse files
committed
sync resource with the last resource version
1 parent 0a87be5 commit 09ebc49

File tree

4 files changed

+113
-24
lines changed

4 files changed

+113
-24
lines changed

pkg/synchromanager/clustersynchro/informer/named_controller.go

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,34 @@ type controller struct {
1414
config cache.Config
1515

1616
reflectorMutex sync.RWMutex
17-
reflector *cache.Reflector
18-
queue cache.Queue
17+
reflector *Reflector
18+
19+
lastResourceVersion string
1920
}
2021

21-
func NewNamedController(name string, config *cache.Config) cache.Controller {
22+
func NewNamedController(name string, config *cache.Config) *controller {
2223
return &controller{
2324
name: name,
2425
config: *config,
2526
}
2627
}
2728

29+
func (c *controller) SetLastResourceVersion(lastResourceVersion string) {
30+
c.reflectorMutex.Lock()
31+
defer c.reflectorMutex.Unlock()
32+
if c.reflector != nil {
33+
panic("controller is running, connot set last resource version")
34+
}
35+
c.lastResourceVersion = lastResourceVersion
36+
}
37+
2838
func (c *controller) Run(stopCh <-chan struct{}) {
2939
defer utilruntime.HandleCrash()
3040
go func() {
3141
<-stopCh
3242
c.config.Queue.Close()
3343
}()
34-
r := cache.NewNamedReflector(
44+
r := NewNamedReflector(
3545
c.name,
3646
c.config.ListerWatcher,
3747
c.config.ObjectType,
@@ -42,6 +52,9 @@ func (c *controller) Run(stopCh <-chan struct{}) {
4252
r.WatchListPageSize = c.config.WatchListPageSize
4353

4454
c.reflectorMutex.Lock()
55+
if c.lastResourceVersion != "" {
56+
r.lastSyncResourceVersion = c.lastResourceVersion
57+
}
4558
c.reflector = r
4659
c.reflectorMutex.Unlock()
4760

@@ -52,6 +65,27 @@ func (c *controller) Run(stopCh <-chan struct{}) {
5265
wg.Wait()
5366
}
5467

68+
/*
69+
func (c *controller) setLastResourceVersionForReflector(reflector *cache.Reflector) {
70+
if c.resourceVersionGetter == nil {
71+
return
72+
}
73+
74+
rv := c.resourceVersionGetter.LastResourceVersion()
75+
if rv == "" || rv == "0" {
76+
return
77+
}
78+
rvValue := reflect.ValueOf(rv)
79+
80+
field := reflect.ValueOf(reflector).Elem().FieldByName("lastSyncResourceVersion")
81+
value := reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem()
82+
if value.Kind() != rvValue.Kind() {
83+
panic(fmt.Sprintf("reflector.lastSyncResourceVersion's value kind is %v", value.Kind()))
84+
}
85+
value.Set(rvValue)
86+
}
87+
*/
88+
5589
func (c *controller) processLoop() {
5690
for {
5791
obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process))
@@ -71,10 +105,7 @@ func (c *controller) HasSynced() bool {
71105
c.reflectorMutex.RLock()
72106
defer c.reflectorMutex.RUnlock()
73107

74-
if c.queue == nil {
75-
return false
76-
}
77-
return c.queue.HasSynced()
108+
return c.config.Queue.HasSynced()
78109
}
79110

80111
func (c *controller) LastSyncResourceVersion() string {

pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ import (
77
)
88

99
type ResourceVersionInformer interface {
10-
Run(stopCh <-chan struct{})
10+
Run(withLastResourceVersion bool, stopCh <-chan struct{})
1111
HasSynced() bool
1212
}
1313

1414
type resourceVersionInformer struct {
1515
name string
1616
storage *ResourceVersionStorage
1717
handler ResourceEventHandler
18-
controller cache.Controller
18+
controller *controller
1919
listerWatcher cache.ListerWatcher
2020
}
2121

@@ -24,7 +24,6 @@ func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *Re
2424
panic("name is required")
2525
}
2626

27-
// storage: NewResourceVersionStorage(cache.DeletionHandlingMetaNamespaceKeyFunc),
2827
informer := &resourceVersionInformer{
2928
name: name,
3029
listerWatcher: lw,
@@ -38,7 +37,7 @@ func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *Re
3837
RetryOnError: false,
3938
Process: func(obj interface{}) error {
4039
deltas := obj.(cache.Deltas)
41-
return informer.HandleDeltas(deltas)
40+
return informer.handleDeltas(deltas)
4241
},
4342
Queue: cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
4443
KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc,
@@ -54,11 +53,15 @@ func (informer *resourceVersionInformer) HasSynced() bool {
5453
return informer.controller.HasSynced()
5554
}
5655

57-
func (informer *resourceVersionInformer) Run(stopCh <-chan struct{}) {
56+
func (informer *resourceVersionInformer) Run(withLastResourceVersion bool, stopCh <-chan struct{}) {
57+
// TODO(iceber): It can only be run once and an error is reported if it is run a second time
58+
if withLastResourceVersion {
59+
informer.controller.SetLastResourceVersion(informer.storage.LastResourceVersion())
60+
}
5861
informer.controller.Run(stopCh)
5962
}
6063

61-
func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas) error {
64+
func (informer *resourceVersionInformer) handleDeltas(deltas cache.Deltas) error {
6265
for _, d := range deltas {
6366
switch d.Type {
6467
case cache.Replaced, cache.Added, cache.Updated:

pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,55 @@
11
package informer
22

33
import (
4+
"strconv"
5+
"sync/atomic"
6+
47
"k8s.io/apimachinery/pkg/api/meta"
8+
"k8s.io/apiserver/pkg/storage/etcd3"
59
"k8s.io/client-go/tools/cache"
610
)
711

812
type ResourceVersionStorage struct {
913
keyFunc cache.KeyFunc
1014

11-
cacheStorage cache.ThreadSafeStore
15+
lastResourceVersion *uint64
16+
cacheStorage cache.ThreadSafeStore
1217
}
1318

1419
var _ cache.KeyListerGetter = &ResourceVersionStorage{}
1520

1621
func NewResourceVersionStorage(keyFunc cache.KeyFunc) *ResourceVersionStorage {
17-
return &ResourceVersionStorage{
18-
cacheStorage: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
19-
keyFunc: keyFunc,
22+
var lastResourceVersion uint64
23+
storage := &ResourceVersionStorage{
24+
keyFunc: keyFunc,
25+
lastResourceVersion: &lastResourceVersion,
26+
cacheStorage: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
2027
}
28+
return storage
29+
}
30+
31+
func (c *ResourceVersionStorage) LastResourceVersion() string {
32+
return strconv.FormatUint(atomic.LoadUint64(c.lastResourceVersion), 10)
2133
}
2234

2335
func (c *ResourceVersionStorage) Add(obj interface{}) error {
2436
key, err := c.keyFunc(obj)
2537
if err != nil {
2638
return cache.KeyError{Obj: obj, Err: err}
2739
}
40+
2841
accessor, err := meta.Accessor(obj)
2942
if err != nil {
3043
return err
3144
}
3245

33-
c.cacheStorage.Add(key, accessor.GetResourceVersion())
46+
resourceversion := accessor.GetResourceVersion()
47+
rv, err := etcd3.Versioner.ParseResourceVersion(resourceversion)
48+
if err != nil {
49+
return err
50+
}
51+
atomic.CompareAndSwapUint64(c.lastResourceVersion, *c.lastResourceVersion, rv)
52+
c.cacheStorage.Add(key, resourceversion)
3453
return nil
3554
}
3655

@@ -39,12 +58,19 @@ func (c *ResourceVersionStorage) Update(obj interface{}) error {
3958
if err != nil {
4059
return cache.KeyError{Obj: obj, Err: err}
4160
}
61+
4262
accessor, err := meta.Accessor(obj)
4363
if err != nil {
4464
return err
4565
}
4666

47-
c.cacheStorage.Update(key, accessor.GetResourceVersion())
67+
resourceversion := accessor.GetResourceVersion()
68+
rv, err := etcd3.Versioner.ParseResourceVersion(resourceversion)
69+
if err != nil {
70+
return err
71+
}
72+
atomic.CompareAndSwapUint64(c.lastResourceVersion, *c.lastResourceVersion, rv)
73+
c.cacheStorage.Update(key, resourceversion)
4874
return nil
4975
}
5076

@@ -54,6 +80,15 @@ func (c *ResourceVersionStorage) Delete(obj interface{}) error {
5480
return cache.KeyError{Obj: obj, Err: err}
5581
}
5682

83+
if accessor, err := meta.Accessor(obj); err == nil {
84+
resourceversion := accessor.GetResourceVersion()
85+
rv, err := etcd3.Versioner.ParseResourceVersion(resourceversion)
86+
if err != nil {
87+
return err
88+
}
89+
atomic.CompareAndSwapUint64(c.lastResourceVersion, *c.lastResourceVersion, rv)
90+
}
91+
5792
c.cacheStorage.Delete(key)
5893
return nil
5994
}
@@ -80,6 +115,19 @@ func (c *ResourceVersionStorage) GetByKey(key string) (item interface{}, exists
80115
}
81116

82117
func (c *ResourceVersionStorage) Replace(versions map[string]interface{}) error {
118+
var lastResourceVersion uint64
119+
for _, version := range versions {
120+
rv, err := etcd3.Versioner.ParseResourceVersion(version.(string))
121+
if err != nil {
122+
// TODO(iceber): handle err
123+
continue
124+
}
125+
126+
if rv > lastResourceVersion {
127+
lastResourceVersion = rv
128+
}
129+
}
130+
atomic.StoreUint64(c.lastResourceVersion, lastResourceVersion)
83131
c.cacheStorage.Replace(versions, "")
84132
return nil
85133
}

pkg/synchromanager/clustersynchro/resource_synchro.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ type ResourceSynchro struct {
2727
cluster string
2828
storageResource schema.GroupResource
2929

30-
queue queue.EventQueue
31-
listerWatcher cache.ListerWatcher
32-
cache *informer.ResourceVersionStorage
30+
queue queue.EventQueue
31+
listerWatcher cache.ListerWatcher
32+
cache *informer.ResourceVersionStorage
33+
syncWithLastResourceVersion bool
3334

3435
memoryVersion schema.GroupVersion
3536
convertor runtime.ObjectConvertor
@@ -70,6 +71,9 @@ func newResourceSynchro(cluster string, lw cache.ListerWatcher, rvcache *informe
7071
}
7172
close(synchro.stoped)
7273

74+
// TODO(iceber): add feature gate
75+
synchro.syncWithLastResourceVersion = true
76+
7377
status := clustersv1alpha1.ClusterResourceSyncCondition{
7478
Status: clustersv1alpha1.SyncStatusPending,
7579
LastTransitionTime: metav1.Now(),
@@ -139,7 +143,10 @@ func (synchro *ResourceSynchro) Run(stopCh <-chan struct{}) {
139143
synchro.cache,
140144
&unstructured.Unstructured{},
141145
synchro,
142-
).Run(informerStopCh)
146+
).Run(synchro.syncWithLastResourceVersion, informerStopCh)
147+
148+
// next run informer with last resource version
149+
synchro.syncWithLastResourceVersion = true
143150

144151
status = clustersv1alpha1.ClusterResourceSyncCondition{
145152
Status: clustersv1alpha1.SyncStatusStop,

0 commit comments

Comments
 (0)