Skip to content

Commit e8f7526

Browse files
authored
Merge pull request #400 from sxllwx/k8s_merge
Add: kubernetes registry and remote package unit test
2 parents 36fdb01 + e48b698 commit e8f7526

21 files changed

+2901
-34
lines changed

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ require (
5757
go.uber.org/zap v1.10.0
5858
google.golang.org/grpc v1.22.1
5959
gopkg.in/yaml.v2 v2.2.2
60+
k8s.io/api v0.0.0-20190325185214-7544f9db76f6
61+
k8s.io/apimachinery v0.0.0-20190223001710-c182ff3b9841
62+
k8s.io/client-go v8.0.0+incompatible
63+
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a // indirect
6064
)
6165

6266
go 1.13

go.sum

Lines changed: 39 additions & 9 deletions
Large diffs are not rendered by default.

registry/etcdv3/listener.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ type dataListener struct {
3838
listener config_center.ConfigurationListener
3939
}
4040

41-
// NewRegistryDataListener ...
41+
// NewRegistryDataListener
4242
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
43-
return &dataListener{listener: listener, interestedURL: []*common.URL{}}
43+
return &dataListener{listener: listener}
4444
}
4545

4646
func (l *dataListener) AddInterestedURL(url *common.URL) {
@@ -49,7 +49,12 @@ func (l *dataListener) AddInterestedURL(url *common.URL) {
4949

5050
func (l *dataListener) DataChange(eventType remoting.Event) bool {
5151

52-
url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):]
52+
index := strings.Index(eventType.Path, "/providers/")
53+
if index == -1 {
54+
logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
55+
return false
56+
}
57+
url := eventType.Path[index+len("/providers/"):]
5358
serviceURL, err := common.NewURL(url)
5459
if err != nil {
5560
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
@@ -68,7 +73,6 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {
6873
return true
6974
}
7075
}
71-
7276
return false
7377
}
7478

@@ -97,7 +101,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
97101

98102
case e := <-l.events:
99103
logger.Infof("got etcd event %#v", e)
100-
if e.ConfigType == remoting.EventTypeDel {
104+
if e.ConfigType == remoting.EventTypeDel && l.registry.client.Valid() {
101105
select {
102106
case <-l.registry.Done():
103107
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)

registry/etcdv3/listener_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
package etcdv3
1919

2020
import (
21+
"os"
2122
"testing"
2223
"time"
23-
24-
"github.com/apache/dubbo-go/config_center"
2524
)
2625

2726
import (
@@ -32,6 +31,7 @@ import (
3231

3332
import (
3433
"github.com/apache/dubbo-go/common"
34+
"github.com/apache/dubbo-go/config_center"
3535
"github.com/apache/dubbo-go/remoting"
3636
)
3737

@@ -40,13 +40,16 @@ type RegistryTestSuite struct {
4040
etcd *embed.Etcd
4141
}
4242

43+
const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-registry.etcd"
44+
4345
// start etcd server
4446
func (suite *RegistryTestSuite) SetupSuite() {
4547

4648
t := suite.T()
4749

4850
cfg := embed.NewConfig()
49-
cfg.Dir = "/tmp/default.etcd"
51+
// avoid conflict with default etcd work-dir
52+
cfg.Dir = defaultEtcdV3WorkDir
5053
e, err := embed.StartEtcd(cfg)
5154
if err != nil {
5255
t.Fatal(err)
@@ -66,6 +69,10 @@ func (suite *RegistryTestSuite) SetupSuite() {
6669
// stop etcd server
6770
func (suite *RegistryTestSuite) TearDownSuite() {
6871
suite.etcd.Close()
72+
// clean the etcd workdir
73+
if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil {
74+
suite.FailNow(err.Error())
75+
}
6976
}
7077

7178
func (suite *RegistryTestSuite) TestDataChange() {

registry/etcdv3/registry_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (suite *RegistryTestSuite) TestSubscribe() {
9898
assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String())
9999
}
100100

101-
func (suite *RegistryTestSuite) TestConsumerDestory() {
101+
func (suite *RegistryTestSuite) TestConsumerDestroy() {
102102

103103
t := suite.T()
104104
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
@@ -117,7 +117,7 @@ func (suite *RegistryTestSuite) TestConsumerDestory() {
117117

118118
}
119119

120-
func (suite *RegistryTestSuite) TestProviderDestory() {
120+
func (suite *RegistryTestSuite) TestProviderDestroy() {
121121

122122
t := suite.T()
123123
reg := initRegistry(t)

registry/kubernetes/listener.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kubernetes
19+
20+
import (
21+
"strings"
22+
)
23+
24+
import (
25+
perrors "github.com/pkg/errors"
26+
)
27+
28+
import (
29+
"github.com/apache/dubbo-go/common"
30+
"github.com/apache/dubbo-go/common/logger"
31+
"github.com/apache/dubbo-go/config_center"
32+
"github.com/apache/dubbo-go/registry"
33+
"github.com/apache/dubbo-go/remoting"
34+
)
35+
36+
type dataListener struct {
37+
interestedURL []*common.URL
38+
listener config_center.ConfigurationListener
39+
}
40+
41+
// NewRegistryDataListener
42+
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
43+
return &dataListener{listener: listener}
44+
}
45+
46+
// AddInterestedURL
47+
func (l *dataListener) AddInterestedURL(url *common.URL) {
48+
l.interestedURL = append(l.interestedURL, url)
49+
}
50+
51+
// DataChange
52+
// notify listen, when interest event
53+
func (l *dataListener) DataChange(eventType remoting.Event) bool {
54+
55+
index := strings.Index(eventType.Path, "/providers/")
56+
if index == -1 {
57+
logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
58+
return false
59+
}
60+
url := eventType.Path[index+len("/providers/"):]
61+
serviceURL, err := common.NewURL(url)
62+
if err != nil {
63+
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
64+
return false
65+
}
66+
67+
for _, v := range l.interestedURL {
68+
if serviceURL.URLEqual(*v) {
69+
l.listener.Process(
70+
&config_center.ConfigChangeEvent{
71+
Key: eventType.Path,
72+
Value: serviceURL,
73+
ConfigType: eventType.Action,
74+
},
75+
)
76+
return true
77+
}
78+
}
79+
return false
80+
}
81+
82+
type configurationListener struct {
83+
registry *kubernetesRegistry
84+
events chan *config_center.ConfigChangeEvent
85+
}
86+
87+
// NewConfigurationListener for listening the event of kubernetes.
88+
func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
89+
// add a new waiter
90+
reg.WaitGroup().Add(1)
91+
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
92+
}
93+
94+
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
95+
l.events <- configType
96+
}
97+
98+
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
99+
for {
100+
select {
101+
case <-l.registry.Done():
102+
logger.Warnf("listener's kubernetes client connection is broken, so kubernetes event listener exits now.")
103+
return nil, perrors.New("listener stopped")
104+
105+
case e := <-l.events:
106+
logger.Infof("got kubernetes event %#v", e)
107+
if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() {
108+
select {
109+
case <-l.registry.Done():
110+
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
111+
default:
112+
}
113+
continue
114+
}
115+
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
116+
}
117+
}
118+
}
119+
func (l *configurationListener) Close() {
120+
l.registry.WaitGroup().Done()
121+
}

0 commit comments

Comments
 (0)