Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ lint: bin/golangci-lint
bin/golangci-lint run

bin/golangci-lint:
GOBIN=$(shell pwd)/bin go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2
GOBIN=$(shell pwd)/bin go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.61.0

# an alternative to above `make lint` command
# use golangCi-lint docker to avoid local golang env issues
Expand Down
8 changes: 8 additions & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ type ClientOptions struct {
// Limit of client memory usage (in byte). The 64M default can guarantee a high producer throughput.
// Config less than 0 indicates off memory limit.
MemoryLimitBytes int64

// Set the properties used for topic lookup.
// When the broker performs topic lookup, these lookup properties will be taken into consideration in a customized
// load manager.
// Note: The lookup properties are only used in topic lookup when:
// The protocol is binary protocol, i.e. the service URL starts with "pulsar://" or "pulsar+ssl://"
// The `loadManagerClassName` config in broker is a class that implements the `ExtensibleLoadManager` interface
LookupProperties map[string]string
}

// Client represents a pulsar client
Expand Down
2 changes: 1 addition & 1 deletion pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func newClient(options ClientOptions) (Client, error) {
}

c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics,
options.ListenerName, tlsConfig, authProvider)
options.ListenerName, tlsConfig, authProvider, toKeyValues(options.LookupProperties))

c.lookupService = c.rpcClient.LookupService("")

Expand Down
54 changes: 54 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4691,3 +4691,57 @@ func TestPartitionConsumerGetLastMessageIDs(t *testing.T) {
}

}

func TestLookupConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
LookupProperties: map[string]string{
"broker.id": "1",
},
})

assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
ctx := context.Background()

// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
for i := 0; i < 10; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
}
}

// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

expectMsg := fmt.Sprintf("hello-%d", i)
assert.Equal(t, []byte(expectMsg), msg.Payload())
// ack message
consumer.Ack(msg)
}
}
6 changes: 5 additions & 1 deletion pulsar/internal/lookup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,21 @@ type lookupService struct {
serviceNameResolver ServiceNameResolver
tlsEnabled bool
listenerName string
lookupProperties []*pb.KeyValue
log log.Logger
metrics *Metrics
}

// NewLookupService init a lookup service struct and return an object of LookupService.
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
tlsEnabled bool, listenerName string, logger log.Logger, metrics *Metrics) LookupService {
tlsEnabled bool, listenerName string,
lookupProperties []*pb.KeyValue, logger log.Logger, metrics *Metrics) LookupService {
return &lookupService{
rpcClient: rpcClient,
serviceNameResolver: serviceNameResolver,
tlsEnabled: tlsEnabled,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
lookupProperties: lookupProperties,
metrics: metrics,
listenerName: listenerName,
}
Expand Down Expand Up @@ -146,6 +149,7 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
Topic: &topic,
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(ls.listenerName),
Properties: ls.lookupProperties,
})
if err != nil {
return nil, err
Expand Down
46 changes: 29 additions & 17 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -138,6 +138,7 @@ func TestLookupSuccess(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -149,9 +150,8 @@ func TestLookupSuccess(t *testing.T) {
},
},
}

metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, serviceNameResolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -165,6 +165,7 @@ func TestTlsLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -174,6 +175,7 @@ func TestTlsLookupSuccess(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -187,7 +189,7 @@ func TestTlsLookupSuccess(t *testing.T) {
}
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)

ls := NewLookupService(mockedClient, url, serviceNameResolver, true, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, serviceNameResolver, true, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -201,6 +203,7 @@ func TestLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -210,6 +213,7 @@ func TestLookupWithProxy(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -223,7 +227,7 @@ func TestLookupWithProxy(t *testing.T) {
},
}
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, serviceNameResolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -236,7 +240,7 @@ func TestLookupWithProxy(t *testing.T) {
func TestTlsLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -246,6 +250,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -260,7 +265,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
}
resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, true, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, true, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -273,7 +278,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
func TestLookupWithRedirect(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
expectedURL: "pulsar://broker-2:6650",
Expand All @@ -284,6 +289,7 @@ func TestLookupWithRedirect(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
{
RequestId: proto.Uint64(2),
Expand All @@ -309,7 +315,7 @@ func TestLookupWithRedirect(t *testing.T) {
}
resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -322,7 +328,7 @@ func TestLookupWithRedirect(t *testing.T) {
func TestTlsLookupWithRedirect(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
expectedURL: "pulsar+ssl://broker-2:6651",
Expand All @@ -333,6 +339,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
{
RequestId: proto.Uint64(2),
Expand All @@ -359,7 +366,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {

resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, true, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, true, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -372,7 +379,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
func TestLookupWithInvalidUrlResponse(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -382,6 +389,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -396,7 +404,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
}
resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand All @@ -406,7 +414,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
func TestLookupWithLookupFailure(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -416,6 +424,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -429,7 +438,7 @@ func TestLookupWithLookupFailure(t *testing.T) {

resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand Down Expand Up @@ -509,6 +518,7 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

kvs := make([]*pb.KeyValue, 0)
ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{
t: t,

Expand All @@ -525,7 +535,7 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
Response: pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
},
},
}, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
}, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer))

metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
Expand All @@ -539,6 +549,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

kvs := make([]*pb.KeyValue, 0)
ls := NewLookupService(&mockedLookupRPCClient{
t: t,

Expand All @@ -548,6 +559,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
Topic: proto.String("my-topic"),
AdvertisedListenerName: proto.String(""),
Authoritative: proto.Bool(false),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -558,7 +570,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
}, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer))

lr, err := ls.Lookup("my-topic")
Expand Down
Loading
Loading