Skip to content

Commit a09356a

Browse files
authored
Fix/duplicated connect (#117)
* chore: update version to 5.4.3-SNAPSHOT * fix: duplicated connectId * fix: duplicated connectId and format * fix: testcase * fix: testcase * fix: testcase * fix: testcase
1 parent bf50669 commit a09356a

File tree

21 files changed

+134
-41
lines changed

21 files changed

+134
-41
lines changed

server/common/model/src/main/java/com/alipay/sofa/registry/common/model/constants/ValueConstants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323
*/
2424
public class ValueConstants {
2525

26+
/**
27+
* connectId: sourceAddress_targetAddress
28+
*/
29+
public static final String CONNECT_ID_SPLIT = "_";
30+
2631
/**
2732
* The constant DEFAULT_GROUP.
2833
*/

server/common/model/src/main/java/com/alipay/sofa/registry/common/model/store/BaseInfo.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public abstract class BaseInfo implements Serializable, StoreData<String> {
5151

5252
private URL sourceAddress;
5353

54+
private URL targetAddress;
55+
5456
private ClientVersion clientVersion;
5557

5658
private String group;
@@ -175,6 +177,24 @@ public void setSourceAddress(URL sourceAddress) {
175177
this.sourceAddress = sourceAddress;
176178
}
177179

180+
/**
181+
* Getter method for property <tt>targetAddress</tt>.
182+
*
183+
* @return property value of targetAddress
184+
*/
185+
public URL getTargetAddress() {
186+
return targetAddress;
187+
}
188+
189+
/**
190+
* Setter method for property <tt>targetAddress</tt>.
191+
*
192+
* @param targetAddress value to be assigned to property targetAddress
193+
*/
194+
public void setTargetAddress(URL targetAddress) {
195+
this.targetAddress = targetAddress;
196+
}
197+
178198
/**
179199
* Getter method for property <tt>attributes</tt>.
180200
*

server/server/data/src/main/java/com/alipay/sofa/registry/server/data/cache/LocalDatumStorage.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Set;
2424
import java.util.concurrent.ConcurrentHashMap;
2525

26+
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
2627
import org.springframework.beans.factory.annotation.Autowired;
2728

2829
import com.alipay.sofa.registry.common.model.dataserver.Datum;
@@ -409,7 +410,9 @@ private void addToIndex(Publisher publisher) {
409410
}
410411

411412
private String getConnectId(Publisher cachePub) {
412-
return WordCache.getInstance().getWordCache(cachePub.getSourceAddress().getAddressString());
413+
return WordCache.getInstance().getWordCache(
414+
cachePub.getSourceAddress().getAddressString() + ValueConstants.CONNECT_ID_SPLIT
415+
+ cachePub.getTargetAddress().getAddressString());
413416
}
414417

415418
/**

server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/PublishDataHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.Executor;
2020
import java.util.concurrent.ThreadPoolExecutor;
2121

22+
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
2223
import org.springframework.beans.factory.annotation.Autowired;
2324

2425
import com.alipay.sofa.registry.common.model.CommonResponse;
@@ -99,7 +100,8 @@ public Object doHandle(Channel channel, PublishDataRequest request) {
99100

100101
if (publisher.getPublishType() != PublishType.TEMPORARY) {
101102
String connectId = WordCache.getInstance().getWordCache(
102-
publisher.getSourceAddress().getAddressString());
103+
publisher.getSourceAddress().getAddressString() + ValueConstants.CONNECT_ID_SPLIT
104+
+ publisher.getTargetAddress().getAddressString());
103105
sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(),
104106
connectId);
105107
// record the renew timestamp

server/server/data/src/main/java/com/alipay/sofa/registry/server/data/remoting/sessionserver/handler/UnPublishDataHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.Executor;
2121
import java.util.concurrent.ThreadPoolExecutor;
2222

23+
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
2324
import org.springframework.beans.factory.annotation.Autowired;
2425

2526
import com.alipay.sofa.registry.common.model.CommonResponse;
@@ -115,7 +116,9 @@ private String getConnectId(UnPublishDataRequest request) {
115116
if (pubMap != null) {
116117
Publisher publisher = pubMap.get(request.getRegisterId());
117118
if (publisher != null) {
118-
return publisher.getSourceAddress().getAddressString();
119+
return publisher.getSourceAddress().getAddressString()
120+
+ ValueConstants.CONNECT_ID_SPLIT
121+
+ publisher.getTargetAddress().getAddressString();
119122
}
120123
}
121124
}

server/server/data/src/main/java/com/alipay/sofa/registry/server/data/resource/DataDigestResource.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import javax.ws.rs.QueryParam;
3434
import javax.ws.rs.core.MediaType;
3535

36+
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
3637
import org.springframework.beans.factory.annotation.Autowired;
3738

3839
import com.alipay.remoting.Connection;
@@ -105,8 +106,8 @@ public Map<String, Datum> getDatumByDataInfoId(@QueryParam("dataId") String data
105106
public Map<String, Map<String, Publisher>> getPublishersByConnectId(Map<String, String> map) {
106107
Map<String, Map<String, Publisher>> ret = new HashMap<>();
107108
if (map != null && !map.isEmpty()) {
108-
map.forEach((ip, port) -> {
109-
String connectId = NetUtil.genHost(ip, Integer.valueOf(port));
109+
map.forEach((clientConnectId, sessionConnectId) -> {
110+
String connectId = clientConnectId + ValueConstants.CONNECT_ID_SPLIT + sessionConnectId;
110111
if (!connectId.isEmpty()) {
111112
Map<String, Publisher> publisherMap = datumCache.getByConnectId(connectId);
112113
if (publisherMap != null && !publisherMap.isEmpty()) {

server/server/session/src/main/java/com/alipay/sofa/registry/server/session/provideData/processor/BlackListProvideDataProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ public List<String> getIpConnects(Set<String> _ipList) {
119119
String key = NetUtil.toAddressString(channel.getRemoteAddress());
120120
String ip = key.substring(0, key.indexOf(":"));
121121
if (_ipList.contains(ip)) {
122-
connections.add(key);
122+
connections.add(key + ValueConstants.CONNECT_ID_SPLIT
123+
+ NetUtil.toAddressString(channel.getLocalAddress()));
123124
}
124125
}
125126
}

server/server/session/src/main/java/com/alipay/sofa/registry/server/session/registry/SessionRegistry.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ public WriteDataRequestType getRequestType() {
163163

164164
@Override
165165
public String getConnectId() {
166-
return publisher.getSourceAddress().getAddressString();
166+
return publisher.getSourceAddress().getAddressString() + ValueConstants.CONNECT_ID_SPLIT
167+
+ publisher.getTargetAddress().getAddressString();
167168
}
168169

169170
@Override
@@ -234,7 +235,9 @@ public WriteDataRequestType getRequestType() {
234235

235236
@Override
236237
public String getConnectId() {
237-
return publisher.getSourceAddress().getAddressString();
238+
return publisher.getSourceAddress().getAddressString()
239+
+ ValueConstants.CONNECT_ID_SPLIT
240+
+ publisher.getTargetAddress().getAddressString();
238241
}
239242

240243
@Override

server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/handler/ClientNodeConnectionHandler.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ private void fireCancelClient(Channel channel) {
124124
//avoid block connect ConnectionEventExecutor thread pool
125125
executorManager.getConnectClientExecutor().execute(() -> {
126126

127-
String connectId = NetUtil.toAddressString(channel.getRemoteAddress());
127+
String connectId = NetUtil.toAddressString(channel.getRemoteAddress()) + ValueConstants.CONNECT_ID_SPLIT +
128+
NetUtil.toAddressString(channel.getLocalAddress());
128129
if (checkCache(connectId)) {
129130
List<String> connectIds = new ArrayList<>();
130131
connectIds.add(connectId);
@@ -159,12 +160,13 @@ private boolean checkWatcher(String connectId) {
159160

160161
private void fireRenewDatum(Channel channel) {
161162
executorManager.getConnectClientExecutor().execute(() -> {
162-
String connectId = NetUtil.toAddressString(channel.getRemoteAddress());
163+
String connectId = NetUtil.toAddressString(channel.getRemoteAddress()) + ValueConstants.CONNECT_ID_SPLIT
164+
+ NetUtil.toAddressString(channel.getLocalAddress());
163165
RENEW_LOGGER.info("Renew task is started: {}", connectId);
164166
recycleAsyncHashedWheelTimer.newTimeout(timerOut -> sessionRegistry.renewDatum(connectId), randomDelay(),
165167
sessionServerConfig.getRenewDatumWheelTaskDelaySec(), TimeUnit.SECONDS, () -> {
166168
Server sessionServer = boltExchange.getServer(sessionServerConfig.getServerPort());
167-
Channel channelClient = sessionServer.getChannel(URL.valueOf(connectId));
169+
Channel channelClient = sessionServer.getChannel(URL.valueOf(connectId.split(ValueConstants.CONNECT_ID_SPLIT)[0]));
168170
boolean shouldContinue = channelClient != null && channel.isConnected();
169171
if (!shouldContinue) {
170172
RENEW_LOGGER.info("Renew task is stop: {}", connectId);

server/server/session/src/main/java/com/alipay/sofa/registry/server/session/scheduler/task/SessionRegisterDataTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.HashSet;
2121
import java.util.Set;
2222

23+
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
2324
import com.alipay.sofa.registry.common.model.dataserver.SessionServerRegisterRequest;
2425
import com.alipay.sofa.registry.log.Logger;
2526
import com.alipay.sofa.registry.log.LoggerFactory;
@@ -82,7 +83,8 @@ public void setTaskEvent(TaskEvent taskEvent) {
8283

8384
Collection<Channel> chs = sessionServer.getChannels();
8485
Set<String> connectIds = new HashSet<>();
85-
chs.forEach(channel -> connectIds.add(NetUtil.toAddressString(channel.getRemoteAddress())));
86+
chs.forEach(channel -> connectIds.add(NetUtil.toAddressString(channel.getRemoteAddress())
87+
+ ValueConstants.CONNECT_ID_SPLIT + NetUtil.toAddressString(channel.getLocalAddress())));
8688

8789
sessionServerRegisterRequest = new SessionServerRegisterRequest(
8890
SessionProcessIdGenerator.getSessionProcessId(), connectIds);

0 commit comments

Comments
 (0)