Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADD: add replace.null.with.default for JdbcSink #1433

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

SML0127
Copy link

@SML0127 SML0127 commented Sep 10, 2024

Problem

When used with Kafka connect, there are cases where null values ​in the record ​are changed to default values.
In kafka-connect library v3.5, replace.null.with.default option was added to prevent null values ​​from always being replaced with default values ​​when converting json.
But, the column constraints related to default values ​​still remain in the record, so null values ​​are entered as default values ​​when building insert/update queries in JdbcSink.

I don't think there is a problem with the current query build logic where null values ​​are replaced with default values ​​if there is a default value constraint. However, since this connector used with kafka-connect and kafka-connect supports replace.null.with.default, so I think it is necessary to support this option.

Solution

Add replace.null.with.default option to choose whether null values should be changed to default values or not.

Does this solution apply anywhere else?
  • yes
  • no
If yes, where?

Test Strategy

Testing done:
  • Unit tests
  • Integration tests
  • System tests
  • Manual tests

Release Plan

@SML0127 SML0127 requested a review from a team as a code owner September 10, 2024 04:37
Comment on lines 175 to +182
) throws SQLException {
for (final String fieldName : fieldsMetadata.nonKeyFieldNames) {
final Field field = record.valueSchema().field(fieldName);
bindField(index++, field.schema(), valueStruct.get(field), fieldName);
if (this.replaceNullWithDefault) {
bindField(index++, field.schema(), valueStruct.get(field), fieldName);
} else {
bindField(index++, field.schema(), valueStruct.getWithoutDefault(field.name()), fieldName);
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Major changes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant