Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion pulsaradmin/pkg/admin/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,21 @@ func TestPartitionState(t *testing.T) {

assert.Nil(t, err)
defer client.Close()
subName := "my-sub"
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
SubscriptionName: subName,
Type: pulsar.Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()

// create producer
producerName := "test-producer"
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: false,
Name: producerName,
})
assert.Nil(t, err)
defer producer.Close()
Expand Down Expand Up @@ -173,6 +176,53 @@ func TestPartitionState(t *testing.T) {
assert.Equal(t, len(subscriptionStats.Consumers), 0)
}

partition, err := topicName.GetPartition(0)
assert.Nil(t, err)
topicState, err := admin.Topics().GetStats(*partition)
assert.Nil(t, err)
assert.Equal(t, len(topicState.Publishers), 1)
publisher := topicState.Publishers[0]
assert.Equal(t, publisher.AccessModel, utils.ProduceModeShared)
assert.Equal(t, publisher.IsSupportsPartialProducer, false)
assert.Equal(t, publisher.ProducerName, producerName)
assert.Contains(t, publisher.Address, "127.0.0.1")
assert.Contains(t, publisher.ClientVersion, "Pulsar Go version")

sub := topicState.Subscriptions[subName]
assert.Equal(t, sub.BytesOutCounter, int64(0))
assert.Equal(t, sub.MsgOutCounter, int64(0))
assert.Equal(t, sub.MessageAckRate, float64(0))
assert.Equal(t, sub.ChunkedMessageRate, float64(0))
assert.Equal(t, sub.BacklogSize, int64(0))
assert.Equal(t, sub.EarliestMsgPublishTimeInBacklog, int64(0))
assert.Equal(t, sub.LastExpireTimestamp, int64(0))
assert.Equal(t, sub.TotalMsgExpired, int64(0))
assert.Equal(t, sub.LastMarkDeleteAdvancedTimestamp, int64(0))
assert.Equal(t, sub.IsDurable, true)
assert.Equal(t, sub.AllowOutOfOrderDelivery, false)
assert.Equal(t, sub.ConsumersAfterMarkDeletePosition, map[string]string{})
assert.Equal(t, sub.NonContiguousDeletedMessagesRanges, 0)
assert.Equal(t, sub.NonContiguousDeletedMessagesRangesSrzSize, 0)
assert.Equal(t, sub.DelayedMessageIndexSizeInBytes, int64(0))
assert.Equal(t, sub.SubscriptionProperties, map[string]string{})
assert.Equal(t, sub.FilterProcessedMsgCount, int64(0))
assert.Equal(t, sub.FilterAcceptedMsgCount, int64(0))
assert.Equal(t, sub.FilterRejectedMsgCount, int64(0))
assert.Equal(t, sub.FilterRescheduledMsgCount, int64(0))

assert.Equal(t, len(sub.Consumers), 1)
consumerState := sub.Consumers[0]
assert.Equal(t, consumerState.BytesOutCounter, int64(0))
assert.Equal(t, consumerState.MsgOutCounter, int64(0))
assert.Equal(t, consumerState.MessageAckRate, float64(0))
assert.Equal(t, consumerState.ChunkedMessageRate, float64(0))
assert.Equal(t, consumerState.AvgMessagesPerEntry, int(0))
assert.Contains(t, consumerState.Address, "127.0.0.1")
assert.Contains(t, consumerState.ClientVersion, "Pulsar Go version")
assert.Equal(t, consumerState.LastAckedTimestamp, int64(0))
assert.Equal(t, consumerState.LastConsumedTimestamp, int64(0))
assert.True(t, consumerState.LastConsumedFlowTimestamp > 0)

}
func TestNonPartitionState(t *testing.T) {
randomName := newTopicName()
Expand Down
89 changes: 68 additions & 21 deletions pulsaradmin/pkg/utils/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,31 +244,67 @@ type TopicStats struct {
DeDuplicationStatus string `json:"deduplicationStatus"`
}

type ProducerAccessMode string

const (
ProduceModeShared ProducerAccessMode = "Shared"
ProduceModeExclusive = "Exclusive"
ProduceModeExclusiveWithFencing = "ExclusiveWithFencing"
ProduceModeWaitForExclusive = "WaitForExclusive"
)

type PublisherStats struct {
ProducerID int64 `json:"producerId"`
MsgRateIn float64 `json:"msgRateIn"`
MsgThroughputIn float64 `json:"msgThroughputIn"`
AverageMsgSize float64 `json:"averageMsgSize"`
Metadata map[string]string `json:"metadata"`
AccessModel ProducerAccessMode `json:"accessMode"`
ProducerID int64 `json:"producerId"`
MsgRateIn float64 `json:"msgRateIn"`
MsgThroughputIn float64 `json:"msgThroughputIn"`
AverageMsgSize float64 `json:"averageMsgSize"`
ChunkedMessageRate float64 `json:"chunkedMessageRate"`
IsSupportsPartialProducer bool `json:"supportsPartialProducer"`
ProducerName string `json:"producerName"`
Address string `json:"address"`
ConnectedSince string `json:"connectedSince"`
ClientVersion string `json:"clientVersion"`
Metadata map[string]string `json:"metadata"`
}

type SubscriptionStats struct {
BlockedSubscriptionOnUnackedMsgs bool `json:"blockedSubscriptionOnUnackedMsgs"`
IsReplicated bool `json:"isReplicated"`
LastConsumedFlowTimestamp int64 `json:"lastConsumedFlowTimestamp"`
LastConsumedTimestamp int64 `json:"lastConsumedTimestamp"`
LastAckedTimestamp int64 `json:"lastAckedTimestamp"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateRedeliver float64 `json:"msgRateRedeliver"`
MsgRateExpired float64 `json:"msgRateExpired"`
MsgBacklog int64 `json:"msgBacklog"`
MsgBacklogNoDelayed int64 `json:"msgBacklogNoDelayed"`
MsgDelayed int64 `json:"msgDelayed"`
UnAckedMessages int64 `json:"unackedMessages"`
SubType string `json:"type"`
ActiveConsumerName string `json:"activeConsumerName"`
Consumers []ConsumerStats `json:"consumers"`
BlockedSubscriptionOnUnackedMsgs bool `json:"blockedSubscriptionOnUnackedMsgs"`
IsReplicated bool `json:"isReplicated"`
LastConsumedFlowTimestamp int64 `json:"lastConsumedFlowTimestamp"`
LastConsumedTimestamp int64 `json:"lastConsumedTimestamp"`
LastAckedTimestamp int64 `json:"lastAckedTimestamp"`
MsgRateOut float64 `json:"msgRateOut"`
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateRedeliver float64 `json:"msgRateRedeliver"`
MsgRateExpired float64 `json:"msgRateExpired"`
MsgBacklog int64 `json:"msgBacklog"`
MsgBacklogNoDelayed int64 `json:"msgBacklogNoDelayed"`
MsgDelayed int64 `json:"msgDelayed"`
UnAckedMessages int64 `json:"unackedMessages"`
SubType string `json:"type"`
ActiveConsumerName string `json:"activeConsumerName"`
BytesOutCounter int64 `json:"bytesOutCounter"`
MsgOutCounter int64 `json:"msgOutCounter"`
MessageAckRate float64 `json:"messageAckRate"`
ChunkedMessageRate float64 `json:"chunkedMessageRate"`
BacklogSize int64 `json:"backlogSize"`
EarliestMsgPublishTimeInBacklog int64 `json:"earliestMsgPublishTimeInBacklog"`
TotalMsgExpired int64 `json:"totalMsgExpired"`
LastExpireTimestamp int64 `json:"lastExpireTimestamp"`
LastMarkDeleteAdvancedTimestamp int64 `json:"lastMarkDeleteAdvancedTimestamp"`
Consumers []ConsumerStats `json:"consumers"`
IsDurable bool `json:"isDurable"`
AllowOutOfOrderDelivery bool `json:"allowOutOfOrderDelivery"`
ConsumersAfterMarkDeletePosition map[string]string `json:"consumersAfterMarkDeletePosition"`
NonContiguousDeletedMessagesRanges int `json:"nonContiguousDeletedMessagesRanges"`
NonContiguousDeletedMessagesRangesSrzSize int `json:"nonContiguousDeletedMessagesRangesSerializedSize"`
DelayedMessageIndexSizeInBytes int64 `json:"delayedMessageIndexSizeInBytes"`
SubscriptionProperties map[string]string `json:"subscriptionProperties"`
FilterProcessedMsgCount int64 `json:"filterProcessedMsgCount"`
FilterAcceptedMsgCount int64 `json:"filterAcceptedMsgCount"`
FilterRejectedMsgCount int64 `json:"filterRejectedMsgCount"`
FilterRescheduledMsgCount int64 `json:"filterRescheduledMsgCount"`
}

type ConsumerStats struct {
Expand All @@ -279,6 +315,17 @@ type ConsumerStats struct {
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateRedeliver float64 `json:"msgRateRedeliver"`
ConsumerName string `json:"consumerName"`
BytesOutCounter int64 `json:"bytesOutCounter"`
MsgOutCounter int64 `json:"msgOutCounter"`
MessageAckRate float64 `json:"messageAckRate"`
ChunkedMessageRate float64 `json:"chunkedMessageRate"`
AvgMessagesPerEntry int `json:"avgMessagesPerEntry"`
Address string `json:"address"`
ConnectedSince string `json:"connectedSince"`
ClientVersion string `json:"clientVersion"`
LastAckedTimestamp int64 `json:"lastAckedTimestamp"`
LastConsumedTimestamp int64 `json:"lastConsumedTimestamp"`
LastConsumedFlowTimestamp int64 `json:"lastConsumedFlowTimestamp"`
Metadata map[string]string `json:"metadata"`
}

Expand Down
Loading