Skip to content

Commit 755c21d

Browse files
committed
Add proper callbacks for Clusters.Add/-OrReplace
1 parent 7d1092f commit 755c21d

File tree

3 files changed

+21
-10
lines changed

3 files changed

+21
-10
lines changed

pkg/clusters/clusters.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212

1313
"sigs.k8s.io/controller-runtime/pkg/client"
1414
"sigs.k8s.io/controller-runtime/pkg/cluster"
15+
16+
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
1517
)
1618

1719
// Clusters is a conccurency-safe map of clusters to be used in
@@ -51,9 +53,13 @@ func (c *Clusters) Get(_ context.Context, name string) (cluster.Cluster, error)
5153
return cl, nil
5254
}
5355

56+
// HandleClusterErrorFunc is called when a cluster encounters an error
57+
// during its lifecycle.
58+
type HandleClusterErrorFunc func(string, error)
59+
5460
// Add adds a new cluster.
5561
// If a cluster with the given name already exists, it returns an error.
56-
func (c *Clusters) Add(ctx context.Context, name string, cl cluster.Cluster, callback func(context.Context, string, cluster.Cluster) error) error {
62+
func (c *Clusters) Add(ctx context.Context, name string, cl cluster.Cluster, callback multicluster.EngageFunc, handleError HandleClusterErrorFunc) error {
5763
c.Lock.Lock()
5864
defer c.Lock.Unlock()
5965

@@ -72,10 +78,7 @@ func (c *Clusters) Add(ctx context.Context, name string, cl cluster.Cluster, cal
7278
go func() {
7379
defer c.Remove(name)
7480
if err := cl.Start(ctx); err != nil {
75-
cancel()
76-
// TODO how to handle errors?
77-
// Another callback?
78-
// pass in a logger on creation?
81+
handleError(name, err)
7982
}
8083
}()
8184

@@ -98,11 +101,11 @@ func (c *Clusters) Remove(name string) {
98101
// If a cluster with the name already exists it compares the
99102
// configuration as returned by cluster.GetConfig() to compare
100103
// clusters.
101-
func (c *Clusters) AddOrReplace(ctx context.Context, name string, cl cluster.Cluster, callback func(context.Context, string, cluster.Cluster) error) error {
104+
func (c *Clusters) AddOrReplace(ctx context.Context, name string, cl cluster.Cluster, callback multicluster.EngageFunc, handleError HandleClusterErrorFunc) error {
102105
existing, err := c.Get(ctx, name)
103106
if err != nil {
104107
// Cluster does not exist, add it
105-
return c.Add(ctx, name, cl, callback)
108+
return c.Add(ctx, name, cl, callback, handleError)
106109
}
107110

108111
if cmp.Equal(existing.GetConfig(), cl.GetConfig()) {
@@ -112,7 +115,7 @@ func (c *Clusters) AddOrReplace(ctx context.Context, name string, cl cluster.Clu
112115

113116
// Cluster exists with a different config, replace it
114117
c.Remove(name)
115-
return c.Add(ctx, name, cl, callback)
118+
return c.Add(ctx, name, cl, callback, handleError)
116119
}
117120

118121
// IndexField indexes a field on all clusters.

pkg/multicluster/multicluster.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ import (
2323
"sigs.k8s.io/controller-runtime/pkg/cluster"
2424
)
2525

26+
// EngageFunc is a function that starts operations for a given Cluster.
27+
// See the Aware interface for more details.
28+
type EngageFunc func(ctx context.Context, clusterName string, c cluster.Cluster) error
29+
2630
// Aware is an interface that can be implemented by components that
2731
// can engage and disengage when clusters are added or removed at runtime.
2832
type Aware interface {
@@ -39,7 +43,7 @@ type Aware interface {
3943
// __||____/ /_
4044
// |___ \
4145
// `--------'
42-
Engage(context.Context, string, cluster.Cluster) error
46+
Engage(ctx context.Context, clusterName string, c cluster.Cluster) error
4347
}
4448

4549
// Provider allows to retrieve clusters by name. The provider is responsible for discovering

providers/file/provider.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ func (p *Provider) RunOnce(ctx context.Context, mgr mcmanager.Manager) error {
190190
return p.run(ctx, mgr)
191191
}
192192

193+
func (p *Provider) handleClusterErr(name string, err error) {
194+
p.log.Error(err, "error in cluster", "name", name)
195+
}
196+
193197
func (p *Provider) run(ctx context.Context, mgr mcmanager.Manager) error {
194198
loadedClusters, err := p.loadClusters()
195199
if err != nil {
@@ -200,7 +204,7 @@ func (p *Provider) run(ctx context.Context, mgr mcmanager.Manager) error {
200204
// add new clusters
201205
for name, cl := range loadedClusters {
202206
p.log.Info("adding or updating cluster", "name", name)
203-
if err := p.Clusters.AddOrReplace(ctx, name, cl, mgr.Engage); err != nil {
207+
if err := p.Clusters.AddOrReplace(ctx, name, cl, mgr.Engage, p.handleClusterErr); err != nil {
204208
p.log.Error(err, "failed to add or replace cluster", "name", name)
205209
continue
206210
}

0 commit comments

Comments
 (0)