This RFC describes a new aws_s3
source for ingesting events stored as objects
in S3 via SQS bucket notifications. This will permit, for example, ingesting
CloudTrail events, VPC flow logs, and ELB logs which can be written to S3, as
well as custom logging pipelines writing there.
This RFC will cover the implementation of an aws_s3
source that will rely on
SQS for bucket notifications.
It will not cover the proposed polling strategy, but will leave space for it to be added later in a backwards compatible fashion.
It will not cover parsing of events within the objects. Vector's current approach for this is to delegate to transforms. For example, it is likely we'll want to add a transform to extract CloudTrail events stored as objects in S3.
There are some types of logs that AWS can write to S3. For example:
- CloudTrail logs (can also go to CloudWatch Logs but with a size limitation)
- AWS load balancer access logs (only S3)
- VPC flow logs (can also go to CloudWatch logs)
Even if some AWS service logs can go to CloudWatch Logs, I think it will be common that operators will want to just send them to S3 as it is cheaper storage if they don't need to regularly access them.
A new aws_s3
source will be added that will poll SQS for bucket notifications
and ingest new objects from S3. It will wait to delete the SQS message until the
object has been fully processed to guarantee "at least" once delivery to the
pipeline (though it can still be lost after that until we tackle end-to-end
acks.
This approach will allow ingestion to be easily horizontally scaled by spinning
up extra vector
instances, relying on SQS to handle the fan-out.
Example SQS bucket notification message:
{
"Records":[
{
"eventVersion":"2.2",
"eventSource":"aws:s3",
"awsRegion":"us-west-2",
"eventTime":The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request,
"eventName":"event-type",
"userIdentity":{
"principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
},
"requestParameters":{
"sourceIPAddress":"ip-address-where-request-came-from"
},
"responseElements":{
"x-amz-request-id":"Amazon S3 generated request ID",
"x-amz-id-2":"Amazon S3 host that processed the request"
},
"s3":{
"s3SchemaVersion":"1.0",
"configurationId":"ID found in the bucket notification configuration",
"bucket":{
"name":"bucket-name",
"ownerIdentity":{
"principalId":"Amazon-customer-ID-of-the-bucket-owner"
},
"arn":"bucket-ARN"
},
"object":{
"key":"object-key",
"size":object-size,
"eTag":"object eTag",
"versionId":"object version if bucket is versioning-enabled, otherwise null",
"sequencer": "a string representation of a hexadecimal value used to determine event sequence,
only used with PUTs and DELETEs"
}
},
"glacierEventData": {
"restoreEventData": {
"lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry",
"lifecycleRestoreStorageClass": "Source storage class for restore"
}
}
}
]
}
A S3 client will be initialized when an SQS message is received using the
s3.bucket.arn
. It will be cached so we don't need to recreate the client for
messages referring to the same bucket.
Events will be published for:
- When SQS message is received
- Number of records in message
- When SQS message is processed (success or failure)
- Number of success / failure / unprocessed records
- When object is processed
- Bytes
These may be revised when the source is actually implemented.
We will create a user guide based on AWS's for configuring bucket notifications to publish object created events to SQS.
The new source configuration will look like:
[sources.my_source_id]
# General
type = "aws_s3" # required
compression = "auto" # optional; compression format of the objects; one of: ["auto", "none", "gzip" "lz4" "snappy" "zstd"]; default
strategy = "sqs" # optional, default, one of ["sqs"]
sqs.queue_url = "https://sqs.us-east-1.amazonaws.com/1234/test-s3-queue"
sqs.poll_secs = 30 # minimum poll interval, optional, default
sqs.visibility_timeout_secs = 300 # default visibility timeout for SQS message; if vector does not process the message in this time, the SQS message will be available to be reprocessed
sqs.delete_message = true # whether to delete the message after processing; false is useful for debugging; default
A comment mentions desiring the ability to filter objects by prefix, but I think this is better configured when setting up bucket notifications to avoid publishing notifications for these objects at all.
The LastModified
value for the object will be used as the timestamp
.
The object key, bucket, and region will be associated with the log as fields:
key
, bucket
, and region
.
All custom S3 object metadata key/values will be set as fields on the log event.
It is common to compress objects stored in S3 as part of the upload process. To facilitate this, we will allow the user to indicate the compression format in the config, as show above, but we will also support an "auto" value that will instruct Vector to automatically try to decompress the objects via:
- If
Content-Type
of the object is set to a known compression mime-type, use this, otherwise: - If the file extension matches a known compression file extension, use this, otherwise:
- Do no decoding and treat it as if the encoding was "none"
Maintenance burden of an additional source.
There are a few alternate approaches (some also noted by James).
AWS S3 can be configured to also send push-based notifications via SNS. This would require running Vector in a way where AWS can send requests to it.
This may be a worthwhile approach in the future as it would result in faster
object ingestion than polling SQS, but I think SQS's pull-based model is likely
to be much more easy to use out-of-the-box as it does not require exposing
a vector
instance to incoming network requests and SQS consumers are more
easily parallelizable (SNS would require a load balancer).
I'd recommend tabling this one for now.
As mentioned in #1017, another approach is to long-poll the bucket; listing all of the objects, and processing any new ones.
This approach is liable to take a long time to process a bucket (hours or days with large buckets) and so is not the most generally applicable approach. It can work for small buckets or if a tightly scoped object prefix is used.
This RFC will leave room for this, and possibly other, approaches to be added in the future.
One recommendation in a comment was to store object created messages in DynamoDB (via Lambda). This could certainly work, but is a more complicated pipeline and I'm unsure how common it would be.
I'd recommend tabling this one and see if anyone asks for it. I consider this approach representative of more complex S3 pipelines (e.g. you could imagine having the Lambda write the metadata to RDS instead).
It is possible to have AWS SQS directly call a Lambda so a user could attempt to run Vector in a Lambda. We've generally tried to stay away from recommending running Vector in a Lambda due to the fact that Lambda's are largely stateless and some Vector components are stateful.
- We could attempt to make the compression handling more robust through falling back to "magic bytes" of the object to see if it appears to be compressed. We may want to have the user opt into this behavior though as I could see it causing issues in edge cases where the files are not compressed but happen to match the magic bytes.
- Submit a PR with the initial implementation of source
- Allow deletion, or changing the storage class, of S3 objects after they've been processed
- Conditionally pull object tags as additional metadata (requires an extra AWS API request per object)
- Enhancing parallelization within vector. For example, could process N messages at a time
- Update message visibility timeout if an object is still actively being
processed by an active vector process to avoid it being reprocessed by another
vector
. Maybe this would be optional? - Possibly adding rate limit parameters. I don't think they'd really be useful in this case though given the high number of requests/s S3 allows.
- Consider reusing compression handling logic with
file
source