-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathembedder.py
137 lines (111 loc) · 5.26 KB
/
embedder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from typing import List
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_ollama.llms import OllamaLLM
import logging
import psutil
import os
from langchain_community.document_loaders import PyPDFDirectoryLoader
from langchain_community.vectorstores.chroma import Chroma
import chromadb
from ebooklib import epub
from bs4 import BeautifulSoup
import torch
from transformers import AutoModel, AutoTokenizer
class RAGPipeline:
def __init__(self, model_name: str = "sentence-transformers/all-mpnet-base-v2", max_memory_gb: float = 3.0):
self.setup_logging()
self.check_system_memory(max_memory_gb)
# Load the language model (LLM)
self.llm = OllamaLLM(model="deepseek-r1:8b")
# Initialize embeddings using a lightweight model
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-mpnet-base-v2",
model_kwargs={'device': 'cpu'} # Use CPU for efficiency
)
self.model = AutoModel.from_pretrained("sentence-transformers/all-mpnet-base-v2")
self.tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-mpnet-base-v2")
self.model.eval()
# Initialize Chroma client - Fixed configuration for remote server
self.chroma_client = chromadb.HttpClient(
host="localhost",
port=18000
)
# Initialize Chroma collection
self.collection_name = "oreilly"
# Delete the old collection
# self.chroma_client.delete_collection(self.collection_name)
# Create a new collection with the correct dimension
self.collection = self.chroma_client.get_or_create_collection(
self.collection_name,
metadata={"dimensionality": 768} # Explicitly define the correct embedding size
)
def setup_logging(self):
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def check_system_memory(self, max_memory_gb: float):
available_memory = psutil.virtual_memory().available / (1024 ** 3)
self.logger.info(f"Available system memory: {available_memory:.1f} GB")
if available_memory < max_memory_gb:
self.logger.warning("Memory is below recommended threshold.")
# Function to extract text from an EPUB file
def extract_text_from_epub(self, epub_path):
book = epub.read_epub(epub_path)
text_content = []
for item in book.get_items():
if isinstance(item, epub.EpubHtml):
soup = BeautifulSoup(item.content, 'html.parser')
text_content.append(soup.get_text())
return " ".join(text_content)
# Function to embed text using the model
def embed_text(self, text):
tokenized = self.tokenizer([text], padding=True, truncation=True, return_tensors="pt")
with torch.no_grad():
model_output = self.model(**tokenized)
embeddings = model_output[0][:, 0] # CLS pooling
return torch.nn.functional.normalize(embeddings, dim=1).squeeze().tolist()
def load_and_split_documents(self, file_path: str) -> List[Document]:
# Step 1 - load and split documents
# Process EPUB files in the ./epubs directory
epub_dir = "./epubs"
for filename in os.listdir(file_path):
if filename.endswith(".epub"):
epub_path = os.path.join(epub_dir, filename)
print(f"Processing: {epub_path}")
# Extract text and create embeddings
text = self.extract_text_from_epub(epub_path)
embedding = self.embed_text(text)
# Generate a unique ID from the filename (without the .epub extension)
doc_id = os.path.splitext(filename)[0]
# Add to Chroma database with the unique ID
self.collection.add(
ids=[doc_id],
embeddings=[embedding],
metadatas=[{"filename": filename}],
documents=[text],
)
def create_vectorstore(self, documents: List[Document]) -> Chroma:
"""
Create a vector store from the provided documents using Chroma.
Args:
documents: List of Document objects to be added to the vector store
Returns:
Chroma: Initialized vector store containing the document embeddings
"""
# Create and initialize Chroma vector store
vectorstore = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
collection_name=self.collection_name,
client=self.chroma_client
)
self.logger.info(f"Created vector store with {len(documents)} documents")
return vectorstore
def main():
rag = RAGPipeline(model_name="deepseek-r1:8b", max_memory_gb=3.0)
# documents = rag.load_and_split_documents("data/knowledge.txt")
documents = rag.load_and_split_documents("./epubs")
vectorstore = rag.create_vectorstore(documents)
if __name__ == "__main__":
main()