Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@ toolchain go1.22.5

require (
github.com/apache/arrow/go/v17 v17.0.0
github.com/aws/aws-sdk-go-v2 v1.36.4
github.com/aws/aws-sdk-go-v2/config v1.29.14
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.43.3
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3
github.com/ghodss/yaml v1.0.0
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
github.com/mattn/go-sqlite3 v1.14.23
github.com/pkg/errors v0.9.1
github.com/redis/go-redis/v9 v9.6.1
github.com/roberson-io/mmh3 v0.0.0-20190729202758-fdfce3ba6225
github.com/rs/zerolog v1.33.0
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.9.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1
google.golang.org/grpc v1.67.0
google.golang.org/protobuf v1.34.2
)
Expand All @@ -23,20 +29,18 @@ require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/apache/thrift v0.21.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect
github.com/aws/aws-sdk-go-v2/config v1.29.14 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.16 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect
Expand Down Expand Up @@ -66,7 +70,6 @@ require (
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.25.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE=
github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw=
github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM=
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
github.com/aws/aws-sdk-go-v2 v1.36.4 h1:GySzjhVvx0ERP6eyfAbAuAXLtAda5TEy19E5q5W8I9E=
github.com/aws/aws-sdk-go-v2 v1.36.4/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14=
github.com/aws/aws-sdk-go-v2/config v1.29.14 h1:f+eEi/2cKCg9pqKBoAIwRGzVb70MRKqWX4dg1BDcSJM=
Expand All @@ -18,16 +20,24 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 h1:x793wxmUWVDhshP8WW2mln
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30/go.mod h1:Jpne2tDnYiFascUEs2AWHJL9Yp7A5ZVy3TNyxaAjD6M=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35 h1:o1v1VFfPcDVlK3ll1L5xHsaQAFdNtZ5GXnNR7SwueC4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.35/go.mod h1:rZUQNYMNG+8uZxz9FOerQJ+FceCiodXvixpeRtdESrU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.35 h1:R5b82ubO2NntENm3SAm0ADME+H630HomNJdgv+yZ3xw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.35/go.mod h1:FuA+nmgMRfkzVKYDNEqQadvEMxtxl9+RLT9ribCwEMs=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 h1:ZNTqv4nIdE/DiBfUUfXcLZ/Spcuz+RjeziUtNJackkM=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34/go.mod h1:zf7Vcd1ViW7cPqYWEHLHJkS50X0JS2IKz9Cgaj6ugrs=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.43.3 h1:2FCJAT5wyPs5JjAFoLgaEB0MIiWvXiJ0T6PZiKDkJoo=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.43.3/go.mod h1:rUOhTo9+gtTYTMnGD+xiiks/2Z8vssPP+uSMNhJBbmI=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 h1:4nm2G6A4pV9rdlWzGMPv4BNtQp22v1hg3yrtkYpeLl8=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1/go.mod h1:iu6FSzgt+M2/x3Dk8zhycdIcHjEFb36IS8HVUVFoMg0=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.16 h1:TLsOzHW9zlJoMgjcKQI/7bolyv/DL0796y4NigWgaw8=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.16/go.mod h1:mNoiR5qsO9TxXZ6psjjQ3M+Zz7hURFTumXHF+UKjyAU=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 h1:moLQUoVq91LiqT1nbvzDukyqAlCv89ZmwaHw/ZFlFZg=
Expand Down Expand Up @@ -94,6 +104,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/roberson-io/mmh3 v0.0.0-20190729202758-fdfce3ba6225 h1:ZMsPCp7oYgjoIFt1c+sM2qojxZXotSYcMF8Ur9/LJlM=
github.com/roberson-io/mmh3 v0.0.0-20190729202758-fdfce3ba6225/go.mod h1:XEESr+X1SY8ZSuc3jqsTlb3clCkqQJ4DcF3Qxv1N3PM=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
Expand Down
240 changes: 240 additions & 0 deletions go/internal/feast/onlinestore/dynamodbonlinestore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package onlinestore

import (
"context"
"encoding/hex"
"fmt"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
dtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/feast-dev/feast/go/internal/feast/registry"
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/roberson-io/mmh3"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"runtime"
"sync"
"time"
)

type batchResult struct {
index int
response *dynamodb.BatchGetItemOutput
err error
}

type DynamodbOnlineStore struct {
// Feast project name
// TODO: Should we remove project as state that is tracked at the store level?
project string

client *dynamodb.Client

config *registry.RepoConfig

// dynamodb configuration
consistentRead *bool
batchSize *int
}

func NewDynamodbOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*DynamodbOnlineStore, error) {
store := DynamodbOnlineStore{
project: project,
config: config,
}

// aws configuration
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cfg, err := awsConfig.LoadDefaultConfig(ctx)
if err != nil {
panic(err)
}
store.client = dynamodb.NewFromConfig(cfg)

// dynamodb configuration
consistentRead, ok := onlineStoreConfig["consistent_reads"].(bool)
if !ok {
consistentRead = false
}
store.consistentRead = &consistentRead

var batchSize int
if batchSizeFloat, ok := onlineStoreConfig["batch_size"].(float64); ok {
batchSize = int(batchSizeFloat)
} else {
batchSize = 40
}
store.batchSize = &batchSize

return &store, nil
}

func (d *DynamodbOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
// prevent resource waste in case context is canceled earlier
if ctx.Err() != nil {
return nil, ctx.Err()
}

results := make([][]FeatureData, len(entityKeys))

// serialize entity key into entity hash id
entityIndexMap := make(map[string]int)
entityIds := make([]string, 0, len(entityKeys))
unprocessedEntityIds := make(map[string]bool)
for i, entityKey := range entityKeys {
serKey, err := serializeEntityKey(entityKey, d.config.EntityKeySerializationVersion)
if err != nil {
return nil, err
}
entityId := hex.EncodeToString(mmh3.Hashx64_128(*serKey, 0))
entityIds = append(entityIds, entityId)
entityIndexMap[entityId] = i
unprocessedEntityIds[entityId] = false
}

// metadata from feature views, feature names
featureMap, featureNamesIndex, err := makeFeatureMeta(featureViewNames, featureNames)
if err != nil {
return nil, err
}

// initialize `FeatureData` slice
featureCount := len(featureNamesIndex)
for i := 0; i < len(results); i++ {
results[i] = make([]FeatureData, featureCount)
}

// controls the maximum number of concurrent goroutines sending requests to DynamoDB using a semaphore
cpuCount := runtime.NumCPU()
sem := semaphore.NewWeighted(int64(cpuCount * 2))

var mu sync.Mutex
for featureViewName, featureNames := range featureMap {
tableName := fmt.Sprintf("%s.%s", d.project, featureViewName)

var batchGetItemInputs []*dynamodb.BatchGetItemInput
batchSize := *d.batchSize
for i := 0; i < len(entityIds); i += batchSize {
end := i + batchSize
if end > len(entityIds) {
end = len(entityIds)
}
batchEntityIds := entityIds[i:end]
entityIdBatch := make([]map[string]dtypes.AttributeValue, len(batchEntityIds))
for i, entityId := range batchEntityIds {
entityIdBatch[i] = map[string]dtypes.AttributeValue{
"entity_id": &dtypes.AttributeValueMemberS{Value: entityId},
}
}
batchGetItemInput := &dynamodb.BatchGetItemInput{
RequestItems: map[string]dtypes.KeysAndAttributes{
tableName: {
Keys: entityIdBatch,
ConsistentRead: d.consistentRead,
},
},
}
batchGetItemInputs = append(batchGetItemInputs, batchGetItemInput)
}

// goroutines sending requests to DynamoDB
errGroup, ctx := errgroup.WithContext(ctx)
for i, batchGetItemInput := range batchGetItemInputs {
_, batchGetItemInput := i, batchGetItemInput
errGroup.Go(func() error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)

resp, err := d.client.BatchGetItem(ctx, batchGetItemInput)
if err != nil {
return err
}

// in case there is no entity id of a feature view in dynamodb
batchSize := len(resp.Responses[tableName])
if batchSize == 0 {
return nil
}

// process response from dynamodb
for j := 0; j < batchSize; j++ {
entityId := resp.Responses[tableName][j]["entity_id"].(*dtypes.AttributeValueMemberS).Value
timestampString := resp.Responses[tableName][j]["event_ts"].(*dtypes.AttributeValueMemberS).Value
t, err := time.Parse("2006-01-02 15:04:05-07:00", timestampString)
if err != nil {
return err
}
timeStamp := timestamppb.New(t)

featureValues := resp.Responses[tableName][j]["values"].(*dtypes.AttributeValueMemberM).Value
entityIndex := entityIndexMap[entityId]

for _, featureName := range featureNames {
featureValue := featureValues[featureName].(*dtypes.AttributeValueMemberB).Value
var value types.Value
if err := proto.Unmarshal(featureValue, &value); err != nil {
return err
}
featureIndex := featureNamesIndex[featureName]

mu.Lock()
results[entityIndex][featureIndex] = FeatureData{Reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: featureName},
Timestamp: timestamppb.Timestamp{Seconds: timeStamp.Seconds, Nanos: timeStamp.Nanos},
Value: types.Value{Val: value.Val},
}
mu.Unlock()
}

mu.Lock()
delete(unprocessedEntityIds, entityId)
mu.Unlock()
}
return nil
})
}
if err := errGroup.Wait(); err != nil {
return nil, err
}

// process null imputation for entity ids that don't exist in dynamodb
currentTime := timestamppb.Now() // TODO: should use a different timestamp?
for entityId, _ := range unprocessedEntityIds {
entityIndex := entityIndexMap[entityId]
for _, featureName := range featureNames {
featureIndex := featureNamesIndex[featureName]
results[entityIndex][featureIndex] = FeatureData{Reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: featureName},
Timestamp: timestamppb.Timestamp{Seconds: currentTime.Seconds, Nanos: currentTime.Nanos},
Value: types.Value{Val: &types.Value_NullVal{NullVal: types.Null_NULL}},
}
}
}
}

return results, nil
}

func (d *DynamodbOnlineStore) Destruct() {

}

func makeFeatureMeta(featureViewNames []string, featureNames []string) (map[string][]string, map[string]int, error) {
if len(featureViewNames) != len(featureNames) {
return nil, nil, fmt.Errorf("the lengths of featureViewNames and featureNames must be the same. got=%d, %d", len(featureViewNames), len(featureNames))
}
featureMap := make(map[string][]string)
featureNamesIndex := make(map[string]int)
for i := 0; i < len(featureViewNames); i++ {
featureViewName := featureViewNames[i]
featureName := featureNames[i]

featureMap[featureViewName] = append(featureMap[featureViewName], featureName)
featureNamesIndex[featureName] = i
}
return featureMap, featureNamesIndex, nil
}
23 changes: 23 additions & 0 deletions go/internal/feast/onlinestore/dynamodbonlinestore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package onlinestore

import (
"testing"

"github.com/feast-dev/feast/go/internal/feast/registry"
"github.com/stretchr/testify/assert"
)

func TestNewDynamodbOnlineStore(t *testing.T) {
var config = map[string]interface{}{
"batch_size": 40,
"region": "us-east-1",
"max_pool_connections": 4,
"consistent_reads": "true",
}
rc := &registry.RepoConfig{
OnlineStore: config,
EntityKeySerializationVersion: 2,
}
_, err := NewDynamodbOnlineStore("test", rc, config)
assert.Nil(t, err)
}
Loading
Loading