diff --git a/README.md b/README.md index 98663d9..e412ef6 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,8 @@ This Kafka *sink* connector for Amazon EventBridge allows you to send events (records) from one or multiple Kafka topic(s) to the specified event bus, including useful features such as: -- configurable topic to event `detail-type` name mapping +- offloading large events to S3 (✨ new in `v1.3.0` ) +- configurable topic to event `detail-type` name mapping with option to provide a custom class to customize event `detail-type` naming (✨ new in `v1.3.0`) - custom IAM profiles per connector - IAM role-based authentication - support for [dead-letter @@ -84,20 +85,22 @@ mvn clean package -Drevision=$(git describe --tags --always) In addition to the common Kafka Connect [sink-related](https://kafka.apache.org/documentation.html#sinkconnectconfigs) configuration options, this connector defines the following configuration properties. -| Property | Required | Default | Description | -| --------------------------------------------- | -------- | -------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| `aws.eventbridge.connector.id` | **Yes** | | The unique ID of this connector (used in the EventBridge event `source` field as a suffix on `kafka-connect.` to uniquely identify a connector). | -| `aws.eventbridge.region` | **Yes** | | The AWS region of the target event bus. | -| `aws.eventbridge.endpoint.uri` | No | | An optional [service endpoint](https://docs.aws.amazon.com/general/latest/gr/ev.html) URI used to connect to EventBridge. | -| `aws.eventbridge.eventbus.arn` | **Yes** | | The ARN of the target event bus. | -| `aws.eventbridge.eventbus.global.endpoint.id` | No | | An optional global endpoint ID of the target event bus specified using `abcde.xyz` syntax (see API [documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-global-endpoints.html)). | -| `aws.eventbridge.eventbus.resources` | No | | Optional [`Resources`](https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_PutEventsRequestEntry.html) (comma-seperated) to add to each EventBridge event. | -| `aws.eventbridge.detail.types` | No | `"kafka-connect-${topic}"` | The `detail-type` that will be used for the EventBridge events. Can be defined per topic e.g., `"topic1:MyDetailType, topic2:MyDetailType"`, as a single expression with a dynamic `${topic}` placeholder for all topics e.g., `"my-detail-type-${topic}"` or as a static value without additional topic information for all topics e.g, `"my-detail-type"`. | -| `aws.eventbridge.retries.max` | No | `2` | The maximum number of retry attempts when sending events to EventBridge. | -| `aws.eventbridge.retries.delay` | No | `200` | The retry delay in milliseconds between each retry attempt. | -| `aws.eventbridge.iam.profile.name` | No | | Use the specified IAM profile to resolve credentials See [Using different Configuration Profiles per Connector](#using-different-configuration-profiles-per-connector) for details | -| `aws.eventbridge.iam.role.arn` | No | | Uses [STS](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html) to assume the specified IAM role with periodic refresh. The connector ID is used as the session name. | -| `aws.eventbridge.iam.external.id` | No | | The IAM external id (optional) when role-based authentication is used. | +| Property | Required | Default | Description | +|------------------------------------------------|----------|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `aws.eventbridge.connector.id` | **Yes** | | The unique ID of this connector (used in the EventBridge event `source` field as a suffix on `kafka-connect.` to uniquely identify a connector). | +| `aws.eventbridge.region` | **Yes** | | The AWS region of the target event bus. | +| `aws.eventbridge.endpoint.uri` | No | | An optional [service endpoint](https://docs.aws.amazon.com/general/latest/gr/ev.html) URI used to connect to EventBridge. | +| `aws.eventbridge.eventbus.arn` | **Yes** | | The ARN of the target event bus. | +| `aws.eventbridge.eventbus.global.endpoint.id` | No | | An optional global endpoint ID of the target event bus specified using `abcde.xyz` syntax (see API [documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-global-endpoints.html)). | +| `aws.eventbridge.eventbus.resources` | No | | Optional [`Resources`](https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_PutEventsRequestEntry.html) (comma-seperated) to add to each EventBridge event. | +| `aws.eventbridge.detail.types` | No | `"kafka-connect-${topic}"` | The `detail-type` that will be used for the EventBridge events. Can be defined per topic e.g., `"topic1:MyDetailType, topic2:MyDetailType"`, as a single expression with a dynamic `${topic}` placeholder for all topics e.g., `"my-detail-type-${topic}"` or as a static value without additional topic information for all topics e.g, `"my-detail-type"`. | +| `aws.eventbridge.retries.max` | No | `2` | The maximum number of retry attempts when sending events to EventBridge. | +| `aws.eventbridge.retries.delay` | No | `200` | The retry delay in milliseconds between each retry attempt. | +| `aws.eventbridge.iam.profile.name` | No | | Use the specified IAM profile to resolve credentials See [Using different Configuration Profiles per Connector](#using-different-configuration-profiles-per-connector) for details | +| `aws.eventbridge.iam.role.arn` | No | | Uses [STS](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html) to assume the specified IAM role with periodic refresh. The connector ID is used as the session name. | +| `aws.eventbridge.iam.external.id` | No | | The IAM external id (optional) when role-based authentication is used. | +| `aws.eventbridge.offloading.default.s3.bucket` | No | | The S3 bucket to use to offload events to S3 (see [Offloading large events (payloads) to S3](#offloading-large-events-payloads-to-s3)) | +| `aws.eventbridge.offloading.default.fieldref` | No | `$.detail.value` | The part of the event (payload) to offload to S3 (only active when `aws.eventbridge.offloading.default.s3.bucket` is set) | > [!NOTE] > When using the default retry configuration (or retries > 0), the connector provides *at-least-once* delivery semantics @@ -224,6 +227,189 @@ If only the topic name should be used, a single expression with a dynamic `${top used e.g., `"my-detail-type-${topic}"` (using a hardcoded prefix), `"${topic}"` (only topic name), or as a static value without additional topic information `"my-detail-type"`. +### Offloading large events (payloads) to S3 + +The current `PutEvents` size [limit](https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_PutEvents.html) in +EventBridge is 256KB. This can be problematic in cases where Kafka topics contain records exceeding this limit. By +default, the connector logs a warning when trying to send those events to EventBridge which can be ignored (dropped) or +sent to a Kafka dead-letter topic (see [Payloads exceeding PutEvents Limit](#payloads-exceeding-putevents-limit)). + +Alternatively, the connector can be configured to offload (parts of) the event to S3 before calling the `PutEvents` API. +This is also known as the claim-check pattern. When enabled (see [Configuration](#configure-offloading)), every record +received from the associated Kafka topics in the connector which matches the +[JSONPath](https://www.ietf.org/archive/id/draft-goessner-dispatch-jsonpath-00.html) expression defined in +`aws.eventbridge.offloading.default.fieldref`(default: `$.detail.value`) will be offloaded. + +#### Configure Offloading + +To enable offloading, specify an S3 bucket via `aws.eventbridge.offloading.default.s3.bucket`. + +> [!NOTE] +> The IAM credentials/role used in the connector needs +> [`PutObject`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) permissions. + +Unless overwritten by `aws.eventbridge.offloading.default.fieldref`, the connector will offload the value in +`$.detail.value` to S3, delete that key from the event and add claim-check information to the event metadata (see +examples below). The JSONPath expression applies to the converted EventBridge event before calling `PutEvents` to EventBridge. + +The benefits of this approach over other offloading implementations is flexibility in which parts of the events should +be offloaded and retaining as much of the original event as possible to harness the powerful event filtering +capabilities in EventBridge. For example, some events in a topic might contain large blobs of binary/base64-encoded data +which most consumers are not interested. In those cases, offloading helps to trim down event (payload) size and giving +the consumer(s) interested in the full payload the option to fully reconstruct the event based on the offloaded S3 +object and metadata added to the event structure. + +> [!NOTE] +> Array and wildcard references are not allowed in the JSONPath expression defined in +> `aws.eventbridge.offloading.default.fieldref` and the JSONPath must always begin with `$.detail.value`. + +#### Examples + +Assuming offloading is enabled via the setting `aws.eventbridge.offloading.default.s3.bucket="my-offloading-bucket"` and +the following event structure which the S3 offloading logic in the connector operates on before making the final +`PutEvents` API call to EventBridge: + +```json +{ + "version": "0", + "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57", + "account": "1234567890", + "time": "2023-05-23T11:38:46Z", + "region": "us-east-1", + "detail-type": "kafka-connect-json-values-topic", + "source": "kafka-connect.my-json-values-connector", + "resources": [], + "detail": { + "topic": "json-values-topic", + "partition": 0, + "offset": 0, + "timestamp": 1684841916831, + "timestampType": "CreateTime", + "headers": [], + "key": "order-1", + "value": { + "orderItems": [ + "item-1", + "item-2" + ], + "orderCreatedTime": "Tue May 23 13:38:46 CEST 2023", + "orderPreferences": null + } + } +} +``` + +If `aws.eventbridge.offloading.default.fieldref` is `$.detail.value` (the default), the resulting event sent to EventBridge would be: + +```json +{ + "version": "0", + "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57", + "account": "1234567890", + "time": "2023-05-23T11:38:46Z", + "region": "us-east-1", + "detail-type": "kafka-connect-json-values-topic", + "source": "kafka-connect.my-json-values-connector", + "resources": [], + "detail": { + "topic": "json-values-topic", + "partition": 0, + "offset": 0, + "timestamp": 1684841916831, + "timestampType": "CreateTime", + "headers": [], + "key": "order-1", + "dataref": "arn:aws:s3:::my-offloading-bucket/2d10c6f6-31e9-43b4-8706-51b4cf5534d8", + "datarefJsonPath": "$.detail.value" + } +} +``` + +In the S3 bucket `my-offloading-bucket` there would be an object `2d10c6f6-31e9-43b4-8706-51b4cf5534d8` containing: + +```json +{ + "orderItems": [ + "item-1", + "item-2" + ], + "orderCreatedTime": "Tue May 23 13:38:46 CEST 2023", + "orderPreferences": null +} +``` + +Continuing the example, if `aws.eventbridge.offloading.default.fieldref` is `$.detail.value.non-existing-key`, +offloading would pass this event through without modification. The resulting event would be the same as the input event +without offloading information: + +```json +{ + "version": "0", + "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57", + "account": "1234567890", + "time": "2023-05-23T11:38:46Z", + "region": "us-east-1", + "detail-type": "kafka-connect-json-values-topic", + "source": "kafka-connect.my-json-values-connector", + "resources": [], + "detail": { + "topic": "json-values-topic", + "partition": 0, + "offset": 0, + "timestamp": 1684841916831, + "timestampType": "CreateTime", + "headers": [], + "key": "order-1", + "value": { + "orderItems": [ + "item-1", + "item-2" + ], + "orderCreatedTime": "Tue May 23 13:38:46 CEST 2023", + "orderPreferences": null + } + } +} +``` + +If `aws.eventbridge.offloading.default.fieldref` is `$.detail.value.orderPreferences` and matches a key with a `null` +value, offloading is also skipped as there is nothing to offload. The resulting event would be the same as the input +event without offloading information: + +```json +{ + "version": "0", + "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57", + "account": "1234567890", + "time": "2023-05-23T11:38:46Z", + "region": "us-east-1", + "detail-type": "kafka-connect-json-values-topic", + "source": "kafka-connect.my-json-values-connector", + "resources": [], + "detail": { + "topic": "json-values-topic", + "partition": 0, + "offset": 0, + "timestamp": 1684841916831, + "timestampType": "CreateTime", + "headers": [], + "key": "order-1", + "value": { + "orderItems": [ + "item-1", + "item-2" + ], + "orderCreatedTime": "Tue May 23 13:38:46 CEST 2023", + "orderPreferences": null + } + } +} +``` + +> [!NOTE] +> If offloading matches a key with an empty object `{}` or array `[]`, these values are considered a match and will be +> offloaded just as any other matched value. + ### Retry Behavior By default, the connector is configured to retry failed [`PutEvents` @@ -510,7 +696,8 @@ We recommend to verify your `PutEvents` account quota for the specific AWS EventBridge has a [limit](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-putevent-size.html) of 256KB on the request size used in `PutEvents`. When a Kafka record exceeds this threshold, the connector will log a warning and ignore (skip) over the record. Optionally, a dead-letter topic can be -[configured](#json-encoding-with-dead-letter-queue) where such records are sent to. +[configured](#json-encoding-with-dead-letter-queue) where such records are sent to or [offloading to +S3](#configure-offloading) can be enabled. ```console [2023-05-26 09:01:21,149] WARN [EventBridgeSink-Json|task-0] [@69c7029] Marking record as failed: code=413 message=EventBridge batch size limit exceeded topic=json-test partition=0 offset=0 (software.amazon.event.kafkaconnector.EventBridgeWriter:244) diff --git a/src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfig.java b/src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfig.java index 208e3cf..ca50110 100644 --- a/src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfig.java +++ b/src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfig.java @@ -131,7 +131,7 @@ public EventBridgeSinkConfig(final Map originalProps) { log.info( "EventBridge properties: connectorId={} eventBusArn={} eventBusRegion={} eventBusEndpointURI={} " + "eventBusMaxRetries={} eventBusRetriesDelay={} eventBusResources={} " - + "eventBusEndpointID={} roleArn={} roleSessionName={} roleExternalID={}" + + "eventBusEndpointID={} roleArn={} roleSessionName={} roleExternalID={} " + "offloadingDefaultS3Bucket={} offloadingDefaultFieldRef={}", connectorId, eventBusArn,