The project is highly inspired by Kafka Connect, but implemented with Go. The project contains several connector classes designed for Change-Data-Capture (CDC) pipeline from specific source to sink. Some of them use internal storage based on Badger embedded database to keep offsets/log positions/other important metadata to prevent from data loss or duplication on connection issues. Internal storage is organized in "data" folder in a workspace of running app.
Keep in my mind to not mix connectors in common workspace unless they have unique names ("name" parameter in YAML configuration). It's not recommended to recreate Kafka topics and use the same state of connector's internal storage. If you have to then you might want to remove "data" folder also, but...
Docker image:
docker pull egsam98/kafka-pipe:{tag}
https://hub.docker.com/r/egsam98/kafka-pipe/tags
Go module:
go get github.com/egsam98/kafka-pipe
Class | Source | Sink |
---|---|---|
pg.Source | PostgreSQL | Kafka |
pg.Snapshot | PostgreSQL | Kafka |
ch.Sink | Kafka | ClickHouse |
s3.Sink | Kafka | S3 |
s3.Backup | S3 | Kafka |
To start application you want to pass YAML configuration as an argument for application.
./{kafka-pipe bin} config.yaml
Common parameters of config:
name: "name of your connector"
class: "any class described in \"Supported classes\" section"
log:
pretty: true
level: "info"
Additional params are particular for every class.
Transmits events from PostgreSQL to Kafka topics via logical replication and "pgoutput" plugin. Postgres' last read LSN is stored on disk of internal storage. Config parameters
Name | Type | Required | Description |
---|---|---|---|
pg:skip.delete | Bool | Skip delete-events. Default is false |
|
pg:url | String | + | Postgres connection URL |
pg:publication | String | + | Publication name |
pg:slot | String | + | Replication slot name |
pg:tables | List(String) | + | Tables list to subscribe for replication |
pg:health.table | String | Table name. Default is public.pipe_health |
|
kafka:brokers | List(String) | + | List of Kafka brokers. Format is {hostname}:{port} |
kafka:topic:prefix | String | Prefix for created topics. Format is {prefix}.{postgres table name} | |
kafka:topic:replication.factor | UInt | Topic replication factor. Default is 1 |
|
kafka:topic:partitions | UInt | Number of partitions. Default is 1 |
|
kafka:topic:compression.type | String | Compression type. Default is producer |
|
kafka:topic:cleanup.policy | String | Cleanup policy. Default is delete |
|
kafka:topic:retention | String | Retention duration. Default is 168h (7 days) |
|
kafka:topic:part_retention_size | String | Partition retention size in bytes, kilobytes, megabytes or terabytes. Default: 10GB |
|
kafka:topic:routes | Dict | Mapping of Postgres relation's regular expression to Kafka topic. Example for partitions: ^public.transaction(_\d+)?$: "public.transaction" | |
kafka:batch:size | UInt | Producer's batch size. Default is 10000 |
|
kafka:batch:timeout | String | Producer's batch timeout. Default is 5s |
Selects all/certain rows from tables and supplies to Kafka topics. Connector stops when all rows are produced
Config parameters
Name | Type | Required | Description |
---|---|---|---|
pg:url | String | + | Postgres connection URL |
pg:tables | List(String) | + | Tables list to execute snapshot |
pg:condition | String | SQL condition part to select rows from table. Example: "WHERE id > 1". Default is empty string. | |
kafka:brokers | List(String) | + | List of Kafka brokers. Format is {hostname}:{port} |
kafka:topic:prefix | String | Prefix for created topics. Format is {prefix}.{postgres table name} | |
kafka:topic:replication.factor | UInt | Topic replication factor. Default is 1 |
|
kafka:topic:partitions | UInt | Number of partitions. Default is 1 |
|
kafka:topic:compression.type | String | Compression type. Default is producer |
|
kafka:topic:cleanup.policy | String | Cleanup policy. Default is delete |
|
kafka:topic:retention | String | Retention duration. Default is 168h (7 days) |
|
kafka:topic:part_retention_size | String | Partition retention size in bytes, kilobytes, megabytes or terabytes. Default: 10GB |
|
kafka:topic:routes | Dict | Mapping of Postgres relation's regular expression to Kafka topic. Example for partitions: ^public.transaction(_\d+)?$: "public.transaction" | |
kafka:batch:size | UInt | Producer's batch size. Default is 10000 |
|
kafka:batch:timeout | String | Producer's batch timeout. Default is 5s |
Transmits events from Kafka topics to ClickHouse. Deduplication mechanism is designed similarly to https://github.com/ClickHouse/clickhouse-kafka-connect/blob/main/docs/DESIGN.md#state-machine . Kafka partition offsets are additionally stored on disk of internal storage. Config parameters
Name | Type | Required | Description |
---|---|---|---|
kafka:group | String | + | Group ID |
kafka:brokers | List(String) | + | List of Kafka brokers. Format is {hostname}:{port} |
kafka:topics | List(String) | + | Topics to read from |
kafka:sasl:protocol | String | SASL protocol, additional settings here | |
kafka:rebalance_timeout | String |
How long all members are allowed to complete work and commit offsets, minus the time it took to detect the rebalance (from a heartbeat).
Default is 1m
|
|
kafka:workers_per_topic | UInt | Number of workers (consumers) per 1 topic. Default is 1 |
|
kafka:batch:size | UInt | Consumer's batch size. Default is 10000 |
|
kafka:batch:timeout | String | Consumer's batch timeout. Default is 5s |
|
kafka:fetch_max_bytes | UInt |
Maximum amount of bytes a broker will try to send during a fetch.
Note that brokers may not obey this limit if it has records larger than this limit. Also note that this
client sends a fetch to each broker concurrently, meaning the client will buffer up to brokers * max bytes worth of memory.
This corresponds to the Java fetch.max.bytes setting.
Default: 52428800 (50MB)
|
|
kafka:fetch_max_partition_bytes | UInt |
Maximum amount of bytes that will be consumed for a single partition in a fetch request.
Note that if a single batch is larger than this number, that batch will still be returned so the client can make progress.
This corresponds to the Java max.partition.fetch.bytes setting.
Default: 1048576 (1MB)
|
|
click_house:addrs | List(String) | + | ClickHouse addresses list to connect |
click_house:database | String | + | Database name |
click_house:user | String | + | Username credential |
click_house:password | String | Password credential | |
serde:format | String |
Serde's format for Kafka messages (key & value). Default is json .
Additional settings for every Serde format are described here
|
|
routes | Dict | Routes of mapping Kafka topic regular expression to ClickHouse table. Example:
routes: public.transaction_\d+: transactions |
Non-YAML configuration (available if you use connector as go-module):
func BeforeInsert(ctx context.Context, serde Serde, batch []*kgo.Record) ([]any, error)
Hook function that's called per every consumed batch before inserting one into ClickHouse. Due to specific ClickHouse implementations (see here and here) result must consist of struct pointers, i.e.
func BeforeInsert(ctx context.Context, serde Serde, batch []*kgo.Record) ([]any, error) {
// Handle batch
// ...
return []any{&Data{Column1: 1}, &Data{Column1: 2}}, nil
}
Transmits events from Kafka topics to S3-compatible storage.
This connector polls Kafka records according batch settings (see kafka:batch:*
), groups by truncated datetime + partition and
then uploads collected data chunks into S3. The storage keys are built by following schema:
{topic}/{truncated datetime}/{partition}/{min_offset in chunk}-{max_offset in chunk}.gz
.
If previous data chunk doesn't exceed 5MB limit it's merged with new chunk and deleted. If prev. chunk's max offset >=
new chunk's max offset, new chunk is ignored.
Config parameters
Name | Type | Required | Description |
---|---|---|---|
kafka:group | String | + | Group ID |
kafka:brokers | List(String) | + | List of Kafka brokers. Format is {hostname}:{port} |
kafka:topics | List(String) | + | Topics to read from |
kafka:sasl:protocol | String | SASL protocol, additional settings here | |
kafka:rebalance_timeout | String |
How long all members are allowed to complete work and commit offsets, minus the time it took to detect the rebalance (from a heartbeat).
Default is 1m
|
|
kafka:workers_per_topic | UInt | Number of workers (consumers) per 1 topic. Default is 1 |
|
kafka:batch:size | UInt | Consumer's batch size. Default is 10000 |
|
kafka:batch:timeout | String | Consumer's batch timeout. Default is 5s |
|
kafka:fetch_max_bytes | UInt |
Maximum amount of bytes a broker will try to send during a fetch.
Note that brokers may not obey this limit if it has records larger than this limit. Also note that this
client sends a fetch to each broker concurrently, meaning the client will buffer up to brokers * max bytes worth of memory.
This corresponds to the Java fetch.max.bytes setting.
Default: 52428800 (50MB)
|
|
kafka:fetch_max_partition_bytes | UInt |
Maximum amount of bytes that will be consumed for a single partition in a fetch request.
Note that if a single batch is larger than this number, that batch will still be returned so the client can make progress.
This corresponds to the Java max.partition.fetch.bytes setting.
Default: 1048576 (1MB)
|
|
s3:endpoint | String | + | Connection endpoint |
s3:bucket | String | + | Bucket name |
s3:id | String | + | Access key ID (v4) |
s3:secret | String | + | Secret access key (V4) |
s3:ssl | Bool | use SSL connection | |
group_time_interval | String | Describes how records' datetime is rounded down and collected into groups in S3. Default: 1h |
Restores Kafka messages from S3 storage back to topics. The connector stops when all selected objects are handled. Read offsets are saved to internal storage and skipped on rescan afterwards.
Config parameters:
Name | Type | Required | Description |
---|---|---|---|
kafka:brokers | List(String) | + | List of Kafka brokers. Format is {hostname}:{port} |
kafka:batch:size | UInt | Consumer's batch size. Default is 10000 |
|
kafka:batch:timeout | String | Consumer's batch timeout. Default is 0 . Equivalent to producer's linger |
|
s3:endpoint | String | + | Connection endpoint |
s3:bucket | String | + | Bucket name |
s3:id | String | + | Access key ID (v4) |
s3:secret | String | + | Secret access key (V4) |
s3:ssl | Bool | use SSL connection | |
topics | List(String) | + | Topics to read from |
date_since | String | + | UTC date from the beginning of which to select objects. Format: "2006-01-02 15:04:05" |
date_to | String | + | UTC date by which to select objects. Format: "2006-01-02 15:04:05" |
Specific settings for every SASL protocol. All params are optional.
Name | Type | Required | Description |
---|---|---|---|
user | String | + | Username |
pass | String | + | Password |
zid | String | Authorization ID to use in authenticating |
Name | Type | Required | Description |
---|---|---|---|
user | String | + | Username |
pass | String | + | Password |
zid | String | Authorization ID to use in authenticating | |
nonce | String | If provided, is the string to use for authentication. Otherwise, this package uses 20 bytes read with crypto/rand (Go) | |
is_token | Bool | Suffixes the "tokenauth=true" extra attribute to initial authentication message |
See scram-256
Name | Type | Required | Description |
---|---|---|---|
token | String | + | Oauthbearer token to use for a single session's authentication |
zid | String | Authorization ID |
Name | Type | Required | Description |
---|---|---|---|
access_key | String | + | AWS access key |
secret_key | String | + | AWS secret key |
session_token | String | Read more https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html | |
user_agent | String | Override the default "franz-go/<runtime.Version()>/". Read more https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-useragent |
i.e. Serialization and Deserialization.
Supported formats: json
, avro
.
Depending on the selected format, different settings are provided:
Name | Type | Required | Description |
---|---|---|---|
time_format | String | Datetime formatter, possible values: rfc3339 , timestamp , timestamp-milli . Default is timestamp-milli |
Name | Type | Required | Description |
---|---|---|---|
schemas | Dict | + | Mapping of Kafka topics to Avro schema's source URLs. Supported HTTP schemas: file (local download), http(-s) (download via HTTP protocol). Example URL: file://schema.avsc |
Examples of Serde configuration:
serde:
format: json
time_format: rfc3339
serde:
format: avro
schemas:
topic_test: file://schema.avsc