Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37180][table] Support running stateless PTFs #26076

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

twalthr
Copy link
Contributor

@twalthr twalthr commented Jan 24, 2025

What is the purpose of the change

This implements the first runnable version of stateless PTFs. Some parts of the code base are prepared for future features such as state or multiple inputs for not starting from scratch.

Restore tests will be added once state is supported.

Brief change log

  • Implement StreamExecNodeProcessTableFunction
  • Implement ProcessTableOperator
  • Make uid optional for stateless PTFs
  • Do not allow pass through columns for updating inputs
  • Fix various shortcomings in the type inference and Calcite integration

Verifying this change

This change added tests and can be verified as follows:

  • ProcessTableFunctionSemanticTests for semantically correct behavior
  • ProcessTableFunctionTests for planner and error behavior

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 24, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

/**
* Format of unique identifiers for {@link ProcessTableFunction}.
*
* <p>Leading digits are not allowed. This also prevents that a custom PTF uid can infer with
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* <p>Leading digits are not allowed. This also prevents that a custom PTF uid can infer with
* <p>Leading digits are not allowed. This also prevents that a custom PTF uid can interfere with

@@ -177,6 +177,14 @@ public List<FunctionTestStep> getSetupFunctionTestSteps() {
.collect(Collectors.toList());
}

/** Convenience method to avoid casting. It assumes that the order of steps is not important. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually faced adj + noun rather than noun + noun... not sure how correct it is...

Suggested change
/** Convenience method to avoid casting. It assumes that the order of steps is not important. */
/** Convenient method to avoid casting. It assumes that the order of steps is not important. */

@@ -135,6 +142,25 @@ private static void checkReservedArgs(List<StaticArgument> staticArgs) {
}
}

private static void checkMultipleTableArgs(List<StaticArgument> staticArgs) {
if (staticArgs.stream().filter(arg -> arg.is(StaticArgumentTrait.TABLE)).count() > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (staticArgs.stream().filter(arg -> arg.is(StaticArgumentTrait.TABLE)).count() > 1) {
if (staticArgs.stream().filter(arg -> arg.is(StaticArgumentTrait.TABLE)).limit(2).count() > 1) {

nit: would it make sense to add limit like?

@@ -81,6 +85,7 @@ public static TypeInference of(FunctionKind functionKind, TypeInference origin)
return builder.build();
}

@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public static boolean isValidUidForProcessTableFunction(String uid) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe invert it then?

@@ -145,15 +144,15 @@ public Optional<DataType> getOutputDataType() {
// TableSemantics
// --------------------------------------------------------------------------------------------

private static class CallBindingTableSemantics implements TableSemantics {
private static class SqlBindingTableSemantics implements TableSemantics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I usually don't have an opinion on names, so feel free to ignore this comment.

You have SqlCallContext, SqlCallBinding, shouldn't it be called SqlCallBindingTableSemantics then?

@@ -85,7 +107,7 @@ public boolean isArgumentLiteral(int pos) {

@Override
public boolean isArgumentNull(int pos) {
return binding.isOperandNull(pos, false);
return binding.isOperandNull(pos, false) || isDefault(pos);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify/add a comment why we can treat default value as null? Will the default be always null?

* An important part of {@link ProcessTableFunction} is the mandatory unique identifier. Even if
* the PTF has no state entries, state or timers might be added later. So a PTF should serve as
* an identifiable black box for the optimizer. UIDs ensure that.
* An important part of {@link ProcessTableFunction} is the mandatory unique identifier for PTFs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity: does this mean if a PTF does not have set semantics it cannot have state nor timers? I guess this is somewhat similar to non-keyed datastream operators, which makes sense.

Do we check that in the validation stack? We could fail early.

Comment on lines +112 to +113
"+I[Bob, {+I[Bob, 12], 1}, 1]",
"+I[Bob, {+I[Bob, 12], 1}, 2]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure myself if we need to change that, but I was slightly confused why this was printed twice if it's grouped by the name. I understood when I checked the function implementation (that every record is emitted twice), but made me wondering at first.

import static org.assertj.core.api.Assertions.assertThat;

/**
* Base for tests that do not fall into the {@link RestoreTestBase} category, but use the {@link
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not sure if we need to always mention RestoreTestBase. I think the TableTestProgramRunner was always meant for multiple purposes.

Comment on lines +92 to +105
@Override
public int[] orderByColumns() {
return new int[0];
}

@Override
public int timeColumn() {
return -1;
}

@Override
public List<String> coPartitionArgs() {
return List.of();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we throw an exception in methods we don't support yet? We do that at other locations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants