Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write CreateTime, offset and partition to dedicated columns to allow pruning #502

Open
anthu opened this issue Oct 15, 2022 · 2 comments
Open

Comments

@anthu
Copy link

anthu commented Oct 15, 2022

Hey all,

After the first rather minutes than hours playing around with the Snowflake Kafka Connector (both SNOWPIPE as well as SNOWPIPE_STREAMING) I'm seeing constant full tables scans when querying the landing area directly. While during my current usecases this might be still performant enough, I could imagine that over time this might become an issue.

Use Cases

  • What are the newest x records ingested to Snowflake.
  • What is the lag on Snowflake side compared to the Highwater on Kafka.
  • Work explorative on raw data
  • efficiently prune the kafka tables, after a period of time, days or months

Current Approach

I would query something like this:

SELECT 
    RECORD_METADATA:partition::varchar as PARTITION, 
    MAX(RECORD_METADATA:offset::number) as MAX_OFFSET, 
    MAX(RECORD_METADATA:CreateTime::varchar::timestamp) as LATEST_RECORD 
FROM LANDING.TOPIC_1 
GROUP BY 1 
ORDER BY 1;

If my understanding is correct (what I could confirm in own experiments) this would result in a full table scan every time.
The general idea is to work with Streams and Tasks to manually schematise the Record as well as the Metadata. But due to the fixed schedule of the task, working on the schematised result increases the delay of access significantly.

Further, in exploratory and monitoring use-cases where you're working on raw data and limiting the analysis interval, it would be incredibly helpful to make use of pruning on the CreateTime field.

Proposal

Based on these thoughts and observations I propose write out CreateTime, offset and partition columns separately and propose to filter based on these relational columns.

I know that schematization is on the roadmap, but according to the design document this will not include the RECORD_METADATA field.
Also I'm happy to contribute this enhancement, if you agree on the points.

@vuphamcs
Copy link

vuphamcs commented Jun 27, 2023

**Edit** I realize this only works if you have schematization enabled which is currently in private preview

You can achieve this with a combination of SMTs, so long as you aren't using one of the unsupported converters.

Below is an example of extracting CreateTime metadata into a column named KCREATETIME and formatting it as a timestamp.

    "transforms": "insertCreateTime, convertKCreateTime",
    "transforms.insertCreateTime.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.insertCreateTime.timestamp.field": "kcreatetime",

    "transforms.convertKCreateTime.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.convertKCreateTime.field": "kcreatetime",
    "transforms.convertKCreateTime.format": "yyyy-MM-dd HH:mm:ss:SSS",
    "transforms.convertKCreateTime.target.type": "string",

It's stored as a VARCHAR because setting "transforms.convertTimestamp.target.type": "Timestamp" didn't seem to work, but partition pruning with this column still works.

@hitesh-yadav-x
Copy link

hitesh-yadav-x commented Nov 19, 2023

Just wanted to share that I was able to get the offset, partition and create time for JSON without using the schema registry. The options you need to set are as below and kafka connect will do the rest to populate the values in the column names given. Schema registry is required if you use any format other than JSON

    "transforms": "insert",
    "transforms.insert.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.insert.timestamp.field": "kafka_create_time",
    "transforms.insert.partition.field": "kafka_partition",
    "transforms.insert.offset.field": "kafka_offset",
    "snowflake.enable.schematization": true

Setting the schematization snowflake.enable.schematization to true for json will help creating the values as separate fields in the target table and hence will help in partition pruning well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants