This repository provides a detailed walkthrough on how to create embeddings for your documents at scale using the powerful combination of MongoDB Atlas Stream Processing and Vector Search. By following this guide, you'll be able to efficiently process large volumes of data and generate high-quality document embeddings that can be easily searched and analyzed.
- Prerequisites
- Setting up the Environment
- Loading dataset into the cluster
- Configuring a Kafka Cluster in Confluent
- Creating Document Embeddings with MongoDB Atlas Stream Processing and Vector Search
- Configure the Stream Processing Connection Registry
- Configuring Atlas Stream Processing
- Launch the processor scripts
- Creating a Vector Search Index
- Searching and Analyzing Large-Scale Documents using Vector Search
- Conclusion
- Additional Resources
- Contributing
- Acknowledgments
- Python 3.8+
- MongoDB Atlas account
- Confluent Cloud Account
- Clone the Repository:
git clone https://github.com/dsdlt/mongodb-scalable-document-embeddings.git
cd mongodb-scalable-document-embeddings
- Set Up a Virtual Environment:
python3 -m venv venv
source venv/bin/activate
- Install Dependencies:
pip install -r requirements.txt
- Download the models for Spacy
python3 -m spacy download en_core_web_sm
python3 -m spacy download es_core_news_sm
We have included a script in this repository to download the dataset and restore it into MongoDB
-
Create an account in MongoDB Atlas: If you do not have one, we recommend following this guide
-
Load the data in MongoDB:
This script will ask for MongoDB cluster information and will load the data using mongoimport
./dataset/data_load.sh
To create a Kafka cluster in Confluent follow the instructions in the Confluent documentation
Once you have created the cluster, go to cluster settings and copy the bootstrap URL.
Then, create an API key to connect to your cluster.
The next step is to configure the topics that are going to be used in this solution: SpanishInputTopic, EnglishInputTopic, and OutputTopic
To configure a new connection, click the configure button in the Stream Processing cluster, then Connection Registry, and add a new connection.
This connection will be created to connect the Atlas Stream Processing Instance (SPI) with the Kafka Cluster.
Once you have created your Kafka cluster, Confluent will provide you with the bootstrap server URL, username, and password to use in the Connection Registry.
Next, create a connection from the SPI to the MongoDB cluster.
Firstly, we will connect to the SPI
mongosh "mongodb://atlas-stream-sdjfo1nvi1owinf-q123wf.virginia-usa.a.query.mongodb.net/" --tls --authenticationDatabase admin --username username
After that, you will create a stream processor with a pipeline to filter inserted and updated songs and send only the content of the lyric and the unique id to the corresponding topic in the Kafka cluster.
- Stop all previous connections:
sp.lyrics_source_cluster.stop()
sp.lyrics_source_cluster.drop()
- Create the connection to the MongoDB cluster
lyrics_source_cluster={$source: { connectionName: "LyricsCluster", db: "streamingvectors", coll : "lyrics", config: {fullDocument : 'updateLookup'}}}
- Create the pipelines for songs in both languages
only_lyrics_in_english={$match: {operationType: {$in: ["insert", "update"]}, "fullDocument.language": "en", "fullDocument.updated": false}}
only_lyrics_in_spanish={$match: {operationType: {$in: ["insert", "update"]}, "fullDocument.language": "es", "fullDocument.updated": false}}
project_year={$project: {"fullDocument.lyrics": 1, "fullDocument._id": 1}}
send_to_kafka_english={$emit: {connectionName: "KafkaConfluent",topic: "EnglishInputTopic"}}
send_to_kafka_spanish={$emit: {connectionName: "KafkaConfluent",topic: "SpanishInputTopic"}}
- Create the stream processors using the pipelines
sp.createStreamProcessor("lyrics_source_cluster_english", [lyrics_source_cluster, only_lyrics_in_english, project_year, send_to_kafka_english])
sp.createStreamProcessor("lyrics_source_cluster_spanish", [lyrics_source_cluster, only_lyrics_in_spanish, project_year, send_to_kafka_spanish])
- Start processing documents
sp.lyrics_source_cluster_english.start()
sp.lyrics_source_cluster_spanish.start()
You can execute the following command to list available stream processors:
sp.listStreamProcessors()
You can filter stream processors by their name:
sp.listStreamProcessors({name : 'lyrics_source_cluster_english'})
Now we will create the connection between the output topic from the Kafka cluster and Atlas Stream Processing
- Stop all previous connections
sp.lyrics_destination_cluster.stop()
sp.lyrics_destination_cluster.drop()
- Create the connection to the Kafka cluster
lyrics_output_topic={$source: { connectionName: "KafkaConfluent", topic: "OutputTopic"}}
- Create the pipeline to update the documents in MongoDB:
project_metadata = {$project : {last_updated : '$_ts', _stream_meta: 0}}
update_lyrics_embedding = {
$merge: {
into: {
connectionName: "LyricsCluster",
db: "streamingvectors",
coll: "lyrics"
},
on: "_id",
whenMatched: "merge",
whenNotMatched: "insert"
}
}
- Create the stream processors using the pipelines
sp.createStreamProcessor("lyrics_destination_cluster", [lyrics_output_topic, update_lyrics_embedding])
- Start processing documents
sp.lyrics_destination_cluster.start()
We can preview a stream processor with the following command:
sp.lyrics_destination_cluster.sample()
The metadata service is a Python script that will subscribe to their corresponding topic to generate the embeddings using a language specific LLM model and will generate a list of tags based on the contents of the lyrics using the Python library Spacy
After processing the document, it will send the event to the Output topic.
- Start consuming events from the Kafka topics
python3 server/metadataservice.py -l english
python3 server/metadataservice.py -l spanish
After executing the metadata service, the messages will start appearing in the Kafka cluster topics.
You have to create an index in the field lyrics_embeddings which is where the output stream processor will record the embeddings.
We are using different models for each language to increase the accuracy of the semantic search based on the language. Each model has created a different embeddings length so the embeddings are being written into different fields: lyrics_embeddings_es and lyrics_embeddings_en
Spanish Atlas Vector Search Index:
{
"fields": [
{
"type": "vector",
"path": "lyrics_embeddings_es",
"numDimensions": 768,
"similarity": "cosine"
}
]
}
English Atlas Vector Search Index:
{
"fields": [
{
"type": "vector",
"path": "lyrics_embeddings_en",
"numDimensions": 384,
"similarity": "cosine"
}
]
}
We have used a Euclidean similarity function when creating the Atlas Vector Search Index as "sentence-transformers" uses a pre-trained model that generates dense vectors.
The Euclidean similarity function is useful for dense data where specific values matter (e.g. image similarity)
Additionally, cosine similarity (based on the angle between vectors) is useful when you have sparse data where orientation is more important (e.g. text concepts and themes).
You could use dot product (based on angles between vectors and include vector magnitudes) for sparse data where orientation and intensity matter.
There is a Python script to run semantic queries using Atlas Vector Search in a chat interface.
python3 client/query_client.py
This event-driven architecture will allow you to scale horizontally and vertically, adding more workers that consume events from the topics thanks to Atlas Stream Processing. Additionally, you can regulate the speed at which you write into MongoDB or postpone the update to later in the day by stopping the processor in Atlas Stream Processing.
sp.lyrics_destination_cluster.stop()
Another benefit of using Atlas Stream Processing and a Kafka cluster is that you can use different models for different types of documents that are more suitable according to the values in the documents (or calculated values from the documents in real-time).
You can find additional resources in the MongoDB Developer Center and the MongoDB Atlas Solutions Library
More information about Atlas Vector Search MongoDB.local NYC 2023 - Vector Search: Powering the Next Generation of Applications
More information about Atlas Stream Processing in the Atlas Stream Processing documentation
More information about language-specific models and extension of existing sentence embedding models to other languages Making Monolingual Sentence Embeddings Multilingual using Knowledge Distillation
Find the original dataset at Kaggle
- Fork the repository on GitHub.
- Clone the forked repo to your machine.
- Create a new branch in your local repo.
- Make your changes and commit them to your local branch.
- Push your local branch to your fork on GitHub.
- Create a pull request from your fork to the original repository.
Please ensure that your code adheres to the repo's coding standards and include tests where necessary.
Thanks to Kaggle team and CARLOSGDCJ for sharing their lyrics dataset at Genius.
MongoDB team for the incredible Atlas Stream Processing and Atlas Vector Search features.