Skip to content

Commit 6cac5ae

Browse files
committed
[navi] add discovery init mesh config v4
1 parent a0f884f commit 6cac5ae

File tree

32 files changed

+1524
-770
lines changed

32 files changed

+1524
-770
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ require (
5656
helm.sh/helm/v3 v3.18.4
5757
istio.io/api v1.26.3
5858
istio.io/client-go v1.26.0-alpha.0.0.20250801111706-8b4229d430b7
59-
istio.io/istio v0.0.0-20250803052519-90ec2958505f
59+
istio.io/istio v0.0.0-20250804181122-c5fdc78b0741
6060
k8s.io/api v0.33.2
6161
k8s.io/apiextensions-apiserver v0.33.2
6262
k8s.io/apimachinery v0.33.2

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -906,8 +906,8 @@ istio.io/api v1.26.3 h1:/TiA7bJi24yBQSgpLy5vHhFkobf4DWS1L+CuUxNk4os=
906906
istio.io/api v1.26.3/go.mod h1:DTVGH6CLXj5W8FF9JUD3Tis78iRgT1WeuAnxfTz21Wg=
907907
istio.io/client-go v1.26.0-alpha.0.0.20250801111706-8b4229d430b7 h1:UfQlmnTALAQJkGHzaQVBLqeWYLOFU2vhRQhEFBAXjdg=
908908
istio.io/client-go v1.26.0-alpha.0.0.20250801111706-8b4229d430b7/go.mod h1:M7elDrU4/g2Rgr/K/u5E4VQP6l4lSMwBdNj5NeT21mg=
909-
istio.io/istio v0.0.0-20250803052519-90ec2958505f h1:Yx4yf5/ikWf/X/tt6rBKZ6NcWpEROYW5JyDAr1MuAQQ=
910-
istio.io/istio v0.0.0-20250803052519-90ec2958505f/go.mod h1:TY8IgDEUQKj+8eRtT8zxs8Mfsqxv1d/ecErZhfzF73U=
909+
istio.io/istio v0.0.0-20250804181122-c5fdc78b0741 h1:dxfcWWEWYto5gpKo8rVyLh42iJHGaqRfmv4n91IxK6U=
910+
istio.io/istio v0.0.0-20250804181122-c5fdc78b0741/go.mod h1:TY8IgDEUQKj+8eRtT8zxs8Mfsqxv1d/ecErZhfzF73U=
911911
k8s.io/api v0.33.2 h1:YgwIS5jKfA+BZg//OQhkJNIfie/kmRsO0BmNaVSimvY=
912912
k8s.io/api v0.33.2/go.mod h1:fhrbphQJSM2cXzCWgqU29xLDuks4mu7ti9vveEnpSXs=
913913
k8s.io/apiextensions-apiserver v0.33.2 h1:6gnkIbngnaUflR3XwE1mCefN3YS8yTD631JXQhsU6M8=

navigator/cmd/navi-discovery/app/cmd.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ func addFlags(c *cobra.Command) {
113113
"Directory to watch for updates to config yaml files. If specified, the files will be used as the source of config, rather than a CRD client.")
114114
c.PersistentFlags().StringVar(&serverArgs.RegistryOptions.KubeOptions.DomainSuffix, "domain", constants.DefaultClusterLocalDomain,
115115
"DNS domain suffix")
116+
c.PersistentFlags().StringVarP(&serverArgs.Namespace, "namespace", "n", bootstrap.PodNamespace,
117+
"Select a namespace where the controller resides. If not set, uses ${POD_NAMESPACE} environment variable")
116118
c.PersistentFlags().StringVar((*string)(&serverArgs.RegistryOptions.KubeOptions.ClusterID), "clusterID", features.ClusterName,
117119
"The ID of the cluster that this Dubbod instance resides")
118120
c.PersistentFlags().StringToStringVar(&serverArgs.RegistryOptions.KubeOptions.ClusterAliases, "clusterAliases", map[string]string{},

navigator/pkg/bootstrap/mesh.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (s *Server) initMeshConfiguration(args *NaviArgs, fileWatcher filewatcher.F
3939
}
4040

4141
func (s *Server) getMeshConfiguration(args *NaviArgs, fileWatcher filewatcher.FileWatcher) krt.Singleton[meshwatcher.MeshConfigResource] {
42-
opts := krt.NewOptionsBuilder(s.internalStop, "")
42+
opts := krt.NewOptionsBuilder(s.internalStop, "", args.KrtDebugger)
4343
sources := s.getConfigurationSources(args, fileWatcher, args.MeshConfigFile, kubemesh.MeshConfigKey)
4444
if len(sources) == 0 {
4545
fmt.Printf("\nUsing default mesh - missing file %s and no k8s client\n", args.MeshConfigFile)
@@ -48,7 +48,7 @@ func (s *Server) getMeshConfiguration(args *NaviArgs, fileWatcher filewatcher.Fi
4848
}
4949

5050
func (s *Server) getConfigurationSources(args *NaviArgs, fileWatcher filewatcher.FileWatcher, file string, cmKey string) []meshwatcher.MeshConfigSource {
51-
opts := krt.NewOptionsBuilder(s.internalStop, "")
51+
opts := krt.NewOptionsBuilder(s.internalStop, "", args.KrtDebugger)
5252
var userMeshConfig *meshwatcher.MeshConfigSource
5353
if features.SharedMeshConfig != "" && s.kubeClient != nil {
5454
userMeshConfig = ptr.Of(kubemesh.NewConfigMapSource(s.kubeClient, args.Namespace, features.SharedMeshConfig, cmKey, opts))

navigator/pkg/bootstrap/options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/apache/dubbo-kubernetes/pkg/ctrlz"
2424
"github.com/apache/dubbo-kubernetes/pkg/env"
2525
"github.com/apache/dubbo-kubernetes/pkg/keepalive"
26+
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
2627
)
2728

2829
var (
@@ -46,6 +47,7 @@ type NaviArgs struct {
4647
Namespace string
4748
CtrlZOptions *ctrlz.Options
4849
KeepaliveOptions *keepalive.Options
50+
KrtDebugger *krt.DebugHandler `json:"-"`
4951
}
5052

5153
type DiscoveryServerOptions struct {

navigator/pkg/bootstrap/server.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,11 @@ import (
2828
"github.com/apache/dubbo-kubernetes/pkg/filewatcher"
2929
"github.com/apache/dubbo-kubernetes/pkg/h2c"
3030
dubbokeepalive "github.com/apache/dubbo-kubernetes/pkg/keepalive"
31+
kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
3132
"github.com/apache/dubbo-kubernetes/pkg/mesh"
3233
"github.com/apache/dubbo-kubernetes/pkg/network"
3334
"golang.org/x/net/http2"
3435
"google.golang.org/grpc"
35-
cluster2 "istio.io/istio/pkg/cluster"
36-
kubelib "istio.io/istio/pkg/kube"
3736
"k8s.io/client-go/rest"
3837
"net"
3938
"net/http"
@@ -93,10 +92,13 @@ func NewServer(args *NaviArgs, initFuncs ...func(*Server)) (*Server, error) {
9392
}
9493

9594
func (s *Server) Start(stop <-chan struct{}) error {
95+
fmt.Printf("\nStarting Dubbod Server with primary cluster %s\n", s.clusterID)
9696
if err := s.server.Start(stop); err != nil {
9797
return err
9898
}
99-
// TODO waitForCacheSync
99+
if !s.waitForCacheSync(stop) {
100+
return fmt.Errorf("failed to sync cache")
101+
}
100102
// TODO XDSserver CacheSynced
101103

102104
if s.secureGrpcAddress != "" {
@@ -143,6 +145,7 @@ func (s *Server) Start(stop <-chan struct{}) error {
143145

144146
func (s *Server) initKubeClient(args *NaviArgs) error {
145147
if s.kubeClient != nil {
148+
// Already initialized by startup arguments
146149
return nil
147150
}
148151
hasK8SConfigStore := false
@@ -168,16 +171,16 @@ func (s *Server) initKubeClient(args *NaviArgs) error {
168171
}
169172

170173
if hasK8SConfigStore || hasKubeRegistry(args.RegistryOptions.Registries) {
174+
// Used by validation
171175
kubeRestConfig, err := kubelib.DefaultRestConfig(args.RegistryOptions.KubeConfig, "", func(config *rest.Config) {
172176
config.QPS = args.RegistryOptions.KubeOptions.KubernetesAPIQPS
173177
config.Burst = args.RegistryOptions.KubeOptions.KubernetesAPIBurst
174178
})
175-
176179
if err != nil {
177180
return fmt.Errorf("failed creating kube config: %v", err)
178181
}
179182

180-
s.kubeClient, err = kubelib.NewClient(kubelib.NewClientConfigForRestConfig(kubeRestConfig), cluster2.ID(s.clusterID))
183+
s.kubeClient, err = kubelib.NewClient(kubelib.NewClientConfigForRestConfig(kubeRestConfig), s.clusterID)
181184
if err != nil {
182185
return fmt.Errorf("failed creating kube client: %v", err)
183186
}
@@ -248,3 +251,23 @@ func getClusterID(args *NaviArgs) cluster.ID {
248251
}
249252
return clusterID
250253
}
254+
255+
func (s *Server) cachesSynced() bool {
256+
// TODO multiclusterController HasSynced
257+
// TODO ServiceController().HasSynced
258+
// TODO configController.HasSynced
259+
return true
260+
}
261+
262+
func (s *Server) waitForCacheSync(stop <-chan struct{}) bool {
263+
start := time.Now()
264+
fmt.Println("\nWaiting for caches to be synced")
265+
if !kubelib.WaitForCacheSync("server", stop, s.cachesSynced) {
266+
fmt.Println("\nFailed waiting for cache sync")
267+
return false
268+
}
269+
fmt.Printf("\nAll controller caches have been synced up in %v\n", time.Since(start))
270+
// TODO XDSServer.InboundUpdates.Load
271+
// TODO return kubelib.WaitForCacheSync("push context", stop, func() bool { return s.pushContextReady(expected) })
272+
return false
273+
}

pkg/config/constants/constants.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,6 @@ const (
2222
DefaultClusterLocalDomain = "cluster.local"
2323
DefaultClusterName = "Kubernetes"
2424
ServiceClusterName = "dubbo-proxy"
25+
ConfigPathDir = "./etc/dubbo/proxy"
26+
BinaryPathFilename = "/usr/local/bin/envoy"
2527
)

pkg/config/labels/instance.go

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// Copyright Istio Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package labels
16+
17+
import (
18+
"fmt"
19+
"regexp"
20+
"strings"
21+
22+
"github.com/hashicorp/go-multierror"
23+
24+
"istio.io/istio/pkg/maps"
25+
"istio.io/istio/pkg/slices"
26+
)
27+
28+
const (
29+
DNS1123LabelMaxLength = 63 // Public for testing only.
30+
dns1123LabelFmt = "[a-zA-Z0-9](?:[-a-zA-Z0-9]*[a-zA-Z0-9])?"
31+
// a wild-card prefix is an '*', a normal DNS1123 label with a leading '*' or '*-', or a normal DNS1123 label
32+
wildcardPrefix = `(\*|(\*|\*-)?` + dns1123LabelFmt + `)`
33+
34+
// Using kubernetes requirement, a valid key must be a non-empty string consist
35+
// of alphanumeric characters, '-', '_' or '.', and must start and end with an
36+
// alphanumeric character (e.g. 'MyValue', or 'my_value', or '12345'
37+
qualifiedNameFmt = "(?:[A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]"
38+
39+
// In Kubernetes, label names can start with a DNS name followed by a '/':
40+
dnsNamePrefixFmt = dns1123LabelFmt + `(?:\.` + dns1123LabelFmt + `)*/`
41+
dnsNamePrefixMaxLength = 253
42+
)
43+
44+
var (
45+
tagRegexp = regexp.MustCompile("^(" + dnsNamePrefixFmt + ")?(" + qualifiedNameFmt + ")$") // label value can be an empty string
46+
labelValueRegexp = regexp.MustCompile("^" + "(" + qualifiedNameFmt + ")?" + "$")
47+
dns1123LabelRegexp = regexp.MustCompile("^" + dns1123LabelFmt + "$")
48+
wildcardPrefixRegexp = regexp.MustCompile("^" + wildcardPrefix + "$")
49+
)
50+
51+
// Instance is a non empty map of arbitrary strings. Each version of a service can
52+
// be differentiated by a unique set of labels associated with the version. These
53+
// labels are assigned to all instances of a particular service version. For
54+
// example, lets say catalog.mystore.com has 2 versions v1 and v2. v1 instances
55+
// could have labels gitCommit=aeiou234, region=us-east, while v2 instances could
56+
// have labels name=kittyCat,region=us-east.
57+
type Instance map[string]string
58+
59+
// SubsetOf is true if the label has same values for the keys
60+
func (i Instance) SubsetOf(that Instance) bool {
61+
if len(i) == 0 {
62+
return true
63+
}
64+
65+
if len(that) == 0 || len(that) < len(i) {
66+
return false
67+
}
68+
69+
for k, v1 := range i {
70+
if v2, ok := that[k]; !ok || v1 != v2 {
71+
return false
72+
}
73+
}
74+
return true
75+
}
76+
77+
// Match is true if the label has same values for the keys.
78+
// if len(i) == 0, will return false. It is mainly used for service -> workload
79+
func (i Instance) Match(that Instance) bool {
80+
if len(i) == 0 {
81+
return false
82+
}
83+
84+
return i.SubsetOf(that)
85+
}
86+
87+
// Equals returns true if the labels are equal.
88+
func (i Instance) Equals(that Instance) bool {
89+
return maps.Equal(i, that)
90+
}
91+
92+
// Validate ensures tag is well-formed
93+
func (i Instance) Validate() error {
94+
if i == nil {
95+
return nil
96+
}
97+
var errs error
98+
for k, v := range i {
99+
if err := validateTagKey(k); err != nil {
100+
errs = multierror.Append(errs, err)
101+
}
102+
if !labelValueRegexp.MatchString(v) {
103+
errs = multierror.Append(errs, fmt.Errorf("invalid tag value: %q", v))
104+
}
105+
}
106+
return errs
107+
}
108+
109+
// IsDNS1123Label tests for a string that conforms to the definition of a label in
110+
// DNS (RFC 1123).
111+
func IsDNS1123Label(value string) bool {
112+
return len(value) <= DNS1123LabelMaxLength && dns1123LabelRegexp.MatchString(value)
113+
}
114+
115+
// IsWildcardDNS1123Label tests for a string that conforms to the definition of a label in DNS (RFC 1123), but allows
116+
// the wildcard label (`*`), and typical labels with a leading astrisk instead of alphabetic character (e.g. "*-foo")
117+
func IsWildcardDNS1123Label(value string) bool {
118+
return len(value) <= DNS1123LabelMaxLength && wildcardPrefixRegexp.MatchString(value)
119+
}
120+
121+
// validateTagKey checks that a string is valid as a Kubernetes label name.
122+
func validateTagKey(k string) error {
123+
match := tagRegexp.FindStringSubmatch(k)
124+
if match == nil {
125+
return fmt.Errorf("invalid tag key: %q", k)
126+
}
127+
128+
if len(match[1]) > 0 {
129+
dnsPrefixLength := len(match[1]) - 1 // exclude the trailing / from the length
130+
if dnsPrefixLength > dnsNamePrefixMaxLength {
131+
return fmt.Errorf("invalid tag key: %q (DNS prefix is too long)", k)
132+
}
133+
}
134+
135+
if len(match[2]) > DNS1123LabelMaxLength {
136+
return fmt.Errorf("invalid tag key: %q (name is too long)", k)
137+
}
138+
139+
return nil
140+
}
141+
142+
func (i Instance) String() string {
143+
// Ensure stable ordering
144+
keys := slices.Sort(maps.Keys(i))
145+
146+
var buffer strings.Builder
147+
// Assume each kv pair is roughly 25 characters. We could be under or over, this is just a guess to optimize
148+
buffer.Grow(len(keys) * 25)
149+
first := true
150+
for _, k := range keys {
151+
v := i[k]
152+
if !first {
153+
buffer.WriteString(",")
154+
} else {
155+
first = false
156+
}
157+
if len(v) > 0 {
158+
buffer.WriteString(k + "=" + v)
159+
} else {
160+
buffer.WriteString(k)
161+
}
162+
}
163+
return buffer.String()
164+
}

pkg/config/mesh/kubemesh/watcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ package kubemesh
1919

2020
import (
2121
"fmt"
22+
"github.com/apache/dubbo-kubernetes/pkg/kube"
2223
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
2324
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
2425
"github.com/apache/dubbo-kubernetes/pkg/mesh/meshwatcher"
2526
"github.com/apache/dubbo-kubernetes/pkg/ptr"
26-
"istio.io/istio/pkg/kube"
2727
v1 "k8s.io/api/core/v1"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2929
"k8s.io/apimachinery/pkg/fields"

pkg/config/mesh/meshwatcher/mesh.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package meshwatcher
2+
3+
import (
4+
"google.golang.org/protobuf/proto"
5+
6+
meshconfig "istio.io/api/mesh/v1alpha1"
7+
)
8+
9+
// MeshConfigResource holds the current MeshConfig state
10+
type MeshConfigResource struct {
11+
*meshconfig.MeshConfig
12+
}
13+
14+
func (m MeshConfigResource) ResourceName() string { return "MeshConfigResource" }
15+
16+
func (m MeshConfigResource) Equals(other MeshConfigResource) bool {
17+
return proto.Equal(m.MeshConfig, other.MeshConfig)
18+
}
19+
20+
// MeshNetworksResource holds the current MeshNetworks state
21+
type MeshNetworksResource struct {
22+
*meshconfig.MeshNetworks
23+
}
24+
25+
func (m MeshNetworksResource) ResourceName() string { return "MeshNetworksResource" }
26+
27+
func (m MeshNetworksResource) Equals(other MeshNetworksResource) bool {
28+
return proto.Equal(m.MeshNetworks, other.MeshNetworks)
29+
}

0 commit comments

Comments
 (0)