Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Write logged features to Offline Store (Go - Python integration) #2621

Merged
merged 8 commits into from
May 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 54 additions & 9 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,22 @@ import (
"os"
"os/signal"
"syscall"

"google.golang.org/grpc"

"github.com/feast-dev/feast/go/internal/feast/server"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/protos/feast/serving"
"time"

"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/arrow/cdata"
"github.com/apache/arrow/go/v8/arrow/memory"
"google.golang.org/grpc"

"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/feast-dev/feast/go/internal/feast/onlineserving"
"github.com/feast-dev/feast/go/internal/feast/registry"
"github.com/feast-dev/feast/go/internal/feast/server"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/internal/feast/transformation"
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
)
Expand All @@ -44,6 +43,15 @@ type DataTable struct {
SchemaPtr uintptr
}

// LoggingOptions is a public (embedded) copy of logging.LoggingOptions struct.
// See logging.LoggingOptions for properties description
type LoggingOptions struct {
ChannelCapacity int
EmitTimeout time.Duration
WriteInterval time.Duration
FlushInterval time.Duration
}

func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig, transformationCallback transformation.TransformationCallback) *OnlineFeatureService {
repoConfig, err := registry.NewRepoConfigFromJSON(conf.RepoPath, conf.RepoConfig)
if err != nil {
Expand Down Expand Up @@ -214,17 +222,50 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
return nil
}

// StartGprcServer starts gRPC server with disabled feature logging and blocks the thread
func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
// TODO(oleksii): enable logging
// Disable logging for now
return s.StartGprcServerWithLogging(host, port, nil, LoggingOptions{})
}

// StartGprcServerWithLoggingDefaultOpts starts gRPC server with enabled feature logging but default configuration for logging
// Caller of this function must provide Python callback to flush buffered logs
func (s *OnlineFeatureService) StartGprcServerWithLoggingDefaultOpts(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback) error {
defaultOpts := LoggingOptions{
ChannelCapacity: logging.DefaultOptions.ChannelCapacity,
EmitTimeout: logging.DefaultOptions.EmitTimeout,
WriteInterval: logging.DefaultOptions.WriteInterval,
FlushInterval: logging.DefaultOptions.FlushInterval,
}
return s.StartGprcServerWithLogging(host, port, writeLoggedFeaturesCallback, defaultOpts)
}

// StartGprcServerWithLogging starts gRPC server with enabled feature logging
// Caller of this function must provide Python callback to flush buffered logs as well as logging configuration (loggingOpts)
func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) error {
var loggingService *logging.LoggingService = nil
var err error
if writeLoggedFeaturesCallback != nil {
sink, err := logging.NewOfflineStoreSink(writeLoggedFeaturesCallback)
if err != nil {
return err
}

loggingService, err = logging.NewLoggingService(s.fs, sink, logging.LoggingOptions{
ChannelCapacity: loggingOpts.ChannelCapacity,
EmitTimeout: loggingOpts.EmitTimeout,
WriteInterval: loggingOpts.WriteInterval,
FlushInterval: loggingOpts.FlushInterval,
})
if err != nil {
return err
}
}
ser := server.NewGrpcServingServiceServer(s.fs, loggingService)
log.Printf("Starting a gRPC server on host %s port %d\n", host, port)
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
return err
}
log.Printf("Listening a gRPC server on host %s port %d\n", host, port)

grpcServer := grpc.NewServer()
serving.RegisterServingServiceServer(grpcServer, ser)
Expand All @@ -234,6 +275,10 @@ func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
<-s.grpcStopCh
fmt.Println("Stopping the gRPC server...")
grpcServer.GracefulStop()
if loggingService != nil {
loggingService.Stop()
}
fmt.Println("gRPC server terminated")
Comment on lines 276 to +281
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really move some of this stuff to use a proper logger.

}()

err = grpcServer.Serve(lis)
Expand Down
2 changes: 1 addition & 1 deletion go/internal/feast/server/logging/filelogsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *FileLogSink) Write(record arrow.Record) error {
return pqarrow.WriteTable(table, writer, 100, props, arrProps)
}

func (s *FileLogSink) Flush() error {
func (s *FileLogSink) Flush(featureServiceName string) error {
// files are already flushed during Write
return nil
}
26 changes: 19 additions & 7 deletions go/internal/feast/server/logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"
"math/rand"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -37,7 +38,7 @@ type LogSink interface {
// Flush actually send data to a sink.
// We want to control amount to interaction with sink, since it could be a costly operation.
// Also, some sinks like BigQuery might have quotes and physically limit amount of write requests per day.
Flush() error
Flush(featureServiceName string) error
}

type Logger interface {
Expand Down Expand Up @@ -135,6 +136,10 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
lErr = errors.WithStack(rErr)
}
}()

writeTicker := time.NewTicker(l.config.WriteInterval)
flushTicker := time.NewTicker(l.config.FlushInterval)

for {
shouldStop := false

Expand All @@ -144,18 +149,18 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
if err != nil {
log.Printf("Log write failed: %+v", err)
}
err = l.sink.Flush()
err = l.sink.Flush(l.featureServiceName)
if err != nil {
log.Printf("Log flush failed: %+v", err)
}
shouldStop = true
case <-time.After(l.config.WriteInterval):
case <-writeTicker.C:
err := l.buffer.writeBatch(l.sink)
if err != nil {
log.Printf("Log write failed: %+v", err)
}
case <-time.After(l.config.FlushInterval):
err := l.sink.Flush()
case <-flushTicker.C:
err := l.sink.Flush(l.featureServiceName)
if err != nil {
log.Printf("Log flush failed: %+v", err)
}
Expand All @@ -171,6 +176,9 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
}
}

writeTicker.Stop()
flushTicker.Stop()

// Notify all waiters for graceful stop
l.cond.L.Lock()
l.isStopped = true
Expand Down Expand Up @@ -225,7 +233,11 @@ func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featur
for idx, featureName := range l.schema.Features {
featureIdx, ok := featureNameToVectorIdx[featureName]
if !ok {
return errors.Errorf("Missing feature %s in log data", featureName)
featureNameParts := strings.Split(featureName, "__")
featureIdx, ok = featureNameToVectorIdx[featureNameParts[1]]
if !ok {
return errors.Errorf("Missing feature %s in log data", featureName)
}
}
featureValues[idx] = featureVectors[featureIdx].Values[rowIdx]
featureStatuses[idx] = featureVectors[featureIdx].Statuses[rowIdx]
Expand Down Expand Up @@ -259,7 +271,7 @@ func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featur
EventTimestamps: eventTimestamps,

RequestId: requestId,
LogTimestamp: time.Now(),
LogTimestamp: time.Now().UTC(),
}
err := l.EmitLog(&newLog)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/internal/feast/server/logging/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *DummySink) Write(rec arrow.Record) error {
return nil
}

func (s *DummySink) Flush() error {
func (s *DummySink) Flush(featureServiceName string) error {
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions go/internal/feast/server/logging/memorybuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package logging

import (
"fmt"
"time"

"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/array"
Expand Down Expand Up @@ -143,7 +142,7 @@ func (b *MemoryBuffer) convertToArrowRecord() (arrow.Record, error) {
}

logTimestamp := arrow.Timestamp(logRow.LogTimestamp.UnixMicro())
logDate := arrow.Date32(logRow.LogTimestamp.Truncate(24 * time.Hour).Unix())
logDate := arrow.Date32FromTime(logRow.LogTimestamp)

builder.Field(fieldNameToIdx[LOG_TIMESTAMP_FIELD]).(*array.TimestampBuilder).UnsafeAppend(logTimestamp)
builder.Field(fieldNameToIdx[LOG_DATE_FIELD]).(*array.Date32Builder).UnsafeAppend(logDate)
Expand Down
2 changes: 1 addition & 1 deletion go/internal/feast/server/logging/memorybuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestSerializeToArrowTable(t *testing.T) {
// log date
today := time.Now().Truncate(24 * time.Hour)
builder.Field(8).(*array.Date32Builder).AppendValues(
[]arrow.Date32{arrow.Date32(today.Unix()), arrow.Date32(today.Unix())}, []bool{true, true})
[]arrow.Date32{arrow.Date32FromTime(today), arrow.Date32FromTime(today)}, []bool{true, true})

// request id
builder.Field(9).(*array.StringBuilder).AppendValues(
Expand Down
83 changes: 83 additions & 0 deletions go/internal/feast/server/logging/offlinestoresink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package logging

import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"

"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/parquet"
"github.com/apache/arrow/go/v8/parquet/pqarrow"
"github.com/google/uuid"
)

type OfflineStoreWriteCallback func(featureServiceName, datasetDir string) string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably should specify name as string too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a short form


type OfflineStoreSink struct {
datasetDir string
writeCallback OfflineStoreWriteCallback
}

func NewOfflineStoreSink(writeCallback OfflineStoreWriteCallback) (*OfflineStoreSink, error) {
return &OfflineStoreSink{
datasetDir: "",
writeCallback: writeCallback,
}, nil
}

func (s *OfflineStoreSink) getOrCreateDatasetDir() (string, error) {
if s.datasetDir != "" {
return s.datasetDir, nil
}
dir, err := ioutil.TempDir("", "*")
if err != nil {
return "", err
}
s.datasetDir = dir
return s.datasetDir, nil
}

func (s *OfflineStoreSink) cleanCurrentDatasetDir() error {
if s.datasetDir == "" {
return nil
}
datasetDir := s.datasetDir
s.datasetDir = ""
return os.RemoveAll(datasetDir)
}

func (s *OfflineStoreSink) Write(record arrow.Record) error {
fileName, _ := uuid.NewUUID()
datasetDir, err := s.getOrCreateDatasetDir()
if err != nil {
return err
}

var writer io.Writer
writer, err = os.Create(filepath.Join(datasetDir, fmt.Sprintf("%s.parquet", fileName.String())))
if err != nil {
return err
}
table := array.NewTableFromRecords(record.Schema(), []arrow.Record{record})

props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false))
arrProps := pqarrow.DefaultWriterProps()
return pqarrow.WriteTable(table, writer, 1000, props, arrProps)
}

func (s *OfflineStoreSink) Flush(featureServiceName string) error {
if s.datasetDir == "" {
return nil
}

errMsg := s.writeCallback(featureServiceName, s.datasetDir)
if errMsg != "" {
return errors.New(errMsg)
}

return s.cleanCurrentDatasetDir()
}
7 changes: 7 additions & 0 deletions go/internal/feast/server/logging/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,10 @@ func (s *LoggingService) GetOrCreateLogger(featureService *model.FeatureService)

return logger, nil
}

func (s *LoggingService) Stop() {
for _, logger := range s.loggers {
logger.Stop()
logger.WaitUntilStopped()
}
}
4 changes: 2 additions & 2 deletions go/internal/test/go_integration_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func SetupInitializedRepo(basePath string) error {
// var stderr bytes.Buffer
// var stdout bytes.Buffer
applyCommand.Dir = featureRepoPath
out, err := applyCommand.Output()
out, err := applyCommand.CombinedOutput()
if err != nil {
log.Println(string(out))
return err
Expand All @@ -152,7 +152,7 @@ func SetupInitializedRepo(basePath string) error {
materializeCommand := exec.Command("feast", "materialize-incremental", formattedTime)
materializeCommand.Env = os.Environ()
materializeCommand.Dir = featureRepoPath
out, err = materializeCommand.Output()
out, err = materializeCommand.CombinedOutput()
if err != nil {
log.Println(string(out))
return err
Expand Down
Loading