Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37005][table] Make StreamExecDeduplicate ouput insert only where possible #26051

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepFirstRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepLastRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateKeepFirstRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchDeduplicateFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchLatestChangeDeduplicateFunction;
import org.apache.flink.table.runtime.operators.deduplicate.asyncprocessing.AsyncStateRowTimeDeduplicateFunction;
Expand All @@ -70,6 +71,7 @@
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES_ENABLED;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* Stream {@link ExecNode} which deduplicate on keys and keeps only first row or last row. This node
Expand Down Expand Up @@ -98,6 +100,7 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
public static final String FIELD_NAME_KEEP_LAST_ROW = "keepLastRow";
public static final String FIELD_NAME_GENERATE_UPDATE_BEFORE = "generateUpdateBefore";
public static final String STATE_NAME = "deduplicateState";
public static final String FIELD_NAME_OUTPUT_INSERT_ONLY = "outputInsertOnly";

@JsonProperty(FIELD_NAME_UNIQUE_KEYS)
private final int[] uniqueKeys;
Expand All @@ -116,11 +119,23 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
@JsonInclude(JsonInclude.Include.NON_NULL)
private final List<StateMetadata> stateMetadataList;

/**
* For backward compatibility, if plan was generated without insert-only requirement, insertOnly
* will be absent in the json (null) and we interpret that as false to use old code path and
* avoid a problematic migration from {@link RowTimeDeduplicateFunction} to {@link
* RowTimeDeduplicateKeepFirstRowFunction}.
*/
@Nullable
@JsonProperty(FIELD_NAME_OUTPUT_INSERT_ONLY)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final Boolean outputInsertOnly;

public StreamExecDeduplicate(
ReadableConfig tableConfig,
int[] uniqueKeys,
boolean isRowtime,
boolean keepLastRow,
boolean outputInsertOnly,
boolean generateUpdateBefore,
InputProperty inputProperty,
RowType outputType,
Expand All @@ -132,6 +147,7 @@ public StreamExecDeduplicate(
uniqueKeys,
isRowtime,
keepLastRow,
outputInsertOnly,
generateUpdateBefore,
StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, STATE_NAME),
Collections.singletonList(inputProperty),
Expand All @@ -147,6 +163,7 @@ public StreamExecDeduplicate(
@JsonProperty(FIELD_NAME_UNIQUE_KEYS) int[] uniqueKeys,
@JsonProperty(FIELD_NAME_IS_ROWTIME) boolean isRowtime,
@JsonProperty(FIELD_NAME_KEEP_LAST_ROW) boolean keepLastRow,
@Nullable @JsonProperty(FIELD_NAME_OUTPUT_INSERT_ONLY) Boolean outputInsertOnly,
@JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean generateUpdateBefore,
@Nullable @JsonProperty(FIELD_NAME_STATE) List<StateMetadata> stateMetadataList,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
Expand All @@ -157,6 +174,7 @@ public StreamExecDeduplicate(
this.uniqueKeys = checkNotNull(uniqueKeys);
this.isRowtime = isRowtime;
this.keepLastRow = keepLastRow;
this.outputInsertOnly = outputInsertOnly == null ? false : outputInsertOnly;
this.generateUpdateBefore = generateUpdateBefore;
this.stateMetadataList = stateMetadataList;
}
Expand Down Expand Up @@ -187,6 +205,7 @@ protected Transformation<RowData> translateToPlanInternal(
rowSerializer,
inputRowType,
keepLastRow,
outputInsertOnly,
generateUpdateBefore,
stateRetentionTime)
.createDeduplicateOperator();
Expand Down Expand Up @@ -224,7 +243,7 @@ protected Transformation<RowData> translateToPlanInternal(

/** Base translator to create deduplicate operator. */
private abstract static class DeduplicateOperatorTranslator {
private final ReadableConfig config;
protected final ReadableConfig config;
protected final InternalTypeInfo<RowData> rowTypeInfo;
protected final TypeSerializer<RowData> typeSerializer;
protected final boolean keepLastRow;
Expand All @@ -251,7 +270,7 @@ protected boolean generateInsert() {
}

protected boolean isMiniBatchEnabled() {
return config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
return StreamExecDeduplicate.isMiniBatchEnabled(config);
}

protected boolean isCompactChanges() {
Expand Down Expand Up @@ -283,13 +302,15 @@ private static class RowtimeDeduplicateOperatorTranslator
extends DeduplicateOperatorTranslator {

private final RowType inputRowType;
private final boolean outputInsertOnly;

protected RowtimeDeduplicateOperatorTranslator(
ReadableConfig config,
InternalTypeInfo<RowData> rowTypeInfo,
TypeSerializer<RowData> typeSerializer,
RowType inputRowType,
boolean keepLastRow,
boolean outputInsertOnly,
boolean generateUpdateBefore,
long stateRetentionTime) {
super(
Expand All @@ -299,6 +320,7 @@ protected RowtimeDeduplicateOperatorTranslator(
keepLastRow,
generateUpdateBefore,
stateRetentionTime);
this.outputInsertOnly = outputInsertOnly;
this.inputRowType = inputRowType;
}

Expand All @@ -313,6 +335,7 @@ OneInputStreamOperator<RowData, RowData> createDeduplicateOperator() {
}
checkArgument(rowtimeIndex >= 0);
if (isMiniBatchEnabled()) {
checkState(!canBeInsertOnly(config, keepLastRow));
CountBundleTrigger<RowData> trigger = new CountBundleTrigger<>(getMiniBatchSize());
if (isCompactChanges()) {
return new KeyedMapBundleOperator<>(
Expand All @@ -339,25 +362,39 @@ OneInputStreamOperator<RowData, RowData> createDeduplicateOperator() {
}
} else {
if (isAsyncStateEnabled()) {
AsyncStateRowTimeDeduplicateFunction processFunction =
new AsyncStateRowTimeDeduplicateFunction(
rowTypeInfo,
stateRetentionTime,
rowtimeIndex,
generateUpdateBefore,
generateInsert(),
keepLastRow);
return new AsyncKeyedProcessOperator<>(processFunction);
if (!keepLastRow && outputInsertOnly) {
checkState(canBeInsertOnly(config, keepLastRow));
// TODO: create async version
return new AsyncKeyedProcessOperator<>(
new RowTimeDeduplicateKeepFirstRowFunction(
rowTypeInfo, stateRetentionTime, rowtimeIndex));
Comment on lines +368 to +370
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use the sync version of operator if the function is for sync state:

Suggested change
return new AsyncKeyedProcessOperator<>(
new RowTimeDeduplicateKeepFirstRowFunction(
rowTypeInfo, stateRetentionTime, rowtimeIndex));
return new KeyedProcessOperator<>(
new RowTimeDeduplicateKeepFirstRowFunction(
rowTypeInfo, stateRetentionTime, rowtimeIndex));

} else {
AsyncStateRowTimeDeduplicateFunction processFunction =
new AsyncStateRowTimeDeduplicateFunction(
rowTypeInfo,
stateRetentionTime,
rowtimeIndex,
generateUpdateBefore,
generateInsert(),
keepLastRow);
return new AsyncKeyedProcessOperator<>(processFunction);
}
} else {
RowTimeDeduplicateFunction processFunction =
new RowTimeDeduplicateFunction(
rowTypeInfo,
stateRetentionTime,
rowtimeIndex,
generateUpdateBefore,
generateInsert(),
keepLastRow);
return new KeyedProcessOperator<>(processFunction);
if (!keepLastRow && outputInsertOnly) {
checkState(canBeInsertOnly(config, keepLastRow));
return new KeyedProcessOperator<>(
new RowTimeDeduplicateKeepFirstRowFunction(
rowTypeInfo, stateRetentionTime, rowtimeIndex));
} else {
return new KeyedProcessOperator<>(
new RowTimeDeduplicateFunction(
rowTypeInfo,
stateRetentionTime,
rowtimeIndex,
generateUpdateBefore,
generateInsert(),
keepLastRow));
}
}
}
}
Expand Down Expand Up @@ -392,6 +429,7 @@ protected ProcTimeDeduplicateOperatorTranslator(
@Override
OneInputStreamOperator<RowData, RowData> createDeduplicateOperator() {
if (isMiniBatchEnabled()) {
checkState(!canBeInsertOnly(config, keepLastRow));
CountBundleTrigger<RowData> trigger = new CountBundleTrigger<>(getMiniBatchSize());
if (keepLastRow) {
ProcTimeMiniBatchDeduplicateKeepLastRowFunction processFunction =
Expand All @@ -412,6 +450,7 @@ OneInputStreamOperator<RowData, RowData> createDeduplicateOperator() {
}
} else {
if (keepLastRow) {
checkState(!canBeInsertOnly(config, keepLastRow));
ProcTimeDeduplicateKeepLastRowFunction processFunction =
new ProcTimeDeduplicateKeepLastRowFunction(
rowTypeInfo,
Expand All @@ -422,11 +461,27 @@ OneInputStreamOperator<RowData, RowData> createDeduplicateOperator() {
generatedEqualiser);
return new KeyedProcessOperator<>(processFunction);
} else {
checkState(canBeInsertOnly(config, keepLastRow));
ProcTimeDeduplicateKeepFirstRowFunction processFunction =
new ProcTimeDeduplicateKeepFirstRowFunction(stateRetentionTime);
return new KeyedProcessOperator<>(processFunction);
}
}
}
}

/**
* Currently, append-only is not supported for mini-batch mode, however this could be supported
* in the future. Proctime keep first row mini-batch operators are already append-only.
*
* <p>keepLastRow can not support append only, as always more recent record will be retracting
* the previous one.
*/
public static boolean canBeInsertOnly(ReadableConfig config, boolean keepLastRow) {
return !keepLastRow && !isMiniBatchEnabled(config);
}

private static boolean isMiniBatchEnabled(ReadableConfig config) {
return config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,28 +109,34 @@ class StreamPhysicalRank(
.item("select", getRowType.getFieldNames.mkString(", "))
}

private def getDeduplicateDescription(isRowtime: Boolean, isLastRow: Boolean): String = {
private def getDeduplicateDescription(
isRowtime: Boolean,
isLastRow: Boolean,
insertOnly: Boolean): String = {
val fieldNames = getRowType.getFieldNames
val orderString = if (isRowtime) "ROWTIME" else "PROCTIME"
val keep = if (isLastRow) "LastRow" else "FirstRow"
s"Deduplicate(keep=[$keep], key=[${partitionKey.toArray.map(fieldNames.get).mkString(", ")}], order=[$orderString])"
s"Deduplicate(keep=[$keep], key=[${partitionKey.toArray.map(fieldNames.get).mkString(", ")}], order=[$orderString], outputInsertOnly=[$insertOnly])"
}

override def translateToExecNode(): ExecNode[_] = {
val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)

if (RankUtil.canConvertToDeduplicate(this)) {
val keepLastRow = RankUtil.keepLastDeduplicateRow(orderKey)
val tableConfig = unwrapTableConfig(this)
val outputInsertOnly = StreamExecDeduplicate.canBeInsertOnly(tableConfig, keepLastRow)

new StreamExecDeduplicate(
unwrapTableConfig(this),
tableConfig,
partitionKey.toArray,
sortOnRowTime,
keepLastRow,
outputInsertOnly,
generateUpdateBefore,
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getDeduplicateDescription(sortOnRowTime, keepLastRow))
getDeduplicateDescription(sortOnRowTime, keepLastRow, outputInsertOnly))
} else {
new StreamExecRank(
unwrapTableConfig(this),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.planner.plan.`trait`._
import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone, onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER}
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate
import org.apache.flink.table.planner.plan.nodes.physical.stream._
import org.apache.flink.table.planner.plan.utils._
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy, RetractStrategy, UpdateFastStrategy}
Expand Down Expand Up @@ -220,6 +221,38 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val providedTrait = ModifyKindSetTrait.INSERT_ONLY
createNewNode(rel, children, providedTrait, requiredTrait, requester)

case rank: StreamPhysicalRank if RankUtil.isDeduplication(rank) =>
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
val tableConfig = unwrapTableConfig(rank)

// if the rank is deduplication and can be executed as insert-only, forward that information
val insertOnly = children
.filterNot(
rel => {
rel.getTraitSet.contains(ModifyKindSetTrait.INSERT_ONLY)
})
.isEmpty

val providedTrait = {
if (
insertOnly && StreamExecDeduplicate.canBeInsertOnly(
tableConfig,
RankUtil.keepLastDeduplicateRow(rank.orderKey))
) {
// Deduplicate outputs append only if first row is kept and mini batching is disabled
ModifyKindSetTrait.INSERT_ONLY
} else {
ModifyKindSetTrait.ALL_CHANGES
}
}

createNewNode(rel, children, providedTrait, requiredTrait, requester)

case rank: StreamPhysicalRank if !RankUtil.isDeduplication(rank) =>
// Rank supports consuming all changes
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
createNewNode(rel, children, ModifyKindSetTrait.ALL_CHANGES, requiredTrait, requester)

case limit: StreamPhysicalLimit =>
// limit support all changes in input
val children = visitChildren(limit, ModifyKindSetTrait.ALL_CHANGES)
Expand All @@ -230,8 +263,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
}
createNewNode(limit, children, providedTrait, requiredTrait, requester)

case _: StreamPhysicalRank | _: StreamPhysicalSortLimit =>
// Rank and SortLimit supports consuming all changes
case _: StreamPhysicalSortLimit =>
// SortLimit supports consuming all changes
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
createNewNode(rel, children, ModifyKindSetTrait.ALL_CHANGES, requiredTrait, requester)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ Calc(select=[a, b, c])

== Optimized Execution Plan ==
Calc(select=[a, b, c])
+- Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME])
+- Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME], outputInsertOnly=[true])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b, c, rowtime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])
Expand Down Expand Up @@ -341,9 +341,9 @@ Calc(select=[a, b, c])
} ]
}, {
"id" : ,
"type" : "Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME])",
"type" : "Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME], outputInsertOnly=[true])",
"pact" : "Operator",
"contents" : "Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME])",
"contents" : "Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME], outputInsertOnly=[true])",
"parallelism" : 2,
"predecessors" : [ {
"id" : ,
Expand Down Expand Up @@ -386,7 +386,7 @@ Calc(select=[a, b, c])

== Optimized Execution Plan ==
Calc(select=[a, b, c])
+- Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME])
+- Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME], outputInsertOnly=[true])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b, c, rowtime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])
Expand Down Expand Up @@ -438,7 +438,7 @@ Calc(select=[a, b, c])
"id" : ,
"type" : "Deduplicate[]",
"pact" : "Operator",
"contents" : "[]:Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME])",
"contents" : "[]:Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME], outputInsertOnly=[true])",
"parallelism" : 2,
"predecessors" : [ {
"id" : ,
Expand Down
Loading