Skip to content

Conversation

freeznet
Copy link
Contributor

@freeznet freeznet commented Jul 25, 2025

…te, max consumers, message size, subscriptions, schema validation, deduplication, replicator dispatch rate, offload policies, auto subscription creation, and schema compatibility strategy

(If this PR fixes a github issue, please add Fixes #<xyz>.)

Fixes #

(or if this PR is one task of a github issue, please add Master Issue: #<xyz> to link to the master issue.)

Master Issue: #

Motivation

The Pulsar Go client's admin APIs for topic-level policies were incomplete and not aligned with the Java client's RESTful APIs. This inconsistency made it difficult for users who work with both Java and Go clients to have a unified experience when managing topic-level policies programmatically.
Currently, many essential topic-level policy management features available in the Java admin client are missing in the Go client, limiting the ability to fully manage Pulsar topics through Go applications. This includes policies for:

  • Maximum consumers per topic
  • Maximum message size
  • Subscription management
  • Schema validation enforcement
  • Message deduplication
  • Replicator dispatch rate
  • Offload policies
  • Auto subscription creation
  • Schema compatibility strategies

By aligning these APIs with the Java implementation, we ensure feature parity and provide Go developers with the same administrative capabilities.

Modifications

This PR adds comprehensive topic-level policy management APIs to align with the Java admin client. The following APIs have been implemented:

New Topic Policy APIs Added:

  1. Max Consumers Policy

    • GetMaxConsumers(topic string) (int, error)
    • SetMaxConsumers(topic string, maxConsumers int) error
    • RemoveMaxConsumers(topic string) error
  2. Max Message Size Policy

    • GetMaxMessageSize(topic string) (int, error)
    • SetMaxMessageSize(topic string, maxMessageSize int) error
    • RemoveMaxMessageSize(topic string) error
  3. Subscription Management

    • GetSubscriptionDispatchRate(topic string) (*DispatchRate, error)
    • SetSubscriptionDispatchRate(topic string, dispatchRate DispatchRate) error
    • RemoveSubscriptionDispatchRate(topic string) error
  4. Schema Validation Policy

    • GetSchemaValidationEnforced(topic string) (bool, error)
    • SetSchemaValidationEnforced(topic string, enforced bool) error
  5. Deduplication Policy

    • GetDeduplicationStatus(topic string) (bool, error)
    • SetDeduplicationStatus(topic string, enabled bool) error
    • RemoveDeduplicationStatus(topic string) error
  6. Replicator Dispatch Rate

    • GetReplicatorDispatchRate(topic string) (*DispatchRate, error)
    • SetReplicatorDispatchRate(topic string, dispatchRate DispatchRate) error
    • RemoveReplicatorDispatchRate(topic string) error
  7. Offload Policies

    • GetOffloadPolicies(topic string) (*OffloadPolicies, error)
    • SetOffloadPolicies(topic string, policies OffloadPolicies) error
    • RemoveOffloadPolicies(topic string) error
  8. Auto Subscription Creation

    • GetAutoSubscriptionCreation(topic string) (*AutoSubscriptionCreationOverride, error)
    • SetAutoSubscriptionCreation(topic string, config AutoSubscriptionCreationOverride) error
    • RemoveAutoSubscriptionCreation(topic string) error
  9. Schema Compatibility Strategy

    • GetSchemaCompatibilityStrategy(topic string) (SchemaCompatibilityStrategy, error)
    • SetSchemaCompatibilityStrategy(topic string, strategy SchemaCompatibilityStrategy) error
    • RemoveSchemaCompatibilityStrategy(topic string) error

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: Yes - This adds new public admin APIs for topic policy management
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

…te, max consumers, message size, subscriptions, schema validation, deduplication, replicator dispatch rate, offload policies, auto subscription creation, and schema compatibility strategy
@freeznet freeznet self-assigned this Jul 25, 2025
@crossoverJie crossoverJie added this to the v0.16.0 milestone Jul 25, 2025
@crossoverJie crossoverJie requested a review from Copilot July 25, 2025 07:22
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds comprehensive topic-level policy management APIs to the Pulsar Go admin client to align with the Java REST APIs. The implementation provides feature parity for managing topic policies programmatically through Go applications.

Key changes include:

  • Addition of 9 new topic policy API categories with get/set/remove operations
  • New data structures for offload policies and auto subscription creation configuration
  • Enhanced delayed delivery data structure with optional maximum delay limits

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
pulsaradmin/pkg/utils/offload_policies.go Defines OffloadPolicies struct with S3 and general offload configuration fields
pulsaradmin/pkg/utils/data.go Enhances DelayedDeliveryData with optional MaxDelayInMillis field and helper functions
pulsaradmin/pkg/utils/auto_subscription_creation.go Defines AutoSubscriptionCreationOverride struct for managing auto subscription settings
pulsaradmin/pkg/admin/topic.go Implements 27 new topic policy management methods across 9 API categories
Comments suppressed due to low confidence (1)

pulsaradmin/pkg/utils/offload_policies.go:30

  • Inconsistent field naming: The struct field is named 'S3ManagedLedgerOffloadCredentialID' but the JSON tag uses 'Id' instead of 'ID'. This creates inconsistency between Go naming conventions and the JSON representation.
	S3ManagedLedgerOffloadCredentialID                string            `json:"s3ManagedLedgerOffloadCredentialId,omitempty"`

@crossoverJie
Copy link
Member

Could you please add the relevant unit tests?

@freeznet
Copy link
Contributor Author

Could you please add the relevant unit tests?

addressed with latest commits

@freeznet freeznet merged commit 5a320e8 into apache:master Jul 28, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants