Skip to content

Commit

Permalink
Add pd.DataFrame support, remove dataclass_wizard (#3)
Browse files Browse the repository at this point in the history
* Add pd.DataFrame support, remove dataclass_wizard, add import examples

* Remove dataclass-wizard dependency

* Disable debug timing code
  • Loading branch information
xthexder authored Jan 13, 2024
1 parent 771312e commit 2b7e086
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 43 deletions.
23 changes: 18 additions & 5 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,21 @@

To build and publish the PyPI package:

1. `python -m pip install --upgrade build`
1. `python -m pip install --upgrade twine`
1. `rm -rf dist/`
2. `python -m build`
3. `python -m twine upload dist/*`
1. Set up `~/.pypirc` with PyPI api token.
2. Install `build` and `twine` python packages
```sh
$ python -m pip install --upgrade build
$ python -m pip install --upgrade twine
```
3. Remove existing output
```sh
$ rm -rf dist/
```
4. Build new package
```sh
$ python -m build
```
5. Upload release
```sh
$ python -m twine upload dist/*
```
57 changes: 57 additions & 0 deletions examples/import_huggingface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/python3

import sys
import time
from datasets import load_dataset

import turbopuffer as tpuf
import traceback
from typing import Iterable, Iterator


class DocumentMapper:
doc_source: Iterator
index: int

def __init__(self, doc_source: Iterable):
self.doc_source = iter(doc_source)
self.index = -1

def __iter__(self):
return self

def __next__(self):
self.index += 1
value = next(self.doc_source)
if value:
vector = value.pop('emb')
return tpuf.VectorRow(
id=self.index,
vector=vector,
attributes=value,
)
return value


def main(dataset_name):
docs = load_dataset(dataset_name, split="train", streaming=True)

mapper = DocumentMapper(docs)
ns = tpuf.Namespace(dataset_name.replace('/', '-'))
start_time = time.monotonic()
try:
ns.upsert(mapper)
except Exception:
traceback.print_exc()
finally:
print('Upserted', mapper.index+1, 'documents')
print('Took:', (time.monotonic()-start_time), 'seconds')


if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <dataset_name>\n"
" Default TURBOPUFFER_API_KEY will be used from environment.")
sys.exit(1)

main(sys.argv[1])
86 changes: 86 additions & 0 deletions examples/import_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/python3

import glob
import os
import sys
import time
import pandas as pd
import turbopuffer as tpuf
import traceback
import threading
from queue import Queue

NUM_THREADS = 4


def read_docs_to_queue(queue, parquet_files):
try:
file_offset = 0
for parquet_file in parquet_files:
df = pd.read_parquet(parquet_file).rename(columns={'emb': 'vector'})
if 'id' not in df.keys():
df['id'] = range(file_offset, file_offset+len(df))
queue.put(df)
file_offset += len(df)
except Exception:
print('Failed to read batch:')
traceback.print_exc()
for _ in range(0, NUM_THREADS):
queue.put(None) # Signal the end of the documents


def upsert_docs_from_queue(input_queue, dataset_name):
ns = tpuf.Namespace(dataset_name)

batch = input_queue.get()
while batch is not None:
try:
ns.upsert(batch)
print(f"Completed {batch['id'][0]}..{batch['id'][batch.shape[0]-1]}")
except KeyboardInterrupt:
break
except Exception:
print(f"Failed to upsert batch: {batch['id'][0]}..{batch['id'][batch.shape[0]-1]}")
traceback.print_exc()
batch = input_queue.get()


def main(dataset_name, input_path):
input_glob = os.path.join(input_path, "*.parquet")
parquet_files = glob.glob(input_glob)

if len(parquet_files) == 0:
print(f"No .parquet files found in: {input_glob}")
sys.exit(1)

doc_queue = Queue(NUM_THREADS)
read_thread = threading.Thread(target=read_docs_to_queue, args=(doc_queue, parquet_files))
upsert_threads = []

start_time = time.monotonic()

try:
read_thread.start()

for _ in range(NUM_THREADS):
upsert_thread = threading.Thread(target=upsert_docs_from_queue, args=(doc_queue, dataset_name))
upsert_threads.append(upsert_thread)
upsert_thread.start()

read_thread.join()

for upsert_thread in upsert_threads:
upsert_thread.join()

finally:
print('Upserted', doc_queue.qsize(), 'documents')
print('Took:', (time.monotonic() - start_time), 'seconds')


if __name__ == "__main__":
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} <dataset_name> <input_folder>\n"
" Default TURBOPUFFER_API_KEY will be used from environment.")
sys.exit(1)

main(sys.argv[1], sys.argv[2])
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ classifiers = [
[tool.poetry.dependencies]
python = "^3.9"
requests = "^2.31"
dataclass-wizard = "^0.22"
orjson = {version = "^3.9", optional = true}

[tool.poetry.extras]
Expand Down
1 change: 1 addition & 0 deletions turbopuffer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
api_key = os.environ.get('TURBOPUFFER_API_KEY')
api_base_url = os.environ.get('TURBOPUFFER_API_BASE_URL', 'https://api.turbopuffer.com/v1')
upsert_batch_size = 5_000

try:
import orjson # extras = ["fast"]
Expand Down
16 changes: 5 additions & 11 deletions turbopuffer/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
import turbopuffer as tpuf
import gzip
from turbopuffer.error import TurbopufferError, AuthenticationError, APIError
from typing import Optional, List, Union
from dataclass_wizard import JSONSerializable
from typing import Optional, List


def find_api_key(api_key: Optional[str] = None) -> str:
Expand Down Expand Up @@ -38,7 +37,7 @@ def make_api_request(self,
*args: List[str],
method: Optional[str] = None,
query: Optional[dict] = None,
payload: Optional[Union[JSONSerializable, dict]] = None) -> dict:
payload: Optional[dict] = None) -> dict:
start = time.monotonic()
if method is None and payload is not None:
method = 'POST'
Expand All @@ -48,19 +47,14 @@ def make_api_request(self,
request.params = query

if payload is not None:
if isinstance(payload, JSONSerializable):
# before = time.monotonic()
dict_payload = payload.to_dict()
# print('Dict time:', time.monotonic() - before)
# before = time.monotonic()
if isinstance(payload, dict):
# before = time.monotonic()
json_payload = tpuf.dump_json_bytes(dict_payload)
# print('Json time:', time.monotonic() - before)
elif isinstance(payload, dict):
json_payload = tpuf.dump_json_bytes(payload)
# print('Json time:', time.monotonic() - before)
else:
raise ValueError(f'Unsupported POST payload type: {type(payload)}')

# before = time.monotonic()
gzip_payload = gzip.compress(json_payload, compresslevel=1)
# json_mebibytes = len(json_payload) / 1024 / 1024
# gzip_mebibytes = len(gzip_payload) / 1024 / 1024
Expand Down
55 changes: 47 additions & 8 deletions turbopuffer/namespace.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from turbopuffer.vectors import Cursor, VectorResult, VectorColumns, VectorRow, ITERATOR_BATCH_SIZE, batch_iter
from turbopuffer.vectors import Cursor, VectorResult, VectorColumns, VectorRow, batch_iter
from turbopuffer.backend import Backend
from turbopuffer.query import VectorQuery, FilterTuple
from typing import Dict, List, Optional, Iterable, Union, overload
import turbopuffer as tpuf


class Namespace:
Expand Down Expand Up @@ -77,7 +78,7 @@ def upsert(self, data=None, ids=None, vectors=None, attributes=None) -> None:
elif isinstance(data, VectorColumns):
if None in data.vectors:
raise ValueError('upsert() call would result in a vector deletion, use Namespace.delete([ids...]) instead.')
response = self.backend.make_api_request('vectors', self.name, payload=data)
response = self.backend.make_api_request('vectors', self.name, payload=data.__dict__)
elif isinstance(data, VectorRow):
raise ValueError('upsert() should be called on a list of vectors, got single vector.')
elif isinstance(data, list):
Expand All @@ -98,12 +99,50 @@ def upsert(self, data=None, ids=None, vectors=None, attributes=None) -> None:
return self.upsert(VectorColumns.from_dict(data))
else:
raise ValueError('Provided dict is missing ids.')
elif isinstance(data, Iterable):
for batch in batch_iter(data, ITERATOR_BATCH_SIZE):
self.upsert(batch)
return
else:
raise ValueError(f'Unsupported data type: {type(data)}')
try:
import pandas as pd
if isinstance(data, pd.DataFrame):
if 'id' not in data.keys():
raise ValueError('Provided pd.DataFrame is missing an id column.')
if 'vector' not in data.keys():
raise ValueError('Provided pd.DataFrame is missing a vector column.')
# start = time.monotonic()
for i in range(0, len(data), tpuf.upsert_batch_size):
batch = data[i:i+tpuf.upsert_batch_size]
attributes = dict()
for key, values in batch.items():
if key != 'id' and key != 'vector':
attributes[key] = values.tolist()
columns = tpuf.VectorColumns(
ids=batch['id'].tolist(),
vectors=batch['vector'].transform(lambda x: x.tolist()).tolist(),
attributes=attributes
)
# time_diff = time.monotonic() - start
# print(f"Batch {columns.ids[0]}..{columns.ids[-1]} begin:", time_diff, '/', len(batch), '=', len(batch)/time_diff)
# before = time.monotonic()
# print(columns)
self.upsert(columns)
# time_diff = time.monotonic() - before
# print(f"Batch {columns.ids[0]}..{columns.ids[-1]} time:", time_diff, '/', len(batch), '=', len(batch)/time_diff)
# start = time.monotonic()
return
except ImportError:
pass
if isinstance(data, Iterable):
# start = time.monotonic()
for batch in batch_iter(data, tpuf.upsert_batch_size):
# time_diff = time.monotonic() - start
# print('Batch begin:', time_diff, '/', len(batch), '=', len(batch)/time_diff)
# before = time.monotonic()
self.upsert(batch)
# time_diff = time.monotonic() - before
# print('Batch time:', time_diff, '/', len(batch), '=', len(batch)/time_diff)
# start = time.monotonic()
return
else:
raise ValueError(f'Unsupported data type: {type(data)}')

assert response.get('status', '') == 'OK', f'Invalid upsert() response: {response}'

Expand Down Expand Up @@ -174,7 +213,7 @@ def query(self,
else:
raise ValueError(f'query() input type must be compatible with turbopuffer.VectorQuery: {type(query_data)}')

response = self.backend.make_api_request('vectors', self.name, 'query', payload=query_data)
response = self.backend.make_api_request('vectors', self.name, 'query', payload=query_data.__dict__)
return VectorResult(response, namespace=self)

def vectors(self, cursor: Optional[Cursor] = None) -> VectorResult:
Expand Down
23 changes: 18 additions & 5 deletions turbopuffer/query.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dataclasses import dataclass
from typing import Optional, List, Tuple, Union, Dict
from dataclass_wizard import JSONSerializable
from enum import Enum


Expand All @@ -16,14 +15,28 @@ class FilterMatch(Enum):


@dataclass
class VectorQuery(JSONSerializable):
class VectorQuery:
vector: Optional[List[float]] = None
distance_metric: Optional[str] = None
top_k: int = 10
include_vectors: bool = False
include_attributes: Optional[List[str]] = None
filters: Optional[Dict[str, List[FilterTuple]]] = None

class _(JSONSerializable.Meta):
skip_defaults = True
key_transform_with_dump = 'SNAKE'
def from_dict(source: dict) -> 'VectorQuery':
return VectorQuery(
vector=source.get('vector'),
distance_metric=source.get('distance_metric'),
top_k=source.get('top_k'),
include_vectors=source.get('include_vectors'),
include_attributes=source.get('include_attributes'),
filters=source.get('filters'),
)

def __post_init__(self):
if self.vector is not None and not isinstance(self.vector, list):
raise ValueError('VectorQuery.vector must be a list, got:', type(self.vector))
if self.include_attributes is not None and not isinstance(self.include_attributes, list):
raise ValueError('VectorQuery.include_attributes must be a list, got:', type(self.include_attributes))
if self.filters is not None and not isinstance(self.filters, dict):
raise ValueError('VectorQuery.filters must be a dict, got:', type(self.filters))
Loading

0 comments on commit 2b7e086

Please sign in to comment.