Skip to content

Commit 59bc5b5

Browse files
authored
Merge pull request #294 from zouyx/feature/addRouter
Ftr: condition router
2 parents 4e67732 + 0c7af31 commit 59bc5b5

36 files changed

+1877
-242
lines changed

.travis.yml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@ install: true
1010

1111
script:
1212
- go fmt ./... && [[ -z `git status -s` ]]
13-
- mkdir -p remoting/zookeeper/zookeeper-4unittest/contrib/fatjar config_center/zookeeper/zookeeper-4unittest/contrib/fatjar registry/zookeeper/zookeeper-4unittest/contrib/fatjar
14-
- wget -P "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar" https://github.com/dubbogo/resources/raw/master/zookeeper-4unitest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar
15-
- cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar config_center/zookeeper/zookeeper-4unittest/contrib/fatjar/
16-
- cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar registry/zookeeper/zookeeper-4unittest/contrib/fatjar/
13+
- chmod u+x before_ut.sh && ./before_ut.sh
1714
- go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic
1815

1916
after_success:

before_ut.bat

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,17 @@
1515
:: limitations under the License.
1616

1717
set zkJar=zookeeper-3.4.9-fatjar.jar
18-
md remoting\zookeeper\zookeeper-4unittest\contrib\fatjar config_center\zookeeper\zookeeper-4unittest\contrib\fatjar registry\zookeeper\zookeeper-4unittest\contrib\fatjar
18+
md remoting\zookeeper\zookeeper-4unittest\contrib\fatjar
1919
curl -L https://github.com/dubbogo/resources/raw/master/zookeeper-4unitest/contrib/fatjar/%zkJar% -o remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%
20+
21+
md config_center\zookeeper\zookeeper-4unittest\contrib\fatjar
2022
xcopy /f "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%" "config_center/zookeeper/zookeeper-4unittest/contrib/fatjar/"
21-
xcopy /f "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%" "registry/zookeeper/zookeeper-4unittest/contrib/fatjar/"
23+
24+
md registry\zookeeper\zookeeper-4unittest\contrib\fatjar
25+
xcopy /f "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%" "registry/zookeeper/zookeeper-4unittest/contrib/fatjar/"
26+
27+
md cluster\router\chain\zookeeper-4unittest\contrib\fatjar
28+
xcopy /f "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%" "cluster/router/chain/zookeeper-4unittest/contrib/fatjar/"
29+
30+
md cluster\router\condition\zookeeper-4unittest\contrib\fatjar
31+
xcopy /f "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%" "cluster/router/condition/zookeeper-4unittest/contrib/fatjar/"

before_ut.sh

100644100755
Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,17 @@
1515
# limitations under the License.
1616

1717

18-
mkdir -p remoting/zookeeper/zookeeper-4unittest/contrib/fatjar config_center/zookeeper/zookeeper-4unittest/contrib/fatjar registry/zookeeper/zookeeper-4unittest/contrib/fatjar
18+
mkdir -p remoting/zookeeper/zookeeper-4unittest/contrib/fatjar
1919
wget -P "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar" https://github.com/dubbogo/resources/raw/master/zookeeper-4unitest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar
20+
21+
mkdir -p config_center/zookeeper/zookeeper-4unittest/contrib/fatjar
2022
cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar config_center/zookeeper/zookeeper-4unittest/contrib/fatjar/
21-
cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar registry/zookeeper/zookeeper-4unittest/contrib/fatjar/
23+
24+
mkdir -p registry/zookeeper/zookeeper-4unittest/contrib/fatjar
25+
cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar registry/zookeeper/zookeeper-4unittest/contrib/fatjar/
26+
27+
mkdir -p cluster/router/chain/zookeeper-4unittest/contrib/fatjar
28+
cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar cluster/router/chain/zookeeper-4unittest/contrib/fatjar
29+
30+
mkdir -p cluster/router/condition/zookeeper-4unittest/contrib/fatjar
31+
cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar cluster/router/condition/zookeeper-4unittest/contrib/fatjar

cluster/directory/base_directory.go

Lines changed: 75 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,39 +20,94 @@ package directory
2020
import (
2121
"sync"
2222
)
23+
2324
import (
25+
"github.com/dubbogo/gost/container/set"
2426
"go.uber.org/atomic"
2527
)
28+
2629
import (
30+
"github.com/apache/dubbo-go/cluster/router"
31+
"github.com/apache/dubbo-go/cluster/router/chain"
2732
"github.com/apache/dubbo-go/common"
33+
"github.com/apache/dubbo-go/common/constant"
34+
"github.com/apache/dubbo-go/common/extension"
35+
"github.com/apache/dubbo-go/common/logger"
2836
)
2937

30-
// BaseDirectory ...
38+
var routerURLSet = gxset.NewSet()
39+
40+
// BaseDirectory Abstract implementation of Directory: Invoker list returned from this Directory's list method have been filtered by Routers
3141
type BaseDirectory struct {
3242
url *common.URL
3343
destroyed *atomic.Bool
34-
mutex sync.Mutex
44+
// this mutex for change the properties in BaseDirectory, like routerChain , destroyed etc
45+
mutex sync.Mutex
46+
routerChain router.Chain
3547
}
3648

37-
// NewBaseDirectory ...
49+
// NewBaseDirectory Create BaseDirectory with URL
3850
func NewBaseDirectory(url *common.URL) BaseDirectory {
3951
return BaseDirectory{
40-
url: url,
41-
destroyed: atomic.NewBool(false),
52+
url: url,
53+
destroyed: atomic.NewBool(false),
54+
routerChain: &chain.RouterChain{},
4255
}
4356
}
4457

45-
// GetUrl ...
58+
// RouterChain Return router chain in directory
59+
func (dir *BaseDirectory) RouterChain() router.Chain {
60+
return dir.routerChain
61+
}
62+
63+
// SetRouterChain Set router chain in directory
64+
func (dir *BaseDirectory) SetRouterChain(routerChain router.Chain) {
65+
dir.mutex.Lock()
66+
defer dir.mutex.Unlock()
67+
dir.routerChain = routerChain
68+
}
69+
70+
// GetUrl Get URL
4671
func (dir *BaseDirectory) GetUrl() common.URL {
4772
return *dir.url
4873
}
4974

50-
// GetDirectoryUrl ...
75+
// GetDirectoryUrl Get URL instance
5176
func (dir *BaseDirectory) GetDirectoryUrl() *common.URL {
5277
return dir.url
5378
}
5479

55-
// Destroy ...
80+
// SetRouters Convert url to routers and add them into dir.routerChain
81+
func (dir *BaseDirectory) SetRouters(urls []*common.URL) {
82+
if len(urls) == 0 {
83+
return
84+
}
85+
86+
routers := make([]router.Router, 0, len(urls))
87+
88+
for _, url := range urls {
89+
routerKey := url.GetParam(constant.ROUTER_KEY, "")
90+
91+
if len(routerKey) > 0 {
92+
factory := extension.GetRouterFactory(url.Protocol)
93+
r, err := factory.NewRouter(url)
94+
if err != nil {
95+
logger.Errorf("Create router fail. router key: %s, error: %v", routerKey, url.Service(), err)
96+
return
97+
}
98+
routers = append(routers, r)
99+
}
100+
}
101+
102+
logger.Infof("Init file condition router success, size: %v", len(routers))
103+
dir.mutex.Lock()
104+
rc := dir.routerChain
105+
dir.mutex.Unlock()
106+
107+
rc.AddRouters(routers)
108+
}
109+
110+
// Destroy Destroy
56111
func (dir *BaseDirectory) Destroy(doDestroy func()) {
57112
if dir.destroyed.CAS(false, true) {
58113
dir.mutex.Lock()
@@ -61,7 +116,18 @@ func (dir *BaseDirectory) Destroy(doDestroy func()) {
61116
}
62117
}
63118

64-
// IsAvailable ...
119+
// IsAvailable Once directory init finish, it will change to true
65120
func (dir *BaseDirectory) IsAvailable() bool {
66121
return !dir.destroyed.Load()
67122
}
123+
124+
// GetRouterURLSet Return router URL
125+
func GetRouterURLSet() *gxset.HashSet {
126+
return routerURLSet
127+
}
128+
129+
// AddRouterURLSet Add router URL
130+
// Router URL will init in config/config_loader.go
131+
func AddRouterURLSet(url *common.URL) {
132+
routerURLSet.Add(url)
133+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package directory
19+
20+
import (
21+
"encoding/base64"
22+
"fmt"
23+
"testing"
24+
)
25+
26+
import (
27+
gxnet "github.com/dubbogo/gost/net"
28+
"github.com/stretchr/testify/assert"
29+
)
30+
31+
import (
32+
_ "github.com/apache/dubbo-go/cluster/router/condition"
33+
"github.com/apache/dubbo-go/common"
34+
"github.com/apache/dubbo-go/common/constant"
35+
)
36+
37+
func TestNewBaseDirectory(t *testing.T) {
38+
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider"))
39+
directory := NewBaseDirectory(&url)
40+
41+
assert.NotNil(t, directory)
42+
43+
assert.Equal(t, url, directory.GetUrl())
44+
assert.Equal(t, &url, directory.GetDirectoryUrl())
45+
46+
}
47+
48+
func TestBuildRouterChain(t *testing.T) {
49+
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider"))
50+
directory := NewBaseDirectory(&url)
51+
52+
assert.NotNil(t, directory)
53+
54+
localIP, _ := gxnet.GetLocalIP()
55+
rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP))
56+
routeURL := getRouteUrl(rule)
57+
routerURLs := make([]*common.URL, 0)
58+
routerURLs = append(routerURLs, routeURL)
59+
directory.SetRouters(routerURLs)
60+
chain := directory.RouterChain()
61+
62+
assert.NotNil(t, chain)
63+
}
64+
65+
func getRouteUrl(rule string) *common.URL {
66+
url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService")
67+
url.AddParam("rule", rule)
68+
url.AddParam("force", "true")
69+
url.AddParam(constant.ROUTER_KEY, "router")
70+
return &url
71+
}

cluster/directory/static_directory.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
package directory
1919

2020
import (
21+
perrors "github.com/pkg/errors"
22+
)
23+
24+
import (
25+
"github.com/apache/dubbo-go/cluster/router/chain"
2126
"github.com/apache/dubbo-go/common"
2227
"github.com/apache/dubbo-go/protocol"
2328
)
@@ -27,7 +32,7 @@ type staticDirectory struct {
2732
invokers []protocol.Invoker
2833
}
2934

30-
// NewStaticDirectory ...
35+
// NewStaticDirectory Create a new staticDirectory with invokers
3136
func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory {
3237
var url common.URL
3338

@@ -53,11 +58,21 @@ func (dir *staticDirectory) IsAvailable() bool {
5358
return true
5459
}
5560

61+
// List List invokers
5662
func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
57-
//TODO:Here should add router
58-
return dir.invokers
63+
l := len(dir.invokers)
64+
invokers := make([]protocol.Invoker, l, l)
65+
copy(invokers, dir.invokers)
66+
routerChain := dir.RouterChain()
67+
68+
if routerChain == nil {
69+
return invokers
70+
}
71+
dirUrl := dir.GetUrl()
72+
return routerChain.Route(invokers, &dirUrl, invocation)
5973
}
6074

75+
// Destroy Destroy
6176
func (dir *staticDirectory) Destroy() {
6277
dir.BaseDirectory.Destroy(func() {
6378
for _, ivk := range dir.invokers {
@@ -66,3 +81,17 @@ func (dir *staticDirectory) Destroy() {
6681
dir.invokers = []protocol.Invoker{}
6782
})
6883
}
84+
85+
// BuildRouterChain build router chain by invokers
86+
func (dir *staticDirectory) BuildRouterChain(invokers []protocol.Invoker) error {
87+
if len(invokers) == 0 {
88+
return perrors.Errorf("invokers == null")
89+
}
90+
url := invokers[0].GetUrl()
91+
routerChain, e := chain.NewRouterChain(&url)
92+
if e != nil {
93+
return e
94+
}
95+
dir.SetRouterChain(routerChain)
96+
return nil
97+
}

cluster/directory/static_directory_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ func Test_StaticDirList(t *testing.T) {
4040
}
4141

4242
staticDir := NewStaticDirectory(invokers)
43-
assert.Len(t, staticDir.List(&invocation.RPCInvocation{}), 10)
43+
list := staticDir.List(&invocation.RPCInvocation{})
44+
45+
assert.Len(t, list, 10)
4446
}
4547

4648
func Test_StaticDirDestroy(t *testing.T) {

0 commit comments

Comments
 (0)