diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/CONFIGURATION.md b/CONFIGURATION.md new file mode 100644 index 0000000..948c546 --- /dev/null +++ b/CONFIGURATION.md @@ -0,0 +1,113 @@ +# Connection parameters + +### weaviate.connection.url +**documentation**: "Weaviate connection URL, should be following the format `://:` +**default**: http://localhost:8080 + +### weaviate.grpc.url +**documentation**: Weaviate GRPC connection URL +**default**: localhost:50051 + +### weaviate.grpc.secured +**documentation**: Weaviate GRPC TLS secured connection, set to True to enable TLS encryption +**default**: false + +### weaviate.auth.scheme +**documentation**: Authentication mechanism to use to connect to Weaviate, could be NONE, API_KEY or OIDC_CLIENT_CREDENTIALS +**default**: NONE +**valid values**: +- NONE +- API_KEY +- OIDC_CLIENT_CREDENTIALS + +### weaviate.headers +**documentation**: Headers to provide while building Weaviate client (e.g. X-OpenAI-Api-Key) + + + + +# Security parameters + +### weaviate.api.key +**documentation**: User API Key if API Key authentication mechanism is used + +### weaviate.oidc.client.secret +**documentation**: User OIDC client secret if OIDC authentication mechanism is used + +### weaviate.oidc.scopes +**documentation**: OIDC client scope if OIDC authentication mechanism is used + + + +# Collection & schema parameters + +### topics +**documentation**: List of topics to consume, separated by commas + +### topics.regex +**documentation**: Regular expression giving topics to consume. Under the hood, the regex is compiled to a `java.util.regex.Pattern`. Only one of topics or topics.regex should be specified. + +### collection.mapping +**documentation**: Mapping between Kafka topic and Weaviate collection +**default**: ${topic} + +### document.id.strategy +**documentation**: Java class returning the document ID for each record +**default**: io.weaviate.connector.idstrategy.NoIdStrategy +**valid values**: +- io.weaviate.connector.idstrategy.NoIdStrategy +- io.weaviate.connector.idstrategy.KafkaIdStrategy +- io.weaviate.connector.idstrategy.FieldIdStrategy + +### document.id.field.name +**documentation**: Field name containing the ID in Kafka +**default**: id + +### vector.strategy +**documentation**: Java class returning the document embedding for each record +**default**: io.weaviate.connector.vectorstrategy.NoVectorStrategy +**valid values**: +- io.weaviate.connector.vectorstrategy.NoVectorStrategy +- io.weaviate.connector.vectorstrategy.FieldVectorStrategy + +### vector.field.name +**documentation**: Field name containing the embedding (used only for FieldVectorStrategy) + +### delete.enabled +**documentation**: Whether to treat null record values as deletes +**default**: false + + + + +# Retry parameters + +### max.connection.retries +**documentation**: Maximum number of retries to perform in case of connection issues +**default**: 3 + +### max.timeout.retries +**documentation**: Maximum number of retries to perform in case of timeout +**default**: 3 + +### retry.interval +**documentation**: Interval between each retry +**default**: 2000 + + + + +# Performance parameters + +### batch.size +**documentation**: Number of records per batch +**default**: 100 + +### pool.size +**documentation**: Number of pool to process batch +**default**: 1 + +### await.termination.ms +**documentation**: Timeout for batch processing +**default**: 10000 + diff --git a/README.md b/README.md new file mode 100644 index 0000000..0846376 --- /dev/null +++ b/README.md @@ -0,0 +1,112 @@ +# Kafka Connect Weaviate Connector + +kafka-connect-weaviate is a [Kafka Sink Connector](http://kafka.apache.org/documentation.html#connect) +for loading data to any Weaviate cluster. + +> [!IMPORTANT] +> This is currently a Work In Progress connector, use it at your own risk. + +## Features + +* Support streaming INSERT, UPSERT and DELETE from a list of topics to multiple Weaviate collections +* Support all structured format in Kafka (Avro, JSON, Protobuf) +* Support Bring Your Own Vector if the embedding is generated outside of Weaviate +* Support multiple tasks for higher throughput +* Support at-least-once semantic +* Tested with both Weaviate Cloud and self-managed Weaviate instance + +## Limitations + +* Does not create Weaviate collections automatically +* Does not support multiple vectors in Bring Your Own Vectors + +## Upsert operation + +Upsert is done by specifying the relevant UUID for each document. +The UUID can be configured by specifying the `document.id.strategy` configuration. +By default, the `io.weaviate.connector.idstrategy.NoIdStrategy` is used, + +The availables `document.id.strategy` are: + +- `io.weaviate.connector.idstrategy.NoIdStrategy` - **default** - generates a new UUID for each record, thus always inserting a new record in Weaviate for each Kafka record. +- `io.weaviate.connector.idstrategy.KafkaIdStrategy` - generates a UUID based on the key of the Kafka message +- `io.weaviate.connector.idstrategy.FieldIdStrategy` - generates a UUID based on a field of the Kafka record payload, the field name can be specified by configuring `document.id.field.name` + +## Bring Your Own Vectors + +By default, the embedding of each record will be compute by Weaviate by levearing the collection vectorizers. +If the embedding is already available or computed outside of Weaviate, you can configure the `vector.strategy` to change this behavior. + +The availables `vector.strategy` are: + +- `io.weaviate.connector.vectorstrategy.NoVectorStrategy` - **default** - will rely on collection vectorizier to generate embedding in Weaviate +- `io.weaviate.connector.vectorstrategy.FieldVectorStrategy` - Embedding available in a field of the Kafka record, the field name can be specified by configuring `vector.field.name` + +## Example of configuration + +The definition of all parameters is available on [CONFIGURATION.md](./CONFIGURATION.md). + +**Simple configuration inserting a new object for each record in Kafka** +```json +{ + "connector.class": "io.weaviate.connector.WeaviateSinkConnector", + "topics": "test", + "weaviate.connection.url": "http://weaviate:8080", + "weaviate.grpc.url": "weaviate:50051", + "collection.mapping": "weaviate_${topic}", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": false +} +``` + + +**Upserting object for each record in Kafka based on an ID field** +```json +{ + "connector.class": "io.weaviate.connector.WeaviateSinkConnector", + "topics": "test", + "weaviate.connection.url": "http://weaviate:8080", + "weaviate.grpc.url": "weaviate:50051", + "collection.mapping": "weaviate_${topic}", + "document.id.strategy": "io.weaviate.connector.idstrategy.FieldIdStrategy", + "document.id.field.name": "id", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": false +} +``` + +**Connecting to Weaviate Cloud** +```json +{ + "connector.class": "io.weaviate.connector.WeaviateSinkConnector", + "topics": "test", + "weaviate.connection.url": "$WEAVIATE_REST_URL", + "weaviate.grpc.url": "$WEAVIATE_GRPC_URL", + "weaviate.grpc.secured": true, + "weaviate.auth.scheme": "API_KEY", + "weaviate.api.key": "$WEAVIATE_API_KEY", + "collection.mapping": "Weaviate_test", + "document.id.strategy": "io.weaviate.connector.idstrategy.FieldIdStrategy", + "document.id.field.name": "id", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": false +} +``` + + +## How to build + +```bash +mvn clean package kafka-connect:kafka-connect -Drevision=0.1.0 +# Package should be generated on target/components/packages/ +``` + +## Quickstart + +```bash +docker-compose up -d +sleep 5 +python examples/create_collection.py weaviate_test +curl -i -X PUT localhost:8083/connectors/weaviate/config -H 'Content-Type:application/json' --data @examples/weaviate-upsert-sink.json +echo '{"string": "Hello World", "number": 1.23, "id": "test" }' | docker-compose exec -T kafka kafka-console-producer --bootstrap-server localhost:9092 --topic test +``` diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..c68dc7d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,61 @@ +--- +services: + kafka: + image: confluentinc/confluent-local:7.8.0 + hostname: kafka + container_name: kafka + environment: + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT, PLAINTEXT_DOCKER:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_DOCKER://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092,PLAINTEXT_DOCKER://kafka:9093 + ports: + - "8082:8082" + - "9092:9092" + - "9101:9101" + + connect: + image: confluentinc/cp-kafka-connect:7.8.0 + hostname: connect + container_name: connect + depends_on: + - kafka + ports: + - "8083:8083" + - "5005:5005" + environment: + CONNECT_BOOTSTRAP_SERVERS: 'kafka:9093' + CONNECT_REST_ADVERTISED_HOST_NAME: 'connect' + CONNECT_GROUP_ID: 'connect' + CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs + CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets + CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_PLUGIN_PATH: "/etc/kafka-connect/plugins" + KAFKA_OPTS: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005" + volumes: + - ./target/components/packages/:/etc/kafka-connect/plugins + + weaviate: + command: + - --host + - 0.0.0.0 + - --port + - '8080' + - --scheme + - http + image: cr.weaviate.io/semitechnologies/weaviate:1.28.2 + ports: + - 8080:8080 + - 50051:50051 + restart: on-failure:0 + environment: + DEFAULT_VECTORIZER_MODULE: 'text2vec-ollama' + ENABLE_MODULES: 'text2vec-ollama,ref2vec-centroid,generative-ollama' + CLUSTER_HOSTNAME: 'node1' + PERSISTENCE_DATA_PATH: '/var/lib/weaviate' diff --git a/examples/create_collection.py b/examples/create_collection.py new file mode 100644 index 0000000..ec7ba38 --- /dev/null +++ b/examples/create_collection.py @@ -0,0 +1,23 @@ +import sys; +import weaviate +from weaviate.classes.config import Property +from weaviate.collections.classes.config import Configure, DataType + +if len(sys.argv) < 2: + print("Usage: python create_collection.py ") + sys.exit(1) + +collection_name = sys.argv[1] + +with weaviate.connect_to_local() as client: + if client.collections.exists(collection_name): + client.collections.delete(collection_name) + client.collections.create( + name=collection_name, + properties=[ + Property(name="string", data_type=DataType.TEXT), + Property(name="number", data_type=DataType.NUMBER), + ], + vectorizer_config=Configure.Vectorizer.text2vec_ollama(model="nomic-embed-text", api_endpoint="http://host.docker.internal:11434") + ) + diff --git a/examples/weaviate-simple-sink.json b/examples/weaviate-simple-sink.json new file mode 100644 index 0000000..1705477 --- /dev/null +++ b/examples/weaviate-simple-sink.json @@ -0,0 +1,9 @@ +{ + "connector.class": "io.weaviate.connector.WeaviateSinkConnector", + "topics": "test", + "weaviate.connection.url": "http://weaviate:8080", + "weaviate.grpc.url": "weaviate:50051", + "collection.mapping": "weaviate_${topic}", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": false +} diff --git a/examples/weaviate-upsert-delete-sink.json b/examples/weaviate-upsert-delete-sink.json new file mode 100644 index 0000000..4f3fda8 --- /dev/null +++ b/examples/weaviate-upsert-delete-sink.json @@ -0,0 +1,12 @@ +{ + "connector.class": "io.weaviate.connector.WeaviateSinkConnector", + "topics": "test", + "weaviate.connection.url": "http://weaviate:8080", + "weaviate.grpc.url": "weaviate:50051", + "collection.mapping": "weaviate_${topic}", + "document.id.strategy": "io.weaviate.connector.idstrategy.KafkaIdStrategy", + "delete.enabled": true, + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": false +} diff --git a/examples/weaviate-upsert-sink.json b/examples/weaviate-upsert-sink.json new file mode 100644 index 0000000..43fc2be --- /dev/null +++ b/examples/weaviate-upsert-sink.json @@ -0,0 +1,11 @@ +{ + "connector.class": "io.weaviate.connector.WeaviateSinkConnector", + "topics": "test", + "weaviate.connection.url": "http://weaviate:8080", + "weaviate.grpc.url": "weaviate:50051", + "collection.mapping": "weaviate_${topic}", + "document.id.strategy": "io.weaviate.connector.idstrategy.FieldIdStrategy", + "document.id.field.name": "id", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": false +} diff --git a/examples/weaviate-wcd-sink.json b/examples/weaviate-wcd-sink.json new file mode 100644 index 0000000..24111c1 --- /dev/null +++ b/examples/weaviate-wcd-sink.json @@ -0,0 +1,14 @@ +{ + "connector.class": "io.weaviate.connector.WeaviateSinkConnector", + "topics": "test", + "weaviate.connection.url": "$WEAVIATE_REST_URL", + "weaviate.grpc.url": "$WEAVIATE_GRPC_URL", + "weaviate.grpc.secured": true, + "weaviate.auth.scheme": "API_KEY", + "weaviate.api.key": "$WEAVIATE_API_KEY", + "collection.mapping": "Weaviate_test", + "document.id.strategy": "io.weaviate.connector.idstrategy.FieldIdStrategy", + "document.id.field.name": "id", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": false +} diff --git a/logos/weaviate.jpg b/logos/weaviate.jpg new file mode 100644 index 0000000..a31e466 Binary files /dev/null and b/logos/weaviate.jpg differ diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..28baa62 --- /dev/null +++ b/pom.xml @@ -0,0 +1,215 @@ + + + 4.0.0 + + io.weaviate + kafka-connect-weaviate + ${revision} + + + 1.0.0-SNAPSHOT + 11 + 11 + 11 + UTF-8 + 3.9.0 + 0.12.0 + 5.0.1 + 5.9.1 + 2.18.0 + 3.7.1 + com/mycila/maven/plugin/license/templates/APACHE-2-noemail.txt + 4.6 + 3.13.0 + 1.68.2 + + + + + org.apache.kafka + connect-runtime + ${kafka.version} + provided + + + io.weaviate + client + ${weaviate.version} + + + + + io.grpc + grpc-grpclb + ${grpc.version} + + + io.grpc + grpc-googleapis + ${grpc.version} + + + io.grpc + grpc-all + ${grpc.version} + + + org.junit.jupiter + junit-jupiter-engine + ${junit-jupiter-engine.version} + test + + + commons-io + commons-io + ${commons-io.version} + test + + + + + + + + + com.mycila + license-maven-plugin + ${plugin.version.license} + +
${license.header}
+ + Weaviate + 2025 + + + **/*.java + + + SLASHSTAR_STYLE + +
+ + + add-license + compile + + format + + + +
+ + + + org.apache.maven.plugins + maven-assembly-plugin + ${maven-assembly-plugin.version} + + + make-assembly + package + + single + + + + jar-with-dependencies + + ${project.artifactId}-${project.version}-uber + false + + + + + + + + io.confluent + ${kafka-connect-maven-plugin.version} + kafka-connect-maven-plugin + + Kafka Connect Weaviate + https://weaviate.io/developers/weaviate + + The Weaviate connector allows moving data from Kafka to Weaviate. It writes data from a + topic in Kafka to a collection in Weaviate. + Weaviate uses state-of-the-art machine learning (ML) models to turn your data - text, + images, and more - into a searchable vector database. + + logos/weaviate.jpg + + Weaviate + Community support + https://github.com/weaviate/kafka-connect-weaviate + logos/weaviate.jpg + Weaviate + organization + Weaviate + https://weaviate.io/ + logos/weaviate.jpg + + confluentinc + cp-kafka-connect + ${project.version} + + + sink + + + + Weaviate + vector + search + analytics + rag + ai + ml + + + + Weaviate 1.28.x or greater + + + + atLeastOnce + + + + + + kafka-connect + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven-compiler-plugin.version} + true + + + -Xlint:all,-serial,-processing + -Werror + + + +
+ + + + src/main/resources + true + + +
+
\ No newline at end of file diff --git a/src/main/java/io/weaviate/connector/Version.java b/src/main/java/io/weaviate/connector/Version.java new file mode 100644 index 0000000..2ca0ed9 --- /dev/null +++ b/src/main/java/io/weaviate/connector/Version.java @@ -0,0 +1,45 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.util.Properties; + +public class Version { + private static final Logger log = LoggerFactory.getLogger(Version.class); + private static String version = "unknown"; + + private static final String VERSION_FILE = "/kafka-connect-weaviate-version.properties"; + + static { + try { + Properties props = new Properties(); + try (InputStream versionFileStream = Version.class.getResourceAsStream(VERSION_FILE)) { + props.load(versionFileStream); + version = props.getProperty("version", version).trim(); + } + } catch (Exception e) { + log.error("Error while loading version:", e); + } + } + + public static String getVersion() { + return version; + } +} diff --git a/src/main/java/io/weaviate/connector/WeaviateSinkConfig.java b/src/main/java/io/weaviate/connector/WeaviateSinkConfig.java new file mode 100644 index 0000000..bfaeca7 --- /dev/null +++ b/src/main/java/io/weaviate/connector/WeaviateSinkConfig.java @@ -0,0 +1,345 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector; + +import io.weaviate.connector.idstrategy.IDStrategy; +import io.weaviate.connector.idstrategy.KafkaIdStrategy; +import io.weaviate.connector.vectorstrategy.VectorStrategy; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import java.util.*; + +import static io.weaviate.client.v1.async.batch.api.ObjectsBatcher.AutoBatchConfig.BATCH_SIZE; +import static io.weaviate.client.v1.async.batch.api.ObjectsBatcher.BatchRetriesConfig.*; +import static io.weaviate.client.v1.batch.api.ObjectsBatcher.AutoBatchConfig.AWAIT_TERMINATION_MS; +import static io.weaviate.client.v1.batch.api.ObjectsBatcher.AutoBatchConfig.POOL_SIZE; + +public final class WeaviateSinkConfig extends AbstractConfig { + + private final String connectionUrl; + private final AuthMechanism authMechanism; + private final String apiKey; + private final String oidcClientSecret; + private final String collectionMapping; + private final List oidcScopes; + private final Boolean grpcSecured; + private final String grpcUrl; + private final Class documentIdStrategy; + private final Class vectorStrategy; + private final String vectorFieldName; + private final String documentIdFieldName; + private final List rawHeaders; + private final ConsistencyLevel consistencyLevel; + private final Integer awaitTerminationMs; + private final Integer maxConnectionRetries; + private final Integer maxTimeoutRetries; + private final Integer retryInterval; + private final Integer batchSize; + private final Integer poolSize; + private final Boolean deleteEnabled; + + public enum AuthMechanism { + NONE, + API_KEY, + OIDC_CLIENT_CREDENTIALS, + } + + public static final String CONNECTION_URL_CONFIG = "weaviate.connection.url"; + private static final String CONNECTION_URL_DOC = "Weaviate connection URL"; + private static final String CONNECTION_URL_DEFAULT = "http://localhost:8080"; + + public static final String GRPC_URL_CONFIG = "weaviate.grpc.url"; + private static final String GRPC_URL_DOC = "Weaviate GRPC connection URL"; + private static final String GRPC_URL_DEFAULT = "localhost:50051"; + + public static final String GRPC_SECURED_CONFIG = "weaviate.grpc.secured"; + private static final String GRPC_SECURED_DOC = "Weaviate GRPC TLS secured connection"; + private static final Boolean GRPC_SECURED_DEFAULT = false; + + public static final String AUTH_MECHANISM_CONFIG = "weaviate.auth.scheme"; + private static final String AUTH_MECHANISM_DOC = "Authentication mechanism to use to connect to Weaviate"; + private static final String AUTH_MECHANISM_DEFAULT = AuthMechanism.NONE.name(); + + public static final String API_KEY_CONFIG = "weaviate.api.key"; + private static final String API_KEY_DOC = "User API key"; + + public static final String OIDC_CLIENT_SECRET_CONFIG = "weaviate.oidc.client.secret"; + private static final String OIDC_CLIENT_SECRET_DOC = "User OIDC client secret"; + + public static final String OIDC_SCOPES_CONFIG = "weaviate.oidc.scopes"; + private static final String OIDC_SCOPES_DOC = "User OIDC client scopes"; + private static final String OIDC_SCOPES_DEFAULT = "openid"; + + public static final String COLLECTION_MAPPING_CONFIG = "collection.mapping"; + private static final String COLLECTION_MAPPING_DOC = "Mapping between Kafka topic and Weaviate collection"; + private static final String COLLECTION_MAPPING_DEFAULT = "${topic}"; + + public static final String HEADERS_CONFIG = "weaviate.headers"; + private static final String HEADERS_DOC = "Headers to provide while building Weaviate client (e.g. X-OpenAI-Api-Key)"; + private static final String HEADERS_DEFAULT = ""; + + public static final String DOCUMENT_ID_STRATEGY_CONFIG = "document.id.strategy"; + private static final String DOCUMENT_ID_STRATEGY_DOC = "Java class returning the document ID for each record"; + private static final Class DOCUMENT_ID_STRATEGY_DEFAULT = io.weaviate.connector.idstrategy.NoIdStrategy.class; + + public static final String DOCUMENT_ID_FIELD_CONFIG = "document.id.field.name"; + private static final String DOCUMENT_ID_FIELD_DOC = "Field name containing the ID in Kafka"; + private static final String DOCUMENT_ID_FIELD_DEFAULT = "id"; + + public static final String VECTOR_STRATEGY_CONFIG = "vector.strategy"; + private static final String VECTOR_STRATEGY_DOC = "Java class returning the document embedding for each record"; + private static final Class VECTOR_STRATEGY_DEFAULT = io.weaviate.connector.vectorstrategy.NoVectorStrategy.class; + + public static final String VECTOR_FIELD_CONFIG = "vector.field.name"; + private static final String VECTOR_FIELD_DOC = "Field name containing the embedding"; + private static final String VECTOR_FIELD_DEFAULT = "vector"; + + public static final String CONSISTENCY_LEVEL_CONFIG = "consistency.level"; + private static final String CONSISTENCY_LEVEL_DOC = "Consistency level to use while inserting objects"; + private static final String CONSISTENCY_LEVEL_DEFAULT = ConsistencyLevel.QUORUM.name(); + + public static final String MAX_TIMEOUT_RETRIES_CONFIG = "max.timeout.retries"; + private static final String MAX_TIMEOUT_RETRIES_DOC = "Maximum number of retries to perform in case of timeout"; + private static final int MAX_TIMEOUT_RETRIES_DEFAULT = MAX_TIMEOUT_RETRIES; + + public static final String MAX_CONNECTION_RETRIES_CONFIG = "max.connection.retries"; + private static final String MAX_CONNECTION_RETRIES_DOC = "Maximum number of retries to perform in case of connection issues"; + private static final int MAX_CONNECTION_RETRIES_DEFAULT = MAX_CONNECTION_RETRIES; + + public static final String RETRY_INTERVAL_CONFIG = "retry.interval"; + private static final String RETRY_INTERVAL_DOC = "Interval between each retry"; + private static final int RETRY_INTERVAL_DEFAULT = RETRIES_INTERVAL; + + public static final String BATCH_SIZE_CONFIG = "batch.size"; + private static final String BATCH_SIZE_DOC = "Number of records per batch"; + private static final int BATCH_SIZE_DEFAULT = BATCH_SIZE; + + public static final String POOL_SIZE_CONFIG = "pool.size"; + private static final String POOL_SIZE_DOC = "Number of pool to process batch"; + private static final int POOL_SIZE_DEFAULT = POOL_SIZE; + + public static final String AWAIT_TERMINATION_MS_CONFIG = "await.termination.ms"; + private static final String AWAIT_TERMINATION_MS_DOC = "Timeout for batch processing"; + private static final int AWAIT_TERMINATION_MS_DEFAULT = AWAIT_TERMINATION_MS; + + public static final String DELETE_ENABLED_CONFIG = "delete.enabled"; + private static final String DELETE_ENABLED_DOC = "Whether to treat null record values as deletes"; + private static final boolean DELETE_ENABLED_DEFAULT = false; + + public static ConfigDef CONFIG_DEF = new ConfigDef() + .define(CONNECTION_URL_CONFIG, ConfigDef.Type.STRING, CONNECTION_URL_DEFAULT, ConfigDef.Importance.HIGH, CONNECTION_URL_DOC) + .define(GRPC_URL_CONFIG, ConfigDef.Type.STRING, GRPC_URL_DEFAULT, ConfigDef.Importance.HIGH, GRPC_URL_DOC) + .define(GRPC_SECURED_CONFIG, ConfigDef.Type.BOOLEAN, GRPC_SECURED_DEFAULT, ConfigDef.Importance.HIGH, GRPC_SECURED_DOC) + .define(AUTH_MECHANISM_CONFIG, ConfigDef.Type.STRING, AUTH_MECHANISM_DEFAULT, EnumValidator.in(AuthMechanism.values()), ConfigDef.Importance.HIGH, AUTH_MECHANISM_DOC) + .define(API_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, API_KEY_DOC) + .define(OIDC_CLIENT_SECRET_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, OIDC_CLIENT_SECRET_DOC) + .define(OIDC_SCOPES_CONFIG, ConfigDef.Type.LIST, OIDC_SCOPES_DEFAULT, ConfigDef.Importance.HIGH, OIDC_SCOPES_DOC) + .define(COLLECTION_MAPPING_CONFIG, ConfigDef.Type.STRING, COLLECTION_MAPPING_DEFAULT, ConfigDef.Importance.HIGH, COLLECTION_MAPPING_DOC) + .define(HEADERS_CONFIG, ConfigDef.Type.LIST, HEADERS_DEFAULT, new HeaderValidator(), ConfigDef.Importance.MEDIUM, HEADERS_DOC) + .define(CONSISTENCY_LEVEL_CONFIG, ConfigDef.Type.STRING, CONSISTENCY_LEVEL_DEFAULT, EnumValidator.in(ConsistencyLevel.values()), ConfigDef.Importance.LOW, CONSISTENCY_LEVEL_DOC) + .define(DOCUMENT_ID_STRATEGY_CONFIG, ConfigDef.Type.CLASS, DOCUMENT_ID_STRATEGY_DEFAULT, ConfigDef.Importance.MEDIUM, DOCUMENT_ID_STRATEGY_DOC) + .define(DOCUMENT_ID_FIELD_CONFIG, ConfigDef.Type.STRING, DOCUMENT_ID_FIELD_DEFAULT, ConfigDef.Importance.MEDIUM, DOCUMENT_ID_FIELD_DOC) + .define(VECTOR_STRATEGY_CONFIG, ConfigDef.Type.CLASS, VECTOR_STRATEGY_DEFAULT, ConfigDef.Importance.MEDIUM, VECTOR_STRATEGY_DOC) + .define(VECTOR_FIELD_CONFIG, ConfigDef.Type.STRING, VECTOR_FIELD_DEFAULT, ConfigDef.Importance.MEDIUM, VECTOR_FIELD_DOC) + .define(MAX_CONNECTION_RETRIES_CONFIG, ConfigDef.Type.INT, MAX_CONNECTION_RETRIES_DEFAULT, ConfigDef.Importance.LOW, MAX_CONNECTION_RETRIES_DOC) + .define(MAX_TIMEOUT_RETRIES_CONFIG, ConfigDef.Type.INT, MAX_TIMEOUT_RETRIES_DEFAULT, ConfigDef.Importance.LOW, MAX_TIMEOUT_RETRIES_DOC) + .define(RETRY_INTERVAL_CONFIG, ConfigDef.Type.INT, RETRY_INTERVAL_DEFAULT, ConfigDef.Importance.LOW, RETRY_INTERVAL_DOC) + .define(BATCH_SIZE_CONFIG, ConfigDef.Type.INT, BATCH_SIZE_DEFAULT, ConfigDef.Importance.LOW, BATCH_SIZE_DOC) + .define(POOL_SIZE_CONFIG, ConfigDef.Type.INT, POOL_SIZE_DEFAULT, ConfigDef.Importance.LOW, POOL_SIZE_DOC) + .define(AWAIT_TERMINATION_MS_CONFIG, ConfigDef.Type.INT, AWAIT_TERMINATION_MS_DEFAULT, ConfigDef.Importance.LOW, AWAIT_TERMINATION_MS_DOC) + .define(DELETE_ENABLED_CONFIG, ConfigDef.Type.BOOLEAN, DELETE_ENABLED_DEFAULT, ConfigDef.Importance.LOW, DELETE_ENABLED_DOC); + + public WeaviateSinkConfig(ConfigDef definition, Map originals) { + super(CONFIG_DEF, originals); + connectionUrl = getString(CONNECTION_URL_CONFIG); + authMechanism = AuthMechanism.valueOf(getString(AUTH_MECHANISM_CONFIG)); + apiKey = getString(API_KEY_CONFIG); + oidcClientSecret = getString(OIDC_CLIENT_SECRET_CONFIG); + oidcScopes = getList(OIDC_SCOPES_CONFIG); + collectionMapping = getString(COLLECTION_MAPPING_CONFIG); + rawHeaders = getList(HEADERS_CONFIG); + grpcUrl = getString(GRPC_URL_CONFIG); + grpcSecured = getBoolean(GRPC_SECURED_CONFIG); + documentIdStrategy = getClass(DOCUMENT_ID_STRATEGY_CONFIG); + documentIdFieldName = getString(DOCUMENT_ID_FIELD_CONFIG); + vectorStrategy = getClass(VECTOR_STRATEGY_CONFIG); + vectorFieldName = getString(VECTOR_FIELD_CONFIG); + consistencyLevel = ConsistencyLevel.valueOf(getString(CONSISTENCY_LEVEL_CONFIG)); + maxConnectionRetries = getInt(MAX_CONNECTION_RETRIES_CONFIG); + maxTimeoutRetries = getInt(MAX_TIMEOUT_RETRIES_CONFIG); + retryInterval = getInt(RETRY_INTERVAL_CONFIG); + batchSize = getInt(BATCH_SIZE_CONFIG); + poolSize = getInt(POOL_SIZE_CONFIG); + awaitTerminationMs = getInt(AWAIT_TERMINATION_MS_CONFIG); + deleteEnabled = getBoolean(DELETE_ENABLED_CONFIG); + if (deleteEnabled && (!documentIdStrategy.equals(KafkaIdStrategy.class))) { + throw new IllegalArgumentException("If delete.enabled is true, document.id.strategy should be set to KafkaIdStrategy"); + } + } + + public String getConnectionUrl() { + return connectionUrl; + } + + public AuthMechanism getAuthMechanism() { + return authMechanism; + } + + public String getApiKey() { + return apiKey; + } + + public String getOidcClientSecret() { + return oidcClientSecret; + } + + public String getCollectionMapping() { + return collectionMapping; + } + + public List getOidcScopes() { + return oidcScopes; + } + + public Boolean getGrpcSecured() { + return grpcSecured; + } + + public String getGrpcUrl() { + return grpcUrl; + } + + public Class getDocumentIdStrategy() { + return documentIdStrategy; + } + + public String getDocumentIdFieldName() { + return documentIdFieldName; + } + + public Class getVectorStrategy() { + return vectorStrategy; + } + + public String getVectorFieldName() { + return vectorFieldName; + } + + public List getRawHeaders() { + return rawHeaders; + } + + public ConsistencyLevel getConsistencyLevel() { + return consistencyLevel; + } + + public Integer getAwaitTerminationMs() { + return awaitTerminationMs; + } + + public Integer getMaxConnectionRetries() { + return maxConnectionRetries; + } + + public Integer getMaxTimeoutRetries() { + return maxTimeoutRetries; + } + + public Integer getRetryInterval() { + return retryInterval; + } + + public Integer getBatchSize() { + return batchSize; + } + + public Integer getPoolSize() { + return poolSize; + } + + public Boolean getDeleteEnabled() { + return deleteEnabled; + } + + public Map getHeaders() { + HashMap headers = new HashMap<>(); + for (String header : rawHeaders) { + if (!header.contains("=")) { + throw new IllegalArgumentException("Invalid header: " + header); + } + headers.put(header.split("=")[0], header.split("=")[1]); + } + return headers; + } + + private static class HeaderValidator implements ConfigDef.Validator { + @SuppressWarnings("unchecked") + @Override + public void ensureValid(String name, Object value) { + if (!(value instanceof List)) { + throw new IllegalArgumentException("Invalid header value: " + value); + } + List headers = (List) value; + for (String header : headers) { + if (!header.contains("=")) { + throw new IllegalArgumentException("Invalid header value: " + header); + } + } + } + } + + private static class EnumValidator implements ConfigDef.Validator { + private final List canonicalValues; + private final Set validValues; + + private EnumValidator(List canonicalValues, Set validValues) { + this.canonicalValues = canonicalValues; + this.validValues = validValues; + } + + public static EnumValidator in(E[] enumerators) { + final List canonicalValues = new ArrayList<>(enumerators.length); + final Set validValues = new HashSet<>(enumerators.length * 2); + for (E e : enumerators) { + canonicalValues.add(e.toString().toLowerCase()); + validValues.add(e.toString().toUpperCase()); + validValues.add(e.toString().toLowerCase()); + } + return new EnumValidator(canonicalValues, validValues); + } + + @Override + public void ensureValid(String key, Object value) { + if (!validValues.contains(value)) { + throw new ConfigException(key, value, "Invalid enumerator"); + } + } + + @Override + public String toString() { + return canonicalValues.toString(); + } + } + + public enum ConsistencyLevel { + ALL, + ONE, + QUORUM, + } + +} diff --git a/src/main/java/io/weaviate/connector/WeaviateSinkConnector.java b/src/main/java/io/weaviate/connector/WeaviateSinkConnector.java new file mode 100644 index 0000000..ca222bb --- /dev/null +++ b/src/main/java/io/weaviate/connector/WeaviateSinkConnector.java @@ -0,0 +1,64 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.sink.SinkConnector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class WeaviateSinkConnector extends SinkConnector { + private Map configProps; + + @Override + public void start(Map map) { + configProps = map; + } + + @Override + public Class taskClass() { + return WeaviateSinkTask.class; + } + + @Override + public List> taskConfigs(int maxTask) { + ArrayList> tasks = new ArrayList<>(); + for (int i = 0; i < maxTask; i++) { + tasks.add(configProps); + } + return tasks; + } + + @Override + public void stop() { + + } + + @Override + public ConfigDef config() { + return WeaviateSinkConfig.CONFIG_DEF; + } + + @Override + public String version() { + return getClass().getPackage().getImplementationVersion(); + } + + +} diff --git a/src/main/java/io/weaviate/connector/WeaviateSinkTask.java b/src/main/java/io/weaviate/connector/WeaviateSinkTask.java new file mode 100644 index 0000000..98e30d3 --- /dev/null +++ b/src/main/java/io/weaviate/connector/WeaviateSinkTask.java @@ -0,0 +1,166 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector; + +import io.grpc.NameResolverRegistry; +import io.weaviate.client.Config; +import io.weaviate.client.WeaviateAuthClient; +import io.weaviate.client.WeaviateClient; +import io.weaviate.client.v1.auth.exception.AuthException; +import io.weaviate.client.v1.batch.api.ObjectsBatcher; +import io.weaviate.client.v1.data.model.WeaviateObject; +import io.weaviate.connector.converter.DataConverter; +import io.weaviate.connector.idstrategy.IDStrategy; +import io.weaviate.connector.vectorstrategy.VectorStrategy; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; + +import java.util.Collection; +import java.util.Map; + +public class WeaviateSinkTask extends SinkTask { + WeaviateClient client; + private String collectionMappingRule; + private IDStrategy documentIdStrategy; + private VectorStrategy vectorStrategy; + private ObjectsBatcher objectsBatcher; + private WeaviateSinkConfig config; + + @Override + public String version() { + return Version.getVersion(); + } + + @Override + public void start(Map map) { + this.config = new WeaviateSinkConfig(WeaviateSinkConfig.CONFIG_DEF, map); + this.collectionMappingRule = config.getCollectionMapping(); + buildWeaviateClient(config); + try { + this.documentIdStrategy = (IDStrategy) config.getDocumentIdStrategy().getDeclaredConstructor().newInstance(); + this.documentIdStrategy.configure(config); + } catch (Exception e) { + throw new RuntimeException("Can not instantiate DocumentIDStrategy class", e); + } + + try { + this.vectorStrategy = (VectorStrategy) config.getVectorStrategy().getDeclaredConstructor().newInstance(); + this.vectorStrategy.configure(config); + } catch (Exception e) { + throw new RuntimeException("Can not instantiate VectorStrategy class", e); + } + + // Getting GRPC default registry to trigger Classloader issues if there + // are missing GRPC packages + NameResolverRegistry defaultRegistry = NameResolverRegistry.getDefaultRegistry(); + defaultRegistry.getDefaultScheme(); + } + + private void buildWeaviateClient(WeaviateSinkConfig config) { + Config weaviateConfig = getConfig(config); + if (config.getAuthMechanism() == WeaviateSinkConfig.AuthMechanism.NONE) { + client = new WeaviateClient(weaviateConfig); + } else if (config.getAuthMechanism() == WeaviateSinkConfig.AuthMechanism.API_KEY) { + try { + client = WeaviateAuthClient.apiKey(weaviateConfig, config.getApiKey()); + } catch (AuthException e) { + throw new RuntimeException(e); + } + } else if (config.getAuthMechanism() == WeaviateSinkConfig.AuthMechanism.OIDC_CLIENT_CREDENTIALS) { + try { + client = WeaviateAuthClient.clientCredentials(weaviateConfig, config.getOidcClientSecret(), config.getOidcScopes()); + } catch (AuthException e) { + throw new RuntimeException(e); + } + } else { + throw new RuntimeException("Unknown authentication mechanism"); + } + } + + private static Config getConfig(WeaviateSinkConfig config) { + String scheme = config.getConnectionUrl().split("://")[0]; + String hostAndPort = config.getConnectionUrl().split("://")[1]; + Map headers = config.getHeaders(); + Config weaviateConfig = new Config(scheme, hostAndPort, headers); + if (config.getGrpcUrl() != null && !config.getGrpcUrl().isEmpty()) { + weaviateConfig.setGRPCHost(config.getGrpcUrl()); + weaviateConfig.setGRPCSecured(config.getGrpcSecured()); + } + return weaviateConfig; + } + + @Override + public void put(Collection collection) { + if (objectsBatcher == null) { + objectsBatcher = client.batch().objectsAutoBatcher( + ObjectsBatcher.BatchRetriesConfig.builder() + .maxConnectionRetries(config.getMaxConnectionRetries()) + .maxTimeoutRetries(config.getMaxTimeoutRetries()) + .retriesIntervalMs(config.getRetryInterval()) + .build(), + ObjectsBatcher.AutoBatchConfig.builder() + .batchSize(config.getBatchSize()) + .poolSize(config.getPoolSize()) + .awaitTerminationMs(config.getAwaitTerminationMs()) + .build() + ); + objectsBatcher.withConsistencyLevel(config.getConsistencyLevel().name()); + } + DataConverter dataConverter = new DataConverter(); + for (SinkRecord record : collection) { + if (record.value() == null) { + // Skipping tombstone if delete is not enabled + if (config.getDeleteEnabled()) { + client.data().deleter() + .withClassName(getCollectionName(record.topic())) + .withID(documentIdStrategy.getDocumentId(record, null)) + .withConsistencyLevel(config.getConsistencyLevel().name()) + .run(); + } + continue; + } + Map properties = dataConverter.convertToWeaviateProperties(record.valueSchema(), record.value()); + objectsBatcher.withObject(WeaviateObject.builder() + .className(getCollectionName((record.topic()))) + .properties(properties) + .id(documentIdStrategy.getDocumentId(record, properties)) + .vector(vectorStrategy.getDocumentVector(record, properties)) + .build()); + } + objectsBatcher.flush(); // Flushing to ease error handling + } + + public String getCollectionName(String topic) { + return collectionMappingRule.replace("${topic}", topic); + } + + @Override + public void flush(Map currentOffsets) { + super.flush(currentOffsets); + if (objectsBatcher != null) { + objectsBatcher.flush(); + } + } + + @Override + public void stop() { + if (objectsBatcher != null) { + objectsBatcher.close(); + } + } +} diff --git a/src/main/java/io/weaviate/connector/converter/DataConverter.java b/src/main/java/io/weaviate/connector/converter/DataConverter.java new file mode 100644 index 0000000..db7e908 --- /dev/null +++ b/src/main/java/io/weaviate/connector/converter/DataConverter.java @@ -0,0 +1,130 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector.converter; + +import org.apache.kafka.connect.data.ConnectSchema; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class DataConverter { + + @SuppressWarnings("unchecked") + public Map convertToWeaviateProperties(Schema schema, Object value) { + Object object = convertToJava(schema, value); + if (!(object instanceof Map)) { + throw new DataException(String.format("Cannot convert " + schema.name() + " to Java object, %s is not a Map", value)); + } + return (Map) object; + } + + private Object convertToJava(Schema schema, Object value) { + if (value == null) { + if (schema == null) + return null; + if (schema.defaultValue() != null) + return convertToJava(schema, schema.defaultValue()); + if (schema.isOptional()) + return null; + throw new DataException("Conversion error: null value for field that is required and has no default value"); + } + + try { + final Schema.Type schemaType; + if (schema == null) { + schemaType = ConnectSchema.schemaType(value.getClass()); + if (schemaType == null) + throw new DataException("Java class " + value.getClass() + " does not have corresponding schema type."); + } else { + schemaType = schema.type(); + } + switch (schemaType) { + case INT8: + ByteBuffer bb = ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, (byte) value}); + return bb.getLong(); + case INT16: + return ((Short) value).longValue(); + case INT32: + return ((Integer) value).longValue(); + case INT64: + return (Long) value; + case FLOAT32: + return ((Float) value).doubleValue(); + case FLOAT64: + return (Double) value; + case BOOLEAN: + return (Boolean) value; + case STRING: + CharSequence charSeq = (CharSequence) value; + return charSeq.toString(); + case BYTES: + if (value instanceof byte[]) + return ((byte[]) value); + else if (value instanceof ByteBuffer) + return ((ByteBuffer) value).array(); + else + throw new DataException("Invalid type for bytes type: " + value.getClass()); + case ARRAY: { + Collection collection = (Collection) value; + ArrayList list = new ArrayList<>(collection.size()); + for (Object elem : collection) { + Schema valueSchema = schema == null ? null : schema.valueSchema(); + Object fieldValue = convertToJava(valueSchema, elem); + list.add(fieldValue); + } + return list; + } + case MAP: { + Map map = (Map) value; + HashMap object = new HashMap<>(map.size()); + for (Map.Entry entry : map.entrySet()) { + Schema keySchema = schema == null ? null : schema.keySchema(); + Schema valueSchema = schema == null ? null : schema.valueSchema(); + Object mapKey = convertToJava(keySchema, entry.getKey()); + Object mapValue = convertToJava(valueSchema, entry.getValue()); + + object.put(String.valueOf(mapKey), mapValue); + } + return object; + } + case STRUCT: { + Struct struct = (Struct) value; + if (!struct.schema().equals(schema)) + throw new DataException("Mismatching schema."); + + HashMap object = new HashMap<>(); + for (Field field : schema.fields()) { + object.put(field.name(), convertToJava(field.schema(), struct.get(field))); + } + return object; + } + } + + throw new DataException("Couldn't convert " + value + " to Java object."); + } catch (ClassCastException e) { + String schemaTypeStr = (schema != null) ? schema.type().toString() : "unknown schema"; + throw new DataException("Invalid type for " + schemaTypeStr + ": " + value.getClass()); + } + } + +} diff --git a/src/main/java/io/weaviate/connector/idstrategy/FieldIdStrategy.java b/src/main/java/io/weaviate/connector/idstrategy/FieldIdStrategy.java new file mode 100644 index 0000000..f92b0b1 --- /dev/null +++ b/src/main/java/io/weaviate/connector/idstrategy/FieldIdStrategy.java @@ -0,0 +1,46 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector.idstrategy; + +import io.weaviate.connector.WeaviateSinkConfig; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.UUID; + +public class FieldIdStrategy implements IDStrategy { + private String fieldName; + + public FieldIdStrategy() { + } + + @Override + public void configure(WeaviateSinkConfig config) { + fieldName = config.getDocumentIdFieldName(); + } + + @Override + public String getDocumentId(SinkRecord record, Map valueProperties) { + try { + String id = String.valueOf(valueProperties.get(fieldName)); + valueProperties.remove(fieldName); + return UUID.nameUUIDFromBytes(id.getBytes(StandardCharsets.UTF_8)).toString(); + } catch (Exception e) { + throw new RuntimeException("Cannot get document id from message", e); + } + } +} diff --git a/src/main/java/io/weaviate/connector/idstrategy/IDStrategy.java b/src/main/java/io/weaviate/connector/idstrategy/IDStrategy.java new file mode 100644 index 0000000..4f62e2d --- /dev/null +++ b/src/main/java/io/weaviate/connector/idstrategy/IDStrategy.java @@ -0,0 +1,28 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector.idstrategy; + +import io.weaviate.connector.WeaviateSinkConfig; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.Map; + +public interface IDStrategy { + public String getDocumentId(SinkRecord record, Map valueProperties); + + public default void configure(WeaviateSinkConfig config) { + } +} diff --git a/src/main/java/io/weaviate/connector/idstrategy/KafkaIdStrategy.java b/src/main/java/io/weaviate/connector/idstrategy/KafkaIdStrategy.java new file mode 100644 index 0000000..cdba032 --- /dev/null +++ b/src/main/java/io/weaviate/connector/idstrategy/KafkaIdStrategy.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector.idstrategy; + +import org.apache.kafka.connect.sink.SinkRecord; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.UUID; + +public class KafkaIdStrategy implements IDStrategy { + public KafkaIdStrategy() { + } + + @Override + public String getDocumentId(SinkRecord record, Map valueProperties) { + if (record.key() instanceof String) { + return UUID.nameUUIDFromBytes(record.key().toString().getBytes(StandardCharsets.UTF_8)).toString(); + } + + return String.valueOf(record.key()); + } +} diff --git a/src/main/java/io/weaviate/connector/idstrategy/NoIdStrategy.java b/src/main/java/io/weaviate/connector/idstrategy/NoIdStrategy.java new file mode 100644 index 0000000..f1804ca --- /dev/null +++ b/src/main/java/io/weaviate/connector/idstrategy/NoIdStrategy.java @@ -0,0 +1,27 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector.idstrategy; + +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.Map; + +public class NoIdStrategy implements IDStrategy { + @Override + public String getDocumentId(SinkRecord record, Map valueProperties) { + return null; + } +} diff --git a/src/main/java/io/weaviate/connector/vectorstrategy/FieldVectorStrategy.java b/src/main/java/io/weaviate/connector/vectorstrategy/FieldVectorStrategy.java new file mode 100644 index 0000000..61aca60 --- /dev/null +++ b/src/main/java/io/weaviate/connector/vectorstrategy/FieldVectorStrategy.java @@ -0,0 +1,66 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector.vectorstrategy; + +import io.weaviate.connector.WeaviateSinkConfig; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class FieldVectorStrategy implements VectorStrategy { + private String fieldName; + + public FieldVectorStrategy() { + } + + @Override + public void configure(WeaviateSinkConfig config) { + fieldName = config.getVectorFieldName(); + } + + @Override + public Float[] getDocumentVector(SinkRecord record, Map valueProperties) { + if (valueProperties.get(fieldName) == null) { + return null; + } + + Object object = valueProperties.get(fieldName); + if (object instanceof Float[]) { + valueProperties.remove(fieldName); + return (Float[]) object; + } + if (object instanceof Iterable) { + List floatList = new ArrayList<>(); + for (Object o : (Iterable) object) { + if (o instanceof Float) { + Float f = (Float) o; + floatList.add(f); + } else if (o instanceof Double) { + Double d = (Double) o; + floatList.add(d.floatValue()); + } else { // trying to cast anyway + Float f = (Float) o; + floatList.add(f); + } + } + valueProperties.remove(fieldName); + return floatList.toArray(new Float[0]); + } + throw new UnsupportedOperationException("Can't convert " + object + " to Float[]"); + } +} diff --git a/src/main/java/io/weaviate/connector/vectorstrategy/NoVectorStrategy.java b/src/main/java/io/weaviate/connector/vectorstrategy/NoVectorStrategy.java new file mode 100644 index 0000000..f015d8b --- /dev/null +++ b/src/main/java/io/weaviate/connector/vectorstrategy/NoVectorStrategy.java @@ -0,0 +1,33 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector.vectorstrategy; + +import io.weaviate.connector.WeaviateSinkConfig; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.Map; + +public class NoVectorStrategy implements VectorStrategy { + @Override + public void configure(WeaviateSinkConfig config) { + + } + + @Override + public Float[] getDocumentVector(SinkRecord record, Map valueProperties) { + return null; + } +} diff --git a/src/main/java/io/weaviate/connector/vectorstrategy/VectorStrategy.java b/src/main/java/io/weaviate/connector/vectorstrategy/VectorStrategy.java new file mode 100644 index 0000000..7f7b8e5 --- /dev/null +++ b/src/main/java/io/weaviate/connector/vectorstrategy/VectorStrategy.java @@ -0,0 +1,27 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector.vectorstrategy; + +import io.weaviate.connector.WeaviateSinkConfig; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.Map; + +public interface VectorStrategy { + public void configure(WeaviateSinkConfig config); + + public Float[] getDocumentVector(SinkRecord record, Map valueProperties); +} diff --git a/src/main/resources/kafka-connect-weaviate-version.properties b/src/main/resources/kafka-connect-weaviate-version.properties new file mode 100644 index 0000000..e5683df --- /dev/null +++ b/src/main/resources/kafka-connect-weaviate-version.properties @@ -0,0 +1 @@ +version=${project.version} \ No newline at end of file diff --git a/src/test/java/io/weaviate/connector/WeaviateSinkConfigTest.java b/src/test/java/io/weaviate/connector/WeaviateSinkConfigTest.java new file mode 100644 index 0000000..86a15cc --- /dev/null +++ b/src/test/java/io/weaviate/connector/WeaviateSinkConfigTest.java @@ -0,0 +1,58 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +class WeaviateSinkConfigTest { + @Test + void ensureHeaderParsing() { + HashMap originals = new HashMap<>() {{ + put(WeaviateSinkConfig.HEADERS_CONFIG, "X-OpenAI-Api-Key=XYZ"); + }}; + + WeaviateSinkConfig config = new WeaviateSinkConfig(WeaviateSinkConfig.CONFIG_DEF, originals); + Map headers = config.getHeaders(); + + assertTrue(headers.containsKey("X-OpenAI-Api-Key")); + assertEquals("XYZ", headers.get("X-OpenAI-Api-Key")); + } + + + @Test + void ensureInvaliHeaderAreThrowingException() { + HashMap originals = new HashMap<>() {{ + put(WeaviateSinkConfig.HEADERS_CONFIG, "X-OpenAI-Api-Key/XYZ"); // Invalid Header format + }}; + + assertThrows(IllegalArgumentException.class, () -> new WeaviateSinkConfig(WeaviateSinkConfig.CONFIG_DEF, originals)); + } + + @Test + void ensureDeleteEnabledEnforceKafkaId() { + HashMap originals = new HashMap<>() {{ + put(WeaviateSinkConfig.DELETE_ENABLED_CONFIG, "true"); + }}; + + assertThrows(IllegalArgumentException.class, () -> new WeaviateSinkConfig(WeaviateSinkConfig.CONFIG_DEF, originals)); + } + +} \ No newline at end of file diff --git a/src/test/java/io/weaviate/connector/converter/DataConverterTest.java b/src/test/java/io/weaviate/connector/converter/DataConverterTest.java new file mode 100644 index 0000000..fd36b9e --- /dev/null +++ b/src/test/java/io/weaviate/connector/converter/DataConverterTest.java @@ -0,0 +1,74 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector.converter; + +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.codehaus.plexus.util.IOUtil; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class DataConverterTest { + + @Test + void structNoSchemaConvertToWeaviateProperties() throws IOException { + DataConverter converter = new DataConverter(); + JsonConverter jsonConverter = new JsonConverter(); + jsonConverter.configure(new HashMap<>() {{ + put(JsonConverterConfig.TYPE_CONFIG, "value"); + put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); + }}); + + String jsonContent = IOUtil.toString(converter.getClass().getResourceAsStream("/jsonData.json")); + SchemaAndValue schemaAndValue = jsonConverter.toConnectData("test", jsonContent.getBytes(Charset.defaultCharset())); + + Map properties = converter.convertToWeaviateProperties(schemaAndValue.schema(), schemaAndValue.value()); + + assertEquals("hello world", properties.get("text")); + assertEquals(123L, properties.get("int")); + assertEquals(1.23, properties.get("float")); + assertEquals(true, properties.get("boolean")); + assertEquals("[1.0, 2.0]", properties.get("vector").toString()); + } + + + @Test + void structWithSchemaConvertToWeaviateProperties() throws IOException { + DataConverter converter = new DataConverter(); + JsonConverter jsonConverter = new JsonConverter(); + jsonConverter.configure(new HashMap<>() {{ + put(JsonConverterConfig.TYPE_CONFIG, "value"); + put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true); + }}); + + String jsonContent = IOUtil.toString(converter.getClass().getResourceAsStream("/jsonDataSchema.json")); + SchemaAndValue schemaAndValue = jsonConverter.toConnectData("test", jsonContent.getBytes(Charset.defaultCharset())); + + Map properties = converter.convertToWeaviateProperties(schemaAndValue.schema(), schemaAndValue.value()); + + assertEquals("hello world", properties.get("text")); + assertEquals(123L, properties.get("int")); + assertEquals(1.23, properties.get("float")); + assertEquals(true, properties.get("boolean")); + } +} \ No newline at end of file diff --git a/src/test/java/io/weaviate/connector/idstrategy/FieldIdStrategyTest.java b/src/test/java/io/weaviate/connector/idstrategy/FieldIdStrategyTest.java new file mode 100644 index 0000000..13adae0 --- /dev/null +++ b/src/test/java/io/weaviate/connector/idstrategy/FieldIdStrategyTest.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector.idstrategy; + +import io.weaviate.connector.WeaviateSinkConfig; +import io.weaviate.connector.converter.DataConverter; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.codehaus.plexus.util.IOUtil; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class FieldIdStrategyTest { + + @Test + void getFieldId() throws IOException { + DataConverter converter = new DataConverter(); + JsonConverter jsonConverter = new JsonConverter(); + jsonConverter.configure(new HashMap<>() {{ + put(JsonConverterConfig.TYPE_CONFIG, "value"); + put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); + }}); + + String jsonContent = IOUtil.toString(converter.getClass().getResourceAsStream("/jsonData.json")); + SchemaAndValue schemaAndValue = jsonConverter.toConnectData("test", jsonContent.getBytes(Charset.defaultCharset())); + + WeaviateSinkConfig config = new WeaviateSinkConfig(WeaviateSinkConfig.CONFIG_DEF, new HashMap<>() {{ + put(WeaviateSinkConfig.DOCUMENT_ID_FIELD_CONFIG, "text"); + }}); + + Map properties = converter.convertToWeaviateProperties(schemaAndValue.schema(), schemaAndValue.value()); + + FieldIdStrategy fieldIdStrategy = new FieldIdStrategy(); + fieldIdStrategy.configure(config); + + String documentId = fieldIdStrategy.getDocumentId(null, properties); + + assertEquals("5eb63bbb-e01e-3ed0-93cb-22bb8f5acdc3", documentId); + } + +} \ No newline at end of file diff --git a/src/test/java/io/weaviate/connector/idstrategy/KafkaIdStrategyTest.java b/src/test/java/io/weaviate/connector/idstrategy/KafkaIdStrategyTest.java new file mode 100644 index 0000000..717ff59 --- /dev/null +++ b/src/test/java/io/weaviate/connector/idstrategy/KafkaIdStrategyTest.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector.idstrategy; + +import io.weaviate.connector.WeaviateSinkConfig; +import io.weaviate.connector.converter.DataConverter; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.sink.SinkRecord; +import org.codehaus.plexus.util.IOUtil; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class KafkaIdStrategyTest { + + @Test + void getFieldId() throws IOException { + DataConverter converter = new DataConverter(); + JsonConverter jsonConverter = new JsonConverter(); + jsonConverter.configure(new HashMap<>() {{ + put(JsonConverterConfig.TYPE_CONFIG, "value"); + put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); + }}); + + String jsonContent = IOUtil.toString(converter.getClass().getResourceAsStream("/jsonData.json")); + SchemaAndValue schemaAndValue = jsonConverter.toConnectData("test", jsonContent.getBytes(Charset.defaultCharset())); + + WeaviateSinkConfig config = new WeaviateSinkConfig(WeaviateSinkConfig.CONFIG_DEF, new HashMap<>() {{ + put(WeaviateSinkConfig.DOCUMENT_ID_FIELD_CONFIG, "text"); + }}); + + Map properties = converter.convertToWeaviateProperties(schemaAndValue.schema(), schemaAndValue.value()); + + KafkaIdStrategy fieldIdStrategy = new KafkaIdStrategy(); + fieldIdStrategy.configure(config); + + String documentId = fieldIdStrategy.getDocumentId(new SinkRecord("", 0, Schema.STRING_SCHEMA, "hello", null, null, 0L), properties); + assertEquals("5d41402a-bc4b-3a76-b971-9d911017c592", documentId); + } +} \ No newline at end of file diff --git a/src/test/java/io/weaviate/connector/vectorstrategy/FieldVectorStrategyTest.java b/src/test/java/io/weaviate/connector/vectorstrategy/FieldVectorStrategyTest.java new file mode 100644 index 0000000..aaa2503 --- /dev/null +++ b/src/test/java/io/weaviate/connector/vectorstrategy/FieldVectorStrategyTest.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2025 Weaviate + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.weaviate.connector.vectorstrategy; + +import io.weaviate.connector.WeaviateSinkConfig; +import io.weaviate.connector.converter.DataConverter; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.codehaus.plexus.util.IOUtil; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class FieldVectorStrategyTest { + + @Test + void getFieldVector() throws IOException { + DataConverter converter = new DataConverter(); + JsonConverter jsonConverter = new JsonConverter(); + jsonConverter.configure(new HashMap<>() {{ + put(JsonConverterConfig.TYPE_CONFIG, "value"); + put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); + }}); + + String jsonContent = IOUtil.toString(converter.getClass().getResourceAsStream("/jsonData.json")); + SchemaAndValue schemaAndValue = jsonConverter.toConnectData("test", jsonContent.getBytes(Charset.defaultCharset())); + + WeaviateSinkConfig config = new WeaviateSinkConfig(WeaviateSinkConfig.CONFIG_DEF, new HashMap<>() {{ + put(WeaviateSinkConfig.VECTOR_FIELD_CONFIG, "vector"); + }}); + + Map properties = converter.convertToWeaviateProperties(schemaAndValue.schema(), schemaAndValue.value()); + + FieldVectorStrategy fieldIdStrategy = new FieldVectorStrategy(); + fieldIdStrategy.configure(config); + + Float[] documentId = fieldIdStrategy.getDocumentVector(null, properties); + + assertEquals(1.0f, documentId[0]); + assertEquals(2.0f, documentId[1]); + } +} \ No newline at end of file diff --git a/src/test/resources/jsonData.json b/src/test/resources/jsonData.json new file mode 100644 index 0000000..41959db --- /dev/null +++ b/src/test/resources/jsonData.json @@ -0,0 +1,12 @@ +{ + "text": "hello world", + "int": 123, + "float": 1.23, + "date": "1736352917", + "boolean": true, + "bytes": "SGVsbG8gV29ybGQK", + "vector": [ + 1.0, + 2.0 + ] +} \ No newline at end of file diff --git a/src/test/resources/jsonDataSchema.json b/src/test/resources/jsonDataSchema.json new file mode 100644 index 0000000..2c17aca --- /dev/null +++ b/src/test/resources/jsonDataSchema.json @@ -0,0 +1,35 @@ +{ + "schema": { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "text" + }, + { + "type": "int64", + "optional": false, + "field": "int" + }, + { + "type": "boolean", + "optional": false, + "field": "boolean" + }, + { + "type": "double", + "optional": false, + "field": "float" + } + ] + }, + "payload": { + "text": "hello world", + "int": 123, + "float": 1.23, + "date": "1736352917", + "boolean": true, + "bytes": "SGVsbG8gV29ybGQK" + } +} \ No newline at end of file