Skip to content

Commit dacfdff

Browse files
mattisonchaonikhil-ctds
authored andcommitted
[improve][broker] Add fine-grain authorization to retention admin API (apache#22163)
(cherry picked from commit 6ec473e) (cherry picked from commit 0256117)
1 parent 487000a commit dacfdff

File tree

2 files changed

+226
-3
lines changed

2 files changed

+226
-3
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2224,7 +2224,8 @@ public void getRetention(@Suspended final AsyncResponse asyncResponse,
22242224
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
22252225
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
22262226
validateTopicName(tenant, namespace, encodedTopic);
2227-
preValidation(authoritative)
2227+
validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.READ)
2228+
.thenCompose(__ -> preValidation(authoritative))
22282229
.thenCompose(__ -> internalGetRetention(applied, isGlobal))
22292230
.thenAccept(asyncResponse::resume)
22302231
.exceptionally(ex -> {
@@ -2251,7 +2252,8 @@ public void setRetention(@Suspended final AsyncResponse asyncResponse,
22512252
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
22522253
@ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) {
22532254
validateTopicName(tenant, namespace, encodedTopic);
2254-
preValidation(authoritative)
2255+
validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE)
2256+
.thenCompose(__ -> preValidation(authoritative))
22552257
.thenCompose(__ -> internalSetRetention(retention, isGlobal))
22562258
.thenRun(() -> {
22572259
try {
@@ -2287,7 +2289,8 @@ public void removeRetention(@Suspended final AsyncResponse asyncResponse,
22872289
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
22882290
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
22892291
validateTopicName(tenant, namespace, encodedTopic);
2290-
preValidation(authoritative)
2292+
validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE)
2293+
.thenCompose(__ -> preValidation(authoritative))
22912294
.thenCompose(__ -> internalRemoveRetention(isGlobal))
22922295
.thenRun(() -> {
22932296
log.info("[{}] Successfully remove retention: namespace={}, topic={}",
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.admin;
20+
21+
import static org.awaitility.Awaitility.await;
22+
import io.jsonwebtoken.Jwts;
23+
import io.jsonwebtoken.SignatureAlgorithm;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.Properties;
27+
import java.util.Set;
28+
import java.util.UUID;
29+
import javax.crypto.SecretKey;
30+
import lombok.Cleanup;
31+
import lombok.SneakyThrows;
32+
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
33+
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
34+
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
35+
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
36+
import org.apache.pulsar.client.admin.PulsarAdmin;
37+
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
38+
import org.apache.pulsar.client.admin.PulsarAdminException;
39+
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
40+
import org.apache.pulsar.common.policies.data.AuthAction;
41+
import org.apache.pulsar.common.policies.data.RetentionPolicies;
42+
import org.apache.pulsar.common.policies.data.TenantInfo;
43+
import org.apache.pulsar.common.util.ObjectMapperFactory;
44+
import org.testng.Assert;
45+
import org.testng.annotations.AfterClass;
46+
import org.testng.annotations.BeforeClass;
47+
import org.testng.annotations.Test;
48+
49+
50+
public final class TopicPoliciesAuthZTest extends MockedPulsarServiceBaseTest {
51+
52+
private PulsarAdmin superUserAdmin;
53+
54+
private PulsarAdmin tenantManagerAdmin;
55+
56+
private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
57+
private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString();
58+
private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
59+
.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
60+
61+
62+
private static final String BROKER_INTERNAL_CLIENT_SUBJECT = "broker_internal";
63+
private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder()
64+
.claim("sub", BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact();
65+
private static final String SUPER_USER_SUBJECT = "super-user";
66+
private static final String SUPER_USER_TOKEN = Jwts.builder()
67+
.claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact();
68+
private static final String NOBODY_SUBJECT = "nobody";
69+
private static final String NOBODY_TOKEN = Jwts.builder()
70+
.claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact();
71+
72+
73+
@BeforeClass
74+
@Override
75+
protected void setup() throws Exception {
76+
conf.setAuthorizationEnabled(true);
77+
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
78+
conf.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT, BROKER_INTERNAL_CLIENT_SUBJECT));
79+
conf.setAuthenticationEnabled(true);
80+
conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
81+
// internal client
82+
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
83+
final Map<String, String> brokerClientAuthParams = new HashMap<>();
84+
brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN);
85+
final String brokerClientAuthParamStr = ObjectMapperFactory.getThreadLocal()
86+
.writeValueAsString(brokerClientAuthParams);
87+
conf.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr);
88+
89+
Properties properties = conf.getProperties();
90+
if (properties == null) {
91+
properties = new Properties();
92+
conf.setProperties(properties);
93+
}
94+
properties.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
95+
96+
internalSetup();
97+
setupDefaultTenantAndNamespace();
98+
99+
this.superUserAdmin =PulsarAdmin.builder()
100+
.serviceHttpUrl(pulsar.getWebServiceAddress())
101+
.authentication(new AuthenticationToken(SUPER_USER_TOKEN))
102+
.build();
103+
final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public");
104+
tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
105+
superUserAdmin.tenants().updateTenant("public", tenantInfo);
106+
this.tenantManagerAdmin = PulsarAdmin.builder()
107+
.serviceHttpUrl(pulsar.getWebServiceAddress())
108+
.authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
109+
.build();
110+
}
111+
112+
@Override
113+
protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
114+
pulsarAdminBuilder.authentication(new AuthenticationToken(SUPER_USER_TOKEN));
115+
}
116+
117+
@AfterClass
118+
@Override
119+
protected void cleanup() throws Exception {
120+
internalCleanup();
121+
}
122+
123+
124+
@SneakyThrows
125+
@Test
126+
public void testRetention() {
127+
final String random = UUID.randomUUID().toString();
128+
final String topic = "persistent://public/default/" + random;
129+
final String subject = UUID.randomUUID().toString();
130+
final String token = Jwts.builder()
131+
.claim("sub", subject).signWith(SECRET_KEY).compact();
132+
superUserAdmin.topics().createNonPartitionedTopic(topic);
133+
134+
@Cleanup
135+
final PulsarAdmin subAdmin = PulsarAdmin.builder()
136+
.serviceHttpUrl(pulsar.getWebServiceAddress())
137+
.authentication(new AuthenticationToken(token))
138+
.build();
139+
final RetentionPolicies definedRetentionPolicy = new RetentionPolicies(1, 1);
140+
// test superuser
141+
superUserAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
142+
143+
// because the topic policies is eventual consistency, we should wait here
144+
await().untilAsserted(() -> {
145+
final RetentionPolicies receivedRetentionPolicy = superUserAdmin.topicPolicies().getRetention(topic);
146+
Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy);
147+
});
148+
superUserAdmin.topicPolicies().removeRetention(topic);
149+
150+
await().untilAsserted(() -> {
151+
final RetentionPolicies retention = superUserAdmin.topicPolicies().getRetention(topic);
152+
Assert.assertNull(retention);
153+
});
154+
155+
// test tenant manager
156+
157+
tenantManagerAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
158+
await().untilAsserted(() -> {
159+
final RetentionPolicies receivedRetentionPolicy = tenantManagerAdmin.topicPolicies().getRetention(topic);
160+
Assert.assertEquals(receivedRetentionPolicy, definedRetentionPolicy);
161+
});
162+
tenantManagerAdmin.topicPolicies().removeRetention(topic);
163+
await().untilAsserted(() -> {
164+
final RetentionPolicies retention = tenantManagerAdmin.topicPolicies().getRetention(topic);
165+
Assert.assertNull(retention);
166+
});
167+
168+
// test nobody
169+
170+
try {
171+
subAdmin.topicPolicies().getRetention(topic);
172+
Assert.fail("unexpected behaviour");
173+
} catch (PulsarAdminException ex) {
174+
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
175+
}
176+
177+
try {
178+
179+
subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
180+
Assert.fail("unexpected behaviour");
181+
} catch (PulsarAdminException ex) {
182+
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
183+
}
184+
185+
try {
186+
subAdmin.topicPolicies().removeRetention(topic);
187+
Assert.fail("unexpected behaviour");
188+
} catch (PulsarAdminException ex) {
189+
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
190+
}
191+
192+
// test sub user with permissions
193+
for (AuthAction action : AuthAction.values()) {
194+
superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
195+
subject, Set.of(action));
196+
try {
197+
subAdmin.topicPolicies().getRetention(topic);
198+
Assert.fail("unexpected behaviour");
199+
} catch (PulsarAdminException ex) {
200+
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
201+
}
202+
203+
try {
204+
205+
subAdmin.topicPolicies().setRetention(topic, definedRetentionPolicy);
206+
Assert.fail("unexpected behaviour");
207+
} catch (PulsarAdminException ex) {
208+
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
209+
}
210+
211+
try {
212+
subAdmin.topicPolicies().removeRetention(topic);
213+
Assert.fail("unexpected behaviour");
214+
} catch (PulsarAdminException ex) {
215+
Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
216+
}
217+
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
218+
}
219+
}
220+
}

0 commit comments

Comments
 (0)