-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathknowledge_store.py
155 lines (131 loc) · 6.24 KB
/
knowledge_store.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from typing import Union, Dict, List, Callable
import marqo
def default_chunker(document: str) -> List[Dict[str, str]]:
"""Default chunker that returns the whole document as a single chunk.
Args:
document (str): The input document.
Returns:
List[Dict[str, str]]: A list containing a single chunk with the whole document.
"""
return [{"text": document}]
class MarqoKnowledgeStore:
def __init__(
self,
client: marqo.Client,
index_name: str,
document_chunker: Callable[[str], List[Dict[str, str]]] = default_chunker,
document_cleaner: Union[Callable[[str], str], None] = None,
) -> None:
"""Initialize the MarqoKnowledgeStore with a client, index name, and optional chunker and cleaner.
Args:
client (marqo.Client): The Marqo client.
index_name (str): The name of the index.
document_chunker (Callable[[str], List[Dict[str, str]]], optional): Function to chunk documents. Defaults to default_chunker.
document_cleaner (Union[Callable[[str], str], None], optional): Function to clean documents. Defaults to None.
"""
self._client = client
self._index_name = index_name
self._document_chunker = document_chunker
self._document_cleaner = document_cleaner
self._index_settings = {
"model": "hf/all_datasets_v4_MiniLM-L6",
"text_preprocessing": {
"split_length": 2,
"split_overlap": 0,
"split_method": "sentence"
}
}
self.reset_index()
def query_for_content(
self, query: Union[str, Dict[str, float]], content_var: str, limit: int = 5
) -> List[str]:
"""Query the knowledge store for content based on a query.
Args:
query (Union[str, Dict[str, float]]): The query string or dictionary.
content_var (str): The key to extract content from the response.
limit (int, optional): The maximum number of results to return. Defaults to 5.
Returns:
List[str]: A list of content strings that match the query.
"""
relevance_score = 0.61
resp = self._client.index(self._index_name).search(q=query, limit=limit)
for res in resp["hits"]:
if res["_score"] > relevance_score:
# Print the response details
for key, value in res.items():
print(f"{key}: {value}")
print("Score:", res["_score"])
# Collect knowledge based on content_var or fallback fields
# Description and Title come from the Document title I used
knowledge = []
for res in resp["hits"]:
if res["_score"] > relevance_score:
if content_var in res:
knowledge.append(res[content_var])
elif "text" in res:
knowledge.append(res["text"])
elif "Description" in res:
knowledge.append(res["Description"])
elif "Title" in res:
knowledge.append(res["Title"])
return knowledge
def add_document(self, document: str) -> None:
print("Adding documents")
"""Add a document to the knowledge store.
Args:
document (str): The document to add.
"""
if self._document_cleaner is not None:
document = self._document_cleaner(document)
chunks = self._document_chunker(document)
for chunk in chunks:
chunk['tensor_fields'] = ['text'] # Explicitly set tensor fields
self._client.index(self._index_name).add_documents(chunks, tensor_fields=['text'])
def add_documents_from_list_of_dicts(self, documents: List[Dict[str, str]]) -> None:
print("Adding documents")
"""Add documents to knowledge store."""
if self._document_cleaner is not None:
# Apply document cleaner to each string field in each document
for doc in documents:
for key, value in doc.items():
if isinstance(value, str): # Only clean string fields
doc[key] = self._document_cleaner(value)
# Create chunks while keeping original fields in each chunk
chunks = []
for doc in documents:
description_chunks = self._document_chunker(doc["Description"])
for chunk in description_chunks:
# Each chunk will only contain the original fields
# Update code so that it adds all properties of "doc" and not hardcode it
chunks.append({
"Title": doc["Title"],
"Description": doc["Description"],
**({"_id": doc["_id"]} if "_id" in doc else {})
})
# Add documents to the knowledge store
result = self._client.index(self._index_name).add_documents(chunks, tensor_fields=['Description', 'Title'])
if 'errors' in result and result['errors']:
print("Errors detected:")
print(result)
else:
print("Documents added successfully.")
print(f"Loaded docs into index'")
def reset_index(self) -> None:
"""Reset the index by deleting it if it exists and creating a new one.
This method will attempt to delete the existing index. If the index is not found,
it will print a message and create a new one. If an error occurs during deletion
or creation, it will print the error message.
"""
try:
self._client.index(self._index_name).delete()
except marqo.errors.MarqoWebError:
print(f"Index '{self._index_name}' not found. Creating a new one.")
except Exception as e:
print(f"Error deleting index '{self._index_name}': {e}")
try:
self._client.create_index(index_name=self._index_name, **self._index_settings)
except marqo.errors.IndexAlreadyExistsError:
print(f"Index '{self._index_name}' already exists. Updating settings.")
self._client.index(self._index_name).settings(**self._index_settings)
except Exception as e:
print(f"Error creating index '{self._index_name}': {e}")