Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
helm.sh/helm/v3 v3.18.4
istio.io/api v1.26.3
istio.io/client-go v1.26.0-alpha.0.0.20250801111706-8b4229d430b7
istio.io/istio v0.0.0-20250803052519-90ec2958505f
istio.io/istio v0.0.0-20250804181122-c5fdc78b0741
k8s.io/api v0.33.2
k8s.io/apiextensions-apiserver v0.33.2
k8s.io/apimachinery v0.33.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -906,8 +906,8 @@ istio.io/api v1.26.3 h1:/TiA7bJi24yBQSgpLy5vHhFkobf4DWS1L+CuUxNk4os=
istio.io/api v1.26.3/go.mod h1:DTVGH6CLXj5W8FF9JUD3Tis78iRgT1WeuAnxfTz21Wg=
istio.io/client-go v1.26.0-alpha.0.0.20250801111706-8b4229d430b7 h1:UfQlmnTALAQJkGHzaQVBLqeWYLOFU2vhRQhEFBAXjdg=
istio.io/client-go v1.26.0-alpha.0.0.20250801111706-8b4229d430b7/go.mod h1:M7elDrU4/g2Rgr/K/u5E4VQP6l4lSMwBdNj5NeT21mg=
istio.io/istio v0.0.0-20250803052519-90ec2958505f h1:Yx4yf5/ikWf/X/tt6rBKZ6NcWpEROYW5JyDAr1MuAQQ=
istio.io/istio v0.0.0-20250803052519-90ec2958505f/go.mod h1:TY8IgDEUQKj+8eRtT8zxs8Mfsqxv1d/ecErZhfzF73U=
istio.io/istio v0.0.0-20250804181122-c5fdc78b0741 h1:dxfcWWEWYto5gpKo8rVyLh42iJHGaqRfmv4n91IxK6U=
istio.io/istio v0.0.0-20250804181122-c5fdc78b0741/go.mod h1:TY8IgDEUQKj+8eRtT8zxs8Mfsqxv1d/ecErZhfzF73U=
k8s.io/api v0.33.2 h1:YgwIS5jKfA+BZg//OQhkJNIfie/kmRsO0BmNaVSimvY=
k8s.io/api v0.33.2/go.mod h1:fhrbphQJSM2cXzCWgqU29xLDuks4mu7ti9vveEnpSXs=
k8s.io/apiextensions-apiserver v0.33.2 h1:6gnkIbngnaUflR3XwE1mCefN3YS8yTD631JXQhsU6M8=
Expand Down
2 changes: 2 additions & 0 deletions navigator/cmd/navi-discovery/app/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func addFlags(c *cobra.Command) {
"Directory to watch for updates to config yaml files. If specified, the files will be used as the source of config, rather than a CRD client.")
c.PersistentFlags().StringVar(&serverArgs.RegistryOptions.KubeOptions.DomainSuffix, "domain", constants.DefaultClusterLocalDomain,
"DNS domain suffix")
c.PersistentFlags().StringVarP(&serverArgs.Namespace, "namespace", "n", bootstrap.PodNamespace,
"Select a namespace where the controller resides. If not set, uses ${POD_NAMESPACE} environment variable")
c.PersistentFlags().StringVar((*string)(&serverArgs.RegistryOptions.KubeOptions.ClusterID), "clusterID", features.ClusterName,
"The ID of the cluster that this Dubbod instance resides")
c.PersistentFlags().StringToStringVar(&serverArgs.RegistryOptions.KubeOptions.ClusterAliases, "clusterAliases", map[string]string{},
Expand Down
4 changes: 2 additions & 2 deletions navigator/pkg/bootstrap/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *Server) initMeshConfiguration(args *NaviArgs, fileWatcher filewatcher.F
}

func (s *Server) getMeshConfiguration(args *NaviArgs, fileWatcher filewatcher.FileWatcher) krt.Singleton[meshwatcher.MeshConfigResource] {
opts := krt.NewOptionsBuilder(s.internalStop, "")
opts := krt.NewOptionsBuilder(s.internalStop, "", args.KrtDebugger)
sources := s.getConfigurationSources(args, fileWatcher, args.MeshConfigFile, kubemesh.MeshConfigKey)
if len(sources) == 0 {
fmt.Printf("\nUsing default mesh - missing file %s and no k8s client\n", args.MeshConfigFile)
Expand All @@ -48,7 +48,7 @@ func (s *Server) getMeshConfiguration(args *NaviArgs, fileWatcher filewatcher.Fi
}

func (s *Server) getConfigurationSources(args *NaviArgs, fileWatcher filewatcher.FileWatcher, file string, cmKey string) []meshwatcher.MeshConfigSource {
opts := krt.NewOptionsBuilder(s.internalStop, "")
opts := krt.NewOptionsBuilder(s.internalStop, "", args.KrtDebugger)
var userMeshConfig *meshwatcher.MeshConfigSource
if features.SharedMeshConfig != "" && s.kubeClient != nil {
userMeshConfig = ptr.Of(kubemesh.NewConfigMapSource(s.kubeClient, args.Namespace, features.SharedMeshConfig, cmKey, opts))
Expand Down
2 changes: 2 additions & 0 deletions navigator/pkg/bootstrap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/ctrlz"
"github.com/apache/dubbo-kubernetes/pkg/env"
"github.com/apache/dubbo-kubernetes/pkg/keepalive"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
)

var (
Expand All @@ -46,6 +47,7 @@ type NaviArgs struct {
Namespace string
CtrlZOptions *ctrlz.Options
KeepaliveOptions *keepalive.Options
KrtDebugger *krt.DebugHandler `json:"-"`
}

type DiscoveryServerOptions struct {
Expand Down
33 changes: 28 additions & 5 deletions navigator/pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/filewatcher"
"github.com/apache/dubbo-kubernetes/pkg/h2c"
dubbokeepalive "github.com/apache/dubbo-kubernetes/pkg/keepalive"
kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
"github.com/apache/dubbo-kubernetes/pkg/mesh"
"github.com/apache/dubbo-kubernetes/pkg/network"
"golang.org/x/net/http2"
"google.golang.org/grpc"
cluster2 "istio.io/istio/pkg/cluster"
kubelib "istio.io/istio/pkg/kube"
"k8s.io/client-go/rest"
"net"
"net/http"
Expand Down Expand Up @@ -93,10 +92,13 @@ func NewServer(args *NaviArgs, initFuncs ...func(*Server)) (*Server, error) {
}

func (s *Server) Start(stop <-chan struct{}) error {
fmt.Printf("\nStarting Dubbod Server with primary cluster %s\n", s.clusterID)
if err := s.server.Start(stop); err != nil {
return err
}
// TODO waitForCacheSync
if !s.waitForCacheSync(stop) {
return fmt.Errorf("failed to sync cache")
}
// TODO XDSserver CacheSynced

if s.secureGrpcAddress != "" {
Expand Down Expand Up @@ -143,6 +145,7 @@ func (s *Server) Start(stop <-chan struct{}) error {

func (s *Server) initKubeClient(args *NaviArgs) error {
if s.kubeClient != nil {
// Already initialized by startup arguments
return nil
}
hasK8SConfigStore := false
Expand All @@ -168,16 +171,16 @@ func (s *Server) initKubeClient(args *NaviArgs) error {
}

if hasK8SConfigStore || hasKubeRegistry(args.RegistryOptions.Registries) {
// Used by validation
kubeRestConfig, err := kubelib.DefaultRestConfig(args.RegistryOptions.KubeConfig, "", func(config *rest.Config) {
config.QPS = args.RegistryOptions.KubeOptions.KubernetesAPIQPS
config.Burst = args.RegistryOptions.KubeOptions.KubernetesAPIBurst
})

if err != nil {
return fmt.Errorf("failed creating kube config: %v", err)
}

s.kubeClient, err = kubelib.NewClient(kubelib.NewClientConfigForRestConfig(kubeRestConfig), cluster2.ID(s.clusterID))
s.kubeClient, err = kubelib.NewClient(kubelib.NewClientConfigForRestConfig(kubeRestConfig), s.clusterID)
if err != nil {
return fmt.Errorf("failed creating kube client: %v", err)
}
Expand Down Expand Up @@ -248,3 +251,23 @@ func getClusterID(args *NaviArgs) cluster.ID {
}
return clusterID
}

func (s *Server) cachesSynced() bool {
// TODO multiclusterController HasSynced
// TODO ServiceController().HasSynced
// TODO configController.HasSynced
return true
}

func (s *Server) waitForCacheSync(stop <-chan struct{}) bool {
start := time.Now()
fmt.Println("\nWaiting for caches to be synced")
if !kubelib.WaitForCacheSync("server", stop, s.cachesSynced) {
fmt.Println("\nFailed waiting for cache sync")
return false
}
fmt.Printf("\nAll controller caches have been synced up in %v\n", time.Since(start))
// TODO XDSServer.InboundUpdates.Load
// TODO return kubelib.WaitForCacheSync("push context", stop, func() bool { return s.pushContextReady(expected) })
return false
}
2 changes: 2 additions & 0 deletions pkg/config/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ const (
DefaultClusterLocalDomain = "cluster.local"
DefaultClusterName = "Kubernetes"
ServiceClusterName = "dubbo-proxy"
ConfigPathDir = "./etc/dubbo/proxy"
BinaryPathFilename = "/usr/local/bin/envoy"
)
164 changes: 164 additions & 0 deletions pkg/config/labels/instance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package labels

import (
"fmt"
"regexp"
"strings"

"github.com/hashicorp/go-multierror"

"istio.io/istio/pkg/maps"
"istio.io/istio/pkg/slices"
)

const (
DNS1123LabelMaxLength = 63 // Public for testing only.
dns1123LabelFmt = "[a-zA-Z0-9](?:[-a-zA-Z0-9]*[a-zA-Z0-9])?"
// a wild-card prefix is an '*', a normal DNS1123 label with a leading '*' or '*-', or a normal DNS1123 label
wildcardPrefix = `(\*|(\*|\*-)?` + dns1123LabelFmt + `)`

// Using kubernetes requirement, a valid key must be a non-empty string consist
// of alphanumeric characters, '-', '_' or '.', and must start and end with an
// alphanumeric character (e.g. 'MyValue', or 'my_value', or '12345'
qualifiedNameFmt = "(?:[A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]"

// In Kubernetes, label names can start with a DNS name followed by a '/':
dnsNamePrefixFmt = dns1123LabelFmt + `(?:\.` + dns1123LabelFmt + `)*/`
dnsNamePrefixMaxLength = 253
)

var (
tagRegexp = regexp.MustCompile("^(" + dnsNamePrefixFmt + ")?(" + qualifiedNameFmt + ")$") // label value can be an empty string
labelValueRegexp = regexp.MustCompile("^" + "(" + qualifiedNameFmt + ")?" + "$")
dns1123LabelRegexp = regexp.MustCompile("^" + dns1123LabelFmt + "$")
wildcardPrefixRegexp = regexp.MustCompile("^" + wildcardPrefix + "$")
)

// Instance is a non empty map of arbitrary strings. Each version of a service can
// be differentiated by a unique set of labels associated with the version. These
// labels are assigned to all instances of a particular service version. For
// example, lets say catalog.mystore.com has 2 versions v1 and v2. v1 instances
// could have labels gitCommit=aeiou234, region=us-east, while v2 instances could
// have labels name=kittyCat,region=us-east.
type Instance map[string]string

// SubsetOf is true if the label has same values for the keys
func (i Instance) SubsetOf(that Instance) bool {
if len(i) == 0 {
return true
}

if len(that) == 0 || len(that) < len(i) {
return false
}

for k, v1 := range i {
if v2, ok := that[k]; !ok || v1 != v2 {
return false
}
}
return true
}

// Match is true if the label has same values for the keys.
// if len(i) == 0, will return false. It is mainly used for service -> workload
func (i Instance) Match(that Instance) bool {
if len(i) == 0 {
return false
}

return i.SubsetOf(that)
}

// Equals returns true if the labels are equal.
func (i Instance) Equals(that Instance) bool {
return maps.Equal(i, that)
}

// Validate ensures tag is well-formed
func (i Instance) Validate() error {
if i == nil {
return nil
}
var errs error
for k, v := range i {
if err := validateTagKey(k); err != nil {
errs = multierror.Append(errs, err)
}
if !labelValueRegexp.MatchString(v) {
errs = multierror.Append(errs, fmt.Errorf("invalid tag value: %q", v))
}
}
return errs
}

// IsDNS1123Label tests for a string that conforms to the definition of a label in
// DNS (RFC 1123).
func IsDNS1123Label(value string) bool {
return len(value) <= DNS1123LabelMaxLength && dns1123LabelRegexp.MatchString(value)
}

// IsWildcardDNS1123Label tests for a string that conforms to the definition of a label in DNS (RFC 1123), but allows
// the wildcard label (`*`), and typical labels with a leading astrisk instead of alphabetic character (e.g. "*-foo")
func IsWildcardDNS1123Label(value string) bool {
return len(value) <= DNS1123LabelMaxLength && wildcardPrefixRegexp.MatchString(value)
}

// validateTagKey checks that a string is valid as a Kubernetes label name.
func validateTagKey(k string) error {
match := tagRegexp.FindStringSubmatch(k)
if match == nil {
return fmt.Errorf("invalid tag key: %q", k)
}

if len(match[1]) > 0 {
dnsPrefixLength := len(match[1]) - 1 // exclude the trailing / from the length
if dnsPrefixLength > dnsNamePrefixMaxLength {
return fmt.Errorf("invalid tag key: %q (DNS prefix is too long)", k)
}
}

if len(match[2]) > DNS1123LabelMaxLength {
return fmt.Errorf("invalid tag key: %q (name is too long)", k)
}

return nil
}

func (i Instance) String() string {
// Ensure stable ordering
keys := slices.Sort(maps.Keys(i))

var buffer strings.Builder
// Assume each kv pair is roughly 25 characters. We could be under or over, this is just a guess to optimize
buffer.Grow(len(keys) * 25)
first := true
for _, k := range keys {
v := i[k]
if !first {
buffer.WriteString(",")
} else {
first = false
}
if len(v) > 0 {
buffer.WriteString(k + "=" + v)
} else {
buffer.WriteString(k)
}
}
return buffer.String()
}
2 changes: 1 addition & 1 deletion pkg/config/mesh/kubemesh/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package kubemesh

import (
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/kube"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
"github.com/apache/dubbo-kubernetes/pkg/mesh/meshwatcher"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
"istio.io/istio/pkg/kube"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down
29 changes: 29 additions & 0 deletions pkg/config/mesh/meshwatcher/mesh.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package meshwatcher

import (
"google.golang.org/protobuf/proto"

meshconfig "istio.io/api/mesh/v1alpha1"
)

// MeshConfigResource holds the current MeshConfig state
type MeshConfigResource struct {
*meshconfig.MeshConfig
}

func (m MeshConfigResource) ResourceName() string { return "MeshConfigResource" }

func (m MeshConfigResource) Equals(other MeshConfigResource) bool {
return proto.Equal(m.MeshConfig, other.MeshConfig)
}

// MeshNetworksResource holds the current MeshNetworks state
type MeshNetworksResource struct {
*meshconfig.MeshNetworks
}

func (m MeshNetworksResource) ResourceName() string { return "MeshNetworksResource" }

func (m MeshNetworksResource) Equals(other MeshNetworksResource) bool {
return proto.Equal(m.MeshNetworks, other.MeshNetworks)
}
Loading
Loading