Skip to content

Commit 4eaff7a

Browse files
murong00merlimat
authored andcommitted
[Issue 3475][pulsar-client] Add cluster checking before operating tenants with it (#3476)
* Add cluster checking before operating tenants with it. * Check on server side and add a unit test for this part of changes * Fix some unit tests to pass this change * Fix some unit tests to pass this change * Fix NPE when TenantInfo is null * Fix a unit test to pass this change
1 parent e32b45a commit 4eaff7a

14 files changed

+114
-31
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.pulsar.broker.admin.impl;
2020

2121
import java.util.List;
22+
import java.util.Set;
23+
import java.util.stream.Collectors;
2224

2325
import javax.ws.rs.DELETE;
2426
import javax.ws.rs.GET;
@@ -84,10 +86,12 @@ public TenantInfo getTenantAdmin(@PathParam("tenant") String tenant) {
8486
@ApiOperation(value = "Create a new tenant.", notes = "This operation requires Pulsar super-user privileges.")
8587
@ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
8688
@ApiResponse(code = 409, message = "Tenant already exists"),
87-
@ApiResponse(code = 412, message = "Tenant name is not valid") })
89+
@ApiResponse(code = 412, message = "Tenant name is not valid"),
90+
@ApiResponse(code = 412, message = "Clusters do not exist") })
8891
public void createTenant(@PathParam("tenant") String tenant, TenantInfo config) {
8992
validateSuperUserAccess();
9093
validatePoliciesReadOnlyAccess();
94+
validateClusters(config);
9195

9296
try {
9397
NamedEntity.checkName(tenant);
@@ -113,10 +117,12 @@ public void createTenant(@PathParam("tenant") String tenant, TenantInfo config)
113117
@ApiOperation(value = "Update the admins for a tenant.", notes = "This operation requires Pulsar super-user privileges.")
114118
@ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
115119
@ApiResponse(code = 404, message = "Tenant does not exist"),
116-
@ApiResponse(code = 409, message = "Tenant already exists") })
120+
@ApiResponse(code = 409, message = "Tenant already exists"),
121+
@ApiResponse(code = 412, message = "Clusters do not exist") })
117122
public void updateTenant(@PathParam("tenant") String tenant, TenantInfo newTenantAdmin) {
118123
validateSuperUserAccess();
119124
validatePoliciesReadOnlyAccess();
125+
validateClusters(newTenantAdmin);
120126

121127
Stat nodeStat = new Stat();
122128
try {
@@ -202,5 +208,26 @@ public void deleteTenant(@PathParam("tenant") String tenant) {
202208
}
203209
}
204210

211+
private void validateClusters(TenantInfo info) {
212+
List<String> nonexistentClusters;
213+
try {
214+
if (info == null) {
215+
info = new TenantInfo();
216+
}
217+
Set<String> availableClusters = clustersListCache().get();
218+
Set<String> allowedClusters = info.getAllowedClusters();
219+
nonexistentClusters = allowedClusters.stream()
220+
.filter(cluster -> !availableClusters.contains(cluster))
221+
.collect(Collectors.toList());
222+
} catch (Exception e) {
223+
log.error("[{}] Failed to get available clusters", clientAppId(), e);
224+
throw new RestException(e);
225+
}
226+
if (nonexistentClusters.size() > 0) {
227+
log.warn("[{}] Failed to validate due to clusters {} do not exist", clientAppId(), nonexistentClusters);
228+
throw new RestException(Status.PRECONDITION_FAILED, "Clusters do not exist");
229+
}
230+
}
231+
205232
private static final Logger log = LoggerFactory.getLogger(TenantsBase.class);
206233
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Map;
3939
import java.util.Set;
4040
import java.util.concurrent.TimeUnit;
41+
import javax.ws.rs.core.Response.Status;
4142

4243
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
4344
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -817,14 +818,44 @@ public void testTenantNameWithInvalidCharacters() throws Exception {
817818
admin.tenants().createTenant("prop xyz", tenantInfo);
818819
fail("Should have failed");
819820
} catch (PulsarAdminException e) {
820-
// Expected
821+
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
821822
}
822823

823824
try {
824825
admin.tenants().createTenant("prop&xyz", tenantInfo);
825826
fail("Should have failed");
826827
} catch (PulsarAdminException e) {
827-
// Expected
828+
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
829+
}
830+
}
831+
832+
@Test
833+
public void testTenantWithNonexistentClusters() throws Exception {
834+
// Check non-existing cluster
835+
assertTrue(!admin.clusters().getClusters().contains("cluster-non-existing"));
836+
837+
Set<String> allowedClusters = Sets.newHashSet("cluster-non-existing");
838+
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), allowedClusters);
839+
840+
// If we try to create tenant with nonexistent clusters, it should fail immediately
841+
try {
842+
admin.tenants().createTenant("test-tenant", tenantInfo);
843+
fail("Should have failed");
844+
} catch (PulsarAdminException e) {
845+
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
846+
}
847+
848+
assertTrue(!admin.tenants().getTenants().contains("test-tenant"));
849+
850+
// Check existing tenant
851+
assertTrue(admin.tenants().getTenants().contains("prop-xyz"));
852+
853+
// If we try to update existing tenant with nonexistent clusters, it should fail immediately
854+
try {
855+
admin.tenants().updateTenant("prop-xyz", tenantInfo);
856+
fail("Should have failed");
857+
} catch (PulsarAdminException e) {
858+
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
828859
}
829860
}
830861

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.pulsar.client.api.PulsarClient;
3030
import org.apache.pulsar.client.api.Schema;
3131
import org.apache.pulsar.common.policies.data.AuthAction;
32+
import org.apache.pulsar.common.policies.data.ClusterData;
3233
import org.apache.pulsar.common.policies.data.TenantInfo;
3334
import org.apache.pulsar.common.util.SecurityUtility;
3435
import org.glassfish.jersey.client.ClientConfig;
@@ -80,6 +81,10 @@ public void setup() throws Exception {
8081
conf.setBrokerClientTlsEnabled(true);
8182

8283
super.internalSetup();
84+
85+
PulsarAdmin admin = buildAdminClient("admin");
86+
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
87+
admin.close();
8388
}
8489

8590
@AfterMethod

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,48 +19,26 @@
1919
package org.apache.pulsar.broker.admin;
2020

2121
import com.google.common.collect.ImmutableSet;
22-
23-
import java.util.List;
24-
25-
import static org.testng.Assert.fail;
26-
27-
import java.lang.reflect.Method;
28-
import java.security.cert.X509Certificate;
29-
import javax.net.ssl.SSLContext;
30-
import javax.ws.rs.NotAuthorizedException;
31-
import javax.ws.rs.client.Client;
32-
import javax.ws.rs.client.ClientBuilder;
33-
import javax.ws.rs.client.WebTarget;
34-
import javax.ws.rs.core.GenericType;
35-
import javax.ws.rs.core.MediaType;
36-
3722
import lombok.extern.slf4j.Slf4j;
38-
3923
import org.apache.bookkeeper.test.PortManager;
40-
import org.apache.http.conn.ssl.NoopHostnameVerifier;
4124
import org.apache.pulsar.broker.PulsarService;
4225
import org.apache.pulsar.broker.ServiceConfiguration;
4326
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
4427
import org.apache.pulsar.client.admin.PulsarAdmin;
4528
import org.apache.pulsar.client.admin.PulsarAdminException;
46-
import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
4729
import org.apache.pulsar.common.policies.data.AuthAction;
48-
import org.apache.pulsar.common.policies.data.AuthPolicies;
4930
import org.apache.pulsar.common.policies.data.BundlesData;
31+
import org.apache.pulsar.common.policies.data.ClusterData;
5032
import org.apache.pulsar.common.policies.data.Policies;
5133
import org.apache.pulsar.common.policies.data.TenantInfo;
52-
import org.apache.pulsar.common.util.SecurityUtility;
53-
54-
import org.glassfish.jersey.client.ClientConfig;
55-
import org.glassfish.jersey.client.ClientProperties;
56-
import org.glassfish.jersey.jackson.JacksonFeature;
57-
import org.glassfish.jersey.media.multipart.MultiPartFeature;
58-
59-
import org.testng.Assert;
6034
import org.testng.annotations.AfterMethod;
6135
import org.testng.annotations.BeforeMethod;
6236
import org.testng.annotations.Test;
6337

38+
import java.lang.reflect.Method;
39+
40+
import static org.testng.Assert.fail;
41+
6442
@Slf4j
6543
public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest {
6644
protected String methodName;
@@ -143,6 +121,7 @@ public void testPersistentList() throws Exception {
143121

144122
/***** Broker 2 Started *****/
145123
try (PulsarAdmin admin = buildAdminClient("superproxy")) {
124+
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
146125
admin.tenants().createTenant("tenant",
147126
new TenantInfo(ImmutableSet.of("admin"),
148127
ImmutableSet.of("test")));

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ protected void setup() throws Exception {
7979
doNothing().when(persistentTopics).validateAdminAccessForTenant(this.testTenant);
8080
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData();
8181
admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
82+
admin.clusters().createCluster("test", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
8283
admin.tenants().createTenant(this.testTenant,
8384
new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet(testLocalCluster, "test")));
8485
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet(testLocalCluster, "test"));

pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep
169169
authTls.configure(authParams);
170170
internalSetup(authTls);
171171

172+
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
173+
172174
admin.tenants().createTenant("my-property",
173175
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
174176
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
@@ -185,6 +187,8 @@ public void testBasicCryptSyncProducerAndConsumer(int batchMessageDelayMs) throw
185187
authPassword.configure("{\"userId\":\"superUser\",\"password\":\"supepass\"}");
186188
internalSetup(authPassword);
187189

190+
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
191+
188192
admin.tenants().createTenant("my-property",
189193
new TenantInfo(Sets.newHashSet(), Sets.newHashSet("test")));
190194
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
@@ -201,6 +205,8 @@ public void testBasicArp1SyncProducerAndConsumer(int batchMessageDelayMs) throws
201205
authPassword.configure("{\"userId\":\"superUser2\",\"password\":\"superpassword\"}");
202206
internalSetup(authPassword);
203207

208+
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
209+
204210
admin.tenants().createTenant("my-property",
205211
new TenantInfo(Sets.newHashSet(), Sets.newHashSet("test")));
206212
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));

pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
3535
import org.apache.pulsar.client.admin.PulsarAdmin;
3636
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
37+
import org.apache.pulsar.common.policies.data.ClusterData;
3738
import org.apache.pulsar.common.policies.data.TenantInfo;
3839
import org.slf4j.Logger;
3940
import org.slf4j.LoggerFactory;
@@ -115,6 +116,8 @@ protected void setupClient() throws Exception {
115116
.authentication(authTls).enableTls(true).enableTlsHostnameVerification(hostnameVerificationEnabled)
116117
.build();
117118

119+
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
120+
118121
admin.tenants().createTenant("my-property",
119122
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
120123
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));

pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.pulsar.common.naming.NamespaceName;
4646
import org.apache.pulsar.common.naming.TopicName;
4747
import org.apache.pulsar.common.policies.data.AuthAction;
48+
import org.apache.pulsar.common.policies.data.ClusterData;
4849
import org.apache.pulsar.common.policies.data.TenantInfo;
4950
import org.slf4j.Logger;
5051
import org.slf4j.LoggerFactory;
@@ -117,6 +118,8 @@ public void testProducerAndConsumerAuthorization() throws Exception {
117118
PulsarClient pulsarClientInvalidRole = PulsarClient.builder().serviceUrl(lookupUrl)
118119
.authentication(authenticationInvalidRole).build();
119120

121+
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
122+
120123
admin.tenants().createTenant("my-property",
121124
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
122125
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
@@ -180,6 +183,8 @@ public void testSubscriberPermission() throws Exception {
180183

181184
Authentication authentication = new ClientAuthentication(subscriptionRole);
182185

186+
superAdmin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
187+
183188
superAdmin.tenants().createTenant("my-property",
184189
new TenantInfo(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
185190
superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
@@ -261,6 +266,8 @@ public void testSubscriptionPrefixAuthorization() throws Exception {
261266

262267
pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).authentication(authentication).build();
263268

269+
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
270+
264271
admin.tenants().createTenant("prop-prefix",
265272
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
266273
admin.namespaces().createNamespace("prop-prefix/ns", Sets.newHashSet("test"));

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.pulsar.client.api.ProducerConsumerBase;
5151
import org.apache.pulsar.client.api.PulsarClientException;
5252
import org.apache.pulsar.client.api.SubscriptionType;
53+
import org.apache.pulsar.common.policies.data.ClusterData;
5354
import org.apache.pulsar.common.policies.data.TenantInfo;
5455
import org.slf4j.Logger;
5556
import org.slf4j.LoggerFactory;
@@ -780,6 +781,8 @@ public void testDefaultBacklogTTL() throws Exception {
780781
final String topicName = "persistent://" + namespace + "/expiry";
781782
final String subName = "expiredSub";
782783

784+
admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString()));
785+
783786
admin.tenants().createTenant("prop", new TenantInfo(null, Sets.newHashSet("use")));
784787
admin.namespaces().createNamespace(namespace);
785788

pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.pulsar.client.admin.PulsarAdminException;
3333
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
3434
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
35+
import org.apache.pulsar.common.policies.data.ClusterData;
3536
import org.apache.pulsar.common.policies.data.TenantInfo;
3637
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
3738
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
@@ -151,6 +152,8 @@ public void testAuthenticatedProxyAsNonAdmin() throws Exception {
151152
// expected
152153
}
153154

155+
brokerAdmin.clusters().createCluster(configClusterName, new ClusterData(brokerUrl.toString()));
156+
154157
brokerAdmin.tenants().createTenant("tenant1",
155158
new TenantInfo(ImmutableSet.of("user1"),
156159
ImmutableSet.of(configClusterName)));

0 commit comments

Comments
 (0)