Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use a daemon thread to monitor the go feature server exclusively #2391

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ jobs:
run: |
pip install --upgrade "pip>=21.3.1"
- name: Install dependencies
run: make install-go-ci-dependencies
run: make install-go-proto-dependencies
- name: Lint go
run: make lint-go
2 changes: 1 addition & 1 deletion .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
with:
go-version: 1.17.7
- name: Install dependencies
run: make install-go-ci-dependencies
run: make install-go-proto-dependencies
- name: Compile protos
run: make compile-protos-go
- name: Test
Expand Down
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ build: protos build-java build-docker build-html

# Python SDK

install-python-ci-dependencies: install-go-ci-dependencies
install-python-ci-dependencies: install-go-proto-dependencies
cd sdk/python && python -m piptools sync requirements/py$(PYTHON)-ci-requirements.txt
cd sdk/python && COMPILE_GO=true python setup.py develop

Expand Down Expand Up @@ -125,19 +125,21 @@ build-java-no-tests:

# Go SDK

install-go-ci-dependencies:
install-go-proto-dependencies:
go install google.golang.org/protobuf/cmd/[email protected]
go install google.golang.org/grpc/cmd/[email protected]

compile-protos-go: install-go-ci-dependencies
install-protoc-dependencies:
pip install grpcio-tools==1.34.0

compile-protos-go: install-go-proto-dependencies install-protoc-dependencies
python sdk/python/setup.py build_go_protos

compile-go-feature-server: compile-protos-go
go mod tidy
go build -o ${ROOT_DIR}/sdk/python/feast/binaries/goserver github.com/feast-dev/feast/go/cmd/goserver

test-go: install-go-ci-dependencies
test-go: compile-protos-go
go test ./...

format-go:
Expand Down
1 change: 0 additions & 1 deletion go/cmd/goserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type FeastEnvConfig struct {
}

// TODO: Add a proper logging library such as https://github.com/Sirupsen/logrus

func main() {

var feastEnvConfig FeastEnvConfig
Expand Down
Binary file not shown.
26 changes: 10 additions & 16 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,7 @@ class FeatureStore:

@log_exceptions
def __init__(
self,
repo_path: Optional[str] = None,
config: Optional[RepoConfig] = None,
go_server_use_thread: bool = False,
self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None,
):
"""
Creates a FeatureStore object.
Expand All @@ -135,7 +132,6 @@ def __init__(
self._registry._initialize_registry()
self._provider = get_provider(self.config, self.repo_path)
self._go_server = None
self._go_server_use_thread = go_server_use_thread

@log_exceptions
def version(self) -> str:
Expand Down Expand Up @@ -733,6 +729,10 @@ def apply(
service.name, project=self.project, commit=False
)

# If a go server is running, kill it so that it can be recreated in `update_infra` with
# the latest registry state.
self.kill_go_server()

self._get_provider().update_infra(
project=self.project,
tables_to_delete=views_to_delete if not partial else [],
Expand All @@ -754,6 +754,8 @@ def teardown(self):

entities = self.list_entities()

self.kill_go_server()

self._get_provider().teardown_infra(self.project, tables, entities)
self._registry.teardown()

Expand Down Expand Up @@ -1233,11 +1235,8 @@ def get_online_features(
if self.config.go_feature_server:
# Lazily start the go server on the first request
if self._go_server is None:
self._go_server = GoServer(
str(self.repo_path.absolute()),
self.config,
self._go_server_use_thread,
)
self._go_server = GoServer(str(self.repo_path.absolute()), self.config,)
self._go_server._shared_connection._check_grpc_connection()
return self._go_server.get_online_features(
features, columnar, full_feature_names
)
Expand Down Expand Up @@ -1860,12 +1859,7 @@ def serve_transformations(self, port: int) -> None:
def kill_go_server(self):
if self._go_server:
self._go_server.kill_go_server_explicitly()

def set_go_server_use_thread(self, use: bool):
if self._go_server:
self._go_server.set_use_thread(use)
else:
self._go_server_use_thread = use
self._go_server = None


def _validate_entity_values(join_key_values: Dict[str, List[Value]]):
Expand Down
Loading