Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.admin.impl;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
Expand Down Expand Up @@ -84,10 +86,12 @@ public TenantInfo getTenantAdmin(@PathParam("tenant") String tenant) {
@ApiOperation(value = "Create a new tenant.", notes = "This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 409, message = "Tenant already exists"),
@ApiResponse(code = 412, message = "Tenant name is not valid") })
@ApiResponse(code = 412, message = "Tenant name is not valid"),
@ApiResponse(code = 412, message = "Clusters do not exist") })
public void createTenant(@PathParam("tenant") String tenant, TenantInfo config) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
validateClusters(config);

try {
NamedEntity.checkName(tenant);
Expand All @@ -113,10 +117,12 @@ public void createTenant(@PathParam("tenant") String tenant, TenantInfo config)
@ApiOperation(value = "Update the admins for a tenant.", notes = "This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 409, message = "Tenant already exists") })
@ApiResponse(code = 409, message = "Tenant already exists"),
@ApiResponse(code = 412, message = "Clusters do not exist") })
public void updateTenant(@PathParam("tenant") String tenant, TenantInfo newTenantAdmin) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
validateClusters(newTenantAdmin);

Stat nodeStat = new Stat();
try {
Expand Down Expand Up @@ -202,5 +208,26 @@ public void deleteTenant(@PathParam("tenant") String tenant) {
}
}

private void validateClusters(TenantInfo info) {
List<String> nonexistentClusters;
try {
if (info == null) {
info = new TenantInfo();
}
Set<String> availableClusters = clustersListCache().get();
Set<String> allowedClusters = info.getAllowedClusters();
nonexistentClusters = allowedClusters.stream()
.filter(cluster -> !availableClusters.contains(cluster))
.collect(Collectors.toList());
} catch (Exception e) {
log.error("[{}] Failed to get available clusters", clientAppId(), e);
throw new RestException(e);
}
if (nonexistentClusters.size() > 0) {
log.warn("[{}] Failed to validate due to clusters {} do not exist", clientAppId(), nonexistentClusters);
throw new RestException(Status.PRECONDITION_FAILED, "Clusters do not exist");
}
}

private static final Logger log = LoggerFactory.getLogger(TenantsBase.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Response.Status;

import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
Expand Down Expand Up @@ -817,14 +818,44 @@ public void testTenantNameWithInvalidCharacters() throws Exception {
admin.tenants().createTenant("prop xyz", tenantInfo);
fail("Should have failed");
} catch (PulsarAdminException e) {
// Expected
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
}

try {
admin.tenants().createTenant("prop&xyz", tenantInfo);
fail("Should have failed");
} catch (PulsarAdminException e) {
// Expected
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
}
}

@Test
public void testTenantWithNonexistentClusters() throws Exception {
// Check non-existing cluster
assertTrue(!admin.clusters().getClusters().contains("cluster-non-existing"));

Set<String> allowedClusters = Sets.newHashSet("cluster-non-existing");
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), allowedClusters);

// If we try to create tenant with nonexistent clusters, it should fail immediately
try {
admin.tenants().createTenant("test-tenant", tenantInfo);
fail("Should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
}

assertTrue(!admin.tenants().getTenants().contains("test-tenant"));

// Check existing tenant
assertTrue(admin.tenants().getTenants().contains("prop-xyz"));

// If we try to update existing tenant with nonexistent clusters, it should fail immediately
try {
admin.tenants().updateTenant("prop-xyz", tenantInfo);
fail("Should have failed");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.SecurityUtility;
import org.glassfish.jersey.client.ClientConfig;
Expand Down Expand Up @@ -80,6 +81,10 @@ public void setup() throws Exception {
conf.setBrokerClientTlsEnabled(true);

super.internalSetup();

PulsarAdmin admin = buildAdminClient("admin");
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
admin.close();
}

@AfterMethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,26 @@
package org.apache.pulsar.broker.admin;

import com.google.common.collect.ImmutableSet;

import java.util.List;

import static org.testng.Assert.fail;

import java.lang.reflect.Method;
import java.security.cert.X509Certificate;
import javax.net.ssl.SSLContext;
import javax.ws.rs.NotAuthorizedException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;

import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.test.PortManager;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.SecurityUtility;

import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.jackson.JacksonFeature;
import org.glassfish.jersey.media.multipart.MultiPartFeature;

import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.reflect.Method;

import static org.testng.Assert.fail;

@Slf4j
public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest {
protected String methodName;
Expand Down Expand Up @@ -143,6 +121,7 @@ public void testPersistentList() throws Exception {

/***** Broker 2 Started *****/
try (PulsarAdmin admin = buildAdminClient("superproxy")) {
admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
admin.tenants().createTenant("tenant",
new TenantInfo(ImmutableSet.of("admin"),
ImmutableSet.of("test")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ protected void setup() throws Exception {
doNothing().when(persistentTopics).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData();
admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
admin.clusters().createCluster("test", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
admin.tenants().createTenant(this.testTenant,
new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet(testLocalCluster, "test")));
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet(testLocalCluster, "test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep
authTls.configure(authParams);
internalSetup(authTls);

admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
Expand All @@ -185,6 +187,8 @@ public void testBasicCryptSyncProducerAndConsumer(int batchMessageDelayMs) throw
authPassword.configure("{\"userId\":\"superUser\",\"password\":\"supepass\"}");
internalSetup(authPassword);

admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet(), Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
Expand All @@ -201,6 +205,8 @@ public void testBasicArp1SyncProducerAndConsumer(int batchMessageDelayMs) throws
authPassword.configure("{\"userId\":\"superUser2\",\"password\":\"superpassword\"}");
internalSetup(authPassword);

admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet(), Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -115,6 +116,8 @@ protected void setupClient() throws Exception {
.authentication(authTls).enableTls(true).enableTlsHostnameVerification(hostnameVerificationEnabled)
.build();

admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -117,6 +118,8 @@ public void testProducerAndConsumerAuthorization() throws Exception {
PulsarClient pulsarClientInvalidRole = PulsarClient.builder().serviceUrl(lookupUrl)
.authentication(authenticationInvalidRole).build();

admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
Expand Down Expand Up @@ -180,6 +183,8 @@ public void testSubscriberPermission() throws Exception {

Authentication authentication = new ClientAuthentication(subscriptionRole);

superAdmin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));

superAdmin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
Expand Down Expand Up @@ -261,6 +266,8 @@ public void testSubscriptionPrefixAuthorization() throws Exception {

pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).authentication(authentication).build();

admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("prop-prefix",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("prop-prefix/ns", Sets.newHashSet("test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -780,6 +781,8 @@ public void testDefaultBacklogTTL() throws Exception {
final String topicName = "persistent://" + namespace + "/expiry";
final String subName = "expiredSub";

admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("prop", new TenantInfo(null, Sets.newHashSet("use")));
admin.namespaces().createNamespace(namespace);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
Expand Down Expand Up @@ -151,6 +152,8 @@ public void testAuthenticatedProxyAsNonAdmin() throws Exception {
// expected
}

brokerAdmin.clusters().createCluster(configClusterName, new ClusterData(brokerUrl.toString()));

brokerAdmin.tenants().createTenant("tenant1",
new TenantInfo(ImmutableSet.of("user1"),
ImmutableSet.of(configClusterName)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.mockito.Mockito;
import org.slf4j.Logger;
Expand Down Expand Up @@ -164,6 +165,8 @@ public void testProxyAuthorization() throws Exception {

String namespaceName = "my-property/proxy-authorization-neg/my-ns";

admin.clusters().createCluster("proxy-authorization-neg", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization-neg")));
admin.namespaces().createNamespace(namespaceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.mockito.Mockito;
import org.slf4j.Logger;
Expand Down Expand Up @@ -229,6 +230,8 @@ public void testProxyAuthorization() throws Exception {

String namespaceName = "my-property/proxy-authorization/my-ns";

admin.clusters().createCluster("proxy-authorization", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
admin.namespaces().createNamespace(namespaceName);
Expand Down Expand Up @@ -281,6 +284,8 @@ public void testTlsHostVerificationProxyToClient(boolean hostnameVerificationEna

String namespaceName = "my-property/proxy-authorization/my-ns";

admin.clusters().createCluster("proxy-authorization", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
admin.namespaces().createNamespace(namespaceName);
Expand Down Expand Up @@ -331,6 +336,8 @@ public void testTlsHostVerificationProxyToBroker(boolean hostnameVerificationEna

String namespaceName = "my-property/proxy-authorization/my-ns";

admin.clusters().createCluster("proxy-authorization", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
admin.namespaces().createNamespace(namespaceName);
Expand Down Expand Up @@ -366,6 +373,8 @@ public void tlsCiphersAndProtocols(Set<String> tlsCiphers, Set<String> tlsProtoc
String namespaceName = "my-property/proxy-authorization/my-ns";
createAdminClient();

admin.clusters().createCluster("proxy-authorization", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
admin.namespaces().createNamespace(namespaceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.mockito.Mockito;
import org.slf4j.Logger;
Expand Down Expand Up @@ -158,6 +159,8 @@ public void testDiscoveryService() throws Exception {
// create a client which connects to proxy over tls and pass authData
PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl);

admin.clusters().createCluster("without-service-discovery", new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property", new TenantInfo(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("without-service-discovery")));
admin.namespaces().createNamespace("my-property/without-service-discovery/my-ns");
Expand Down
Loading