[flink] Fix scan parallelism propagating to downstream operators#7914
[flink] Fix scan parallelism propagating to downstream operators#7914ArnavBalyan wants to merge 1 commit into
Conversation
|
cc @JingsongLi thanks! |
|
@ArnavBalyan Thanks for picking this up! The direction is right, but I think there are a few gaps worth addressing before merging:
The PR only propagates options.get(SCAN_PARALLELISM). However, Paimon's parallelism resolution has two sources — see FlinkTableSource#inferSourceParallelism:
This PR only covers case (1). For case (2) — which is the default behavior most users hit — the inferred value is still applied via DataStream#setParallelism() inside the
After this PR, parallelism is applied in two places:
The DataStream#setParallelism(int) overload sets isParallelismConfigured=false, while the planner path sets it to true. Depending on execution order, the planner's
ParallelismProvider#getParallelism() only fully solves the forward-edge issue from Flink 1.19+ (FLIP-367), where the planner explicitly calls setParallelism(p, true) on the |
|
State compatibility / upgrade risk This fix changes the resulting job graph topology for any existing user who has scan.parallelism set or relies on inferred parallelism:
For users upgrading from an older Paimon version with an existing Flink savepoint/checkpoint, this is a breaking topology change:
This needs to be called out explicitly in the PR description / release notes as a breaking change for stateful streaming jobs. |
JingsongLi
left a comment
There was a problem hiding this comment.
Code Review
The direction is correct — overriding ParallelismProvider#getParallelism() is the right way to inform Flink's planner about source-specific parallelism so it can insert rebalance edges. However, I see a concrete correctness issue in the current implementation and a coverage gap.
1. Double-set conflict: getParallelism() vs produceDataStream will fight over isParallelismConfigured
In BaseDataTableSource.getScanRuntimeProvider(), the producer lambda still calls:
sourceBuilder.sourceParallelism(inferSourceParallelism(env)).env(env).build()Inside FlinkSourceBuilder.build() (line 257-258):
if (parallelism != null) {
dataStream.setParallelism(parallelism); // sets isParallelismConfigured = false
}Meanwhile, when getParallelism() returns a non-empty Optional, the Flink planner calls Transformation#setParallelism(p, true) which sets isParallelismConfigured = true. The issue is that produceDataStream() is invoked after the planner configures the transformation — so FlinkSourceBuilder.build() will override isParallelismConfigured back to false, silently re-introducing the forward-edge problem this PR aims to fix.
Suggested fix: When getParallelism() returns a value, the producer lambda should not also set parallelism on the DataStream. One approach:
// In BaseDataTableSource.getScanRuntimeProvider():
Integer scanParallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
return new PaimonDataStreamScanProvider(
!unbounded,
env -> sourceBuilder
.sourceParallelism(scanParallelism != null ? null : inferSourceParallelism(env))
.env(env)
.build(),
tableIdentifier.asSummaryString(),
table,
scanParallelism);Or more cleanly, remove the setParallelism call from the producer when getParallelism() will handle it at the planner level.
2. Inferred parallelism is not propagated to getParallelism()
inferSourceParallelism(env) can compute a non-null parallelism even when SCAN_PARALLELISM is null (via bucket count or split count inference). However, the PR only passes the raw options.get(SCAN_PARALLELISM) to getParallelism(). This means the inferred-parallelism path — which is the default behavior for most users — still suffers from forward-edge propagation.
The challenge is that inference requires access to StreamExecutionEnvironment (which is only available inside produceDataStream), while getParallelism() is called by the planner before that. For a complete fix, you may need to eagerly compute the parallelism outside the lambda (or accept this as a known limitation and document it).
3. Same issue exists in SystemTableSource
SystemTableSource (line 123) calls inferSourceParallelism(env) inside the lambda and sets it on the DataStreamSource directly:
if (parallelism != null) {
dataStreamSource.setParallelism(parallelism);
}This will also override isParallelismConfigured to false. Same conflict as point 1 applies here.
4. Test does not verify the actual fix
testScanProviderGetParallelism only tests Optional.ofNullable wrapping — it does not assert that isParallelismConfigured is set correctly on the resulting Transformation, nor does it verify the downstream operator parallelism behavior. A test that creates a mini pipeline and inspects transformation.isParallelismConfigured() would give confidence the fix actually works end-to-end.
Also, the test is placed in LineageUtilsTest but is unrelated to lineage. Consider placing it in a dedicated test class or in an existing scan-provider test.
Minor
- The new 5-arg constructor is fine for backward compatibility. Consider adding a
@VisibleForTestingannotation on the 4-arg constructor if it's only kept for tests.
|
Thanks for review @JingsongLi @hackergin I'm working on the change could not get to it, will raise shortly! |
Purpose
Tests