Skip to content

Commit ccad832

Browse files
pyalexOleksii Moskalenko
andauthored
feat: Write logged features to Offline Store (Go - Python integration) (#2621)
* write logged features by path Signed-off-by: pyalex <[email protected]> * fix dummy sink Signed-off-by: pyalex <[email protected]> * share snowflake cursor and stage name Signed-off-by: pyalex <[email protected]> * share snowflake cursor and stage name Signed-off-by: pyalex <[email protected]> * e2e tests WIP Signed-off-by: pyalex <[email protected]> * graceful stop (grpc & logging) Signed-off-by: pyalex <[email protected]> * revert accidental change Signed-off-by: pyalex <[email protected]> * add comments to go embedded methods Signed-off-by: Oleksii Moskalenko <[email protected]> Co-authored-by: Oleksii Moskalenko <[email protected]>
1 parent cff0133 commit ccad832

24 files changed

+612
-114
lines changed

go/embedded/online_features.go

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,22 @@ import (
88
"os"
99
"os/signal"
1010
"syscall"
11-
12-
"google.golang.org/grpc"
13-
14-
"github.com/feast-dev/feast/go/internal/feast/server"
15-
"github.com/feast-dev/feast/go/internal/feast/server/logging"
16-
"github.com/feast-dev/feast/go/protos/feast/serving"
11+
"time"
1712

1813
"github.com/apache/arrow/go/v8/arrow"
1914
"github.com/apache/arrow/go/v8/arrow/array"
2015
"github.com/apache/arrow/go/v8/arrow/cdata"
2116
"github.com/apache/arrow/go/v8/arrow/memory"
17+
"google.golang.org/grpc"
2218

2319
"github.com/feast-dev/feast/go/internal/feast"
2420
"github.com/feast-dev/feast/go/internal/feast/model"
2521
"github.com/feast-dev/feast/go/internal/feast/onlineserving"
2622
"github.com/feast-dev/feast/go/internal/feast/registry"
23+
"github.com/feast-dev/feast/go/internal/feast/server"
24+
"github.com/feast-dev/feast/go/internal/feast/server/logging"
2725
"github.com/feast-dev/feast/go/internal/feast/transformation"
26+
"github.com/feast-dev/feast/go/protos/feast/serving"
2827
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
2928
"github.com/feast-dev/feast/go/types"
3029
)
@@ -44,6 +43,15 @@ type DataTable struct {
4443
SchemaPtr uintptr
4544
}
4645

46+
// LoggingOptions is a public (embedded) copy of logging.LoggingOptions struct.
47+
// See logging.LoggingOptions for properties description
48+
type LoggingOptions struct {
49+
ChannelCapacity int
50+
EmitTimeout time.Duration
51+
WriteInterval time.Duration
52+
FlushInterval time.Duration
53+
}
54+
4755
func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig, transformationCallback transformation.TransformationCallback) *OnlineFeatureService {
4856
repoConfig, err := registry.NewRepoConfigFromJSON(conf.RepoPath, conf.RepoConfig)
4957
if err != nil {
@@ -214,17 +222,50 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
214222
return nil
215223
}
216224

225+
// StartGprcServer starts gRPC server with disabled feature logging and blocks the thread
217226
func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
218-
// TODO(oleksii): enable logging
219-
// Disable logging for now
227+
return s.StartGprcServerWithLogging(host, port, nil, LoggingOptions{})
228+
}
229+
230+
// StartGprcServerWithLoggingDefaultOpts starts gRPC server with enabled feature logging but default configuration for logging
231+
// Caller of this function must provide Python callback to flush buffered logs
232+
func (s *OnlineFeatureService) StartGprcServerWithLoggingDefaultOpts(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback) error {
233+
defaultOpts := LoggingOptions{
234+
ChannelCapacity: logging.DefaultOptions.ChannelCapacity,
235+
EmitTimeout: logging.DefaultOptions.EmitTimeout,
236+
WriteInterval: logging.DefaultOptions.WriteInterval,
237+
FlushInterval: logging.DefaultOptions.FlushInterval,
238+
}
239+
return s.StartGprcServerWithLogging(host, port, writeLoggedFeaturesCallback, defaultOpts)
240+
}
241+
242+
// StartGprcServerWithLogging starts gRPC server with enabled feature logging
243+
// Caller of this function must provide Python callback to flush buffered logs as well as logging configuration (loggingOpts)
244+
func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) error {
220245
var loggingService *logging.LoggingService = nil
246+
var err error
247+
if writeLoggedFeaturesCallback != nil {
248+
sink, err := logging.NewOfflineStoreSink(writeLoggedFeaturesCallback)
249+
if err != nil {
250+
return err
251+
}
252+
253+
loggingService, err = logging.NewLoggingService(s.fs, sink, logging.LoggingOptions{
254+
ChannelCapacity: loggingOpts.ChannelCapacity,
255+
EmitTimeout: loggingOpts.EmitTimeout,
256+
WriteInterval: loggingOpts.WriteInterval,
257+
FlushInterval: loggingOpts.FlushInterval,
258+
})
259+
if err != nil {
260+
return err
261+
}
262+
}
221263
ser := server.NewGrpcServingServiceServer(s.fs, loggingService)
222264
log.Printf("Starting a gRPC server on host %s port %d\n", host, port)
223265
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
224266
if err != nil {
225267
return err
226268
}
227-
log.Printf("Listening a gRPC server on host %s port %d\n", host, port)
228269

229270
grpcServer := grpc.NewServer()
230271
serving.RegisterServingServiceServer(grpcServer, ser)
@@ -234,6 +275,10 @@ func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
234275
<-s.grpcStopCh
235276
fmt.Println("Stopping the gRPC server...")
236277
grpcServer.GracefulStop()
278+
if loggingService != nil {
279+
loggingService.Stop()
280+
}
281+
fmt.Println("gRPC server terminated")
237282
}()
238283

239284
err = grpcServer.Serve(lis)

go/internal/feast/server/logging/filelogsink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (s *FileLogSink) Write(record arrow.Record) error {
4949
return pqarrow.WriteTable(table, writer, 100, props, arrProps)
5050
}
5151

52-
func (s *FileLogSink) Flush() error {
52+
func (s *FileLogSink) Flush(featureServiceName string) error {
5353
// files are already flushed during Write
5454
return nil
5555
}

go/internal/feast/server/logging/logger.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"log"
66
"math/rand"
7+
"strings"
78
"sync"
89
"time"
910

@@ -37,7 +38,7 @@ type LogSink interface {
3738
// Flush actually send data to a sink.
3839
// We want to control amount to interaction with sink, since it could be a costly operation.
3940
// Also, some sinks like BigQuery might have quotes and physically limit amount of write requests per day.
40-
Flush() error
41+
Flush(featureServiceName string) error
4142
}
4243

4344
type Logger interface {
@@ -135,6 +136,10 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
135136
lErr = errors.WithStack(rErr)
136137
}
137138
}()
139+
140+
writeTicker := time.NewTicker(l.config.WriteInterval)
141+
flushTicker := time.NewTicker(l.config.FlushInterval)
142+
138143
for {
139144
shouldStop := false
140145

@@ -144,18 +149,18 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
144149
if err != nil {
145150
log.Printf("Log write failed: %+v", err)
146151
}
147-
err = l.sink.Flush()
152+
err = l.sink.Flush(l.featureServiceName)
148153
if err != nil {
149154
log.Printf("Log flush failed: %+v", err)
150155
}
151156
shouldStop = true
152-
case <-time.After(l.config.WriteInterval):
157+
case <-writeTicker.C:
153158
err := l.buffer.writeBatch(l.sink)
154159
if err != nil {
155160
log.Printf("Log write failed: %+v", err)
156161
}
157-
case <-time.After(l.config.FlushInterval):
158-
err := l.sink.Flush()
162+
case <-flushTicker.C:
163+
err := l.sink.Flush(l.featureServiceName)
159164
if err != nil {
160165
log.Printf("Log flush failed: %+v", err)
161166
}
@@ -171,6 +176,9 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
171176
}
172177
}
173178

179+
writeTicker.Stop()
180+
flushTicker.Stop()
181+
174182
// Notify all waiters for graceful stop
175183
l.cond.L.Lock()
176184
l.isStopped = true
@@ -225,7 +233,11 @@ func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featur
225233
for idx, featureName := range l.schema.Features {
226234
featureIdx, ok := featureNameToVectorIdx[featureName]
227235
if !ok {
228-
return errors.Errorf("Missing feature %s in log data", featureName)
236+
featureNameParts := strings.Split(featureName, "__")
237+
featureIdx, ok = featureNameToVectorIdx[featureNameParts[1]]
238+
if !ok {
239+
return errors.Errorf("Missing feature %s in log data", featureName)
240+
}
229241
}
230242
featureValues[idx] = featureVectors[featureIdx].Values[rowIdx]
231243
featureStatuses[idx] = featureVectors[featureIdx].Statuses[rowIdx]
@@ -259,7 +271,7 @@ func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featur
259271
EventTimestamps: eventTimestamps,
260272

261273
RequestId: requestId,
262-
LogTimestamp: time.Now(),
274+
LogTimestamp: time.Now().UTC(),
263275
}
264276
err := l.EmitLog(&newLog)
265277
if err != nil {

go/internal/feast/server/logging/logger_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (s *DummySink) Write(rec arrow.Record) error {
2828
return nil
2929
}
3030

31-
func (s *DummySink) Flush() error {
31+
func (s *DummySink) Flush(featureServiceName string) error {
3232
return nil
3333
}
3434

go/internal/feast/server/logging/memorybuffer.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package logging
22

33
import (
44
"fmt"
5-
"time"
65

76
"github.com/apache/arrow/go/v8/arrow"
87
"github.com/apache/arrow/go/v8/arrow/array"
@@ -143,7 +142,7 @@ func (b *MemoryBuffer) convertToArrowRecord() (arrow.Record, error) {
143142
}
144143

145144
logTimestamp := arrow.Timestamp(logRow.LogTimestamp.UnixMicro())
146-
logDate := arrow.Date32(logRow.LogTimestamp.Truncate(24 * time.Hour).Unix())
145+
logDate := arrow.Date32FromTime(logRow.LogTimestamp)
147146

148147
builder.Field(fieldNameToIdx[LOG_TIMESTAMP_FIELD]).(*array.TimestampBuilder).UnsafeAppend(logTimestamp)
149148
builder.Field(fieldNameToIdx[LOG_DATE_FIELD]).(*array.Date32Builder).UnsafeAppend(logDate)

go/internal/feast/server/logging/memorybuffer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func TestSerializeToArrowTable(t *testing.T) {
158158
// log date
159159
today := time.Now().Truncate(24 * time.Hour)
160160
builder.Field(8).(*array.Date32Builder).AppendValues(
161-
[]arrow.Date32{arrow.Date32(today.Unix()), arrow.Date32(today.Unix())}, []bool{true, true})
161+
[]arrow.Date32{arrow.Date32FromTime(today), arrow.Date32FromTime(today)}, []bool{true, true})
162162

163163
// request id
164164
builder.Field(9).(*array.StringBuilder).AppendValues(
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package logging
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"io"
7+
"io/ioutil"
8+
"os"
9+
"path/filepath"
10+
11+
"github.com/apache/arrow/go/v8/arrow"
12+
"github.com/apache/arrow/go/v8/arrow/array"
13+
"github.com/apache/arrow/go/v8/parquet"
14+
"github.com/apache/arrow/go/v8/parquet/pqarrow"
15+
"github.com/google/uuid"
16+
)
17+
18+
type OfflineStoreWriteCallback func(featureServiceName, datasetDir string) string
19+
20+
type OfflineStoreSink struct {
21+
datasetDir string
22+
writeCallback OfflineStoreWriteCallback
23+
}
24+
25+
func NewOfflineStoreSink(writeCallback OfflineStoreWriteCallback) (*OfflineStoreSink, error) {
26+
return &OfflineStoreSink{
27+
datasetDir: "",
28+
writeCallback: writeCallback,
29+
}, nil
30+
}
31+
32+
func (s *OfflineStoreSink) getOrCreateDatasetDir() (string, error) {
33+
if s.datasetDir != "" {
34+
return s.datasetDir, nil
35+
}
36+
dir, err := ioutil.TempDir("", "*")
37+
if err != nil {
38+
return "", err
39+
}
40+
s.datasetDir = dir
41+
return s.datasetDir, nil
42+
}
43+
44+
func (s *OfflineStoreSink) cleanCurrentDatasetDir() error {
45+
if s.datasetDir == "" {
46+
return nil
47+
}
48+
datasetDir := s.datasetDir
49+
s.datasetDir = ""
50+
return os.RemoveAll(datasetDir)
51+
}
52+
53+
func (s *OfflineStoreSink) Write(record arrow.Record) error {
54+
fileName, _ := uuid.NewUUID()
55+
datasetDir, err := s.getOrCreateDatasetDir()
56+
if err != nil {
57+
return err
58+
}
59+
60+
var writer io.Writer
61+
writer, err = os.Create(filepath.Join(datasetDir, fmt.Sprintf("%s.parquet", fileName.String())))
62+
if err != nil {
63+
return err
64+
}
65+
table := array.NewTableFromRecords(record.Schema(), []arrow.Record{record})
66+
67+
props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false))
68+
arrProps := pqarrow.DefaultWriterProps()
69+
return pqarrow.WriteTable(table, writer, 1000, props, arrProps)
70+
}
71+
72+
func (s *OfflineStoreSink) Flush(featureServiceName string) error {
73+
if s.datasetDir == "" {
74+
return nil
75+
}
76+
77+
errMsg := s.writeCallback(featureServiceName, s.datasetDir)
78+
if errMsg != "" {
79+
return errors.New(errMsg)
80+
}
81+
82+
return s.cleanCurrentDatasetDir()
83+
}

go/internal/feast/server/logging/service.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,10 @@ func (s *LoggingService) GetOrCreateLogger(featureService *model.FeatureService)
9898

9999
return logger, nil
100100
}
101+
102+
func (s *LoggingService) Stop() {
103+
for _, logger := range s.loggers {
104+
logger.Stop()
105+
logger.WaitUntilStopped()
106+
}
107+
}

go/internal/test/go_integration_test_utils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func SetupInitializedRepo(basePath string) error {
138138
// var stderr bytes.Buffer
139139
// var stdout bytes.Buffer
140140
applyCommand.Dir = featureRepoPath
141-
out, err := applyCommand.Output()
141+
out, err := applyCommand.CombinedOutput()
142142
if err != nil {
143143
log.Println(string(out))
144144
return err
@@ -152,7 +152,7 @@ func SetupInitializedRepo(basePath string) error {
152152
materializeCommand := exec.Command("feast", "materialize-incremental", formattedTime)
153153
materializeCommand.Env = os.Environ()
154154
materializeCommand.Dir = featureRepoPath
155-
out, err = materializeCommand.Output()
155+
out, err = materializeCommand.CombinedOutput()
156156
if err != nil {
157157
log.Println(string(out))
158158
return err

0 commit comments

Comments
 (0)