Skip to content

Commit 9059606

Browse files
geniusjoeninjazhou
authored andcommitted
[improve] Update topic admin interface comment, add topic admin test cases
1 parent 393f80b commit 9059606

File tree

3 files changed

+383
-25
lines changed

3 files changed

+383
-25
lines changed

integration-tests/conf/standalone.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ maxUnackedMessagesPerConsumer=50000
8383
# Set maxMessageSize to 1MB rather than the default value 5MB for testing
8484
maxMessageSize=1048576
8585

86+
# enable topic level policies to test topic admin functions
87+
topicLevelPoliciesEnabled=true
88+
8689
### --- Authentication --- ###
8790

8891
# Enable TLS

pulsaradmin/pkg/admin/topic.go

Lines changed: 154 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,35 @@ import (
2626

2727
// Topics is admin interface for topics management
2828
type Topics interface {
29-
// Create a topic
30-
Create(utils.TopicName, int) error
31-
32-
// Delete a topic
33-
Delete(utils.TopicName, bool, bool) error
29+
// Create a partitioned or non-partitioned topic
30+
//
31+
// @param topic
32+
// topicName struct
33+
// @param partitions
34+
// number of topic partitions,
35+
// when setting to 0, it will create a non-partitioned topic
36+
Create(topic utils.TopicName, partitions int) error
37+
38+
// Delete a topic, this function can delete both partitioned or non-partitioned topic
39+
//
40+
// @param topic
41+
// topicName struct
42+
// @param force
43+
// delete topic forcefully
44+
// @param nonPartitioned
45+
// when set to true, topic will be treated as a non-partitioned topic
46+
// Otherwise it will be treated as a partitioned topic
47+
Delete(topic utils.TopicName, force bool, nonPartitioned bool) error
3448

3549
// Update number of partitions of a non-global partitioned topic
3650
// It requires partitioned-topic to be already exist and number of new partitions must be greater than existing
3751
// number of partitions. Decrementing number of partitions requires deletion of topic which is not supported.
38-
Update(utils.TopicName, int) error
52+
//
53+
// @param topic
54+
// topicName struct
55+
// @param partitions
56+
// number of new partitions of already exist partitioned-topic
57+
Update(topic utils.TopicName, partitions int) error
3958

4059
// GetMetadata returns metadata of a partitioned topic
4160
GetMetadata(utils.TopicName) (utils.PartitionedTopicMetadata, error)
@@ -52,12 +71,24 @@ type Topics interface {
5271
GetPermissions(utils.TopicName) (map[string][]utils.AuthAction, error)
5372

5473
// GrantPermission grants a new permission to a client role on a single topic
55-
GrantPermission(utils.TopicName, string, []utils.AuthAction) error
74+
//
75+
// @param topic
76+
// topicName struct
77+
// @param role
78+
// client role to which grant permission
79+
// @param action
80+
// auth actions (e.g. produce and consume)
81+
GrantPermission(topic utils.TopicName, role string, action []utils.AuthAction) error
5682

5783
// RevokePermission revokes permissions to a client role on a single topic. If the permission
5884
// was not set at the topic level, but rather at the namespace level, this operation will
5985
// return an error (HTTP status code 412).
60-
RevokePermission(utils.TopicName, string) error
86+
//
87+
// @param topic
88+
// topicName struct
89+
// @param role
90+
// client role to which remove permissions
91+
RevokePermission(topic utils.TopicName, role string) error
6192

6293
// Lookup a topic returns the broker URL that serves the topic
6394
Lookup(utils.TopicName) (utils.LookupData, error)
@@ -69,24 +100,56 @@ type Topics interface {
69100
GetLastMessageID(utils.TopicName) (utils.MessageID, error)
70101

71102
// GetMessageID returns the message Id by timestamp(ms) of a topic
72-
GetMessageID(utils.TopicName, int64) (utils.MessageID, error)
73-
74-
// GetStats returns the stats for the topic
75-
// All the rates are computed over a 1 minute window and are relative the last completed 1 minute period
103+
//
104+
// @param topic
105+
// topicName struct
106+
// @param timestamp
107+
// absolute timestamp (in ms)
108+
GetMessageID(topic utils.TopicName, timestamp int64) (utils.MessageID, error)
109+
110+
// GetStats returns the stats for the topic.
111+
//
112+
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
76113
GetStats(utils.TopicName) (utils.TopicStats, error)
77114

78115
// GetStatsWithOption returns the stats for the topic
79-
GetStatsWithOption(utils.TopicName, utils.GetStatsOptions) (utils.TopicStats, error)
116+
//
117+
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
118+
//
119+
// @param topic
120+
// topicName struct
121+
// @param option
122+
// request option, e.g. get_precise_backlog or subscription_backlog_size
123+
GetStatsWithOption(topic utils.TopicName, option utils.GetStatsOptions) (utils.TopicStats, error)
80124

81125
// GetInternalStats returns the internal stats for the topic.
82126
GetInternalStats(utils.TopicName) (utils.PersistentTopicInternalStats, error)
83127

84128
// GetPartitionedStats returns the stats for the partitioned topic
85-
// All the rates are computed over a 1 minute window and are relative the last completed 1 minute period
86-
GetPartitionedStats(utils.TopicName, bool) (utils.PartitionedTopicStats, error)
129+
//
130+
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
131+
//
132+
// @param topic
133+
// topicName struct
134+
// @param perPartition
135+
// flag to get stats per partition
136+
GetPartitionedStats(topic utils.TopicName, perPartition bool) (utils.PartitionedTopicStats, error)
87137

88138
// GetPartitionedStatsWithOption returns the stats for the partitioned topic
89-
GetPartitionedStatsWithOption(utils.TopicName, bool, utils.GetStatsOptions) (utils.PartitionedTopicStats, error)
139+
//
140+
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
141+
//
142+
// @param topic
143+
// topicName struct
144+
// @param perPartition
145+
// flag to get stats per partition
146+
// @param option
147+
// request option, e.g. get_precise_backlog or subscription_backlog_size
148+
GetPartitionedStatsWithOption(
149+
topic utils.TopicName,
150+
perPartition bool,
151+
option utils.GetStatsOptions,
152+
) (utils.PartitionedTopicStats, error)
90153

91154
// Terminate the topic and prevent any more messages being published on it
92155
Terminate(utils.TopicName) (utils.MessageID, error)
@@ -111,7 +174,12 @@ type Topics interface {
111174
GetMessageTTL(utils.TopicName) (int, error)
112175

113176
// SetMessageTTL Set the message TTL for a topic
114-
SetMessageTTL(utils.TopicName, int) error
177+
//
178+
// @param topic
179+
// topicName struct
180+
// @param messageTTL
181+
// Message TTL in second
182+
SetMessageTTL(topic utils.TopicName, messageTTL int) error
115183

116184
// RemoveMessageTTL Remove the message TTL for a topic
117185
RemoveMessageTTL(utils.TopicName) error
@@ -120,7 +188,12 @@ type Topics interface {
120188
GetMaxProducers(utils.TopicName) (int, error)
121189

122190
// SetMaxProducers Set max number of producers for a topic
123-
SetMaxProducers(utils.TopicName, int) error
191+
//
192+
// @param topic
193+
// topicName struct
194+
// @param maxProducers
195+
// max number of producer
196+
SetMaxProducers(topic utils.TopicName, maxProducers int) error
124197

125198
// RemoveMaxProducers Remove max number of producers for a topic
126199
RemoveMaxProducers(utils.TopicName) error
@@ -129,7 +202,12 @@ type Topics interface {
129202
GetMaxConsumers(utils.TopicName) (int, error)
130203

131204
// SetMaxConsumers Set max number of consumers for a topic
132-
SetMaxConsumers(utils.TopicName, int) error
205+
//
206+
// @param topic
207+
// topicName struct
208+
// @param maxConsumers
209+
// max number of consumer
210+
SetMaxConsumers(topic utils.TopicName, maxConsumers int) error
133211

134212
// RemoveMaxConsumers Remove max number of consumers for a topic
135213
RemoveMaxConsumers(utils.TopicName) error
@@ -138,7 +216,12 @@ type Topics interface {
138216
GetMaxUnackMessagesPerConsumer(utils.TopicName) (int, error)
139217

140218
// SetMaxUnackMessagesPerConsumer Set max unacked messages policy on consumer for a topic
141-
SetMaxUnackMessagesPerConsumer(utils.TopicName, int) error
219+
//
220+
// @param topic
221+
// topicName struct
222+
// @param maxUnackedNum
223+
// max unAcked messages on each consumer
224+
SetMaxUnackMessagesPerConsumer(topic utils.TopicName, maxUnackedNum int) error
142225

143226
// RemoveMaxUnackMessagesPerConsumer Remove max unacked messages policy on consumer for a topic
144227
RemoveMaxUnackMessagesPerConsumer(utils.TopicName) error
@@ -147,7 +230,12 @@ type Topics interface {
147230
GetMaxUnackMessagesPerSubscription(utils.TopicName) (int, error)
148231

149232
// SetMaxUnackMessagesPerSubscription Set max unacked messages policy on subscription for a topic
150-
SetMaxUnackMessagesPerSubscription(utils.TopicName, int) error
233+
//
234+
// @param topic
235+
// topicName struct
236+
// @param maxUnackedNum
237+
// max unAcked messages on subscription of a topic
238+
SetMaxUnackMessagesPerSubscription(topic utils.TopicName, maxUnackedNum int) error
151239

152240
// RemoveMaxUnackMessagesPerSubscription Remove max unacked messages policy on subscription for a topic
153241
RemoveMaxUnackMessagesPerSubscription(utils.TopicName) error
@@ -192,30 +280,60 @@ type Topics interface {
192280
GetDeduplicationStatus(utils.TopicName) (bool, error)
193281

194282
// SetDeduplicationStatus Set the deduplication policy for a topic
195-
SetDeduplicationStatus(utils.TopicName, bool) error
283+
//
284+
// @param topic
285+
// topicName struct
286+
// @param enabled
287+
// set enable or disable deduplication of the topic
288+
SetDeduplicationStatus(topic utils.TopicName, enabled bool) error
196289

197290
// RemoveDeduplicationStatus Remove the deduplication policy for a topic
198291
RemoveDeduplicationStatus(utils.TopicName) error
199292

200293
// GetRetention returns the retention configuration for a topic
201-
GetRetention(utils.TopicName, bool) (*utils.RetentionPolicies, error)
294+
//
295+
// @param topic
296+
// topicName struct
297+
// @param applied
298+
// when set to true, function will try to find policy applied to this topic
299+
// in namespace or broker level, if no policy set in topic level
300+
GetRetention(topic utils.TopicName, applied bool) (*utils.RetentionPolicies, error)
202301

203302
// RemoveRetention removes the retention configuration on a topic
204303
RemoveRetention(utils.TopicName) error
205304

206305
// SetRetention sets the retention policy for a topic
207306
SetRetention(utils.TopicName, utils.RetentionPolicies) error
208307

209-
// Get the compaction threshold for a topic
308+
// GetCompactionThreshold Get the compaction threshold for a topic.
309+
//
310+
// i.e. The maximum number of bytes can have before compaction is triggered.
311+
//
312+
// @param topic
313+
// topicName struct
314+
// @param applied
315+
// when set to true, function will try to find policy applied to this topic
316+
// in namespace or broker level, if no policy set in topic level
210317
GetCompactionThreshold(topic utils.TopicName, applied bool) (int64, error)
211318

212-
// Set the compaction threshold for a topic
319+
// SetCompactionThreshold Set the compaction threshold for a topic
320+
//
321+
// @param topic
322+
// topicName struct
323+
// @param threshold
324+
// maximum number of backlog bytes before compaction is triggered
213325
SetCompactionThreshold(topic utils.TopicName, threshold int64) error
214326

215327
// Remove compaction threshold for a topic
216328
RemoveCompactionThreshold(utils.TopicName) error
217329

218330
// GetBacklogQuotaMap returns backlog quota map for a topic
331+
//
332+
// @param topic
333+
// topicName struct
334+
// @param applied
335+
// when set to true, function will try to find policy applied to this topic
336+
// in namespace or broker level, if no policy set in topic level
219337
GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota, error)
220338

221339
// SetBacklogQuota sets a backlog quota for a topic
@@ -225,6 +343,12 @@ type Topics interface {
225343
RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error
226344

227345
// GetInactiveTopicPolicies gets the inactive topic policies on a topic
346+
//
347+
// @param topic
348+
// topicName struct
349+
// @param applied
350+
// when set to true, function will try to find policy applied to this topic
351+
// in namespace or broker level, if no policy set in topic level
228352
GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error)
229353

230354
// RemoveInactiveTopicPolicies removes inactive topic policies from a topic
@@ -237,6 +361,11 @@ type Topics interface {
237361
GetReplicationClusters(topic utils.TopicName) ([]string, error)
238362

239363
// SetReplicationClusters sets the replication clusters on a topic
364+
//
365+
// @param topic
366+
// topicName struct
367+
// @param data
368+
// list of replication cluster id
240369
SetReplicationClusters(topic utils.TopicName, data []string) error
241370
}
242371

0 commit comments

Comments
 (0)