Skip to content

Commit 12b966d

Browse files
authored
[improve] Support http lookup getSchema interface (#1368)
Master Issue: https://github.com/apache/pulsar/wiki/PIP-43%3A-producer-send-message-with-different-schema#changespart-1 Related pr #611 ### Motivation Currently pulsar go sdk has supported multi-version schema in above pr, but the pr does not support `getSchema()` method with http lookup service. So that we will encounter error when we call `msg.GetSchemaValue(v interface{}) error` function with http serviceUrl. Demo below: ``` func createClient() Client { // create client //lookupURL := "pulsar://localhost:6650" lookupURL := "http://localhost:8080" // change to http protocol serviceUrl client, err := NewClient(ClientOptions{ URL: lookupURL, }) if err != nil { log.Fatal(err) } return client } func TestBytesSchema(t *testing.T) { client := createClient() defer client.Close() topic := newTopicName() properties := make(map[string]string) properties["pulsar"] = "hello" producerSchemaBytes := NewBytesSchema(properties) producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, Schema: producerSchemaBytes, }) assert.NoError(t, err) _, err = producer.Send(context.Background(), &ProducerMessage{ Value: []byte(`{"key": "value"}`), }) require.NoError(t, err) producer.Close() // Create consumer consumerSchemaBytes := NewBytesSchema(nil) assert.NotNil(t, consumerSchemaBytes) consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: "sub-1", Schema: consumerSchemaBytes, SubscriptionInitialPosition: SubscriptionPositionEarliest, }) assert.Nil(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() // Receive first message var out1 []byte msg1, err := consumer.Receive(ctx) assert.NoError(t, err) err = msg1.GetSchemaValue(&out1) assert.NoError(t, err) assert.Equal(t, []byte(`{"key": "value"}`), out1) consumer.Ack(msg1) require.NoError(t, err) } ``` Test output: ``` /root/go/pkg/mod/golang.org/[email protected]/bin/go tool test2json -t /root/.cache/JetBrains/GoLand2024.1/tmp/GoLand/___TestBytesSchema_in_github_com_apache_pulsar_client_go_pulsar.test -test.v -test.paniconexit0 -test.run ^\QTestBytesSchema\E$ === RUN TestBytesSchema time="2025-05-14T16:32:45+08:00" level=info msg="Connecting to broker" remote_addr="pulsar://localhost:6650" time="2025-05-14T16:32:45+08:00" level=info msg="TCP connection established" local_addr="127.0.0.1:36638" remote_addr="pulsar://localhost:6650" time="2025-05-14T16:32:45+08:00" level=info msg="Connection is ready" local_addr="127.0.0.1:36638" remote_addr="pulsar://localhost:6650" time="2025-05-14T16:32:45+08:00" level=info msg="Connected producer" cnx="127.0.0.1:36638 -> 127.0.0.1:6650" epoch=0 topic="persistent://public/default/my-topic-147368803" time="2025-05-14T16:32:45+08:00" level=info msg="Created producer" cnx="127.0.0.1:36638 -> 127.0.0.1:6650" producerID=1 producer_name=standalone-42-1 topic="persistent://public/default/my-topic-147368803" time="2025-05-14T16:32:45+08:00" level=info msg="Closing producer" producerID=1 producer_name=standalone-42-1 topic="persistent://public/default/my-topic-147368803" time="2025-05-14T16:32:45+08:00" level=info msg="Closed producer" producerID=1 producer_name=standalone-42-1 topic="persistent://public/default/my-topic-147368803" time="2025-05-14T16:32:45+08:00" level=info msg="Connected consumer" consumerID=1 name=yzway subscription=sub-1 topic="persistent://public/default/my-topic-147368803" time="2025-05-14T16:32:45+08:00" level=info msg="Created consumer" consumerID=1 name=yzway subscription=sub-1 topic="persistent://public/default/my-topic-147368803" schema_test.go:101: Error Trace: /data/code/dev/pulsar-client-go/pulsar/schema_test.go:101 Error: Received unexpected error: GetSchema is not supported by httpLookupService Test: TestBytesSchema schema_test.go:102: Error Trace: /data/code/dev/pulsar-client-go/pulsar/schema_test.go:102 Error: Not equal: expected: []byte{0x7b, 0x22, 0x6b, 0x65, 0x79, 0x22, 0x3a, 0x20, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x7d} actual : []byte(nil) Diff: --- Expected +++ Actual @@ -1,4 +1,2 @@ -([]uint8) (len=16) { - 00000000 7b 22 6b 65 79 22 3a 20 22 76 61 6c 75 65 22 7d |{"key": "value"}| -} +([]uint8) <nil> Test: TestBytesSchema schema_test.go:104: Error Trace: /data/code/dev/pulsar-client-go/pulsar/schema_test.go:104 Error: Received unexpected error: GetSchema is not supported by httpLookupService Test: TestBytesSchema time="2025-05-14T16:32:45+08:00" level=info msg="Closing consumer=1" consumerID=1 name=yzway subscription=sub-1 topic="persistent://public/default/my-topic-147368803" time="2025-05-14T16:32:45+08:00" level=info msg="Closed consumer" consumerID=1 name=yzway subscription=sub-1 topic="persistent://public/default/my-topic-147368803" --- FAIL: TestBytesSchema (0.12s) Expected :[]byte{0x7b, 0x22, 0x6b, 0x65, 0x79, 0x22, 0x3a, 0x20, 0x22, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x7d} Actual :[]byte(nil) ``` ### Modifications - Update LookupService interface return type from `GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error)` to `GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error)` to support http lookup protocol in `lookup_service.go` - Support HTTPLookupService `GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error)` function in `lookup_service.go` - Add http lookup `GetSchema()` related test cases in schema_test.go
1 parent aaadde5 commit 12b966d

File tree

5 files changed

+270
-39
lines changed

5 files changed

+270
-39
lines changed

pulsar/consumer_partition.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -318,22 +318,22 @@ func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
318318
return schema, nil
319319
}
320320

321-
pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
321+
// cache missed, try to use lookupService to find schema info
322+
lookupSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
322323
if err != nil {
323324
return nil, err
324325
}
325-
326-
if pbSchema == nil {
327-
err = fmt.Errorf("schema not found for topic: [ %v ], schema version : [ %v ]", s.topic, schemaVersion)
328-
return nil, err
329-
}
330-
331-
var properties = internal.ConvertToStringMap(pbSchema.Properties)
332-
333-
schema, err = NewSchema(SchemaType(*pbSchema.Type), pbSchema.SchemaData, properties)
326+
schema, err = NewSchema(
327+
// lookupSchema.SchemaType is internal package SchemaType type,
328+
// we need to cast it to pulsar.SchemaType as soon as we use it in current pulsar package
329+
SchemaType(lookupSchema.SchemaType),
330+
lookupSchema.Data,
331+
lookupSchema.Properties,
332+
)
334333
if err != nil {
335334
return nil, err
336335
}
336+
337337
s.add(key, schema)
338338
return schema, nil
339339
}

pulsar/internal/lookup_service.go

Lines changed: 69 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
package internal
1919

2020
import (
21+
"encoding/binary"
2122
"errors"
2223
"fmt"
2324
"net/url"
25+
"strings"
2426

2527
"google.golang.org/protobuf/proto"
2628

@@ -34,6 +36,13 @@ type LookupResult struct {
3436
PhysicalAddr *url.URL
3537
}
3638

39+
// LookupSchema return lookup schema result
40+
type LookupSchema struct {
41+
SchemaType SchemaType
42+
Data []byte
43+
Properties map[string]string
44+
}
45+
3746
// GetTopicsOfNamespaceMode for CommandGetTopicsOfNamespace_Mode
3847
type GetTopicsOfNamespaceMode string
3948

@@ -62,7 +71,7 @@ type LookupService interface {
6271
GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error)
6372

6473
// GetSchema returns schema for a given version.
65-
GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error)
74+
GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error)
6675

6776
GetBrokerAddress(brokerServiceURL string, proxyThroughServiceURL bool) (*LookupResult, error)
6877

@@ -97,7 +106,7 @@ func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResol
97106
}
98107
}
99108

100-
func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) {
109+
func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error) {
101110
id := ls.rpcClient.NewRequestID()
102111
req := &pb.CommandGetSchema{
103112
RequestId: proto.Uint64(id),
@@ -106,12 +115,23 @@ func (ls *lookupService) GetSchema(topic string, schemaVersion []byte) (schema *
106115
}
107116
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_GET_SCHEMA, req)
108117
if err != nil {
109-
return nil, err
118+
return &LookupSchema{}, err
110119
}
111120
if res.Response.Error != nil {
112-
return nil, errors.New(res.Response.GetError().String())
121+
return &LookupSchema{}, errors.New(res.Response.GetError().String())
122+
}
123+
124+
// deserialize pbSchema and convert it to LookupSchema struct
125+
pbSchema := res.Response.GetSchemaResponse.Schema
126+
if pbSchema == nil {
127+
err = fmt.Errorf("schema not found for topic: [ %v ], schema version : [ %v ]", topic, schemaVersion)
128+
return &LookupSchema{}, err
113129
}
114-
return res.Response.GetSchemaResponse.Schema, nil
130+
return &LookupSchema{
131+
SchemaType: SchemaType(int(*pbSchema.Type)),
132+
Data: pbSchema.SchemaData,
133+
Properties: ConvertToStringMap(pbSchema.Properties),
134+
}, nil
115135
}
116136

117137
func (ls *lookupService) GetBrokerAddress(brokerServiceURL string, proxyThroughServiceURL bool) (*LookupResult, error) {
@@ -273,6 +293,8 @@ const HTTPAdminServiceV1Format string = "/admin/%s/partitions"
273293
const HTTPAdminServiceV2Format string = "/admin/v2/%s/partitions"
274294
const HTTPTopicUnderNamespaceV1 string = "/admin/namespaces/%s/destinations?mode=%s"
275295
const HTTPTopicUnderNamespaceV2 string = "/admin/v2/namespaces/%s/topics?mode=%s"
296+
const HTTPSchemaV2 string = "/admin/v2/schemas/%s/schema"
297+
const HTTPSchemaWithVersionV2 string = "/admin/v2/schemas/%s/schema/%d"
276298

277299
type httpLookupData struct {
278300
BrokerURL string `json:"brokerUrl"`
@@ -289,6 +311,12 @@ type httpLookupService struct {
289311
metrics *Metrics
290312
}
291313

314+
type httpLookupSchema struct {
315+
HTTPSchemaType string `json:"type"`
316+
Data string `json:"data"`
317+
Properties map[string]string `json:"properties"`
318+
}
319+
292320
func (h *httpLookupService) GetBrokerAddress(brokerServiceURL string, _ bool) (*LookupResult, error) {
293321
logicalAddress, err := url.ParseRequestURI(brokerServiceURL)
294322
if err != nil {
@@ -371,8 +399,42 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace string, mode GetTopic
371399
return topics, nil
372400
}
373401

374-
func (h *httpLookupService) GetSchema(_ string, _ []byte) (schema *pb.Schema, err error) {
375-
return nil, errors.New("GetSchema is not supported by httpLookupService")
402+
func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error) {
403+
topicName, err := ParseTopicName(topic)
404+
if err != nil {
405+
return nil, err
406+
}
407+
topicRestPath := fmt.Sprintf("%s/%s", topicName.Namespace, topicName.Topic)
408+
var path string
409+
if schemaVersion != nil {
410+
path = fmt.Sprintf(HTTPSchemaWithVersionV2, topicRestPath, int64(binary.BigEndian.Uint64(schemaVersion)))
411+
} else {
412+
path = fmt.Sprintf(HTTPSchemaV2, topicRestPath)
413+
}
414+
lookupSchema := &httpLookupSchema{}
415+
if err := h.httpClient.Get(path, &lookupSchema, nil); err != nil {
416+
if strings.HasPrefix(err.Error(), "Code: 404") {
417+
err = fmt.Errorf("schema not found for topic: [ %v ], schema version : [ %v ]", topic, schemaVersion)
418+
}
419+
h.log.Errorf("schema [ %v ] request error, schema version : [ %v ]", topic, schemaVersion)
420+
return &LookupSchema{}, err
421+
}
422+
423+
// deserialize httpSchema and convert it to LookupSchema struct
424+
schemaType, exists := HTTPSchemaTypeMap[strings.ToUpper(lookupSchema.HTTPSchemaType)]
425+
if !exists {
426+
err = fmt.Errorf("unsupported schema type [%s] for topic: [ %v ], schema version : [ %v ]",
427+
lookupSchema.HTTPSchemaType,
428+
topic,
429+
schemaVersion,
430+
)
431+
return nil, err
432+
}
433+
return &LookupSchema{
434+
SchemaType: schemaType,
435+
Data: []byte(lookupSchema.Data),
436+
Properties: lookupSchema.Properties,
437+
}, nil
376438
}
377439

378440
func (h *httpLookupService) ServiceNameResolver() *ServiceNameResolver {

pulsar/internal/schema.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package internal
19+
20+
// SchemaType We need to define a SchemaType in this internal package, to avoid directly importing pulsar.SchemaType.
21+
// In case we might encounter importing cycle problem.
22+
type SchemaType int
23+
24+
const (
25+
NONE SchemaType = iota //No schema defined
26+
STRING //Simple String encoding with UTF-8
27+
JSON //JSON object encoding and validation
28+
PROTOBUF //Protobuf message encoding and decoding
29+
AVRO //Serialize and deserialize via Avro
30+
BOOLEAN //
31+
INT8 //A 8-byte integer.
32+
INT16 //A 16-byte integer.
33+
INT32 //A 32-byte integer.
34+
INT64 //A 64-byte integer.
35+
FLOAT //A float number.
36+
DOUBLE //A double number
37+
_ //
38+
_ //
39+
_ //
40+
KeyValue //A Schema that contains Key Schema and Value Schema.
41+
BYTES = 0 //A bytes array.
42+
AUTO = -2 //
43+
AutoConsume = -3 //Auto Consume Type.
44+
AutoPublish = -4 //Auto Publish Type.
45+
ProtoNative = 20 //Protobuf native message encoding and decoding
46+
)
47+
48+
var HTTPSchemaTypeMap = map[string]SchemaType{
49+
"NONE": BYTES,
50+
"STRING": STRING,
51+
"JSON": JSON,
52+
"PROTOBUF": PROTOBUF,
53+
"AVRO": AVRO,
54+
"BOOLEAN": BOOLEAN,
55+
"INT8": INT8,
56+
"INT16": INT16,
57+
"INT32": INT32,
58+
"INT64": INT64,
59+
"FLOAT": FLOAT,
60+
"DOUBLE": DOUBLE,
61+
"KEYVALUE": KeyValue,
62+
"BYTES": BYTES,
63+
"AUTO": AUTO,
64+
"AUTOCONSUME": AutoConsume,
65+
"AUTOPUBLISH": AutoPublish,
66+
"PROTOBUF_NATIVE": ProtoNative,
67+
}

pulsar/schema.go

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"sync"
2727
"unsafe"
2828

29+
"github.com/apache/pulsar-client-go/pulsar/internal"
2930
log "github.com/sirupsen/logrus"
3031

3132
"github.com/hamba/avro/v2"
@@ -35,30 +36,27 @@ import (
3536
"google.golang.org/protobuf/types/descriptorpb"
3637
)
3738

38-
type SchemaType int
39+
type SchemaType internal.SchemaType
3940

4041
const (
41-
NONE SchemaType = iota //No schema defined
42-
STRING //Simple String encoding with UTF-8
43-
JSON //JSON object encoding and validation
44-
PROTOBUF //Protobuf message encoding and decoding
45-
AVRO //Serialize and deserialize via Avro
46-
BOOLEAN //
47-
INT8 //A 8-byte integer.
48-
INT16 //A 16-byte integer.
49-
INT32 //A 32-byte integer.
50-
INT64 //A 64-byte integer.
51-
FLOAT //A float number.
52-
DOUBLE //A double number
53-
_ //
54-
_ //
55-
_ //
56-
KeyValue //A Schema that contains Key Schema and Value Schema.
57-
BYTES = 0 //A bytes array.
58-
AUTO = -2 //
59-
AutoConsume = -3 //Auto Consume Type.
60-
AutoPublish = -4 // Auto Publish Type.
61-
ProtoNative = 20 //Protobuf native message encoding and decoding
42+
NONE = SchemaType(internal.NONE)
43+
STRING = SchemaType(internal.STRING)
44+
JSON = SchemaType(internal.JSON)
45+
PROTOBUF = SchemaType(internal.PROTOBUF)
46+
AVRO = SchemaType(internal.AVRO)
47+
BOOLEAN = SchemaType(internal.BOOLEAN)
48+
INT8 = SchemaType(internal.INT8)
49+
INT16 = SchemaType(internal.INT16)
50+
INT32 = SchemaType(internal.INT32)
51+
INT64 = SchemaType(internal.INT64)
52+
FLOAT = SchemaType(internal.FLOAT)
53+
DOUBLE = SchemaType(internal.DOUBLE)
54+
KeyValue = SchemaType(internal.KeyValue)
55+
BYTES = SchemaType(internal.BYTES)
56+
AUTO = SchemaType(internal.AUTO)
57+
AutoConsume = SchemaType(internal.AutoConsume)
58+
AutoPublish = SchemaType(internal.AutoPublish)
59+
ProtoNative = SchemaType(internal.ProtoNative)
6260
)
6361

6462
// Encapsulates data around the schema definition

0 commit comments

Comments
 (0)