Skip to content

Commit 08df705

Browse files
committed
ToPath resolve locally
1 parent 20d1e45 commit 08df705

File tree

1 file changed

+45
-10
lines changed

1 file changed

+45
-10
lines changed

pkg/tree/tree_schema_cache_client.go

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import (
1010
sdcpb "github.com/sdcio/sdc-protos/sdcpb"
1111
)
1212

13+
const (
14+
PATHSEP = "/"
15+
)
16+
1317
type TreeSchemaCacheClient interface {
1418
// CACHE based Functions
1519
// ReadIntended retrieves the highes priority value from the intended store
@@ -47,30 +51,61 @@ func (c *TreeSchemaCacheClientImpl) Read(ctx context.Context, opts *cache.Opts,
4751
}
4852

4953
func (c *TreeSchemaCacheClientImpl) ToPath(ctx context.Context, path []string) (*sdcpb.Path, error) {
50-
return c.scb.ToPath(ctx, path)
54+
var err error
55+
56+
keylessPathSlice := []string{}
57+
58+
p := &sdcpb.Path{}
59+
for i := 0; i < len(path); i++ {
60+
p.Elem = append(p.Elem, &sdcpb.PathElem{Name: path[i]})
61+
keylessPathSlice = append(keylessPathSlice, path[i])
62+
schema, exists := c.schemaIndex[strings.Join(keylessPathSlice, PATHSEP)]
63+
if !exists {
64+
schema, err = c.retrieveSchema(ctx, p)
65+
if err != nil {
66+
return nil, err
67+
}
68+
}
69+
70+
if schemaKeys := schema.GetSchema().GetContainer().GetKeys(); schemaKeys != nil {
71+
i += len(schemaKeys)
72+
}
73+
}
74+
75+
return p, nil
76+
}
77+
78+
func (c *TreeSchemaCacheClientImpl) retrieveSchema(ctx context.Context, p *sdcpb.Path) (*sdcpb.GetSchemaResponse, error) {
79+
// if schema wasn't found in index, go and fetch it
80+
schemaRsp, err := c.scb.GetSchema(ctx, p)
81+
if err != nil {
82+
return nil, err
83+
}
84+
85+
// convert the path into a keyless path, for schema index lookups.
86+
keylessPathSlice := utils.ToStrings(p, false, true)
87+
keylessPath := strings.Join(keylessPathSlice, PATHSEP)
88+
89+
c.schemaIndex[keylessPath] = schemaRsp
90+
91+
return schemaRsp, nil
5192
}
5293

5394
func (c *TreeSchemaCacheClientImpl) GetSchema(ctx context.Context, path []string) (*sdcpb.GetSchemaResponse, error) {
5495
// convert the []string path into sdcpb.path for schema retrieval
55-
sdcpbPath, err := c.scb.ToPath(ctx, path)
96+
sdcpbPath, err := c.ToPath(ctx, path)
5697
if err != nil {
5798
return nil, err
5899
}
59100

60101
// convert the path into a keyless path, for schema index lookups.
61102
keylessPathSlice := utils.ToStrings(sdcpbPath, false, true)
62-
keylessPath := strings.Join(keylessPathSlice, "/")
103+
keylessPath := strings.Join(keylessPathSlice, PATHSEP)
63104

64105
// lookup schema in schemaindex, preventing consecutive gets from the schema server
65106
if v, exists := c.schemaIndex[keylessPath]; exists {
66107
return v, nil
67108
}
68109

69-
// if schema wasn't found in index, go and fetch it
70-
schemaRsp, err := c.scb.GetSchema(ctx, sdcpbPath)
71-
if err != nil {
72-
return nil, err
73-
}
74-
75-
return schemaRsp, nil
110+
return c.retrieveSchema(ctx, sdcpbPath)
76111
}

0 commit comments

Comments
 (0)