Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -713,20 +713,25 @@ compile-protos-go: install-go-proto-dependencies ## Compile Go protobuf files
--go-grpc_out=$(ROOT_DIR)/go/protos \
--go-grpc_opt=module=github.com/feast-dev/feast/go/protos $(ROOT_DIR)/protos/feast/$(folder)/*.proto; ) true

#install-go-ci-dependencies:
# go install golang.org/x/tools/cmd/goimports
# python -m pip install "pybindgen==0.22.1" "grpcio-tools>=1.56.2,<2" "mypy-protobuf>=3.1"
install-go-ci-dependencies:
go install golang.org/x/tools/cmd/goimports
uv pip install "pybindgen==0.22.1" "grpcio-tools>=1.56.2,<2" "mypy-protobuf>=3.1"

.PHONY: build-go
build-go: compile-protos-go ## Build Go code
go build -o feast ./go/main.go

.PHONY: install-feast-ci-locally
install-feast-ci-locally: ## Install Feast CI dependencies locally
uv pip install -e ".[ci]"

## Assume the uv will create an .venv folder for itself.
## The unit test funcions will call the Python "feast" command to initialze a feast repo.
.PHONY: install-feast-locally
install-feast-locally: ## Install Feast locally
uv pip install -e "."
@export PATH=$(ROOT_DIR)/.venv/bin:$$PATH
@echo $$PATH

.PHONY: test-go
test-go: compile-protos-go install-feast-ci-locally compile-protos-python ## Run Go tests
test-go: compile-protos-python compile-protos-go install-go-ci-dependencies install-feast-locally ## Run Go tests
CGO_ENABLED=1 go test -coverprofile=coverage.out ./... && go tool cover -html=coverage.out -o coverage.html

.PHONY: format-go
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/feast-dev/feast

go 1.22.0
go 1.23

toolchain go1.22.5
toolchain go1.23.12

require (
github.com/apache/arrow/go/v17 v17.0.0
Expand Down
2 changes: 1 addition & 1 deletion go/internal/feast/onlinestore/dynamodbonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*type

// process null imputation for entity ids that don't exist in dynamodb
currentTime := timestamppb.Now() // TODO: should use a different timestamp?
for entityId, _ := range unprocessedEntityIdsFeatureView {
for entityId := range unprocessedEntityIdsFeatureView {
entityIndex := entityIndexMap[entityId]
for _, featureName := range featureNames {
featureIndex := featureNamesIndex[featureName]
Expand Down
67 changes: 56 additions & 11 deletions go/internal/feast/onlinestore/onlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,41 +87,63 @@ func serializeEntityKey(entityKey *types.EntityKey, entityKeySerializationVersio
}

keys := make([]string, 0, len(m))
for k := range entityKey.JoinKeys {
keys = append(keys, entityKey.JoinKeys[k])
}
sort.Strings(keys)
keys = append(keys, entityKey.JoinKeys...)
sort.Strings(keys) // Sort the keys

// Build the key
length := 5 * len(keys)
length := 7 * len(keys)
bufferList := make([][]byte, length)
offset := 0

// For entityKeySerializationVersion 3 and above, we add the number of join keys
// as the first 4 bytes of the serialized key.
if entityKeySerializationVersion >= 3 {
byteBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(byteBuffer, uint32(len(keys)))
bufferList[offset] = byteBuffer // First buffer is always the length of the keys
offset++
}

for i := 0; i < len(keys); i++ {
offset := i * 2
// Add the key type STRING info
byteBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(byteBuffer, uint32(types.ValueType_Enum_value["STRING"]))
bufferList[offset] = byteBuffer
bufferList[offset+1] = []byte(keys[i])
offset++

// Add the size of current "key" string
keyLenByteBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(keyLenByteBuffer, uint32(len(keys[i])))
bufferList[offset] = keyLenByteBuffer
offset++

// Add value
bufferList[offset] = []byte(keys[i])
offset++
}

for i := 0; i < len(keys); i++ {
offset := (2 * len(keys)) + (i * 3)
value := m[keys[i]].GetVal()

valueBytes, valueTypeBytes, err := serializeValue(value, entityKeySerializationVersion)
if err != nil {
return valueBytes, err
}

// Add value type info
typeBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(typeBuffer, uint32(valueTypeBytes))
bufferList[offset] = typeBuffer
offset++

// Add length info
lenBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(lenBuffer, uint32(len(*valueBytes)))
bufferList[offset] = lenBuffer
offset++

bufferList[offset+0] = typeBuffer
bufferList[offset+1] = lenBuffer
bufferList[offset+2] = *valueBytes
bufferList[offset] = *valueBytes
offset++
}

// Convert from an array of byte arrays to a single byte array
Expand All @@ -132,3 +154,26 @@ func serializeEntityKey(entityKey *types.EntityKey, entityKeySerializationVersio

return &entityKeyBuffer, nil
}

func serializeValue(value interface{}, entityKeySerializationVersion int64) (*[]byte, types.ValueType_Enum, error) {
// TODO: Implement support for other types (at least the major types like ints, strings, bytes)
switch x := (value).(type) {
case *types.Value_StringVal:
valueString := []byte(x.StringVal)
return &valueString, types.ValueType_STRING, nil
case *types.Value_BytesVal:
return &x.BytesVal, types.ValueType_BYTES, nil
case *types.Value_Int32Val:
valueBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int32Val))
return &valueBuffer, types.ValueType_INT32, nil
case *types.Value_Int64Val:
valueBuffer := make([]byte, 8)
binary.LittleEndian.PutUint64(valueBuffer, uint64(x.Int64Val))
return &valueBuffer, types.ValueType_INT64, nil
case nil:
return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x)
default:
return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x)
}
}
46 changes: 46 additions & 0 deletions go/internal/feast/onlinestore/onlinestore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package onlinestore

import (
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/stretchr/testify/assert"
"reflect"
"testing"
)

func Test_serializeEntityKey(t *testing.T) {
expect_res := []byte{1, 0, 0, 0, 2, 0, 0, 0, 9, 0, 0, 0, 100, 114, 105, 118, 101, 114, 95, 105, 100, 4, 0, 0, 0, 8, 0, 0, 0, 233, 3, 0, 0, 0, 0, 0, 0}
tests := []struct {
name string // description of this test case
// Named input parameters for target function.
entityKey *types.EntityKey
entityKeySerializationVersion int64
want []byte
wantErr bool
}{
{
"test a specific key",
&types.EntityKey{
JoinKeys: []string{"driver_id"},
EntityValues: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1001}}},
},
3,
expect_res,
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, gotErr := serializeEntityKey(tt.entityKey, tt.entityKeySerializationVersion)
if gotErr != nil {
if !tt.wantErr {
t.Errorf("serializeEntityKey() failed: %v", gotErr)
}
return
}
if tt.wantErr {
t.Fatal("serializeEntityKey() succeeded unexpectedly")
}
assert.True(t, reflect.DeepEqual(*got, tt.want))
})
}
}
23 changes: 0 additions & 23 deletions go/internal/feast/onlinestore/redisonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,26 +338,3 @@ func buildRedisKey(project string, entityKey *types.EntityKey, entityKeySerializ
fullKey := append(*serKey, []byte(project)...)
return &fullKey, nil
}

func serializeValue(value interface{}, entityKeySerializationVersion int64) (*[]byte, types.ValueType_Enum, error) {
// TODO: Implement support for other types (at least the major types like ints, strings, bytes)
switch x := (value).(type) {
case *types.Value_StringVal:
valueString := []byte(x.StringVal)
return &valueString, types.ValueType_STRING, nil
case *types.Value_BytesVal:
return &x.BytesVal, types.ValueType_BYTES, nil
case *types.Value_Int32Val:
valueBuffer := make([]byte, 4)
binary.LittleEndian.PutUint32(valueBuffer, uint32(x.Int32Val))
return &valueBuffer, types.ValueType_INT32, nil
case *types.Value_Int64Val:
valueBuffer := make([]byte, 8)
binary.LittleEndian.PutUint64(valueBuffer, uint64(x.Int64Val))
return &valueBuffer, types.ValueType_INT64, nil
case nil:
return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x)
default:
return nil, types.ValueType_INVALID, fmt.Errorf("could not detect type for %v", x)
}
}
3 changes: 2 additions & 1 deletion go/internal/test/feature_repo/feature_store.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ project: feature_repo
registry: data/registry.db
provider: local
online_store:
path: data/online_store.db
path: data/online_store.db
entity_key_serialization_version: 3
Loading