Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ecaf257
feat: add check box
YoWuwuuuw Jun 24, 2025
6d033b7
Merge remote-tracking branch 'origin/2.x' into 2.x
YoWuwuuuw Jul 8, 2025
4446570
feat: Data format for supporting metadata
YoWuwuuuw Jul 10, 2025
731b6bd
test: add test
YoWuwuuuw Jul 10, 2025
49c54db
feat: support server-metadata access to RegistryService
YoWuwuuuw Jul 10, 2025
e1ad806
opt: solve conflicts and opt discovery-namingserver code
YoWuwuuuw Jul 10, 2025
66702a9
fix: fix some bug and improve code readability
YoWuwuuuw Jul 10, 2025
3953c32
test: fix test for discovery-namingserver
YoWuwuuuw Jul 11, 2025
81465da
feat: enhance discovery-loadbalance to support metadata
YoWuwuuuw Jul 11, 2025
8d015b5
feat: enhance discovery-loadbalance to support metadata
YoWuwuuuw Jul 11, 2025
bab788c
feat: support WeightedRandomLoadBalance
YoWuwuuuw Jul 12, 2025
a3d50dc
test: fix test
YoWuwuuuw Jul 12, 2025
d1c4f12
test: fix test
YoWuwuuuw Jul 12, 2025
775c7b1
Merge branch '2.x' into gsoc-metadata-support
YoWuwuuuw Jul 12, 2025
9f4439d
doc: improve Java doc and improve readability
YoWuwuuuw Jul 12, 2025
8ae37ae
opt: apply spotless
YoWuwuuuw Jul 12, 2025
27d1572
opt: rabbit review
YoWuwuuuw Jul 13, 2025
24ba004
test: fix test
YoWuwuuuw Jul 13, 2025
e562774
add changes
YoWuwuuuw Jul 13, 2025
61b95e1
test: add test for ServiceInstance
YoWuwuuuw Jul 13, 2025
1c9385d
test: fix ci
YoWuwuuuw Jul 13, 2025
1b29d94
test: add test for discovery-namingserver
YoWuwuuuw Jul 13, 2025
7cfd97c
test: fix ci
YoWuwuuuw Jul 13, 2025
58806b8
test:refactor test
YoWuwuuuw Jul 16, 2025
1d727b2
fix: fix spi bug
YoWuwuuuw Aug 14, 2025
a6787cd
feat: meta-support for eureka
YoWuwuuuw Aug 24, 2025
21d67dd
Merge branch 'gsoc-2025-meta-registry' into gsoc-metadata-support-eureka
YoWuwuuuw Oct 29, 2025
eeb16cb
opt
YoWuwuuuw Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ public ServiceInstance(InetSocketAddress address, Map<String, Object> metadata)
this.metadata = metadata;
}

public ServiceInstance(Instance instance) {
this.address = new InetSocketAddress(
instance.getTransaction().getHost(), instance.getTransaction().getPort());
this.metadata = instance.getMetadata();
}

public ServiceInstance(InetSocketAddress address) {
this.address = address;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ public void testConstructorAndGetters() {

Instance instance = Instance.getInstance();
instance.setTransaction(new Node.Endpoint("127.0.0.1", 8093));

ServiceInstance instance2 = new ServiceInstance(Instance.getInstance());

assertEquals(
Instance.getInstance().getTransaction().getHost(),
instance2.getAddress().getAddress().getHostAddress());
assertEquals(
Instance.getInstance().getTransaction().getPort(),
instance2.getAddress().getPort());

instance.setTransaction(null); // clean up after test
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.netflix.appinfo.MyDataCenterInstanceConfig;
import org.apache.seata.common.util.StringUtils;

import java.util.HashMap;
import java.util.Map;

/**
* override MyDataCenterInstanceConfig for set value,
* eg: instanceId \ipAddress \ applicationName...
Expand All @@ -29,6 +32,7 @@ public class CustomEurekaInstanceConfig extends MyDataCenterInstanceConfig imple
private String instanceId;
private String ipAddress;
private int port = -1;
private Map<String, Object> metadata = new HashMap<>();

@Override
public String getInstanceId() {
Expand Down Expand Up @@ -67,6 +71,27 @@ public String getHostName(boolean refresh) {
return this.getIpAddress();
}

public Map<String, Object> getMetadata() {
return metadata;
}

public void setMetadata(Map<String, Object> metadata) {
this.metadata = metadata;
}

@Override
public Map<String, String> getMetadataMap() {
Map<String, String> stringMap = new HashMap<>();
if (metadata != null) {
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
if (entry.getValue() != null) {
stringMap.put(entry.getKey(), String.valueOf(entry.getValue()));
}
}
}
return stringMap;
}

public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -102,6 +104,8 @@ public void register(ServiceInstance instance) throws Exception {
instanceConfig.setPort(address.getPort());
instanceConfig.setApplicationName(getApplicationName());
instanceConfig.setInstanceId(getInstanceId());
instanceConfig.setMetadata(instance.getMetadata());

getEurekaClient(true);
applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.UP);
}
Expand Down Expand Up @@ -168,14 +172,20 @@ private void refreshCluster(String clusterName) {
if (application == null || CollectionUtils.isEmpty(application.getInstances())) {
LOGGER.info("refresh cluster success,but cluster empty! cluster name:{}", clusterName);
} else {
List<ServiceInstance> onlineInstanceList =
ServiceInstance.convertToServiceInstanceList(application.getInstances().stream()
.filter(instance -> InstanceInfo.InstanceStatus.UP.equals(instance.getStatus())
&& instance.getIPAddr() != null
&& instance.getPort() > 0
&& instance.getPort() < 0xFFFF)
.map(instance -> new InetSocketAddress(instance.getIPAddr(), instance.getPort()))
.collect(Collectors.toList()));
List<ServiceInstance> onlineInstanceList = application.getInstances().stream()
.filter(instance -> InstanceInfo.InstanceStatus.UP.equals(instance.getStatus())
&& instance.getIPAddr() != null
&& instance.getPort() > 0
&& instance.getPort() < 0xFFFF)
.map(instance -> {
InetSocketAddress address = new InetSocketAddress(instance.getIPAddr(), instance.getPort());
Map<String, Object> metadata = new HashMap<>();
if (instance.getMetadata() != null) {
metadata.putAll(instance.getMetadata());
}
return new ServiceInstance(address, metadata);
})
.collect(Collectors.toList());
CLUSTER_INSTANCE_MAP.put(clusterName, onlineInstanceList);

removeOfflineAddressesIfNecessary(transactionServiceGroup, clusterName, onlineInstanceList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -97,6 +99,18 @@ void testGetHostNameWhenIpAddressNotSet() throws Exception {
assertEquals(ipAddress, config.getHostName(false));
}

@Test
void testSetMetadata() {
Map<String, Object> metadata = new HashMap<>();
metadata.put("key1", "value1");
metadata.put("key2", 123);

config.setMetadata(metadata);
assertEquals(2, config.getMetadata().size());
assertEquals("value1", config.getMetadata().get("key1"));
assertEquals(123, config.getMetadata().get("key2"));
}

private Object getConfigString(String method)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
Class<?> grandparentClass = config.getClass().getSuperclass().getSuperclass();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.discovery.registry.eureka;

import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaEventListener;
import com.netflix.discovery.shared.Application;
import org.apache.seata.common.metadata.ServiceInstance;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.config.exception.ConfigNotFoundException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class EurekaRegistryServiceImplMockTest {

private EurekaClient mockEurekaClient;
private ApplicationInfoManager mockAppInfoManager;
private Application mockApplication;
private InstanceInfo mockInstanceInfo;
private EurekaRegistryServiceImpl registryService;
EurekaEventListener mockEventListener;

@BeforeEach
public void setUp() throws Exception {

mockEurekaClient = mock(EurekaClient.class);
mockAppInfoManager = mock(ApplicationInfoManager.class);
mockApplication = mock(Application.class);
mockInstanceInfo = mock(InstanceInfo.class);
mockEventListener = mock(EurekaEventListener.class);

resetSingleton();
registryService = EurekaRegistryServiceImpl.getInstance();
setStaticField(EurekaRegistryServiceImpl.class, "eurekaClient", mockEurekaClient);
setStaticField(EurekaRegistryServiceImpl.class, "applicationInfoManager", mockAppInfoManager);
}

@AfterAll
public static void tearDown() throws Exception {
resetSingleton();
}

private static void resetSingleton() throws Exception {
// Reset singleton and static fields
setStaticField(EurekaRegistryServiceImpl.class, "instance", null);
setStaticField(EurekaRegistryServiceImpl.class, "applicationInfoManager", null);
setStaticField(EurekaRegistryServiceImpl.class, "eurekaClient", null);
setStaticField(EurekaRegistryServiceImpl.class, "instanceConfig", null);
clearStaticMap(EurekaRegistryServiceImpl.class, "LISTENER_SERVICE_MAP");
clearStaticMap(EurekaRegistryServiceImpl.class, "CLUSTER_INSTANCE_MAP");
clearStaticMap(EurekaRegistryServiceImpl.class, "CLUSTER_LOCK");
}

@Test
public void testGetInstance() {
EurekaRegistryServiceImpl instance1 = EurekaRegistryServiceImpl.getInstance();
EurekaRegistryServiceImpl instance2 = EurekaRegistryServiceImpl.getInstance();
Assertions.assertEquals(instance1, instance2);
}

@Test
public void testRegister() throws Exception {
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8091);
registryService.register(new ServiceInstance(address));
CustomEurekaInstanceConfig instanceConfig = getInstanceConfig();
Assertions.assertEquals("127.0.0.1", instanceConfig.getIpAddress());
Assertions.assertEquals("default", instanceConfig.getAppname());
verify(mockAppInfoManager).setInstanceStatus(InstanceInfo.InstanceStatus.UP);
}

@Test
void testRegisterWhenEurekaClientIsNull() throws Exception {
setStaticField(EurekaRegistryServiceImpl.class, "eurekaClient", null);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8091);
registryService.register(new ServiceInstance(address));
verify(mockAppInfoManager, times(0)).setInstanceStatus(any());
}

@Test
void testSubscribe() throws Exception {
String testCluster = "TEST_CLUSTER";
registryService.subscribe(testCluster, mockEventListener);

// Verify that the listener is added to LISTENER_SERVICE_MAP
ConcurrentMap<String, List<EurekaEventListener>> listenerMap = getStaticListenerMap();
Assertions.assertTrue(listenerMap.containsKey(testCluster));
Assertions.assertTrue(listenerMap.get(testCluster).contains(mockEventListener));

// Verify that the EurekaClient has registered the listener
verify(mockEurekaClient, times(1)).registerEventListener(mockEventListener);
}

@Test
void testUnsubscribe() throws Exception {
String testCluster = "TEST_CLUSTER";
registryService.subscribe(testCluster, mockEventListener);
registryService.unsubscribe(testCluster, mockEventListener);

// Verify that the listener is removed from LISTENER_SERVICE_MAP
ConcurrentMap<String, List<EurekaEventListener>> listenerMap = getStaticListenerMap();
Assertions.assertFalse(
listenerMap.getOrDefault(testCluster, Collections.emptyList()).contains(mockEventListener));

// Verify that the EurekaClient has deregistered the listener
verify(mockEurekaClient, times(1)).unregisterEventListener(mockEventListener);
}

@Test
void testUnsubscribeWhenEurekaClientIsNull() throws Exception {
setStaticField(EurekaRegistryServiceImpl.class, "eurekaClient", null);
registryService.unsubscribe("TEST_CLUSTER", mockEventListener);
verify(mockEurekaClient, times(0)).unregisterEventListener(any());
}

@Test
void testUnsubscribeWithNoExistingListeners() throws Exception {
String testCluster = "NON_EXISTENT_CLUSTER";
registryService.unsubscribe(testCluster, mockEventListener);
verify(mockEurekaClient).unregisterEventListener(any());
}

@Test
public void testUnregister() throws Exception {
registryService.unregister(new ServiceInstance(new InetSocketAddress("127.0.0.1", 8091)));
verify(mockAppInfoManager).setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
}

@Test
public void testLookup() throws Exception {
Configuration mockConfig = mock(Configuration.class);
when(mockConfig.getConfig("service.vgroupMapping.test-group")).thenReturn("TEST-CLUSTER");

try (MockedStatic<ConfigurationFactory> mockedFactory = mockStatic(ConfigurationFactory.class)) {
mockedFactory.when(ConfigurationFactory::getInstance).thenReturn(mockConfig);

// Mock Eureka to return the application instance
when(mockEurekaClient.getApplication("TEST-CLUSTER")).thenReturn(mockApplication);
when(mockApplication.getInstances()).thenReturn(Collections.singletonList(mockInstanceInfo));
when(mockInstanceInfo.getStatus()).thenReturn(InstanceInfo.InstanceStatus.UP);
when(mockInstanceInfo.getIPAddr()).thenReturn("192.168.1.1");
when(mockInstanceInfo.getPort()).thenReturn(8091);

List<ServiceInstance> instances = registryService.lookup("test-group");

// Verify whether the transactionServiceGroup is set correctly
Field serviceGroupField = EurekaRegistryServiceImpl.class.getDeclaredField("transactionServiceGroup");
serviceGroupField.setAccessible(true);
String actualServiceGroup = (String) serviceGroupField.get(registryService);
Assertions.assertEquals("test-group", actualServiceGroup);
Assertions.assertNotNull(instances);
Assertions.assertEquals(1, instances.size());
Assertions.assertEquals(
new InetSocketAddress("192.168.1.1", 8091), instances.get(0).getAddress());
}
}

@Test
void testLookUpWithNoClusterName() {
Configuration mockConfig = mock(Configuration.class);
when(mockConfig.getConfig("service.vgroupMapping.test-group")).thenReturn(null);
try (MockedStatic<ConfigurationFactory> mockedFactory = mockStatic(ConfigurationFactory.class)) {
mockedFactory.when(ConfigurationFactory::getInstance).thenReturn(mockConfig);
Assertions.assertThrows(ConfigNotFoundException.class, () -> {
registryService.lookup("test-group");
});
}
}

@Test
public void testClose() throws Exception {
registryService.close();
verify(mockEurekaClient).shutdown();
Assertions.assertNull(getStaticField(EurekaRegistryServiceImpl.class, "eurekaClient"));
Assertions.assertNull(getStaticField(EurekaRegistryServiceImpl.class, "applicationInfoManager"));
}

// Helper method: Set static fields via reflection
private static void setStaticField(Class<?> clazz, String fieldName, Object value) throws Exception {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(null, value);
}

// Helper method: Get the value of a static field
@SuppressWarnings("unchecked")
private static <T> T getStaticField(Class<?> clazz, String fieldName) throws Exception {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
return (T) field.get(null);
}

private static void clearStaticMap(Class<?> clazz, String fieldName) throws Exception {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
((ConcurrentMap<?, ?>) field.get(null)).clear();
}

private CustomEurekaInstanceConfig getInstanceConfig() throws Exception {
return getStaticField(EurekaRegistryServiceImpl.class, "instanceConfig");
}

@SuppressWarnings("unchecked")
private static ConcurrentMap<String, List<EurekaEventListener>> getStaticListenerMap() throws Exception {
Field field = EurekaRegistryServiceImpl.class.getDeclaredField("LISTENER_SERVICE_MAP");
field.setAccessible(true);
return (ConcurrentMap<String, List<EurekaEventListener>>) field.get(null);
}
}
Loading