Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add mysql as online store #3190

Merged
merged 17 commits into from
Sep 22, 2022
21 changes: 21 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,27 @@ test-python-universal-postgres-online:
not test_snowflake" \
sdk/python/tests

test-python-universal-mysql-online:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.mysql_repo_configuration \
PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.mysql \
FEAST_USAGE=False \
IS_TEST=True \
python -m pytest -n 8 --integration \
-k "not test_universal_cli and \
not test_go_feature_server and \
not test_feature_logging and \
not test_reorder_columns and \
not test_logged_features_validation and \
not test_lambda_materialization_consistency and \
not test_offline_write and \
not test_push_features_to_offline_store and \
not gcs_registry and \
not s3_registry and \
not test_universal_types and \
not test_snowflake" \
sdk/python/tests

test-python-universal-cassandra:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.cassandra_repo_configuration \
Expand Down
201 changes: 201 additions & 0 deletions sdk/python/feast/infra/online_stores/contrib/mysql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
from __future__ import absolute_import

from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

import pymysql
import pytz
from pydantic import StrictStr
from pymysql.connections import Connection

from feast import Entity, FeatureView, RepoConfig
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel


class MySQLOnlineStoreConfig(FeastConfigBaseModel):
"""
Configuration for the MySQL online store.
NOTE: The class *must* end with the `OnlineStoreConfig` suffix.
"""

type = "mysql"

host: Optional[StrictStr] = None
user: Optional[StrictStr] = None
password: Optional[StrictStr] = None
database: Optional[StrictStr] = None
port: Optional[int] = None


class MySQLOnlineStore(OnlineStore):
"""
An online store implementation that uses MySQL.
NOTE: The class *must* end with the `OnlineStore` suffix.
"""

_conn: Optional[Connection] = None

def _get_conn(self, config: RepoConfig) -> Connection:

online_store_config = config.online_store
assert isinstance(online_store_config, MySQLOnlineStoreConfig)

if not self._conn:
self._conn = pymysql.connect(
host=online_store_config.host or "127.0.0.1",
user=online_store_config.user or "test",
password=online_store_config.password or "test",
database=online_store_config.database or "feast",
port=online_store_config.port or 3306,
autocommit=True,
)
return self._conn

def online_write_batch(
self,
config: RepoConfig,
table: FeatureView,
data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]],
progress: Optional[Callable[[int], Any]],
) -> None:

conn = self._get_conn(config)
cur = conn.cursor()

project = config.project

for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(entity_key).hex()
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)

for feature_name, val in values.items():
self.write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val)
conn.commit()
if progress:
progress(1)

@staticmethod
def write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val) -> None:
cur.execute(
f"""
INSERT INTO {_table_id(project, table)}
(entity_key, feature_name, value, event_ts, created_ts)
values (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
value = %s,
event_ts = %s,
created_ts = %s;
""",
(
# Insert
entity_key_bin,
feature_name,
val.SerializeToString(),
timestamp,
created_ts,
# Update on duplicate key
val.SerializeToString(),
timestamp,
created_ts
),
)

def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
conn = self._get_conn(config)
cur = conn.cursor()

result: List[Tuple[Optional[datetime], Optional[Dict[str, Any]]]] = []

project = config.project
for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(entity_key).hex()

cur.execute(
f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = %s",
(entity_key_bin,),
)

res = {}
res_ts: Optional[datetime] = None
records = cur.fetchall()
if records:
for feature_name, val_bin, ts in records:
val = ValueProto()
val.ParseFromString(val_bin)
res[feature_name] = val
res_ts = ts

if not res:
result.append((None, None))
else:
result.append((res_ts, res))
return result

def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
) -> None:
conn = self._get_conn(config)
cur = conn.cursor()
project = config.project

# We don't create any special state for the entities in this implementation.
for table in tables_to_keep:
cur.execute(
f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512),
feature_name VARCHAR(256),
value BLOB,
event_ts timestamp NULL DEFAULT NULL,
created_ts timestamp NULL DEFAULT NULL,
PRIMARY KEY(entity_key, feature_name))"""
)

cur.execute(
f"ALTER TABLE {_table_id(project, table)} ADD INDEX {_table_id(project, table)}_ek (entity_key);"
)

for table in tables_to_delete:
cur.execute(f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};")
cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")

def teardown(
self,
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
) -> None:
conn = self._get_conn(config)
cur = conn.cursor()
project = config.project

for table in tables:
cur.execute(f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};")
cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")


def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"


def _to_naive_utc(ts: datetime) -> datetime:
if ts.tzinfo is None:
return ts
else:
return ts.astimezone(pytz.utc).replace(tzinfo=None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)
from tests.integration.feature_repos.universal.online_store.mysql import (
MySQLOnlineStoreCreator,
)

FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(online_store_creator=MySQLOnlineStoreCreator),
]
1 change: 1 addition & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"postgres": "feast.infra.online_stores.contrib.postgres.PostgreSQLOnlineStore",
"hbase": "feast.infra.online_stores.contrib.hbase_online_store.hbase.HbaseOnlineStore",
"cassandra": "feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore",
"mysql": "feast.infra.online_stores.contrib.mysql.MySQLOnlineStore",
}

OFFLINE_STORE_CLASS_FOR_TYPE = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Dict

from testcontainers.mysql import MySqlContainer

from tests.integration.feature_repos.universal.online_store_creator import (
OnlineStoreCreator,
)


class MySQLOnlineStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
self.container = MySqlContainer('mysql:latest', platform='linux/amd64') \
.with_exposed_ports(3306) \
.with_env("MYSQL_USER", "root") \
.with_env("MYSQL_PASSWORD", "test") \
.with_env("MYSQL_DATABASE", "test")

def create_online_store(self) -> Dict[str, str]:
self.container.start()
exposed_port = self.container.get_exposed_port(3306)
return {"type": "mysql", "user": "root", "password": "test", "database": "test", "port": exposed_port}

def teardown(self):
self.container.stop()
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@

MYSQL_REQUIRED = [
"mysqlclient",
"pymysql",
"types-PyMySQL"
]

HBASE_REQUIRED = [
Expand Down