Skip to content

Commit dbc0649

Browse files
Add broker interceptor for Intercepting all Pulsar command and REST API requests. (#7143)
Add broker interceptor for Intercepting all Pulsar command and REST API requests
1 parent f8a697d commit dbc0649

21 files changed

+1109
-5
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
707707
)
708708
private int maxNumPartitionsPerPartitionedTopic = 0;
709709

710+
@FieldContext(
711+
category = CATEGORY_SERVER,
712+
doc = "The directory to locate broker interceptors"
713+
)
714+
private String brokerInterceptorsDirectory = "./interceptors";
715+
716+
@FieldContext(
717+
category = CATEGORY_SERVER,
718+
doc = "List of broker interceptor to load, which is a list of broker interceptor names"
719+
)
720+
private Set<String> brokerInterceptors = Sets.newTreeSet();
721+
710722
@FieldContext(
711723
doc = "There are two policies when zookeeper session expired happens, \"shutdown\" and \"reconnect\". \n\n"
712724
+ " If uses \"shutdown\" policy, shutdown the broker when zookeeper session expired happens.\n\n"

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@
7777
import org.apache.pulsar.broker.authorization.AuthorizationService;
7878
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
7979
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
80+
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
81+
import org.apache.pulsar.broker.intercept.BrokerInterceptors;
8082
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
8183
import org.apache.pulsar.broker.loadbalance.LeaderElectionService.LeaderListener;
8284
import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -202,6 +204,7 @@ public class PulsarService implements AutoCloseable {
202204

203205
private MetricsGenerator metricsGenerator;
204206
private TransactionMetadataStoreService transactionMetadataStoreService;
207+
private BrokerInterceptor brokerInterceptor;
205208

206209
public enum State {
207210
Init, Started, Closed
@@ -454,7 +457,9 @@ public void start() throws PulsarServerException {
454457

455458
this.defaultOffloader = createManagedLedgerOffloader(
456459
OffloadPolicies.create(this.getConfiguration().getProperties()));
457-
460+
this.brokerInterceptor = BrokerInterceptors.load(config);
461+
brokerService.setInterceptor(getBrokerInterceptor());
462+
this.brokerInterceptor.initialize(config);
458463
brokerService.start();
459464

460465
this.webService = new WebService(this);
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.intercept;
20+
21+
import com.google.common.annotations.Beta;
22+
import org.apache.pulsar.broker.ServiceConfiguration;
23+
import org.apache.pulsar.broker.service.ServerCnx;
24+
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
25+
26+
import javax.servlet.FilterChain;
27+
import javax.servlet.ServletException;
28+
import javax.servlet.ServletRequest;
29+
import javax.servlet.ServletResponse;
30+
import java.io.IOException;
31+
32+
/**
33+
* A plugin interface that allows you to intercept the
34+
* client requests to the Pulsar brokers.
35+
*
36+
* <p>BrokerInterceptor callbacks may be called from multiple threads. Interceptor
37+
* implementation must ensure thread-safety, if needed.
38+
*/
39+
@Beta
40+
public interface BrokerInterceptor extends AutoCloseable {
41+
42+
/**
43+
* Called by the broker while new command incoming.
44+
*/
45+
void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws Exception;
46+
47+
/**
48+
* Called by the web service while new request incoming.
49+
*/
50+
void onWebServiceRequest(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException;
51+
52+
/**
53+
* Initialize the broker interceptor.
54+
*
55+
* @throws Exception when fail to initialize the broker interceptor.
56+
*/
57+
void initialize(ServiceConfiguration conf) throws Exception;
58+
59+
BrokerInterceptor DISABLED = new BrokerInterceptorDisabled();
60+
61+
/**
62+
* Broker interceptor disabled implementation.
63+
*/
64+
class BrokerInterceptorDisabled implements BrokerInterceptor {
65+
66+
@Override
67+
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws Exception {
68+
//No-op
69+
}
70+
71+
@Override
72+
public void onWebServiceRequest(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
73+
chain.doFilter(request, response);
74+
}
75+
76+
@Override
77+
public void initialize(ServiceConfiguration conf) throws Exception {
78+
//No-op
79+
}
80+
81+
@Override
82+
public void close() {
83+
//No-op
84+
}
85+
}
86+
87+
/**
88+
* Close this broker interceptor.
89+
*/
90+
@Override
91+
void close();
92+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.intercept;
20+
21+
import lombok.Data;
22+
import lombok.NoArgsConstructor;
23+
24+
/**
25+
* Metadata information about a broker interceptor.
26+
*/
27+
@Data
28+
@NoArgsConstructor
29+
public class BrokerInterceptorDefinition {
30+
31+
/**
32+
* The name of the broker interceptor.
33+
*/
34+
private String name;
35+
36+
/**
37+
* The description of the broker interceptor to be used for user help.
38+
*/
39+
private String description;
40+
41+
/**
42+
* The class name for the broker interceptor.
43+
*/
44+
private String interceptorClass;
45+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.intercept;
20+
21+
import lombok.Data;
22+
import lombok.experimental.Accessors;
23+
import java.util.Map;
24+
import java.util.TreeMap;
25+
26+
/**
27+
* The collection of broker interceptor.
28+
*/
29+
@Data
30+
@Accessors(fluent = true)
31+
public class BrokerInterceptorDefinitions {
32+
33+
private final Map<String, BrokerInterceptorMetadata> interceptors = new TreeMap<>();
34+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.intercept;
20+
21+
import lombok.Data;
22+
import lombok.NoArgsConstructor;
23+
24+
import java.nio.file.Path;
25+
26+
/**
27+
* The metadata of broker interceptor
28+
*/
29+
@Data
30+
@NoArgsConstructor
31+
public class BrokerInterceptorMetadata {
32+
33+
/**
34+
* The definition of the broker interceptor.
35+
*/
36+
private BrokerInterceptorDefinition definition;
37+
38+
/**
39+
* The path to the handler package.
40+
*/
41+
private Path archivePath;
42+
}

0 commit comments

Comments
 (0)