Skip to content

Commit 0895e43

Browse files
committed
fix: Update the serialization of EntityKey to version 3 in Go Feature server.
Signed-off-by: Shuchu Han <[email protected]>
1 parent 65b5eac commit 0895e43

File tree

3 files changed

+103
-34
lines changed

3 files changed

+103
-34
lines changed

go/internal/feast/onlinestore/onlinestore.go

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,41 +87,63 @@ func serializeEntityKey(entityKey *types.EntityKey, entityKeySerializationVersio
8787
}
8888

8989
keys := make([]string, 0, len(m))
90-
for k := range entityKey.JoinKeys {
91-
keys = append(keys, entityKey.JoinKeys[k])
92-
}
93-
sort.Strings(keys)
90+
keys = append(keys, entityKey.JoinKeys...)
91+
sort.Strings(keys) // Sort the keysgiut
9492

9593
// Build the key
96-
length := 5 * len(keys)
94+
length := 7 * len(keys)
9795
bufferList := make([][]byte, length)
96+
offset := 0
97+
98+
// For entityKeySerializationVersion 3 and above, we add the number of join keys
99+
// as the first 4 bytes of the serialized key.
100+
if entityKeySerializationVersion >= 3 {
101+
byteBuffer := make([]byte, 4)
102+
binary.LittleEndian.PutUint32(byteBuffer, uint32(len(keys)))
103+
bufferList[offset] = byteBuffer // First buffer is always the length of the keys
104+
offset++
105+
}
98106

99107
for i := 0; i < len(keys); i++ {
100-
offset := i * 2
108+
// Add the key type STRING info
101109
byteBuffer := make([]byte, 4)
102110
binary.LittleEndian.PutUint32(byteBuffer, uint32(types.ValueType_Enum_value["STRING"]))
103111
bufferList[offset] = byteBuffer
104-
bufferList[offset+1] = []byte(keys[i])
112+
offset++
113+
114+
// Add the size of current "key" string
115+
keyLenByteBuffer := make([]byte, 4)
116+
binary.LittleEndian.PutUint32(keyLenByteBuffer, uint32(len(keys[i])))
117+
bufferList[offset] = keyLenByteBuffer
118+
offset++
119+
120+
// Add value
121+
bufferList[offset] = []byte(keys[i])
122+
offset++
105123
}
106124

107125
for i := 0; i < len(keys); i++ {
108-
offset := (2 * len(keys)) + (i * 3)
109126
value := m[keys[i]].GetVal()
110127

111128
valueBytes, valueTypeBytes, err := serializeValue(value, entityKeySerializationVersion)
112129
if err != nil {
113130
return valueBytes, err
114131
}
115132

133+
// Add value type info
116134
typeBuffer := make([]byte, 4)
117135
binary.LittleEndian.PutUint32(typeBuffer, uint32(valueTypeBytes))
136+
bufferList[offset] = typeBuffer
137+
offset++
118138

139+
// Add length info
119140
lenBuffer := make([]byte, 4)
120141
binary.LittleEndian.PutUint32(lenBuffer, uint32(len(*valueBytes)))
142+
bufferList[offset] = lenBuffer
143+
offset++
121144

122-
bufferList[offset+0] = typeBuffer
123-
bufferList[offset+1] = lenBuffer
124-
bufferList[offset+2] = *valueBytes
145+
bufferList[offset] = *valueBytes
146+
offset++
125147
}
126148

127149
// Convert from an array of byte arrays to a single byte array
@@ -132,3 +154,27 @@ func serializeEntityKey(entityKey *types.EntityKey, entityKeySerializationVersio
132154

133155
return &entityKeyBuffer, nil
134156
}
157+
158+
159+
func serializeValue(value interface{}, entityKeySerializationVersion int64) (*[]byte, types.ValueType_Enum, error) {
160+
// TODO: Implement support for other types (at least the major types like ints, strings, bytes)
161+
switch x := (value).(type) {
162+
case *types.Value_StringVal:
163+
valueString := []byte(x.StringVal)
164+
return &valueString, types.ValueType_STRING, nil
165+
case *types.Value_BytesVal:
166+
return &x.BytesVal, types.ValueType_BYTES, nil
167+
case *types.Value_Int32Val:
168+
valueBuffer := make([]byte, 4)
169+
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int32Val))
170+
return &valueBuffer, types.ValueType_INT32, nil
171+
case *types.Value_Int64Val:
172+
valueBuffer := make([]byte, 8)
173+
binary.LittleEndian.PutUint64(valueBuffer, uint64(x.Int64Val))
174+
return &valueBuffer, types.ValueType_INT64, nil
175+
case nil:
176+
return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x)
177+
default:
178+
return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x)
179+
}
180+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package onlinestore
2+
3+
import (
4+
"testing"
5+
"reflect"
6+
"github.com/stretchr/testify/assert"
7+
"github.com/feast-dev/feast/go/protos/feast/types"
8+
)
9+
10+
func Test_serializeEntityKey(t *testing.T) {
11+
expect_res := []byte{1, 0, 0, 0, 2, 0, 0, 0, 9, 0, 0, 0, 100, 114, 105, 118, 101, 114, 95, 105, 100, 4, 0, 0, 0, 8, 0, 0, 0, 233, 3, 0, 0, 0, 0, 0, 0}
12+
tests := []struct {
13+
name string // description of this test case
14+
// Named input parameters for target function.
15+
entityKey *types.EntityKey
16+
entityKeySerializationVersion int64
17+
want []byte
18+
wantErr bool
19+
}{
20+
{
21+
"test a specific key",
22+
&types.EntityKey{
23+
JoinKeys: []string{"driver_id"},
24+
EntityValues: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1001}}},
25+
},
26+
3,
27+
expect_res,
28+
false,
29+
},
30+
}
31+
for _, tt := range tests {
32+
t.Run(tt.name, func(t *testing.T) {
33+
got, gotErr := serializeEntityKey(tt.entityKey, tt.entityKeySerializationVersion)
34+
if gotErr != nil {
35+
if !tt.wantErr {
36+
t.Errorf("serializeEntityKey() failed: %v", gotErr)
37+
}
38+
return
39+
}
40+
if tt.wantErr {
41+
t.Fatal("serializeEntityKey() succeeded unexpectedly")
42+
}
43+
assert.True(t, reflect.DeepEqual(*got, tt.want))
44+
})
45+
}
46+
}

go/internal/feast/onlinestore/redisonlinestore.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -338,26 +338,3 @@ func buildRedisKey(project string, entityKey *types.EntityKey, entityKeySerializ
338338
fullKey := append(*serKey, []byte(project)...)
339339
return &fullKey, nil
340340
}
341-
342-
func serializeValue(value interface{}, entityKeySerializationVersion int64) (*[]byte, types.ValueType_Enum, error) {
343-
// TODO: Implement support for other types (at least the major types like ints, strings, bytes)
344-
switch x := (value).(type) {
345-
case *types.Value_StringVal:
346-
valueString := []byte(x.StringVal)
347-
return &valueString, types.ValueType_STRING, nil
348-
case *types.Value_BytesVal:
349-
return &x.BytesVal, types.ValueType_BYTES, nil
350-
case *types.Value_Int32Val:
351-
valueBuffer := make([]byte, 4)
352-
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int32Val))
353-
return &valueBuffer, types.ValueType_INT32, nil
354-
case *types.Value_Int64Val:
355-
valueBuffer := make([]byte, 8)
356-
binary.LittleEndian.PutUint64(valueBuffer, uint64(x.Int64Val))
357-
return &valueBuffer, types.ValueType_INT64, nil
358-
case nil:
359-
return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x)
360-
default:
361-
return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x)
362-
}
363-
}

0 commit comments

Comments
 (0)