Skip to content

Commit

Permalink
Project initializaiton & rebasing for 0.1.0 release
Browse files Browse the repository at this point in the history
  • Loading branch information
Dabz committed Jan 22, 2025
0 parents commit f2f5298
Show file tree
Hide file tree
Showing 31 changed files with 1,984 additions and 0 deletions.
38 changes: 38 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/

### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr

### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/

### VS Code ###
.vscode/

### Mac OS ###
.DS_Store
113 changes: 113 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Connection parameters

### weaviate.connection.url
**documentation**: "Weaviate connection URL, should be following the format `<scheme>://<host>:<port>`
**default**: http://localhost:8080

### weaviate.grpc.url
**documentation**: Weaviate GRPC connection URL
**default**: localhost:50051

### weaviate.grpc.secured
**documentation**: Weaviate GRPC TLS secured connection, set to True to enable TLS encryption
**default**: false

### weaviate.auth.scheme
**documentation**: Authentication mechanism to use to connect to Weaviate, could be NONE, API_KEY or OIDC_CLIENT_CREDENTIALS
**default**: NONE
**valid values**:
- NONE
- API_KEY
- OIDC_CLIENT_CREDENTIALS

### weaviate.headers
**documentation**: Headers to provide while building Weaviate client (e.g. X-OpenAI-Api-Key)




# Security parameters

### weaviate.api.key
**documentation**: User API Key if API Key authentication mechanism is used

### weaviate.oidc.client.secret
**documentation**: User OIDC client secret if OIDC authentication mechanism is used

### weaviate.oidc.scopes
**documentation**: OIDC client scope if OIDC authentication mechanism is used



# Collection & schema parameters

### topics
**documentation**: List of topics to consume, separated by commas

### topics.regex
**documentation**: Regular expression giving topics to consume. Under the hood, the regex is compiled to a `java.util.regex.Pattern`. Only one of topics or topics.regex should be specified.

### collection.mapping
**documentation**: Mapping between Kafka topic and Weaviate collection
**default**: ${topic}

### document.id.strategy
**documentation**: Java class returning the document ID for each record
**default**: io.weaviate.connector.idstrategy.NoIdStrategy
**valid values**:
- io.weaviate.connector.idstrategy.NoIdStrategy
- io.weaviate.connector.idstrategy.KafkaIdStrategy
- io.weaviate.connector.idstrategy.FieldIdStrategy

### document.id.field.name
**documentation**: Field name containing the ID in Kafka
**default**: id

### vector.strategy
**documentation**: Java class returning the document embedding for each record
**default**: io.weaviate.connector.vectorstrategy.NoVectorStrategy
**valid values**:
- io.weaviate.connector.vectorstrategy.NoVectorStrategy
- io.weaviate.connector.vectorstrategy.FieldVectorStrategy

### vector.field.name
**documentation**: Field name containing the embedding (used only for FieldVectorStrategy)

### delete.enabled
**documentation**: Whether to treat null record values as deletes
**default**: false




# Retry parameters

### max.connection.retries
**documentation**: Maximum number of retries to perform in case of connection issues
**default**: 3

### max.timeout.retries
**documentation**: Maximum number of retries to perform in case of timeout
**default**: 3

### retry.interval
**documentation**: Interval between each retry
**default**: 2000




# Performance parameters

### batch.size
**documentation**: Number of records per batch
**default**: 100

### pool.size
**documentation**: Number of pool to process batch
**default**: 1

### await.termination.ms
**documentation**: Timeout for batch processing
**default**: 10000

112 changes: 112 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Kafka Connect Weaviate Connector

kafka-connect-weaviate is a [Kafka Sink Connector](http://kafka.apache.org/documentation.html#connect)
for loading data to any Weaviate cluster.

> [!IMPORTANT]
> This is currently a Work In Progress connector, use it at your own risk.
## Features

* Support streaming INSERT, UPSERT and DELETE from a list of topics to multiple Weaviate collections
* Support all structured format in Kafka (Avro, JSON, Protobuf)
* Support Bring Your Own Vector if the embedding is generated outside of Weaviate
* Support multiple tasks for higher throughput
* Support at-least-once semantic
* Tested with both Weaviate Cloud and self-managed Weaviate instance

## Limitations

* Does not create Weaviate collections automatically
* Does not support multiple vectors in Bring Your Own Vectors

## Upsert operation

Upsert is done by specifying the relevant UUID for each document.
The UUID can be configured by specifying the `document.id.strategy` configuration.
By default, the `io.weaviate.connector.idstrategy.NoIdStrategy` is used,

The availables `document.id.strategy` are:

- `io.weaviate.connector.idstrategy.NoIdStrategy` - **default** - generates a new UUID for each record, thus always inserting a new record in Weaviate for each Kafka record.
- `io.weaviate.connector.idstrategy.KafkaIdStrategy` - generates a UUID based on the key of the Kafka message
- `io.weaviate.connector.idstrategy.FieldIdStrategy` - generates a UUID based on a field of the Kafka record payload, the field name can be specified by configuring `document.id.field.name`

## Bring Your Own Vectors

By default, the embedding of each record will be compute by Weaviate by levearing the collection vectorizers.
If the embedding is already available or computed outside of Weaviate, you can configure the `vector.strategy` to change this behavior.

The availables `vector.strategy` are:

- `io.weaviate.connector.vectorstrategy.NoVectorStrategy` - **default** - will rely on collection vectorizier to generate embedding in Weaviate
- `io.weaviate.connector.vectorstrategy.FieldVectorStrategy` - Embedding available in a field of the Kafka record, the field name can be specified by configuring `vector.field.name`

## Example of configuration

The definition of all parameters is available on [CONFIGURATION.md](./CONFIGURATION.md).

**Simple configuration inserting a new object for each record in Kafka**
```json
{
"connector.class": "io.weaviate.connector.WeaviateSinkConnector",
"topics": "test",
"weaviate.connection.url": "http://weaviate:8080",
"weaviate.grpc.url": "weaviate:50051",
"collection.mapping": "weaviate_${topic}",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
```


**Upserting object for each record in Kafka based on an ID field**
```json
{
"connector.class": "io.weaviate.connector.WeaviateSinkConnector",
"topics": "test",
"weaviate.connection.url": "http://weaviate:8080",
"weaviate.grpc.url": "weaviate:50051",
"collection.mapping": "weaviate_${topic}",
"document.id.strategy": "io.weaviate.connector.idstrategy.FieldIdStrategy",
"document.id.field.name": "id",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
```

**Connecting to Weaviate Cloud**
```json
{
"connector.class": "io.weaviate.connector.WeaviateSinkConnector",
"topics": "test",
"weaviate.connection.url": "$WEAVIATE_REST_URL",
"weaviate.grpc.url": "$WEAVIATE_GRPC_URL",
"weaviate.grpc.secured": true,
"weaviate.auth.scheme": "API_KEY",
"weaviate.api.key": "$WEAVIATE_API_KEY",
"collection.mapping": "Weaviate_test",
"document.id.strategy": "io.weaviate.connector.idstrategy.FieldIdStrategy",
"document.id.field.name": "id",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
```


## How to build

```bash
mvn clean package kafka-connect:kafka-connect -Drevision=0.1.0
# Package should be generated on target/components/packages/
```

## Quickstart

```bash
docker-compose up -d
sleep 5
python examples/create_collection.py weaviate_test
curl -i -X PUT localhost:8083/connectors/weaviate/config -H 'Content-Type:application/json' --data @examples/weaviate-upsert-sink.json
echo '{"string": "Hello World", "number": 1.23, "id": "test" }' | docker-compose exec -T kafka kafka-console-producer --bootstrap-server localhost:9092 --topic test
```
61 changes: 61 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
---
services:
kafka:
image: confluentinc/confluent-local:7.8.0
hostname: kafka
container_name: kafka
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT, PLAINTEXT_DOCKER:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_DOCKER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092,PLAINTEXT_DOCKER://kafka:9093
ports:
- "8082:8082"
- "9092:9092"
- "9101:9101"

connect:
image: confluentinc/cp-kafka-connect:7.8.0
hostname: connect
container_name: connect
depends_on:
- kafka
ports:
- "8083:8083"
- "5005:5005"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kafka:9093'
CONNECT_REST_ADVERTISED_HOST_NAME: 'connect'
CONNECT_GROUP_ID: 'connect'
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_PLUGIN_PATH: "/etc/kafka-connect/plugins"
KAFKA_OPTS: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005"
volumes:
- ./target/components/packages/:/etc/kafka-connect/plugins

weaviate:
command:
- --host
- 0.0.0.0
- --port
- '8080'
- --scheme
- http
image: cr.weaviate.io/semitechnologies/weaviate:1.28.2
ports:
- 8080:8080
- 50051:50051
restart: on-failure:0
environment:
DEFAULT_VECTORIZER_MODULE: 'text2vec-ollama'
ENABLE_MODULES: 'text2vec-ollama,ref2vec-centroid,generative-ollama'
CLUSTER_HOSTNAME: 'node1'
PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
23 changes: 23 additions & 0 deletions examples/create_collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import sys;
import weaviate
from weaviate.classes.config import Property
from weaviate.collections.classes.config import Configure, DataType

if len(sys.argv) < 2:
print("Usage: python create_collection.py <collection_name>")
sys.exit(1)

collection_name = sys.argv[1]

with weaviate.connect_to_local() as client:
if client.collections.exists(collection_name):
client.collections.delete(collection_name)
client.collections.create(
name=collection_name,
properties=[
Property(name="string", data_type=DataType.TEXT),
Property(name="number", data_type=DataType.NUMBER),
],
vectorizer_config=Configure.Vectorizer.text2vec_ollama(model="nomic-embed-text", api_endpoint="http://host.docker.internal:11434")
)

9 changes: 9 additions & 0 deletions examples/weaviate-simple-sink.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"connector.class": "io.weaviate.connector.WeaviateSinkConnector",
"topics": "test",
"weaviate.connection.url": "http://weaviate:8080",
"weaviate.grpc.url": "weaviate:50051",
"collection.mapping": "weaviate_${topic}",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
12 changes: 12 additions & 0 deletions examples/weaviate-upsert-delete-sink.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"connector.class": "io.weaviate.connector.WeaviateSinkConnector",
"topics": "test",
"weaviate.connection.url": "http://weaviate:8080",
"weaviate.grpc.url": "weaviate:50051",
"collection.mapping": "weaviate_${topic}",
"document.id.strategy": "io.weaviate.connector.idstrategy.KafkaIdStrategy",
"delete.enabled": true,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
11 changes: 11 additions & 0 deletions examples/weaviate-upsert-sink.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"connector.class": "io.weaviate.connector.WeaviateSinkConnector",
"topics": "test",
"weaviate.connection.url": "http://weaviate:8080",
"weaviate.grpc.url": "weaviate:50051",
"collection.mapping": "weaviate_${topic}",
"document.id.strategy": "io.weaviate.connector.idstrategy.FieldIdStrategy",
"document.id.field.name": "id",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
Loading

0 comments on commit f2f5298

Please sign in to comment.