From f4f203956bc29390e8a186c51a7f9c0643e83106 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 10 Jan 2025 06:40:44 -0500 Subject: [PATCH 1/3] chore: Updating workflow to use custom version for get highest semver step Signed-off-by: Francisco Javier Arceo --- examples/rag/feature_repo/feature_store.yaml | 15 +++ examples/rag/feature_repo/test_workflow.py | 93 +++++++++++++++++++ .../milvus_online_store/milvus.py | 24 +++-- 3 files changed, 125 insertions(+), 7 deletions(-) create mode 100644 examples/rag/feature_repo/feature_store.yaml create mode 100644 examples/rag/feature_repo/test_workflow.py diff --git a/examples/rag/feature_repo/feature_store.yaml b/examples/rag/feature_repo/feature_store.yaml new file mode 100644 index 0000000000..e2747773d0 --- /dev/null +++ b/examples/rag/feature_repo/feature_store.yaml @@ -0,0 +1,15 @@ +project: rag +provider: local +registry: data/registry.db +online_store: + type: milvus + vector_enabled: true + embedding_dim: 384 + + +offline_store: + type: file +entity_key_serialization_version: 3 +# By default, no_auth for authentication and authorization, other possible values kubernetes and oidc. Refer the documentation for more details. +auth: + type: no_auth diff --git a/examples/rag/feature_repo/test_workflow.py b/examples/rag/feature_repo/test_workflow.py new file mode 100644 index 0000000000..278e9b8c6f --- /dev/null +++ b/examples/rag/feature_repo/test_workflow.py @@ -0,0 +1,93 @@ +import pandas as pd +import torch +import torch.nn.functional as F +from feast import FeatureStore +from pymilvus import MilvusClient, DataType, FieldSchema +from transformers import AutoTokenizer, AutoModel +from example_repo import city_embeddings_feature_view, item +TOKENIZER = "sentence-transformers/all-MiniLM-L6-v2" +MODEL = "sentence-transformers/all-MiniLM-L6-v2" + + +def mean_pooling(model_output, attention_mask): + token_embeddings = model_output[ + 0 + ] # First element of model_output contains all token embeddings + input_mask_expanded = ( + attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() + ) + return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp( + input_mask_expanded.sum(1), min=1e-9 + ) + + +def run_model(sentences, tokenizer, model): + encoded_input = tokenizer( + sentences, padding=True, truncation=True, return_tensors="pt" + ) + # Compute token embeddings + with torch.no_grad(): + model_output = model(**encoded_input) + + sentence_embeddings = mean_pooling(model_output, encoded_input["attention_mask"]) + sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1) + return sentence_embeddings + +def run_demo(): + store = FeatureStore(repo_path=".") + df = pd.read_parquet("./data/city_wikipedia_summaries_with_embeddings.parquet") + embedding_length = len(df['vector'][0]) + print(f'embedding length = {embedding_length}') + + print('\ndata=') + print(df.head().T) + + store.apply([city_embeddings_feature_view, item]) + store.write_to_online_store("city_embeddings", df) + + client = MilvusClient(uir="http://localhost:19530", token="username:password") + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name='state', dtype=DataType.STRING, description="State"), + FieldSchema(name='wiki_summary', dtype=DataType.STRING, description="State"), + FieldSchema(name='sentence_chunks', dtype=DataType.STRING, description="Sentence Chunks"), + FieldSchema(name="item_id", dtype=DataType.INT64, default_value=0, description="Item"), + FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=embedding_length, description="vector") + ] + cols = [f.name for f in fields] + client.insert( + collection_name="demo_collection", + data=df[cols].to_dict(orient="records"), + schema=fields, + ) + print('\n') + print('collections', client.list_collections()) + print('query results =', client.query( + collection_name="rag_city_embeddings", + filter="item_id == 0", + # output_fields=['city_embeddings', 'item_id', 'city_name'], + )) + print('query results2 =', client.query( + collection_name="rag_city_embeddings", + filter="item_id >= 0", + output_fields=["count(*)"] + # output_fields=['city_embeddings', 'item_id', 'city_name'], + )) + question = "the most populous city in the U.S. state of Texas?" + tokenizer = AutoTokenizer.from_pretrained(TOKENIZER) + model = AutoModel.from_pretrained(MODEL) + query_embedding = run_model(question, tokenizer, model) + query = query_embedding.detach().cpu().numpy().tolist()[0] + + # Retrieve top k documents + features = store.retrieve_online_documents( + feature=None, + features=["city_embeddings:vector", "city_embeddings:item_id", "city_embeddings:state"], + query=query, + top_k=3 + ) + print("features", features.to_df()) + + +if __name__ == "__main__": + run_demo() diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 8d5405c428..03cac17338 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -108,12 +108,16 @@ class MilvusOnlineStore(OnlineStore): def _connect(self, config: RepoConfig) -> MilvusClient: if not self.client: - self.client = MilvusClient( - url=f"{config.online_store.host}:{config.online_store.port}", - token=f"{config.online_store.username}:{config.online_store.password}" - if config.online_store.username and config.online_store.password - else "", - ) + if config.provider == "local": + print("Connecting to Milvus in local mode using ./milvus_demo.db") + self.client = MilvusClient("./milvus_demo.db") + else: + self.client = MilvusClient( + url=f"{config.online_store.host}:{config.online_store.port}", + token=f"{config.online_store.username}:{config.online_store.password}" + if config.online_store.username and config.online_store.password + else "", + ) return self.client def _get_collection(self, config: RepoConfig, table: FeatureView) -> Dict[str, Any]: @@ -192,7 +196,9 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Dict[str, A collection_name=collection_name, index_params=index_params, ) + print(f"created collection {collection_name} in Milvus with config = {config.online_store}") else: + print(f"loaded collection {collection_name} in Milvus with config = {config.online_store}") self.client.load_collection(collection_name) self._collections[collection_name] = self.client.describe_collection( collection_name @@ -246,8 +252,10 @@ def online_write_batch( if progress: progress(1) + print(f"Inserting {len(entity_batch_to_insert)} records into Milvus") self.client.insert( - collection_name=collection["collection_name"], data=entity_batch_to_insert + collection_name=collection["collection_name"], + data=entity_batch_to_insert, ) def online_read( @@ -292,8 +300,10 @@ def teardown( self.client = self._connect(config) for table in tables: collection_name = _table_id(config.project, table) + print('deleting collection', collection_name) if self._collections.get(collection_name, None): self.client.drop_collection(collection_name) + print('deleted collection', collection_name) self._collections.pop(collection_name, None) def retrieve_online_documents( From 0f9ca12c202186568e9e75263e69a70ce2f404d1 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 10 Jan 2025 06:43:39 -0500 Subject: [PATCH 2/3] removing print statements Signed-off-by: Francisco Javier Arceo --- .../feast/infra/online_stores/milvus_online_store/milvus.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 03cac17338..f2283387a0 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -196,9 +196,7 @@ def _get_collection(self, config: RepoConfig, table: FeatureView) -> Dict[str, A collection_name=collection_name, index_params=index_params, ) - print(f"created collection {collection_name} in Milvus with config = {config.online_store}") else: - print(f"loaded collection {collection_name} in Milvus with config = {config.online_store}") self.client.load_collection(collection_name) self._collections[collection_name] = self.client.describe_collection( collection_name @@ -252,7 +250,6 @@ def online_write_batch( if progress: progress(1) - print(f"Inserting {len(entity_batch_to_insert)} records into Milvus") self.client.insert( collection_name=collection["collection_name"], data=entity_batch_to_insert, @@ -300,10 +297,8 @@ def teardown( self.client = self._connect(config) for table in tables: collection_name = _table_id(config.project, table) - print('deleting collection', collection_name) if self._collections.get(collection_name, None): self.client.drop_collection(collection_name) - print('deleted collection', collection_name) self._collections.pop(collection_name, None) def retrieve_online_documents( From 91b2e68fa410fa3c2e442320143c6dbe59110742 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 10 Jan 2025 06:44:40 -0500 Subject: [PATCH 3/3] removing feature_store.yaml and test_workflow.py Signed-off-by: Francisco Javier Arceo --- examples/rag/feature_repo/feature_store.yaml | 15 ---- examples/rag/feature_repo/test_workflow.py | 93 -------------------- 2 files changed, 108 deletions(-) delete mode 100644 examples/rag/feature_repo/feature_store.yaml delete mode 100644 examples/rag/feature_repo/test_workflow.py diff --git a/examples/rag/feature_repo/feature_store.yaml b/examples/rag/feature_repo/feature_store.yaml deleted file mode 100644 index e2747773d0..0000000000 --- a/examples/rag/feature_repo/feature_store.yaml +++ /dev/null @@ -1,15 +0,0 @@ -project: rag -provider: local -registry: data/registry.db -online_store: - type: milvus - vector_enabled: true - embedding_dim: 384 - - -offline_store: - type: file -entity_key_serialization_version: 3 -# By default, no_auth for authentication and authorization, other possible values kubernetes and oidc. Refer the documentation for more details. -auth: - type: no_auth diff --git a/examples/rag/feature_repo/test_workflow.py b/examples/rag/feature_repo/test_workflow.py deleted file mode 100644 index 278e9b8c6f..0000000000 --- a/examples/rag/feature_repo/test_workflow.py +++ /dev/null @@ -1,93 +0,0 @@ -import pandas as pd -import torch -import torch.nn.functional as F -from feast import FeatureStore -from pymilvus import MilvusClient, DataType, FieldSchema -from transformers import AutoTokenizer, AutoModel -from example_repo import city_embeddings_feature_view, item -TOKENIZER = "sentence-transformers/all-MiniLM-L6-v2" -MODEL = "sentence-transformers/all-MiniLM-L6-v2" - - -def mean_pooling(model_output, attention_mask): - token_embeddings = model_output[ - 0 - ] # First element of model_output contains all token embeddings - input_mask_expanded = ( - attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() - ) - return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp( - input_mask_expanded.sum(1), min=1e-9 - ) - - -def run_model(sentences, tokenizer, model): - encoded_input = tokenizer( - sentences, padding=True, truncation=True, return_tensors="pt" - ) - # Compute token embeddings - with torch.no_grad(): - model_output = model(**encoded_input) - - sentence_embeddings = mean_pooling(model_output, encoded_input["attention_mask"]) - sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1) - return sentence_embeddings - -def run_demo(): - store = FeatureStore(repo_path=".") - df = pd.read_parquet("./data/city_wikipedia_summaries_with_embeddings.parquet") - embedding_length = len(df['vector'][0]) - print(f'embedding length = {embedding_length}') - - print('\ndata=') - print(df.head().T) - - store.apply([city_embeddings_feature_view, item]) - store.write_to_online_store("city_embeddings", df) - - client = MilvusClient(uir="http://localhost:19530", token="username:password") - fields = [ - FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), - FieldSchema(name='state', dtype=DataType.STRING, description="State"), - FieldSchema(name='wiki_summary', dtype=DataType.STRING, description="State"), - FieldSchema(name='sentence_chunks', dtype=DataType.STRING, description="Sentence Chunks"), - FieldSchema(name="item_id", dtype=DataType.INT64, default_value=0, description="Item"), - FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=embedding_length, description="vector") - ] - cols = [f.name for f in fields] - client.insert( - collection_name="demo_collection", - data=df[cols].to_dict(orient="records"), - schema=fields, - ) - print('\n') - print('collections', client.list_collections()) - print('query results =', client.query( - collection_name="rag_city_embeddings", - filter="item_id == 0", - # output_fields=['city_embeddings', 'item_id', 'city_name'], - )) - print('query results2 =', client.query( - collection_name="rag_city_embeddings", - filter="item_id >= 0", - output_fields=["count(*)"] - # output_fields=['city_embeddings', 'item_id', 'city_name'], - )) - question = "the most populous city in the U.S. state of Texas?" - tokenizer = AutoTokenizer.from_pretrained(TOKENIZER) - model = AutoModel.from_pretrained(MODEL) - query_embedding = run_model(question, tokenizer, model) - query = query_embedding.detach().cpu().numpy().tolist()[0] - - # Retrieve top k documents - features = store.retrieve_online_documents( - feature=None, - features=["city_embeddings:vector", "city_embeddings:item_id", "city_embeddings:state"], - query=query, - top_k=3 - ) - print("features", features.to_df()) - - -if __name__ == "__main__": - run_demo()