Skip to content

Commit

Permalink
formatting and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
DarioBalinzo committed Feb 2, 2021
1 parent 0605218 commit 09385ca
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 127 deletions.
2 changes: 1 addition & 1 deletion .github/FUNDING.yml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
custom: ["https://www.paypal.me/coffeeDarioBalinzo"]
custom: [ "https://www.paypal.me/coffeeDarioBalinzo" ]
149 changes: 89 additions & 60 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,39 +1,48 @@
# Kafka-connect-elasticsearch-source
[![YourActionName Actions Status](https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source/workflows/Java%20CI%20with%20Maven/badge.svg)](https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source/actions)

[![YourActionName Actions Status](https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source/workflows/Java%20CI%20with%20Maven/badge.svg)](https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source/actions)

Kafka Connect Elasticsearch Source: fetch data from elastic-search and sends it to kafka. The connector fetches only new data using a strictly incremental / temporal field (like a timestamp or an incrementing id).
It supports dynamic schema and nested objects/ arrays.
Kafka Connect Elasticsearch Source: fetch data from elastic-search and sends it to kafka. The connector fetches only new
data using a strictly incremental / temporal field (like a timestamp or an incrementing id). It supports dynamic schema
and nested objects/ arrays.

## Requirements:

- Elasticsearch 6.x and 7.x
- Java >= 8
- Maven

## Output data serialization format:
The connector uses kafka-connect schema and structs, that are agnostic regarding
the user serialization method (e.g. it might be Avro or json, etc...).

The connector uses kafka-connect schema and structs, that are agnostic regarding the user serialization method (e.g. it
might be Avro or json, etc...).

## Bugs or new Ideas?

- Issues tracker: https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source/issues
- Feel free to open an issue to discuss new ideas (or propose new solutions with a PR).

## Installation:

Compile the project with:

```bash
mvn clean package -DskipTests
```

You can also compile and running both unit and integration tests (docker is mandatory) with:

```bash
mvn clean package
```

Copy the jar with dependencies from the target folder into connect classpath (e.g ``/usr/share/java/kafka-connect-elasticsearch`` ) or set ``plugin.path`` parameter appropriately.
Copy the jar with dependencies from the target folder into connect classpath (
e.g ``/usr/share/java/kafka-connect-elasticsearch`` ) or set ``plugin.path`` parameter appropriately.

## Example
Using kafka connect in distributed way, a sample config file to fetch ``my_awesome_index*`` indices and to produce output topics with ``es_`` prefix:

Using kafka connect in distributed way, a sample config file to fetch ``my_awesome_index*`` indices and to produce
output topics with ``es_`` prefix:

```json
{
Expand All @@ -49,40 +58,43 @@ Using kafka connect in distributed way, a sample config file to fetch ``my_aweso
}
}
```

To start the connector with curl:

```bash
curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors | jq
```

To check the status:

```bash
curl localhost:8083/connectors/elastic-source/status | jq
```

To stop the connector:

```bash
curl -X DELETE localhost:8083/connectors/elastic-source | jq
```


## Documentation

### Elasticsearch Configuration

``es.host``
ElasticSearch host. Optionally it is possible to specify many hosts using ``;`` as separator (``host1;host2;host3``)
ElasticSearch host. Optionally it is possible to specify many hosts using ``;`` as separator (``host1;host2;host3``)

* Type: string
* Importance: high
* Dependents: ``index.prefix``
* Type: string
* Importance: high
* Dependents: ``index.prefix``

``es.port``
ElasticSearch port
ElasticSearch port

* Type: string
* Importance: high
* Dependents: ``index.prefix``

* Type: string
* Importance: high
* Dependents: ``index.prefix``

``es.scheme``
ElasticSearch scheme (http/https)

Expand All @@ -91,84 +103,101 @@ ElasticSearch scheme (http/https)
* Default: ``http``

``es.user``
Elasticsearch username
Elasticsearch username

* Type: string
* Default: null
* Importance: high
* Type: string
* Default: null
* Importance: high

``es.password``
Elasticsearch password
Elasticsearch password

* Type: password
* Default: null
* Importance: high


``incrementing.field.name``
The name of the strictly incrementing field to use to detect new records.

* Type: password
* Default: null
* Importance: high
* Type: any
* Importance: high

``incrementing.secondary.field.name``
In case the main incrementing field may have duplicates,
this secondary field is used as a secondary sort field in order
to avoid data losses when paginating (available starting from versions >= 1.4).

* Type: any
* Importance: low

``connection.attempts``
Maximum number of attempts to retrieve a valid Elasticsearch connection.
Maximum number of attempts to retrieve a valid Elasticsearch connection.

* Type: int
* Default: 3
* Importance: low
* Type: int
* Default: 3
* Importance: low

``connection.backoff.ms``
Backoff time in milliseconds between connection attempts.
Backoff time in milliseconds between connection attempts.

* Type: long
* Default: 10000
* Importance: low
* Type: long
* Default: 10000
* Importance: low

``index.prefix``
Indices prefix to include in copying.

* Type: string
* Default: ""
* Importance: medium
Indices prefix to include in copying.

* Type: string
* Default: ""
* Importance: medium

### Connector Configuration

``poll.interval.ms``
Frequency in ms to poll for new data in each index.
Frequency in ms to poll for new data in each index.

* Type: int
* Default: 5000
* Importance: high
* Type: int
* Default: 5000
* Importance: high

``batch.max.rows``
Maximum number of documents to include in a single batch when polling for new data.
Maximum number of documents to include in a single batch when polling for new data.

* Type: int
* Default: 10000
* Importance: low
* Type: int
* Default: 10000
* Importance: low

``topic.prefix``
Prefix to prepend to index names to generate the name of the Kafka topic to publish data
Prefix to prepend to index names to generate the name of the Kafka topic to publish data

* Type: string
* Importance: high
* Type: string
* Importance: high

``filters.whitelist``
Whitelist filter for extracting a subset of fields from elastic-search json documents.
The whitelist filter supports nested fields. To provide multiple fields use `;` as separator
Whitelist filter for extracting a subset of fields from elastic-search json documents. The whitelist filter supports
nested fields. To provide multiple fields use `;` as separator
(e.g. `customer;order.qty;order.price`).
* Type: string
* Importance: medium
* Default: null

* Type: string
* Importance: medium
* Default: null

``filters.json_cast``
This filter casts nested fields to json string, avoiding parsing recursively as kafka connect-schema.
The json-cast filter supports nested fields. To provide multiple fields use `;` as separator
This filter casts nested fields to json string, avoiding parsing recursively as kafka connect-schema. The json-cast
filter supports nested fields. To provide multiple fields use `;` as separator
(e.g. `customer;order.qty;order.price`).

* Type: string
* Importance: medium
* Default: null

``fieldname_converter``
Configuring which field name converter should be used (allowed values: `avro` or `nop`).
By default, the avro field name converter renames the json fields non respecting the avro specifications (https://avro.apache.org/docs/current/spec.html#names)
in order to be serialized correctly.
To disable the field name conversion set this parameter to `nop`.
Configuring which field name converter should be used (allowed values: `avro` or `nop`). By default, the avro field name
converter renames the json fields non respecting the avro
specifications (https://avro.apache.org/docs/current/spec.html#names)
in order to be serialized correctly. To disable the field name conversion set this parameter to `nop`.

* Type: string
* Importance: medium
* Default: avro
Loading

0 comments on commit 09385ca

Please sign in to comment.