Skip to content

Commit 96acaea

Browse files
committed
Enable to set dynamic watch from controller flags
1 parent 051d1de commit 96acaea

File tree

6 files changed

+89
-1
lines changed

6 files changed

+89
-1
lines changed

cmd/katib-controller/v1beta1/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
apis "github.com/kubeflow/katib/pkg/apis/controller"
3131
controller "github.com/kubeflow/katib/pkg/controller.v1beta1"
3232
"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
33+
trialutil "github.com/kubeflow/katib/pkg/controller.v1beta1/trial/util"
3334
webhook "github.com/kubeflow/katib/pkg/webhook/v1beta1"
3435
)
3536

@@ -44,6 +45,7 @@ func main() {
4445
var injectSecurityContext bool
4546
var serviceName string
4647
var enableGRPCProbeInSuggestion bool
48+
var trialResources trialutil.GvkListFlag
4749

4850
flag.StringVar(&experimentSuggestionName, "experiment-suggestion-name",
4951
"default", "The implementation of suggestion interface in experiment controller (default)")
@@ -53,6 +55,7 @@ func main() {
5355
flag.BoolVar(&injectSecurityContext, "webhook-inject-securitycontext", false, "Inject the securityContext of container[0] in the sidecar")
5456
flag.StringVar(&serviceName, "webhook-service-name", "katib-controller", "The service name which will be used in webhook")
5557
flag.BoolVar(&enableGRPCProbeInSuggestion, "enable-grpc-probe-in-suggestion", true, "enable grpc probe in suggestions")
58+
flag.Var(&trialResources, "trial-resources", "The list of resources that can be used as trial template, in the form: Kind.version.group (e.g. TFJob.v1.kubeflow.org)")
5659

5760
flag.Parse()
5861

@@ -61,6 +64,7 @@ func main() {
6164
viper.Set(consts.ConfigCertLocalFS, certLocalFS)
6265
viper.Set(consts.ConfigInjectSecurityContext, injectSecurityContext)
6366
viper.Set(consts.ConfigEnableGRPCProbeInSuggestion, enableGRPCProbeInSuggestion)
67+
viper.Set(consts.ConfigTrialResources, trialResources)
6468

6569
log.Info("Config:",
6670
consts.ConfigExperimentSuggestionName,
@@ -75,6 +79,8 @@ func main() {
7579
viper.GetBool(consts.ConfigInjectSecurityContext),
7680
consts.ConfigEnableGRPCProbeInSuggestion,
7781
viper.GetBool(consts.ConfigEnableGRPCProbeInSuggestion),
82+
"trial-resources",
83+
viper.Get(consts.ConfigTrialResources),
7884
)
7985

8086
// Get a config to talk to the apiserver

pkg/controller.v1beta1/consts/const.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ const (
2222
// ConfigEnableGRPCProbeInSuggestion is the config name which indicates
2323
// if we should set GRPC probe in suggestion deployments.
2424
ConfigEnableGRPCProbeInSuggestion = "enable-grpc-probe-in-suggestion"
25+
// ConfigTrialResources is the config name which indicates
26+
// resources list which can be used as trial template
27+
ConfigTrialResources = "trial-resources"
2528

2629
// LabelExperimentName is the label of experiment name.
2730
LabelExperimentName = "experiment"

pkg/controller.v1beta1/trial/trial_controller.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,12 @@ import (
4242
"sigs.k8s.io/controller-runtime/pkg/source"
4343

4444
trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
45+
"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
4546
"github.com/kubeflow/katib/pkg/controller.v1beta1/trial/managerclient"
4647
trialutil "github.com/kubeflow/katib/pkg/controller.v1beta1/trial/util"
4748
"github.com/kubeflow/katib/pkg/controller.v1beta1/util"
4849
jobv1beta1 "github.com/kubeflow/katib/pkg/job/v1beta1"
50+
"github.com/spf13/viper"
4951
)
5052

5153
const (
@@ -124,6 +126,35 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
124126
log.Info("Job watch added successfully", "CRD Kind", gvk.Kind)
125127
}
126128
}
129+
130+
trialResources := viper.Get(consts.ConfigTrialResources)
131+
if trialResources != nil {
132+
// Cast interface to gvk slice object
133+
gvkList := trialResources.(trialutil.GvkListFlag)
134+
135+
// Watch for changes in custom resources
136+
for _, gvk := range gvkList {
137+
unstructuredJob := &unstructured.Unstructured{}
138+
unstructuredJob.SetGroupVersionKind(gvk)
139+
err = c.Watch(
140+
&source.Kind{Type: unstructuredJob},
141+
&handler.EnqueueRequestForOwner{
142+
IsController: true,
143+
OwnerType: &trialsv1beta1.Trial{},
144+
})
145+
if err != nil {
146+
if meta.IsNoMatchError(err) {
147+
log.Info("Job watch error. CRD might be missing. Please install CRD and restart katib-controller",
148+
"CRD Group", gvk.Group, "CRD Version", gvk.Version, "CRD Kind", gvk.Kind)
149+
continue
150+
}
151+
return err
152+
}
153+
log.Info("Job watch added successfully",
154+
"CRD Group", gvk.Group, "CRD Version", gvk.Version, "CRD Kind", gvk.Kind)
155+
}
156+
}
157+
127158
log.Info("Trial controller created")
128159
return nil
129160
}

pkg/controller.v1beta1/trial/trial_controller_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1"
99
"github.com/onsi/gomega"
1010
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/spf13/viper"
1112
"golang.org/x/net/context"
1213
corev1 "k8s.io/api/core/v1"
1314
v1 "k8s.io/api/core/v1"
@@ -22,6 +23,7 @@ import (
2223
commonv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1"
2324
trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
2425
api_pb "github.com/kubeflow/katib/pkg/apis/manager/v1beta1"
26+
"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
2527
trialutil "github.com/kubeflow/katib/pkg/controller.v1beta1/trial/util"
2628
util "github.com/kubeflow/katib/pkg/controller.v1beta1/util"
2729
managerclientmock "github.com/kubeflow/katib/pkg/mock/v1beta1/trial/managerclient"
@@ -50,6 +52,22 @@ func TestAdd(t *testing.T) {
5052
mgr, err := manager.New(cfg, manager.Options{})
5153
g.Expect(err).NotTo(gomega.HaveOccurred())
5254

55+
// Set fake trial resources
56+
trialResources := trialutil.GvkListFlag{
57+
{
58+
Group: "kubeflow.org",
59+
Version: "v1",
60+
Kind: "TFJob",
61+
},
62+
{
63+
Group: "kubeflow.org",
64+
Version: "v1",
65+
Kind: "MPIJob",
66+
},
67+
}
68+
69+
viper.Set(consts.ConfigTrialResources, trialResources)
70+
5371
// Test - Try to add Trial controller to the manager
5472
g.Expect(Add(mgr)).NotTo(gomega.HaveOccurred())
5573
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package util
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"k8s.io/apimachinery/pkg/runtime/schema"
8+
)
9+
10+
// GvkListFlag is the custom flag to parse GroupVersionKind list for trial resources.
11+
type GvkListFlag []schema.GroupVersionKind
12+
13+
// Set is the method to convert gvk to string value
14+
func (flag *GvkListFlag) String() string {
15+
gvkStrings := []string{}
16+
for _, x := range []schema.GroupVersionKind(*flag) {
17+
gvkStrings = append(gvkStrings, x.String())
18+
}
19+
return strings.Join(gvkStrings, ",")
20+
}
21+
22+
// Set is the method to set gvk from string flag value
23+
func (flag *GvkListFlag) Set(value string) error {
24+
gvk, _ := schema.ParseKindArg(value)
25+
if gvk == nil {
26+
return fmt.Errorf("Invalid GroupVersionKind: %v", value)
27+
}
28+
*flag = append(*flag, *gvk)
29+
return nil
30+
}

pkg/controller.v1beta1/trial/util/prometheus_metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package trial
17+
package util
1818

1919
import (
2020
"context"

0 commit comments

Comments
 (0)