Skip to content

Commit

Permalink
typense: process document conversion in chunks to reduce memory
Browse files Browse the repository at this point in the history
Also delete all no longer active collections, to reduce memory usage
of typensense too, before and after a full sync.
  • Loading branch information
lazka committed Aug 21, 2024
1 parent 138b7ec commit 40f16cf
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 43 deletions.
38 changes: 10 additions & 28 deletions src/TypesenseClient/SearchIndex.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public function purgeAll(): void
{
$newName = $this->createNewCollection();
$this->updateAlias($newName);
$this->expireOldCollections(0);
$this->deleteOldCollections();
}

protected function isAliasExists(string $aliasName): bool
Expand All @@ -130,28 +130,20 @@ public function ensureSetup(): void
}
}

public function expireOldCollections(int $keepLast = 3): bool
/**
* Delete all collections that are no longer actively used.
*/
public function deleteOldCollections(): void
{
$client = $this->getClient();
$collectionNameSkipList = [];

// Don't delete the currently linked collection in all cases
$alias = $client->aliases[$this->getAliasName()]->retrieve();
$collectionNameSkipList = [$alias['collection_name']];

// TODO: remove this once we are done testing
// We use these for testing as well, so skip for now
$collectionNameSkipList = array_merge(
$collectionNameSkipList,
['cabinet-students', 'cabinet-files']
);

// Fetch all collections
try {
$collections = $client->collections->retrieve();
} catch (Exception|TypesenseClientErrorAlias) {
return false;
}
$collectionNameSkipList[] = $alias['collection_name'];

// Collect all collections with the given prefix that are not in the skip list
$collections = $client->collections->retrieve();
$collectionNameList = [];
foreach ($collections as $collection) {
if (str_starts_with($collection['name'], $this->getCollectionPrefix())
Expand All @@ -160,21 +152,11 @@ public function expireOldCollections(int $keepLast = 3): bool
}
}

rsort($collectionNameList);
// Slice off $keepLast collections
$collectionNameList = array_slice($collectionNameList, $keepLast);

// Delete the remaining collections
foreach ($collectionNameList as $collectionName) {
$this->logger->info("Deleting old collection '$collectionName'");
try {
$client->collections[$collectionName]->delete();
} catch (Exception|TypesenseClientErrorAlias $e) {
$this->logger->error('Deleting collection failed', ['exception' => $e]);
}
$client->collections[$collectionName]->delete();
}

return true;
}

public function setLogger(LoggerInterface $logger): void
Expand Down
36 changes: 21 additions & 15 deletions src/TypesenseSync/TypesenseSync.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,40 +62,46 @@ public function sync(bool $full = false)
}
$cursor = $this->getCursor();

// Process in chunks to reduce memory consumption
$chunkSize = 10000;

if ($cursor === null) {
$this->logger->info('Starting a full sync');
$schema = $this->translator->getSchema();

$this->searchIndex->setSchema($schema);
$this->searchIndex->ensureSetup();
$this->searchIndex->deleteOldCollections();
$collectionName = $this->searchIndex->createNewCollection();

$res = $this->personSync->getAllPersons();
$documents = [];
foreach ($res->getPersons() as $person) {
$documents[] = $this->personToDocument($person);
}

$this->searchIndex->addDocumentsToCollection($collectionName, $documents);
if ($documents !== []) {
$this->addDummyDocuments($collectionName, $documents);
foreach (array_chunk($res->getPersons(), $chunkSize) as $persons) {
$documents = [];
foreach ($persons as $person) {
$documents[] = $this->personToDocument($person);
}
$this->searchIndex->addDocumentsToCollection($collectionName, $documents);
if ($documents !== []) {
$this->addDummyDocuments($collectionName, $documents);
}
}
$this->searchIndex->ensureSetup();

$this->searchIndex->updateAlias($collectionName);
$this->searchIndex->expireOldCollections();
$this->searchIndex->deleteOldCollections();

$this->saveCursor($res->getCursor());
} else {
$this->logger->info('Starting a partial sync');
$res = $this->personSync->getAllPersons($cursor);
$collectionName = $this->searchIndex->getCollectionName();

$documents = [];
foreach ($res->getPersons() as $person) {
$documents[] = $this->personToDocument($person);
foreach (array_chunk($res->getPersons(), $chunkSize) as $persons) {
$documents = [];
foreach ($persons as $person) {
$documents[] = $this->personToDocument($person);
}
$this->searchIndex->addDocumentsToCollection($collectionName, $documents);
}
$collectionName = $this->searchIndex->getCollectionName();
$this->searchIndex->addDocumentsToCollection($collectionName, $documents);

$this->saveCursor($res->getCursor());
}
Expand Down

0 comments on commit 40f16cf

Please sign in to comment.