Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions integration-tests/blue-green/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ services:
- loadBalancerDebugModeEnabled=true
- brokerServiceCompactionThresholdInBytes=1000000
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
- PULSAR_PREFIX_topicLevelPoliciesEnabled=true
depends_on:
green-zookeeper:
condition: service_healthy
Expand Down Expand Up @@ -163,6 +164,7 @@ services:
- loadBalancerDebugModeEnabled=true
- brokerServiceCompactionThresholdInBytes=1000000
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
- PULSAR_PREFIX_topicLevelPoliciesEnabled=true
depends_on:
green-zookeeper:
condition: service_healthy
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/clustered/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ services:
- advertisedListeners=internal:pulsar://broker-1:6650
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
- PULSAR_PREFIX_topicLevelPoliciesEnabled=true
depends_on:
zookeeper:
condition: service_healthy
Expand Down Expand Up @@ -154,6 +155,7 @@ services:
- advertisedListeners=internal:pulsar://broker-2:6650
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
- PULSAR_PREFIX_topicLevelPoliciesEnabled=true
depends_on:
zookeeper:
condition: service_healthy
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/extensible-load-manager/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ services:
- clusterMigrationCheckDurationSeconds=1
- brokerServiceCompactionThresholdInBytes=1000000
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
- PULSAR_PREFIX_topicLevelPoliciesEnabled=true
depends_on:
zookeeper:
condition: service_healthy
Expand Down Expand Up @@ -168,6 +169,7 @@ services:
- clusterMigrationCheckDurationSeconds=1
- brokerServiceCompactionThresholdInBytes=1000000
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
- PULSAR_PREFIX_topicLevelPoliciesEnabled=true
depends_on:
zookeeper:
condition: service_healthy
Expand Down
290 changes: 290 additions & 0 deletions pulsaradmin/pkg/admin/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,107 @@ type Topics interface {
// @param data
// list of replication cluster id
SetReplicationClusters(topic utils.TopicName, data []string) error

// GetSubscribeRate Get subscribe rate configuration for a topic
GetSubscribeRate(utils.TopicName) (*utils.SubscribeRate, error)

// SetSubscribeRate Set subscribe rate configuration for a topic
SetSubscribeRate(utils.TopicName, utils.SubscribeRate) error

// RemoveSubscribeRate Remove subscribe rate configuration for a topic
RemoveSubscribeRate(utils.TopicName) error

// GetSubscriptionDispatchRate Get subscription dispatch rate for a topic
GetSubscriptionDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)

// SetSubscriptionDispatchRate Set subscription dispatch rate for a topic
SetSubscriptionDispatchRate(utils.TopicName, utils.DispatchRateData) error

// RemoveSubscriptionDispatchRate Remove subscription dispatch rate for a topic
RemoveSubscriptionDispatchRate(utils.TopicName) error

// GetMaxConsumersPerSubscription Get max consumers per subscription for a topic
GetMaxConsumersPerSubscription(utils.TopicName) (int, error)

// SetMaxConsumersPerSubscription Set max consumers per subscription for a topic
SetMaxConsumersPerSubscription(utils.TopicName, int) error

// RemoveMaxConsumersPerSubscription Remove max consumers per subscription for a topic
RemoveMaxConsumersPerSubscription(utils.TopicName) error

// GetMaxMessageSize Get max message size for a topic
GetMaxMessageSize(utils.TopicName) (int, error)

// SetMaxMessageSize Set max message size for a topic
SetMaxMessageSize(utils.TopicName, int) error

// RemoveMaxMessageSize Remove max message size for a topic
RemoveMaxMessageSize(utils.TopicName) error

// GetMaxSubscriptionsPerTopic Get max subscriptions per topic
GetMaxSubscriptionsPerTopic(utils.TopicName) (int, error)

// SetMaxSubscriptionsPerTopic Set max subscriptions per topic
SetMaxSubscriptionsPerTopic(utils.TopicName, int) error

// RemoveMaxSubscriptionsPerTopic Remove max subscriptions per topic
RemoveMaxSubscriptionsPerTopic(utils.TopicName) error

// GetSchemaValidationEnforced Get schema validation enforced flag for a topic
GetSchemaValidationEnforced(utils.TopicName) (bool, error)

// SetSchemaValidationEnforced Set schema validation enforced flag for a topic
SetSchemaValidationEnforced(utils.TopicName, bool) error

// RemoveSchemaValidationEnforced Remove schema validation enforced flag for a topic
RemoveSchemaValidationEnforced(utils.TopicName) error

// GetDeduplicationSnapshotInterval Get deduplication snapshot interval for a topic
GetDeduplicationSnapshotInterval(utils.TopicName) (int, error)

// SetDeduplicationSnapshotInterval Set deduplication snapshot interval for a topic
SetDeduplicationSnapshotInterval(utils.TopicName, int) error

// RemoveDeduplicationSnapshotInterval Remove deduplication snapshot interval for a topic
RemoveDeduplicationSnapshotInterval(utils.TopicName) error

// GetReplicatorDispatchRate Get replicator dispatch rate for a topic
GetReplicatorDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)

// SetReplicatorDispatchRate Set replicator dispatch rate for a topic
SetReplicatorDispatchRate(utils.TopicName, utils.DispatchRateData) error

// RemoveReplicatorDispatchRate Remove replicator dispatch rate for a topic
RemoveReplicatorDispatchRate(utils.TopicName) error

// GetOffloadPolicies Get offload policies for a topic
GetOffloadPolicies(utils.TopicName) (*utils.OffloadPolicies, error)

// SetOffloadPolicies Set offload policies for a topic
SetOffloadPolicies(utils.TopicName, utils.OffloadPolicies) error

// RemoveOffloadPolicies Remove offload policies for a topic
RemoveOffloadPolicies(utils.TopicName) error

// GetAutoSubscriptionCreation Get auto subscription creation override for a topic
GetAutoSubscriptionCreation(utils.TopicName) (*utils.AutoSubscriptionCreationOverride, error)

// SetAutoSubscriptionCreation Set auto subscription creation override for a topic
SetAutoSubscriptionCreation(utils.TopicName,
utils.AutoSubscriptionCreationOverride) error

// RemoveAutoSubscriptionCreation Remove auto subscription creation override for a topic
RemoveAutoSubscriptionCreation(utils.TopicName) error

// GetSchemaCompatibilityStrategy Get schema compatibility strategy for a topic
GetSchemaCompatibilityStrategy(utils.TopicName) (utils.SchemaCompatibilityStrategy, error)

// SetSchemaCompatibilityStrategy Set schema compatibility strategy for a topic
SetSchemaCompatibilityStrategy(utils.TopicName,
utils.SchemaCompatibilityStrategy) error

// RemoveSchemaCompatibilityStrategy Remove schema compatibility strategy for a topic
RemoveSchemaCompatibilityStrategy(utils.TopicName) error
}

type topics struct {
Expand Down Expand Up @@ -933,3 +1034,192 @@ func (t *topics) GetReplicationClusters(topic utils.TopicName) ([]string, error)
err := t.pulsar.Client.Get(endpoint, &data)
return data, err
}

func (t *topics) GetSubscribeRate(topic utils.TopicName) (*utils.SubscribeRate, error) {
var subscribeRate utils.SubscribeRate
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
err := t.pulsar.Client.Get(endpoint, &subscribeRate)
return &subscribeRate, err
}

func (t *topics) SetSubscribeRate(topic utils.TopicName, subscribeRate utils.SubscribeRate) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
return t.pulsar.Client.Post(endpoint, &subscribeRate)
}

func (t *topics) RemoveSubscribeRate(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetSubscriptionDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) {
var dispatchRate utils.DispatchRateData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
err := t.pulsar.Client.Get(endpoint, &dispatchRate)
return &dispatchRate, err
}

func (t *topics) SetSubscriptionDispatchRate(topic utils.TopicName, dispatchRate utils.DispatchRateData) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
return t.pulsar.Client.Post(endpoint, &dispatchRate)
}

func (t *topics) RemoveSubscriptionDispatchRate(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetMaxConsumersPerSubscription(topic utils.TopicName) (int, error) {
var maxConsumers int
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
err := t.pulsar.Client.Get(endpoint, &maxConsumers)
return maxConsumers, err
}

func (t *topics) SetMaxConsumersPerSubscription(topic utils.TopicName, maxConsumers int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
return t.pulsar.Client.Post(endpoint, &maxConsumers)
}

func (t *topics) RemoveMaxConsumersPerSubscription(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetMaxMessageSize(topic utils.TopicName) (int, error) {
var maxMessageSize int
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
err := t.pulsar.Client.Get(endpoint, &maxMessageSize)
return maxMessageSize, err
}

func (t *topics) SetMaxMessageSize(topic utils.TopicName, maxMessageSize int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
return t.pulsar.Client.Post(endpoint, &maxMessageSize)
}

func (t *topics) RemoveMaxMessageSize(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetMaxSubscriptionsPerTopic(topic utils.TopicName) (int, error) {
var maxSubscriptions int
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
err := t.pulsar.Client.Get(endpoint, &maxSubscriptions)
return maxSubscriptions, err
}

func (t *topics) SetMaxSubscriptionsPerTopic(topic utils.TopicName, maxSubscriptions int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
return t.pulsar.Client.Post(endpoint, &maxSubscriptions)
}

func (t *topics) RemoveMaxSubscriptionsPerTopic(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetSchemaValidationEnforced(topic utils.TopicName) (bool, error) {
var enabled bool
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
err := t.pulsar.Client.Get(endpoint, &enabled)
return enabled, err
}

func (t *topics) SetSchemaValidationEnforced(topic utils.TopicName, enabled bool) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
return t.pulsar.Client.Post(endpoint, enabled)
}

func (t *topics) RemoveSchemaValidationEnforced(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetDeduplicationSnapshotInterval(topic utils.TopicName) (int, error) {
var interval int
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
err := t.pulsar.Client.Get(endpoint, &interval)
return interval, err
}

func (t *topics) SetDeduplicationSnapshotInterval(topic utils.TopicName, interval int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
return t.pulsar.Client.Post(endpoint, &interval)
}

func (t *topics) RemoveDeduplicationSnapshotInterval(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetReplicatorDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) {
var dispatchRate utils.DispatchRateData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
err := t.pulsar.Client.Get(endpoint, &dispatchRate)
return &dispatchRate, err
}

func (t *topics) SetReplicatorDispatchRate(topic utils.TopicName, dispatchRate utils.DispatchRateData) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
return t.pulsar.Client.Post(endpoint, &dispatchRate)
}

func (t *topics) RemoveReplicatorDispatchRate(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetAutoSubscriptionCreation(topic utils.TopicName) (*utils.AutoSubscriptionCreationOverride, error) {
var autoSubCreation utils.AutoSubscriptionCreationOverride
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
err := t.pulsar.Client.Get(endpoint, &autoSubCreation)
return &autoSubCreation, err
}

func (t *topics) SetAutoSubscriptionCreation(topic utils.TopicName,
autoSubCreation utils.AutoSubscriptionCreationOverride) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
return t.pulsar.Client.Post(endpoint, &autoSubCreation)
}

func (t *topics) RemoveAutoSubscriptionCreation(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetSchemaCompatibilityStrategy(topic utils.TopicName) (utils.SchemaCompatibilityStrategy, error) {
var strategy utils.SchemaCompatibilityStrategy
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
err := t.pulsar.Client.Get(endpoint, &strategy)
return strategy, err
}

func (t *topics) SetSchemaCompatibilityStrategy(topic utils.TopicName,
strategy utils.SchemaCompatibilityStrategy) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
return t.pulsar.Client.Put(endpoint, strategy)
}

func (t *topics) RemoveSchemaCompatibilityStrategy(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetOffloadPolicies(topic utils.TopicName) (*utils.OffloadPolicies, error) {
var offloadPolicies utils.OffloadPolicies
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
err := t.pulsar.Client.Get(endpoint, &offloadPolicies)
return &offloadPolicies, err
}

func (t *topics) SetOffloadPolicies(topic utils.TopicName, offloadPolicies utils.OffloadPolicies) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
return t.pulsar.Client.Post(endpoint, &offloadPolicies)
}

func (t *topics) RemoveOffloadPolicies(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
return t.pulsar.Client.Delete(endpoint)
}
Loading
Loading