Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package dubbo
import (
"strconv"
"sync"
"sync/atomic"
"time"
)

import (
Expand All @@ -34,7 +36,11 @@ import (
invocation_impl "github.com/apache/dubbo-go/protocol/invocation"
)

var Err_No_Reply = perrors.New("request need @response")
var (
// ErrNoReply ...
ErrNoReply = perrors.New("request need @response")
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)

var (
attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY}
Expand All @@ -44,12 +50,15 @@ type DubboInvoker struct {
protocol.BaseInvoker
client *Client
quitOnce sync.Once
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
reqNum int64
}

func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
return &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
client: client,
reqNum: 0,
}
}

Expand All @@ -59,6 +68,15 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
err error
result protocol.RPCResult
)
if di.reqNum < 0 {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroyed")
result.Err = ErrDestroyedInvoker
return &result
}
atomic.AddInt64(&(di.reqNum), 1)
defer atomic.AddInt64(&(di.reqNum), -1)

inv := invocation.(*invocation_impl.RPCInvocation)
for _, k := range attachmentKey {
Expand All @@ -82,7 +100,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
}
} else {
if inv.Reply() == nil {
result.Err = Err_No_Reply
result.Err = ErrNoReply
} else {
result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response)
}
Expand All @@ -98,10 +116,20 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {

func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
di.BaseInvoker.Destroy()

if di.client != nil {
di.client.Close()
for {
if di.reqNum == 0 {
di.reqNum = -1
logger.Info("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
di.BaseInvoker.Destroy()
if di.client != nil {
di.client.Close()
di.client = nil
}
break
}
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
time.Sleep(1 * time.Second)
}

})
}
39 changes: 28 additions & 11 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
}

func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
var url *common.URL
var (
url *common.URL
oldInvoker protocol.Invoker = nil
)
//judge is override or others
if res != nil {
url = &res.Service
Expand All @@ -123,10 +126,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
switch res.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
dir.cacheInvoker(url)
oldInvoker = dir.cacheInvoker(url)
case remoting.EventTypeDel:
//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
dir.uncacheInvoker(url)
oldInvoker = dir.uncacheInvoker(url)
logger.Infof("selector delete service url{%s}", res.Service)
default:
return
Expand All @@ -135,8 +138,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {

newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock()
defer dir.listenerLock.Unlock()
dir.cacheInvokers = newInvokers
dir.listenerLock.Unlock()
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
if oldInvoker != nil {
oldInvoker.Destroy()
}

}

func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
Expand Down Expand Up @@ -174,12 +183,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
return groupInvokersList
}

func (dir *registryDirectory) uncacheInvoker(url *common.URL) {
// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
dir.cacheInvokersMap.Delete(url.Key())
if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
dir.cacheInvokersMap.Delete(url.Key())
return cacheInvoker.(protocol.Invoker)
}
return nil
}

func (dir *registryDirectory) cacheInvoker(url *common.URL) {
// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL

Expand All @@ -190,7 +205,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
}
if url == nil {
logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
return
return nil
}
//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
Expand All @@ -207,10 +222,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
cacheInvoker.(protocol.Invoker).Destroy()
return cacheInvoker.(protocol.Invoker)
}
}
}
return nil
}

//select the protocol invokers from the directory
Expand All @@ -235,10 +251,11 @@ func (dir *registryDirectory) IsAvailable() bool {
func (dir *registryDirectory) Destroy() {
//TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy(func() {
for _, ivk := range dir.cacheInvokers {
invokers := dir.cacheInvokers
dir.cacheInvokers = []protocol.Invoker{}
for _, ivk := range invokers {
ivk.Destroy()
}
dir.cacheInvokers = []protocol.Invoker{}
})
}

Expand Down