Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Initial Bytewax materialization engine #2974

Merged
merged 4 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/reference/batch-materialization/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Batch materialization

Please see [Batch Materialization Engine](../../getting-started/architecture-and-components/batch-materialization-engine.md) for an explanation of batch materialization engines.

{% page-ref page="bytewax.md" %}
59 changes: 59 additions & 0 deletions docs/reference/batch-materialization/bytewax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Bytewax

## Description

The [Bytewax](https://bytewax.io) batch materialization engine provides an execution
engine for batch materializing operations (`materialize` and `materialize-incremental`).

### Guide

In order to use the Bytewax materialization engine, you will need a [Kubernetes](https://kubernetes.io/) cluster running version 1.22.10 or greater.

#### Kubernetes Authentication

The Bytewax materialization engine loads authentication and cluster information from the [kubeconfig file](https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/). By default, kubectl looks for a file named `config` in the `$HOME/.kube directory`. You can specify other kubeconfig files by setting the `KUBECONFIG` environment variable.
whoahbot marked this conversation as resolved.
Show resolved Hide resolved

#### Resource Authentication

Bytewax jobs can be configured to access [Kubernetes secrets](https://kubernetes.io/docs/concepts/configuration/secret/) as environment variables to access online and offline stores during job runs.

To configure secrets, first create them using `kubectl`:

``` shell
kubectl create secret generic -n bytewax aws-credentials --from-literal=aws-access-key-id='<access key id>' --from-literal=aws-secret-access-key='<secret access key>'
```

Then configure them in the batch_engine section of `feature_store.yaml`:

``` yaml
batch_engine:
type: bytewax
namespace: bytewax
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-credentials
key: aws-access-key-id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-credentials
key: aws-secret-access-key
```

#### Configuration

The Bytewax materialization engine is configured through the The `feature_store.yaml` configuration file:

``` yaml
batch_engine:
type: bytewax
namespace: bytewax
image: bytewax/bytewax-feast:latest
```

The `namespace` configuration directive specifies which Kubernetes [namespace](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/) jobs, services and configuration maps will be created in.

The `image` parameter specifies which container image to use when running the materialization job. To create a custom image based on this container, please see the [GitHub repository](https://github.com/bytewax/bytewax-feast) for this image.

10 changes: 9 additions & 1 deletion sdk/python/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,14 @@ Local Engine
(Alpha) Lambda Based Engine
---------------------------

.. autoclass:: feast.infra.materialization.lambda.lambda_engine
.. automodule:: feast.infra.materialization.lambda.lambda_engine
:members:
:noindex:


Bytewax Engine
---------------------------

.. automodule:: feast.infra.materialization.contrib.bytewax
:members:
:noindex:
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
feast.infra.materialization.contrib.bytewax package
=================================================================

Submodules
----------

feast.infra.materialization.contrib.bytewax.bytewax\_materialization\_engine
----------------------------------------------------------------------

.. automodule:: feast.infra.materialization.contrib.bytewax.bytewax_materialization_engine
:members:
:undoc-members:
:show-inheritance:

feast.infra.materialization.contrib.bytewax.bytewax\_materialization\_job
----------------------------------------------------------------------

.. automodule:: feast.infra.materialization.contrib.bytewax.bytewax_materialization_job
:members:
:undoc-members:
:show-inheritance:

Module contents
---------------

.. automodule:: feast.infra.materialization.contrib.bytewax
:members:
:undoc-members:
:show-inheritance:
10 changes: 10 additions & 0 deletions sdk/python/docs/source/feast.infra.materialization.contrib.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
feast.infra.materialization.contrib package
==========================================

Subpackages
-----------

.. toctree::
:maxdepth: 4

feast.infra.materialization.contrib.bytewax
8 changes: 8 additions & 0 deletions sdk/python/docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,11 @@ Local Engine
.. autoclass:: feast.infra.materialization.lambda.lambda_engine
:members:
:noindex:


Bytewax Engine
---------------------------

.. automodule:: feast.infra.materialization.contrib.bytewax
:members:
:noindex:
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from .bytewax_materialization_dataflow import BytewaxMaterializationDataflow
from .bytewax_materialization_engine import (
BytewaxMaterializationEngine,
BytewaxMaterializationEngineConfig,
)
from .bytewax_materialization_job import BytewaxMaterializationJob
from .bytewax_materialization_task import BytewaxMaterializationTask

__all__ = [
"BytewaxMaterializationTask",
"BytewaxMaterializationJob",
"BytewaxMaterializationDataflow",
"BytewaxMaterializationEngine",
"BytewaxMaterializationEngineConfig",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import List

import pyarrow as pa
import pyarrow.parquet as pq
import s3fs
from bytewax import Dataflow, cluster_main # type: ignore
from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute
from bytewax.parse import proc_env
from tqdm import tqdm

from feast import FeatureStore, FeatureView, RepoConfig
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping


class BytewaxMaterializationDataflow:
def __init__(
self,
config: RepoConfig,
feature_view: FeatureView,
paths: List[str],
):
self.config = config
self.feature_store = FeatureStore(config=config)

self.feature_view = feature_view
self.paths = paths

self._run_dataflow()

def process_path(self, path):
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(path, filesystem=fs, use_legacy_dataset=False)
batches = []
for fragment in dataset.fragments:
for batch in fragment.to_table().to_batches():
batches.append(batch)

return batches

def input_builder(self, worker_index, worker_count, resume_epoch):
worker_paths = distribute(self.paths, worker_index, worker_count)
epoch = 0
for path in worker_paths:
yield AdvanceTo(epoch)
yield Emit(path)
epoch += 1

return

def output_builder(self, worker_index, worker_count):
def output_fn(epoch_batch):
_, batch = epoch_batch

table = pa.Table.from_batches([batch])

if self.feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, self.feature_view.batch_source.field_mapping
)

join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in self.feature_view.entity_columns
}

rows_to_write = _convert_arrow_to_proto(
table, self.feature_view, join_key_to_value_type
)
provider = self.feature_store._get_provider()
with tqdm(total=len(rows_to_write)) as progress:
provider.online_write_batch(
config=self.config,
table=self.feature_view,
data=rows_to_write,
progress=progress.update,
)

return output_fn

def _run_dataflow(self):
flow = Dataflow()
flow.flat_map(self.process_path)
flow.capture()
cluster_main(
flow,
ManualInputConfig(self.input_builder),
self.output_builder,
**proc_env(),
)
Loading