Skip to content

Commit

Permalink
rag evals: rag connector fix (#8063)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: d6079878f42a31ebae5a507f5230fb6f2163d2fb
  • Loading branch information
berkecanrizai authored and Manul from Pathway committed Jan 22, 2025
1 parent 01e1047 commit b9efe06
Showing 1 changed file with 11 additions and 183 deletions.
194 changes: 11 additions & 183 deletions integration_tests/rag_evals/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,7 @@
import httpx
import requests

OPENAI_API_KEY = "sk-................................................" # placeholder, passes the rag app


def send_post_request(
url: str, data: dict, headers: dict = {}, timeout: int | None = None
):
response = requests.post(url, json=data, headers=headers, timeout=timeout)
response.raise_for_status()
return response.json()
from pathway.xpacks.llm.question_answering import RAGClient, send_post_request


async def a_send_post_request(
Expand All @@ -23,11 +15,6 @@ async def a_send_post_request(
return response.json()


def prep_payload(payload):
payload["openai_api_key"] = OPENAI_API_KEY
return payload


class VectorStoreClient:
"""
A client you can use to query VectorStoreServer.
Expand Down Expand Up @@ -163,14 +150,12 @@ def pw_ai_answer_question(
self,
prompt,
filter=None,
response_type="short",
model=None,
return_context_docs: bool | None = True,
) -> dict:
api_url = f"{self.base_url}/v1/pw_ai_answer"
payload = {
"prompt": prompt,
"response_type": response_type,
# "return_context_docs": return_context_docs, # TODO: later
}

if filter:
Expand All @@ -179,17 +164,12 @@ def pw_ai_answer_question(
if model:
payload["model"] = model

result: dict = {}

response = send_post_request(api_url, prep_payload(payload))

result["response"] = response
if return_context_docs is not None:
payload["return_context_docs"] = return_context_docs

context_docs = self.index_client.query(prompt, metadata_filter=filter, k=6)
response = send_post_request(api_url, payload)

result["context_docs"] = context_docs

return result
return response

def pw_list_documents(self, filter=None, keys=["path"]):
api_url = f"{self.base_url}/v1/pw_list_documents"
Expand All @@ -198,167 +178,11 @@ def pw_list_documents(self, filter=None, keys=["path"]):
if filter:
payload["metadata_filter"] = filter

response = send_post_request(api_url, prep_payload(payload))
response = send_post_request(api_url, payload)
result = [{k: v for k, v in dc.items() if k in keys} for dc in response]
return result


class RAGClient:
"""
Connector for interacting with the Pathway RAG applications.
Either (`host` and `port`) or `url` must be set.
Args:
- host: The host of the RAG service.
- port: The port of the RAG service.
- url: The URL of the RAG service.
- timeout: Timeout for requests in seconds. Defaults to 90.
- additional_headers: Additional headers for the requests.
"""

def __init__(
self,
host: str | None = None,
port: int | None = None,
url: str | None = None,
timeout: int | None = 90,
additional_headers: dict | None = None,
):
err = "Either (`host` and `port`) or `url` must be provided, but not both."
if url is not None:
if host is not None or port is not None:
raise ValueError(err)
self.url = url
else:
if host is None:
raise ValueError(err)
port = port or 80

protocol = "https" if port == 443 else "http"
self.url = f"{protocol}://{host}:{port}"

self.timeout = timeout
self.additional_headers = additional_headers or {}

self.index_client = VectorStoreClient(
url=self.url,
timeout=self.timeout,
additional_headers=self.additional_headers,
)

def retrieve(
self,
query: str,
k: int = 3,
metadata_filter: str | None = None,
filepath_globpattern: str | None = None,
):
"""
Retrieve closest documents from the vector store based on a query.
Args:
- query: The query string.
- k: The number of results to retrieve.
- metadata_filter: Optional metadata filter for the documents. Defaults to `None`, which
means there will be no filter.
- filepath_globpattern: Glob pattern for file paths.
"""
return self.index_client.query(
query=query,
k=k,
metadata_filter=metadata_filter,
filepath_globpattern=filepath_globpattern,
)

def statistics(
self,
):
"""
Retrieve stats from the vector store.
"""
return self.index_client.get_vectorstore_statistics()

def pw_ai_answer(
self,
prompt: str,
filters: str | None = None,
model: str | None = None,
):
"""
Return RAG answer based on a given prompt and optional filter.
Args:
- prompt: Question to be asked.
- filters: Optional metadata filter for the documents. Defaults to ``None``, which
means there will be no filter.
- model: Optional LLM model. If ``None``, app default will be used by the server.
"""
api_url = f"{self.url}/v1/pw_ai_answer"
payload = {
"prompt": prompt,
}

if filters:
payload["filters"] = filters

if model:
payload["model"] = model

response = send_post_request(api_url, payload, self.additional_headers)
return response

def pw_ai_summary(
self,
text_list: list[str],
model: str | None = None,
):
"""
Summarize a list of texts.
Args:
- text_list: List of texts to summarize.
- model: Optional LLM model. If ``None``, app default will be used by the server.
"""
api_url = f"{self.url}/v1/pw_ai_summary"
payload: dict = {
"text_list": text_list,
}

if model:
payload["model"] = model

response = send_post_request(api_url, payload, self.additional_headers)
return response

def pw_list_documents(self, filters: str | None = None, keys: list[str] = ["path"]):
"""
List indexed documents from the vector store with optional filtering.
Args:
- filters: Optional metadata filter for the documents.
- keys: List of metadata keys to be included in the response.
Defaults to ``["path"]``. Setting to ``None`` will retrieve all available metadata.
"""
api_url = f"{self.url}/v1/pw_list_documents"
payload = {}

if filters:
payload["metadata_filter"] = filters

response: list[dict] = send_post_request(
api_url, payload, self.additional_headers
)

if response:
if keys:
result = [{k: v for k, v in dc.items() if k in keys} for dc in response]
else:
result = response
else:
result = []
return result


# TODO: switch to this, replace asyncio.to_thread in evaluator
class ARAGClient(RAGClient):
async def a_pw_list_documents(
Expand Down Expand Up @@ -396,6 +220,7 @@ async def pw_ai_answer(
prompt: str,
filters: str | None = None,
model: str | None = None,
return_context_docs: bool | None = True,
):
"""
Return RAG answer based on a given prompt and optional filter.
Expand All @@ -417,5 +242,8 @@ async def pw_ai_answer(
if model:
payload["model"] = model

if return_context_docs is not None:
payload["return_context_docs"] = return_context_docs # type: ignore

response = await a_send_post_request(api_url, payload, self.additional_headers)
return response

0 comments on commit b9efe06

Please sign in to comment.