Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37005][table] Make StreamExecDeduplicate ouput insert only where possible #26051

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

pnowojski
Copy link
Contributor

What is the purpose of the change

This PR:

  • implements the RowTimeDeduplicateKeepFirstRowFunction using watermarks to avoid retracting previous results
  • changes planner to take into account that both proc time and rowtime deduplicate in keep first row variants are append only

Thanks to that, planner can avoid costly operators like SinkUpsertMaterializer downstream the whole query can be append-only - removing a need for retracting/upserting results from the output table.

Currently there is no variant for the async state backend.

Verifying this change

This adds new tests and is also covered by existing ITCases.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@pnowojski
Copy link
Contributor Author

@lincoln-lil @xuyangzhong can you take a look at this? You were recently involved in a couple of changes around here: FLINK-36837 and FLINK-34702.

I could try to implement the async append-only row-time version, but I would probably need a bit of guidance from your side.

For example, the function that I introduced RowTimeDeduplicateKeepFirstRowFunction doesn't output the data in one step and it also relies on firing timers. Are there any guarantees to the order of firing timers interleaved with pending async state access for the given key? In other words, if I implement async version of the processElement, is it guaranteed that it will finish before any timer for that key can be fired?

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 22, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@xuyangzhong
Copy link
Contributor

@pnowojski IIUC, the async state operator currently processes watermarks only after all prior async state requests have been completed. That means when the event time timer is triggered, all preceding async state access operations have already been processed. Please correct me if there is any mistake @Zakelly .

@Zakelly
Copy link
Contributor

Zakelly commented Jan 23, 2025

@pnowojski Glad to see you here. @xuyangzhong is right, you have that guarantee. The same-key records and timers will happen in order of arrival. That is, when watermark advance, the timer will fire after all arrived records for that key finishes. And any records for that key arrive after the watermark advance will start processing after the timer finishes.

@pnowojski
Copy link
Contributor Author

Thanks for your answers @Zakelly and @xuyangzhong!

One more thing. At the moment I have only sync implementation of the append-only deduplicate operator. I guess there is no harm if this is merged as is, so that the synchronous version will be used despite isAsyncStateEnabled() being enabled, right? The code will remain correct? And I can provide async version in a separate PR/ticket? (relevant code is in the StreamExecDeduplicate).

Copy link
Contributor

@Zakelly Zakelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pnowojski Yes. And the async state version could be implemented in a separate PR. One suggestion if so:

Comment on lines +368 to +370
return new AsyncKeyedProcessOperator<>(
new RowTimeDeduplicateKeepFirstRowFunction(
rowTypeInfo, stateRetentionTime, rowtimeIndex));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use the sync version of operator if the function is for sync state:

Suggested change
return new AsyncKeyedProcessOperator<>(
new RowTimeDeduplicateKeepFirstRowFunction(
rowTypeInfo, stateRetentionTime, rowtimeIndex));
return new KeyedProcessOperator<>(
new RowTimeDeduplicateKeepFirstRowFunction(
rowTypeInfo, stateRetentionTime, rowtimeIndex));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants