Skip to content

Commit 2d70b6f

Browse files
authored
Merge pull request #358 from pantianying/addRlockForDubboInvoker
Fix:deal the panic when invoker destroy
2 parents 2d4022d + 873c7d9 commit 2d70b6f

File tree

2 files changed

+58
-16
lines changed

2 files changed

+58
-16
lines changed

protocol/dubbo/dubbo_invoker.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"context"
2222
"strconv"
2323
"sync"
24+
"sync/atomic"
25+
"time"
2426
)
2527

2628
import (
@@ -38,7 +40,8 @@ import (
3840

3941
var (
4042
// ErrNoReply ...
41-
ErrNoReply = perrors.New("request need @response")
43+
ErrNoReply = perrors.New("request need @response")
44+
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
4245
)
4346

4447
var (
@@ -50,13 +53,16 @@ type DubboInvoker struct {
5053
protocol.BaseInvoker
5154
client *Client
5255
quitOnce sync.Once
56+
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
57+
reqNum int64
5358
}
5459

5560
// NewDubboInvoker ...
5661
func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
5762
return &DubboInvoker{
5863
BaseInvoker: *protocol.NewBaseInvoker(url),
5964
client: client,
65+
reqNum: 0,
6066
}
6167
}
6268

@@ -66,6 +72,15 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
6672
err error
6773
result protocol.RPCResult
6874
)
75+
if di.reqNum < 0 {
76+
// Generally, the case will not happen, because the invoker has been removed
77+
// from the invoker list before destroy,so no new request will enter the destroyed invoker
78+
logger.Warnf("this dubboInvoker is destroyed")
79+
result.Err = ErrDestroyedInvoker
80+
return &result
81+
}
82+
atomic.AddInt64(&(di.reqNum), 1)
83+
defer atomic.AddInt64(&(di.reqNum), -1)
6984

7085
inv := invocation.(*invocation_impl.RPCInvocation)
7186
for _, k := range attachmentKey {
@@ -110,11 +125,21 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
110125
// Destroy ...
111126
func (di *DubboInvoker) Destroy() {
112127
di.quitOnce.Do(func() {
113-
di.BaseInvoker.Destroy()
114-
115-
if di.client != nil {
116-
di.client.Close()
128+
for {
129+
if di.reqNum == 0 {
130+
di.reqNum = -1
131+
logger.Info("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
132+
di.BaseInvoker.Destroy()
133+
if di.client != nil {
134+
di.client.Close()
135+
di.client = nil
136+
}
137+
break
138+
}
139+
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
140+
time.Sleep(1 * time.Second)
117141
}
142+
118143
})
119144
}
120145

registry/directory/directory.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
109109
}
110110

111111
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
112-
var url *common.URL
112+
var (
113+
url *common.URL
114+
oldInvoker protocol.Invoker = nil
115+
)
113116
//judge is override or others
114117
if res != nil {
115118
url = &res.Service
@@ -126,10 +129,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
126129
switch res.Action {
127130
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
128131
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
129-
dir.cacheInvoker(url)
132+
oldInvoker = dir.cacheInvoker(url)
130133
case remoting.EventTypeDel:
131134
//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
132-
dir.uncacheInvoker(url)
135+
oldInvoker = dir.uncacheInvoker(url)
133136
logger.Infof("selector delete service url{%s}", res.Service)
134137
default:
135138
return
@@ -138,8 +141,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
138141

139142
newInvokers := dir.toGroupInvokers()
140143
dir.listenerLock.Lock()
141-
defer dir.listenerLock.Unlock()
142144
dir.cacheInvokers = newInvokers
145+
dir.listenerLock.Unlock()
146+
// After dir.cacheInvokers is updated,destroy the oldInvoker
147+
// Ensure that no request will enter the oldInvoker
148+
if oldInvoker != nil {
149+
oldInvoker.Destroy()
150+
}
151+
143152
}
144153

145154
func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
@@ -177,12 +186,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
177186
return groupInvokersList
178187
}
179188

180-
func (dir *registryDirectory) uncacheInvoker(url *common.URL) {
189+
// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
190+
func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
181191
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
182-
dir.cacheInvokersMap.Delete(url.Key())
192+
if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
193+
dir.cacheInvokersMap.Delete(url.Key())
194+
return cacheInvoker.(protocol.Invoker)
195+
}
196+
return nil
183197
}
184198

185-
func (dir *registryDirectory) cacheInvoker(url *common.URL) {
199+
// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
200+
func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
186201
dir.overrideUrl(dir.GetDirectoryUrl())
187202
referenceUrl := dir.GetDirectoryUrl().SubURL
188203

@@ -193,7 +208,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
193208
}
194209
if url == nil {
195210
logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
196-
return
211+
return nil
197212
}
198213
//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
199214
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
@@ -210,10 +225,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
210225
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
211226
if newInvoker != nil {
212227
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
213-
cacheInvoker.(protocol.Invoker).Destroy()
228+
return cacheInvoker.(protocol.Invoker)
214229
}
215230
}
216231
}
232+
return nil
217233
}
218234

219235
//select the protocol invokers from the directory
@@ -239,10 +255,11 @@ func (dir *registryDirectory) IsAvailable() bool {
239255
func (dir *registryDirectory) Destroy() {
240256
//TODO:unregister & unsubscribe
241257
dir.BaseDirectory.Destroy(func() {
242-
for _, ivk := range dir.cacheInvokers {
258+
invokers := dir.cacheInvokers
259+
dir.cacheInvokers = []protocol.Invoker{}
260+
for _, ivk := range invokers {
243261
ivk.Destroy()
244262
}
245-
dir.cacheInvokers = []protocol.Invoker{}
246263
})
247264
}
248265

0 commit comments

Comments
 (0)