Skip to content

Commit

Permalink
e2e tests WIP
Browse files Browse the repository at this point in the history
Signed-off-by: pyalex <[email protected]>
  • Loading branch information
pyalex committed Apr 29, 2022
1 parent b5ca9ac commit 5f22182
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 50 deletions.
42 changes: 32 additions & 10 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,22 @@ import (
"os"
"os/signal"
"syscall"

"google.golang.org/grpc"

"github.com/feast-dev/feast/go/internal/feast/server"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/protos/feast/serving"
"time"

"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/arrow/cdata"
"github.com/apache/arrow/go/v8/arrow/memory"
"google.golang.org/grpc"

"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/feast-dev/feast/go/internal/feast/onlineserving"
"github.com/feast-dev/feast/go/internal/feast/registry"
"github.com/feast-dev/feast/go/internal/feast/server"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/internal/feast/transformation"
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
)
Expand All @@ -44,6 +43,15 @@ type DataTable struct {
SchemaPtr uintptr
}

// LoggingOptions is a public (embedded) copy of logging.LoggingOptions struct.
// See logging.LoggingOptions for properties description
type LoggingOptions struct {
ChannelCapacity int
EmitTimeout time.Duration
WriteInterval time.Duration
FlushInterval time.Duration
}

func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig, transformationCallback transformation.TransformationCallback) *OnlineFeatureService {
repoConfig, err := registry.NewRepoConfigFromJSON(conf.RepoPath, conf.RepoConfig)
if err != nil {
Expand Down Expand Up @@ -215,10 +223,20 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
}

func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
return s.StartGprcServerWithLogging(host, port, nil)
return s.StartGprcServerWithLogging(host, port, nil, LoggingOptions{})
}

func (s *OnlineFeatureService) StartGprcServerWithLoggingDefaultOpts(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback) error {
defaultOpts := LoggingOptions{
ChannelCapacity: logging.DefaultOptions.ChannelCapacity,
EmitTimeout: logging.DefaultOptions.EmitTimeout,
WriteInterval: logging.DefaultOptions.WriteInterval,
FlushInterval: logging.DefaultOptions.FlushInterval,
}
return s.StartGprcServerWithLogging(host, port, writeLoggedFeaturesCallback, defaultOpts)
}

func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback) error {
func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) error {
var loggingService *logging.LoggingService = nil
var err error
if writeLoggedFeaturesCallback != nil {
Expand All @@ -227,7 +245,12 @@ func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int,
return err
}

loggingService, err = logging.NewLoggingService(s.fs, sink)
loggingService, err = logging.NewLoggingService(s.fs, sink, logging.LoggingOptions{
ChannelCapacity: loggingOpts.ChannelCapacity,
EmitTimeout: loggingOpts.EmitTimeout,
WriteInterval: loggingOpts.WriteInterval,
FlushInterval: loggingOpts.FlushInterval,
})
if err != nil {
return err
}
Expand All @@ -238,7 +261,6 @@ func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int,
if err != nil {
return err
}
log.Printf("Listening a gRPC server on host %s port %d\n", host, port)

grpcServer := grpc.NewServer()
serving.RegisterServingServiceServer(grpcServer, ser)
Expand Down
15 changes: 11 additions & 4 deletions go/internal/feast/server/logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type LogSink interface {
// Flush actually send data to a sink.
// We want to control amount to interaction with sink, since it could be a costly operation.
// Also, some sinks like BigQuery might have quotes and physically limit amount of write requests per day.
Flush(featureSeviceName string) error
Flush(featureServiceName string) error
}

type Logger interface {
Expand Down Expand Up @@ -135,6 +135,10 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
lErr = errors.WithStack(rErr)
}
}()

writeTicker := time.NewTicker(l.config.WriteInterval)
flushTicker := time.NewTicker(l.config.FlushInterval)

for {
shouldStop := false

Expand All @@ -149,12 +153,12 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
log.Printf("Log flush failed: %+v", err)
}
shouldStop = true
case <-time.After(l.config.WriteInterval):
case <-writeTicker.C:
err := l.buffer.writeBatch(l.sink)
if err != nil {
log.Printf("Log write failed: %+v", err)
}
case <-time.After(l.config.FlushInterval):
case <-flushTicker.C:
err := l.sink.Flush(l.featureServiceName)
if err != nil {
log.Printf("Log flush failed: %+v", err)
Expand All @@ -171,6 +175,9 @@ func (l *LoggerImpl) loggerLoop() (lErr error) {
}
}

writeTicker.Stop()
flushTicker.Stop()

// Notify all waiters for graceful stop
l.cond.L.Lock()
l.isStopped = true
Expand Down Expand Up @@ -259,7 +266,7 @@ func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featur
EventTimestamps: eventTimestamps,

RequestId: requestId,
LogTimestamp: time.Now(),
LogTimestamp: time.Now().UTC(),
}
err := l.EmitLog(&newLog)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions go/internal/feast/server/logging/memorybuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package logging

import (
"fmt"
"time"

"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/array"
Expand Down Expand Up @@ -143,7 +142,7 @@ func (b *MemoryBuffer) convertToArrowRecord() (arrow.Record, error) {
}

logTimestamp := arrow.Timestamp(logRow.LogTimestamp.UnixMicro())
logDate := arrow.Date32(logRow.LogTimestamp.Truncate(24 * time.Hour).Unix())
logDate := arrow.Date32FromTime(logRow.LogTimestamp)

builder.Field(fieldNameToIdx[LOG_TIMESTAMP_FIELD]).(*array.TimestampBuilder).UnsafeAppend(logTimestamp)
builder.Field(fieldNameToIdx[LOG_DATE_FIELD]).(*array.Date32Builder).UnsafeAppend(logDate)
Expand Down
2 changes: 1 addition & 1 deletion go/internal/feast/server/logging/memorybuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestSerializeToArrowTable(t *testing.T) {
// log date
today := time.Now().Truncate(24 * time.Hour)
builder.Field(8).(*array.Date32Builder).AppendValues(
[]arrow.Date32{arrow.Date32(today.Unix()), arrow.Date32(today.Unix())}, []bool{true, true})
[]arrow.Date32{arrow.Date32FromTime(today), arrow.Date32FromTime(today)}, []bool{true, true})

// request id
builder.Field(9).(*array.StringBuilder).AppendValues(
Expand Down
11 changes: 6 additions & 5 deletions go/internal/feast/server/logging/offlinestoresink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package logging
import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"

"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/parquet"
"github.com/apache/arrow/go/v8/parquet/pqarrow"
"github.com/google/uuid"
"io"
"io/ioutil"
"os"
"path/filepath"
)

type OfflineStoreWriteCallback func(featureServiceName, datasetDir string) string
Expand Down Expand Up @@ -41,7 +42,7 @@ func (s *OfflineStoreSink) getOrCreateDatasetDir() (string, error) {
}

func (s *OfflineStoreSink) cleanCurrentDatasetDir() error {
if s.datasetDir != "" {
if s.datasetDir == "" {
return nil
}
datasetDir := s.datasetDir
Expand Down
4 changes: 2 additions & 2 deletions go/internal/test/go_integration_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func SetupInitializedRepo(basePath string) error {
// var stderr bytes.Buffer
// var stdout bytes.Buffer
applyCommand.Dir = featureRepoPath
out, err := applyCommand.Output()
out, err := applyCommand.CombinedOutput()
if err != nil {
log.Println(string(out))
return err
Expand All @@ -152,7 +152,7 @@ func SetupInitializedRepo(basePath string) error {
materializeCommand := exec.Command("feast", "materialize-incremental", formattedTime)
materializeCommand.Env = os.Environ()
materializeCommand.Dir = featureRepoPath
out, err = materializeCommand.Output()
out, err = materializeCommand.CombinedOutput()
if err != nil {
log.Println(string(out))
return err
Expand Down
30 changes: 24 additions & 6 deletions sdk/python/feast/embedded_go/online_features_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
from feast.types import from_value_type
from feast.value_type import ValueType

from .lib.embedded import DataTable, NewOnlineFeatureService, OnlineFeatureServiceConfig
from .lib.embedded import (
DataTable,
LoggingOptions,
NewOnlineFeatureService,
OnlineFeatureServiceConfig,
)
from .lib.go import Slice_string
from .type_map import FEAST_TYPE_TO_ARROW_TYPE, arrow_array_to_array_of_proto

Expand Down Expand Up @@ -134,9 +139,22 @@ def get_online_features(
resp = record_batch_to_online_response(record_batch)
return OnlineResponse(resp)

def start_grpc_server(self, host: str, port: int, enable_logging=True):
def start_grpc_server(
self,
host: str,
port: int,
enable_logging: bool = True,
logging_options: Optional[LoggingOptions] = None,
):
if enable_logging:
self._service.StartGprcServerWithLogging(host, port, self._logging_callback)
if logging_options:
self._service.StartGprcServerWithLogging(
host, port, self._logging_callback, logging_options
)
else:
self._service.StartGprcServerWithLoggingDefaultOpts(
host, port, self._logging_callback
)
else:
self._service.StartGprcServer(host, port)

Expand Down Expand Up @@ -189,14 +207,14 @@ def transformation_callback(

def logging_callback(
fs: "FeatureStore", feature_service_name: str, dataset_dir: str,
) -> str:
) -> bytes:
feature_service = fs.get_feature_service(feature_service_name, allow_cache=True)
try:
fs.write_logged_features(logs=Path(dataset_dir), source=feature_service)
except Exception as exc:
return repr(exc)
return repr(exc).encode()

return "" # no error
return "".encode() # no error


def allocate_schema_and_array():
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/feature_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ class _DestinationRegistry(type):

def __new__(cls, name, bases, dct):
kls = type.__new__(cls, name, bases, dct)
if dct.get("_proto_attr_name"):
cls.classes_by_proto_attr_name[dct["_proto_attr_name"]] = kls
if dct.get("_proto_kind"):
cls.classes_by_proto_attr_name[dct["_proto_kind"]] = kls
return kls


class LoggingDestination:
class LoggingDestination(metaclass=_DestinationRegistry):
"""
Logging destination contains details about where exactly logs should be written inside an offline store.
It is implementation specific - each offline store must implement LoggingDestination subclass.
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ def write_feature_service_logs(
def retrieve_feature_service_logs(
self,
feature_service: FeatureService,
from_: datetime,
to: datetime,
start_date: datetime,
end_date: datetime,
config: RepoConfig,
registry: Registry,
) -> RetrievalJob:
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/foo_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ def write_feature_service_logs(
def retrieve_feature_service_logs(
self,
feature_service: FeatureService,
from_: datetime,
to: datetime,
start_date: datetime,
end_date: datetime,
config: RepoConfig,
registry: Registry,
) -> RetrievalJob:
Expand Down
Loading

0 comments on commit 5f22182

Please sign in to comment.