[SPARK-57512][SQL] Allow RuntimeReplaceable to opt out of eager replacement and survive into the physical plan#56575
[SPARK-57512][SQL] Allow RuntimeReplaceable to opt out of eager replacement and survive into the physical plan#56575cloud-fan wants to merge 12 commits into
Conversation
8436c7c to
10bf8bc
Compare
|
cc @sunchao |
HyukjinKwon
left a comment
There was a problem hiding this comment.
1 blocking, 1 non-blocking, 0 nits. Solid, carefully-commented mechanism; the blocking item is that the materialization step isn't wired into the AQE path.
Design / architecture (1)
QueryExecution.scala:781:MaterializeRuntimeReplaceableis added only toQueryExecution.preparations, not to AQE's stage prep. Under AQE (default) a survivingRuntimeReplaceablereachesCollapseCodegenStagesun-materialized -- see inline.
Suggestions (1)
NormalizePlan.scala:94: full-unfold recursion duplicated withMaterializeRuntimeReplaceable.replace; consider a shared helper -- see inline.
Verification
Traced the survive-then-materialize transformation for result-equivalence: eval/doGenCode delegate to replacement; non-deterministic (Rand) and Unevaluable (With) replacements are gated to the eager path; foldable-but-referencing survivors are handled by literal-only FoldablePropagation + same-batch ConstantFolding; NormalizePlan fully unfolds. All result-bearing dimensions are equivalent or gated -- query results stay correct. The one gap is the AQE wiring (inline): AdaptiveSparkPlanExec is a LeafExecNode so the outer rule can't reach the managed subtree, and AQE's reOptimize re-inserts the survivor via OptimizeCsvJsonExprs, so under AQE the survivor reaches codegen (correct via delegation, but the stated invariant/metrics don't hold).
| // Materialize any RuntimeReplaceable that survived the optimizer into its replacement for | ||
| // the Spark execution path. After columnar/native conversion (so a native engine sees the | ||
| // original expression), before codegen (so Spark codegen never sees a RuntimeReplaceable). | ||
| MaterializeRuntimeReplaceable, |
There was a problem hiding this comment.
MaterializeRuntimeReplaceable is added here, to QueryExecution.preparations, but I don't see it in any of AQE's stage-prep lists (queryStagePreparationRules, queryStageOptimizerRules, postStageCreationRules in AdaptiveSparkPlanExec). Under AQE (default-on) that looks like a gap:
InsertAdaptiveSparkPlanruns first inpreparations, so the plan is wrapped inAdaptiveSparkPlanExec(aLeafExecNode) before this rule runs --transformUpWithSubqueriescan't reach the AQE-managed subtree.- AQE's
reOptimizere-runs the full optimizer (optimizer.execute), which includesOptimizeCsvJsonExprs, so the survivingMultiGetJsonObject(eagerReplace = false) is re-inserted on the re-optimized stage. - AQE's
postStageCreationRulesthen runsApplyColumnarRulesAndInsertTransitions->CollapseCodegenStageswith no materialization in between.
So under AQE a RuntimeReplaceable reaches whole-stage codegen un-materialized. Results stay correct because eval/doGenCode now delegate to replacement, but the invariant this PR relies on ("Spark whole-stage codegen never sees a RuntimeReplaceable", and "only the physical plan shows its Invoke replacement") wouldn't hold under AQE -- which affects EXPLAIN and the per-operator-metrics rationale. The peer prep rules (CollapseCodegenStages, ApplyColumnarRulesAndInsertTransitions, EnsureRequirements, ...) are all present in both the non-AQE and AQE lists; this rule is the only one that isn't.
Would adding MaterializeRuntimeReplaceable to AQE's postStageCreationRules (just before collapseCodegenStagesRule) be the right fix? The two new tests use no-shuffle queries that aren't AQE-wrapped, so an AQE-path (shuffle) test that inspects the AQE final plan would cover this.
There was a problem hiding this comment.
Agreed, this was a real gap. Fixed in b80438b by adding MaterializeRuntimeReplaceable to AQE's postStageCreationRules, right before collapseCodegenStagesRule, mirroring QueryExecution.preparations. I confirmed both newQueryStage and newResultQueryStage apply postStageCreationRules, so shuffle stages and the final result stage are both covered. Added an AQE-path test (repartition shuffle) that asserts no RuntimeReplaceable reaches the AQE finalPhysicalPlan and that results are correct.
One minor correction to the mechanism: AQEOptimizer doesn't actually run OptimizeCsvJsonExprs or spark.experimental.extraOptimizations, so the survivor isn't re-inserted during reOptimize. Instead it originates in the main optimizer and persists into the AQE-managed subtree, which the outer preparations rule can't reach (since AdaptiveSparkPlanExec is a LeafExecNode). So the conclusion stands -- AQE-side materialization is required -- just via a slightly different path.
| } | ||
|
|
||
| private def replaceRuntimeReplaceable(e: Expression): Expression = e match { | ||
| case r: RuntimeReplaceable => replaceRuntimeReplaceable(r.replacement) |
There was a problem hiding this comment.
Nit / non-blocking: this full-unfold recursion (case r: RuntimeReplaceable => replaceRuntimeReplaceable(r.replacement) / case _ => e.mapChildren(...)) is identical to MaterializeRuntimeReplaceable.replace. Two copies of the same unfold can drift over time -- might be worth a small shared helper.
There was a problem hiding this comment.
Done in b80438b. Extracted a shared RuntimeReplaceable.unfold helper (companion object in Expression.scala); both NormalizePlan.normalizeRuntimeReplaceable and MaterializeRuntimeReplaceable.apply now call it, removing the two private copies.
sunchao
left a comment
There was a problem hiding this comment.
One additional inline finding. The AQE materialization issue is already covered by the existing unresolved review thread, so I did not duplicate it.
| // initialized through the `lazy val replacement`, which tree transforms may re-create. | ||
| // - A replacement that contains an `Unevaluable` expression (e.g. `With`) depends on a later | ||
| // logical rule (such as `RewriteWithExpression`) that can only run in the logical phase. | ||
| val cannotSurvive = !replaced.deterministic || replaced.exists(_.isInstanceOf[Unevaluable]) |
There was a problem hiding this comment.
[P1] AttributeReference extends Unevaluable, so this condition is true for every replacement that reads a resolved input column. For example, an eagerReplace = false wrapper whose replacement is Add(attr, Literal(1L)) is still returned as replaced below and never reaches physical/native planning. In practice, the opt-out only works for constants or nodes inserted after ReplaceExpressions—which is why both MultiGetJsonObject and the new test avoid this path.
Could we exclude AttributeReference from this check (as ConvertToLocalRelation.hasUnevaluableExpr already does) and add a direct ReplaceExpressions regression test with a column-based wrapper?
There was a problem hiding this comment.
Good catch, you're exactly right. Fixed in b80438b: the survival gate now reuses ConvertToLocalRelation.hasUnevaluableExpr(replaced), which already excludes AttributeReference, so a replacement that reads an input column (e.g. Add(attr, Literal(1))) can survive instead of being forced down the eager path. Added a direct ReplaceExpressions regression test in OptimizerSuite with a column-based wrapper (ColumnBasedRuntimeReplaceable) that fails without the fix and passes with it.
sunchao
left a comment
There was a problem hiding this comment.
One inline P2 follow-up on the AQE table-cache path.
| // re-insert a surviving `RuntimeReplaceable`, e.g. via `OptimizeCsvJsonExprs`), and | ||
| // `AdaptiveSparkPlanExec` is a `LeafExecNode` that the outer `preparations` rule can't reach, | ||
| // so the materialization must also run here. | ||
| MaterializeRuntimeReplaceable, |
There was a problem hiding this comment.
[P2] This fixes AQE exchange and result stages, but newQueryStage's InMemoryTableScanLike branch still deliberately skips postStageCreationRules, so this materialization rule never reaches table-cache scans. InMemoryScans copies logical filters into InMemoryTableScanExec.predicates; after the scan is wrapped in TableCacheQueryStageExec (a leaf), later result-stage traversal cannot reach that hidden plan. A surviving predicate wrapper whose replacement is supported by CachedBatchSerializer.buildFilter (for example, EqualTo(attr, literal)) is therefore left unrecognized, disabling cached-batch pruning even though the separate FilterExec still produces correct rows.
Could we apply MaterializeRuntimeReplaceable explicitly to the optimized InMemoryTableScanLike before wrapping it, and add an AQE cache/filter test that inspects TableCacheQueryStageExec.plan? The current repartition test covers only the result-stage path, and finalPlan.exists does not traverse query-stage plans.
There was a problem hiding this comment.
Good catch — fixed in 50cc6a1, though at a slightly different layer than suggested.
I materialized at the pushdown consumer (InMemoryTableScanExec, unfolding RuntimeReplaceable in the predicates handed to buildFilter) rather than applying MaterializeRuntimeReplaceable to the optimized InMemoryTableScanLike before wrapping it. Reasoning:
- The scan is a
LeafExecNodethat never reaches whole-stage codegen, so this isn't a codegen-prep concern.MaterializeRuntimeReplaceablesits between columnar conversion andCollapseCodegenStagesspecifically for codegen, which is exactly why theInMemoryTableScanLikebranch skipspostStageCreationRules— extending that rule into a non-codegen leaf would be misplaced. predicatesonly feedbuildFilter(a runtime closure that pattern-matches expression shape), so unfolding them at the consumer is the right layer. It also covers AQE and non-AQE uniformly (non-AQE was relying onQueryExecution.preparationsreaching thepredicatesfield; now both paths go through the same unfold) while keeping the readable node in the plan/EXPLAIN output.
Added an AQE regression test in MaterializeRuntimeReplaceableSuite using an adaptive cached plan, so the scan is wrapped in a TableCacheQueryStageExec leaf (the gap you flagged). A surviving predicate RuntimeReplaceable is pushed into the scan; the test asserts the wrapper survives into scan.predicates and that cached-batch pruning still kicks in — it fails without the fix (readBatches == 10, no pruning). I also added comments at all three sites recording the codegen-vs-consumer split.
One scoping note: this is forward-looking. The current opt-in survivor (multi_get_json_object) isn't a buildFilter-recognizable predicate shape, so no current query regresses; the fix matters for a future survivor whose replacement is a prunable comparison.
There was a problem hiding this comment.
Follow-up: I generalized this beyond the cached-batch case.
First-principle we landed on: a surviving RuntimeReplaceable (eagerReplace = false) must be unfolded to its replacement wherever a predicate leaves Spark's own expression-evaluation engine and is handed to a consumer that interprets its structure or sends it elsewhere — because RuntimeReplaceable is a Spark-internal optimizer concept the consumer can't understand. Internal Spark evaluation (FilterExec/Project codegen + interpreted) needs no unfold: it's handled by the eval/doGenCode delegation plus MaterializeRuntimeReplaceable.
That boundary is realized at three places, all now unfolding:
InMemoryTableScanExec.buildFilter(cached-batch pruning — the case you flagged;CachedBatchSerializeris a@DeveloperApi).DataSourceStrategy.translateLeafNodeFilter(V1 / file source filter pushdown).V2ExpressionBuilder.generateExpression(V2) — and since all V2 pushdown (filters, aggregate functions, aggregate arguments, group-by, sort) funnels throughgenerateExpressionviaPushableExpression/translateAggregation, this single fallback covers V2 filter and aggregate/group-by pushdown. It's a fallback (after the explicit cases) so native high-level pushdown likeAES_ENCRYPTstill wins.
Out of scope (different kind of consumer — column selection, not expression translation, and driven by references which for a survivor equals its children's references): nested-column/schema pruning and partition pruning.
Tests added for all three boundaries (V1/V2 filter translation, V2 aggregate/group-by, and an AQE cached-batch pruning test); each was verified to fail without the corresponding unfold.
Note: I squashed the branch into a single commit.
6cdb81d to
b80438b
Compare
…cement and survive into the physical plan Add `RuntimeReplaceable.eagerReplace` (default `true`, preserving current behavior). An expression can override it to `false` to survive the logical optimizer into the physical plan, so a native engine can match the high-level expression directly and the optimized plan stays readable. `ReplaceExpressions` still rewrites eagerly when the replacement cannot survive (non-deterministic, or contains an `Unevaluable` other than `AttributeReference`); `RuntimeReplaceableAggregate` is always rewritten. Supporting changes: - `eval`/`doGenCode`/`deterministic`/`foldable` delegate to `replacement`; a new physical-preparation rule `MaterializeRuntimeReplaceable` unfolds survivors before `CollapseCodegenStages` (in both `QueryExecution.preparations` and AQE `postStageCreationRules`). This is required for correctness: whole-stage codegen reasons about `references`/input materialization/CSE via `children`, while the emitted code comes from `replacement`, so an un-materialized survivor can produce invalid generated code. - `FoldablePropagation` only propagates literals; `NormalizePlan` fully unfolds. - Structure-interpreting consumers that receive a predicate leaving Spark's evaluation engine unfold survivors at their boundary: data source filter pushdown (`DataSourceStrategy.translateLeafNodeFilter` for V1, `V2ExpressionBuilder` for V2 which also covers aggregate/group-by) and cached-batch pruning (`InMemoryTableScanExec.buildFilter`). - First adopter: `MultiGetJsonObject` (inserted by `OptimizeCsvJsonExprs` after `ReplaceExpressions`) becomes a surviving `RuntimeReplaceable` with an `Invoke` replacement. Co-authored-by: Isaac
f459bc6 to
5251cd7
Compare
…essions/randomExpressions.scala
sunchao
left a comment
There was a problem hiding this comment.
One inline P2 follow-up on compound V2 filter reconstruction.
| // natively is a Spark-internal optimizer node the connector cannot understand. Fall back to its | ||
| // concrete `replacement` so the lowered form can still be pushed -- same boundary rationale as | ||
| // `DataSourceStrategy.translateLeafNodeFilter` (V1) and `CachedBatchSerializer.buildFilter`. | ||
| case r: RuntimeReplaceable => generateExpression(r.replacement, isPredicate) |
There was a problem hiding this comment.
[P2] Preserve the mapping for compound replacement predicates
translateFilterV2WithMapping treats the surviving wrapper as its leaf/other case, but this fallback can return a structural V2And, V2Or, or V2Not. The map then contains only compoundPredicate -> originalWrapper. If a SupportsPushDownV2Filters source returns that predicate unchanged because it cannot fully push it (as the interface contract requires), rebuildExpressionFromFilter matches the compound node first and recursively visits children that were created inside V2ExpressionBuilder and have no map entries. Planning then throws Failed to rebuild Expression for filter instead of retaining a post-scan filter. For example, this occurs for an eagerReplace = false wrapper whose replacement is And(a > 1, b < 2) on a rejecting V2 source.
Could we either unfold before entering the mapping-aware recursion, or make rebuilding prefer an exact map entry before descending, and add a rejecting-source regression test? The new test exercises only a leaf GreaterThan translation, so it does not cover this reconstruction path.
There was a problem hiding this comment.
Good catch, fixed in cafddb8 with the "prefer an exact map entry before descending" option.
rebuildExpressionFromFilter now checks translatedFilterToExpr.get(predicate) before matching V2And/V2Or/V2Not. For the surviving wrapper, the compound V2And is a map key (mapped to the wrapper), so it's restored directly and we never descend into the synthetic children that have no entries. The original readable wrapper is kept in the post-scan filter and materialized into its replacement before codegen by MaterializeRuntimeReplaceable.
This is granularity-correct at every level of descent, so nesting is covered too -- e.g. And(c = 5, wrapper(And(a > 1, b < 2))) rebuilds as And(c = 5, wrapper): the outer V2And isn't a map key so it descends, and the inner V2And(a>1, b<2) is a map key so it's restored before its synthetic children are visited. Normal compound filters are decomposed at translation (only leaves mapped), so the exact lookup misses and falls through to the existing descent -- behavior unchanged.
Added a regression test in DataSourceV2StrategySuite that translates a compound-replacement wrapper via translateFilterV2WithMapping, then feeds the result back through rebuildExpressionFromFilter (equivalent to a rejecting SupportsPushDownV2Filters source returning the predicate unchanged) and asserts the wrapper is restored; it throws Failed to rebuild Expression for filter without the fix.
One note on V1: the V1 path (DataSourceStrategy) does not have this crash. There the unfold sits inside translateLeafNodeFilter, which only recognizes leaf shapes, so a compound replacement simply isn't translated (returns None) -- no pushdown, no map entry, correct results. That's a milder missed-optimization version of the same root cause; I left V1 as-is since fixing it would be an enhancement (and would lose the readable wrapper in the post-scan filter).
…2 filter rebuild A surviving RuntimeReplaceable (eagerReplace = false) is opaque to translateFilterV2WithMapping (matched as a leaf), so when its unfolded replacement is a compound predicate it is mapped as a single V2And/V2Or/V2Not entry while the synthetic children have no entries. rebuildExpressionFromFilter descended structurally first and threw "Failed to rebuild Expression for filter" on those un-mapped children when a SupportsPushDownV2Filters source returned the predicate unchanged. Prefer an exact map entry before descending, restoring the readable wrapper (materialized later by MaterializeRuntimeReplaceable). Normal compound filters are decomposed at translation so only their leaves are mapped; the exact lookup misses and falls through to the existing descent, keeping behavior unchanged. Co-authored-by: Isaac
…eplacement is rewritten eagerly The cannotSurvive gate in ReplaceExpressions forces eager rewrite of an eagerReplace = false expression when its replacement is non-deterministic or unevaluable. The non-deterministic branch was untested: Uniform uses the default eagerReplace = true, so it never exercises cannotSurvive. Add a regression test with an eagerReplace = false wrapper whose replacement is a Rand, asserting ReplaceExpressions rewrites it (no survivor remains). Co-authored-by: Isaac
…bject and keep cached-scan predicates readable on the non-AQE path Two small consistency fixes for the survival path: - MultiGetJsonObject lost its 'nullIntolerant = true' hint when it became a RuntimeReplaceable, because RuntimeReplaceable does not delegate nullIntolerant. Since the node now survives into the optimized logical plan, the hint must live on the node so NullPropagation and IsNotNull constraint inference still see it. Restore it on the node. - MaterializeRuntimeReplaceable now skips InMemoryTableScanExec, so its pushed-down predicates keep the readable wrapper in the plan/EXPLAIN on the non-AQE path too (they are unfolded at their consumer, filteredCachedBatches). Previously the rule reached the scan only on the non-AQE path, making the in-plan representation differ between AQE and non-AQE. This matches the rule's own docstring and the AQE InMemoryTableScanLike skip. Co-authored-by: Isaac
…sions.scala Co-authored-by: Isaac
…uite.scala Co-authored-by: Isaac
sunchao
left a comment
There was a problem hiding this comment.
One inline P2 follow-up on throwable metadata for surviving RuntimeReplaceable expressions.
| // non-deterministic `Rand`. This matters once a `RuntimeReplaceable` may survive into the | ||
| // physical plan (see `eagerReplace`): the survival decision relies on an accurate determinism | ||
| // signal. | ||
| override lazy val deterministic: Boolean = replacement.deterministic |
There was a problem hiding this comment.
[P2] Delegate throwable to the surviving replacement
RuntimeReplaceable now delegates deterministic and foldable to replacement, but it still inherits Expression.throwable, which checks only the wrapper's original children. A surviving wrapper can therefore look non-throwing even when its replacement contains an explicitly throwable expression such as Sequence(..., step).
For a concrete example, consider a boolean wrapper whose replacement is size(sequence(start, stop, step)) > 0, and an inner join with these inputs:
- left row
(key=1, start=0, stop=1, step=1); right containskey=1 - left row
(key=2, start=0, stop=1, step=-1); right has nokey=2
With the predicate above the join, the second row is removed by the join before sequence(0, 1, -1) is evaluated, so the query succeeds. With eagerReplace = false, the wrapper reports throwable = false; PushPredicateThroughJoin moves it onto the left input, and the second row now throws Illegal sequence boundaries before the join can discard it. Physical materialization happens too late to undo that relocation. Spark's existing SPARK-46707 test specifically keeps explicit-step Sequence predicates above joins for this reason.
Could we also delegate this metadata, e.g. override lazy val throwable: Boolean = replacement.throwable, and add the wrapped join case as a regression test?
There was a problem hiding this comment.
Good catch, fixed in bd8f0ee.
RuntimeReplaceable now does override lazy val throwable: Boolean = replacement.throwable, alongside the existing deterministic/foldable delegations. You're right that the inherited children.exists(_.throwable) was wrong for a survivor: the children are the original arguments (non-throwing literal bounds) while the replacement is where the throwing Sequence lives, so PushPredicateThroughJoin (the SPARK-46707 guard) would relocate the predicate below the join and surface Illegal sequence boundaries on rows the join would otherwise discard.
Added a FilterPushdownSuite regression test mirroring the existing SPARK-46707 join test: a wrapper whose children (x.a, x.b) are non-throwing but whose replacement is an explicit-step Sequence; the predicate must stay above the join. It fails without the delegation (the wrapper is pushed onto the left input) and passes with it.
One implementation note for whoever reviews the test: the wrapper is deliberately not an InheritAnalysisRules expression. With InheritAnalysisRules the replacement becomes the wrapper's child, so the inherited children.exists(_.throwable) would already observe the throwing Sequence and the test would pass even without the fix. The test wrapper keeps its original args as children (the Uniform / MultiGetJsonObject shape) so it actually exercises the gap.
…ntimeReplaceable A surviving `RuntimeReplaceable` (`eagerReplace = false`) still inherited `Expression.throwable`, which inspects only the wrapper's original `children`. When the children are non-throwing but the `replacement` contains an explicitly throwing expression (e.g. a step-`Sequence`), the wrapper reported `throwable = false`, so `PushPredicateThroughJoin` could relocate it below a join and surface an error on rows the join would otherwise discard (the case SPARK-46707 guards against). Delegate `throwable` to `replacement`, mirroring the existing `deterministic` and `foldable` delegations. Add a `FilterPushdownSuite` regression test using a wrapper whose children are non-throwing but whose replacement is a throwing explicit-step `Sequence`; the predicate must stay above the join. Co-authored-by: Isaac
| // which already excludes it. | ||
| val cannotSurvive = | ||
| !replaced.deterministic || ConvertToLocalRelation.hasUnevaluableExpr(replaced) | ||
| if (r.eagerReplace || cannotSurvive) replaced else r |
There was a problem hiding this comment.
[P2] Force eager replacement when later finish-analysis rules need a hidden expression
This gate also lets through replacements that require a later finish-analysis rewrite. For example, consider a non-InheritAnalysisRules wrapper with an input timestamp child, eagerReplace = false, and replacement = Greatest(Seq(CurrentTimestamp(), child)). The replacement is deterministic and evaluable, and the input attribute makes it non-foldable, so the wrapper survives here. However, the replacement is not one of the wrapper's children, so its tree-pattern bits do not expose CURRENT_LIKE. ComputeCurrentTime, which runs after ReplaceExpressions, prunes this subtree and never replaces CurrentTimestamp with the single query-start literal. MaterializeRuntimeReplaceable exposes it only during physical preparation, where CurrentTimestamp.eval calls Instant.now() as rows are evaluated. In a focused 20,000-row regression, this produced multiple distinct timestamps within one query.
Before this PR, ReplaceExpressions always exposed the replacement before ComputeCurrentTime. Could we force eager replacement at least when replaced.containsPattern(CURRENT_LIKE) (or otherwise run the remaining finish-analysis rewrites on the hidden replacement)? Please keep the regression attribute-dependent so constant folding cannot mask the problem.
| // Materialize any RuntimeReplaceable that survived the optimizer into its replacement for | ||
| // the Spark execution path. After columnar/native conversion (so a native engine sees the | ||
| // original expression), before codegen (so Spark codegen never sees a RuntimeReplaceable). | ||
| MaterializeRuntimeReplaceable, |
There was a problem hiding this comment.
[P2] Expose structural equality before join-key extraction
Materializing at this point is too late for planner consumers. A supported eagerReplace = false binary predicate with replacement = EqualTo(leftKey, rightKey) survives the logical optimizer, but ExtractEquiJoinKeys only pattern-matches visible EqualTo / EqualNullSafe conjuncts and therefore extracts no keys from the wrapper. In a focused regression with broadcasting disabled, the batch plan contained CartesianProductExec and no SortMergeJoinExec (while still returning the correct rows), turning a normal equi-join into O(N*M) work that can OOM. A second control showed that an ordinary stream-stream EqualTo is accepted by StreamingJoinStrategy, while the identical equality hidden in this wrapper throws the 'stream-stream join without equality predicate' AnalysisException.
MaterializeRuntimeReplaceable unfolds the condition only after Spark strategies have selected the operator, so it cannot recover a hash/sort-merge/streaming equi-join. Before this PR, unconditional replacement exposed the equality before planning. Could join-key extraction inspect an unfolded condition (while preserving the original expression where needed), or should such structural predicates be forced to replace eagerly? Batch physical-plan and stream-stream planning regressions would cover both consequences.
…RRENT_LIKE expression A surviving RuntimeReplaceable (eagerReplace = false) exposes only the RUNTIME_REPLACEABLE pattern of its children, not the pattern bits of its hidden replacement. The finish-analysis rules ComputeCurrentTime / ReplaceCurrentLike run right after ReplaceExpressions and prune on CURRENT_LIKE, so a CurrentTimestamp / CurrentDate / Now hidden in a survivor's replacement is never folded into the single query-start literal and would be evaluated per row at execution, yielding multiple distinct values within one query. Extend the cannotSurvive gate so a replacement containing CURRENT_LIKE is rewritten eagerly, mirroring the existing Unevaluable handling. Adds an attribute-dependent OptimizerSuite regression so constant folding cannot mask the problem. Co-authored-by: Isaac
…JoinKeys A surviving equi-join predicate whose replacement is an EqualTo is hidden from ExtractEquiJoinKeys, which only pattern-matches visible EqualTo / EqualNullSafe conjuncts. The join then extracts no keys and degrades to a CartesianProductExec (batch), and stream-stream join analysis fails with "without equality predicate" since StreamingJoinStrategy shares this extractor. Unfold a predicate's RuntimeReplaceable (gated on the tree pattern) before matching, so key extraction sees the underlying equality. Only the extracted keys use the unfolded form; non-key predicates stay in their original survivor form so MaterializeRuntimeReplaceable still materializes them before codegen and EXPLAIN keeps the readable node. Adds a batch join-planning regression asserting SortMergeJoinExec rather than CartesianProductExec. Co-authored-by: Isaac
Co-authored-by: Isaac
Co-authored-by: Isaac
sunchao
left a comment
There was a problem hiding this comment.
Follow-up review on 09e4f260: the exact CURRENT_LIKE and root-EqualTo cases are improved, but the broader correctness contracts are still incomplete in the cases below.
| val cannotSurvive = | ||
| !replaced.deterministic || | ||
| ConvertToLocalRelation.hasUnevaluableExpr(replaced) || | ||
| replaced.containsPattern(CURRENT_LIKE) |
There was a problem hiding this comment.
[P2] Cover every required finish-analysis rewrite
This only forces eager replacement for CURRENT_LIKE, but the same survivor opacity still bypasses other mandatory rules that run after ReplaceExpressions. For example, a deterministic, non-foldable wrapper with replacement = ArrayDistinct(arrayDoubleAttr) hides ARRAY_DISTINCT from both NormalizeFloatingNumbers passes; in a focused regression, [-0.0d, 0.0d] remained two elements instead of one canonical positive zero. Likewise, a wrapper whose replacement contains Cast(Literal("epoch"), DateType, ...) hides CAST from SpecialDatetimeValues; it returned NULL in legacy mode (and would throw in ANSI mode) instead of 1970-01-01.
MaterializeRuntimeReplaceable exposes these expressions only after logical optimization, so neither rewrite gets another chance. Could the survival gate cover every replacement requiring a mandatory post-ReplaceExpressions rewrite, or run those rewrites over the hidden replacement, with regressions for these two cases? A CURRENT_LIKE-only guard still permits wrong results and runtime errors.
| @@ -190,30 +191,47 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { | |||
| // Find equi-join predicates that can be evaluated before the join, and thus can be used | |||
| // as join keys. | |||
| val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil) | |||
There was a problem hiding this comment.
[P2] Split the unfolded condition for compound replacements
The condition is split while the outer RuntimeReplaceable is still opaque. If a supported survivor has replacement = And(EqualTo(left.k, right.k), GreaterThan(left.v, 0)), splitConjunctivePredicates produces the wrapper as one predicate; unfoldKeyPredicate then yields an And, which none of the top-level equality cases match. A focused regression still planned this as CartesianProductExec, and the equivalent stream-stream join is still rejected as lacking an equality predicate. This compound replacement shape is already treated as supported by TestCompoundPredicateRuntimeReplaceable in the V2 tests.
Could we expose the replacement before splitting (while retaining the mapping to the original survivor where needed), and add batch plus streaming coverage for a compound replacement?
| // keeps showing the readable node. The unfold is gated on the tree pattern so the common | ||
| // survivor-free join is unaffected. | ||
| def unfoldKeyPredicate(p: Expression): Expression = | ||
| if (p.containsPattern(RUNTIME_REPLACEABLE)) RuntimeReplaceable.unfold(p) else p |
There was a problem hiding this comment.
[P2] Preserve nested survivors when exposing the outer equality
This recursively unfolds every RuntimeReplaceable in the predicate, including a survivor that is already a visible join-key operand. For EqualTo(partitionKey, Survivor(dimensionKey)), the old extractor retained Survivor(dimensionKey) as the physical key; this code emits only its low-level replacement, so native planning can no longer match the high-level key.
It also makes DPP invalid: ExtractEquiJoinKeys returns the unfolded build key, but PartitionPruning inspects the original equality and calls joinKeys.indexOf(Survivor(dimensionKey)), producing -1. The resulting DynamicPruningSubquery is unresolved and fails default plan validation; with validation disabled, the negative index later reaches buildKeys(-1). In a focused planning regression, the visible survivor key was already reduced to (k + 0) before columnar/native conversion.
Could this unfold only the outer survivor needed to reveal an equality, preserving already-visible key operands, with a DPP regression for this shape?
What changes were proposed in this pull request?
Today every
RuntimeReplaceableis rewritten into itsreplacementbyReplaceExpressionsinthe logical optimizer, so it never reaches the physical plan. This PR adds an opt-in mechanism
that lets a
RuntimeReplaceablesurvive into the physical plan, so expressions can be migratedincrementally:
RuntimeReplaceable.eagerReplace: Boolean, defaulting totrue. With the default, behavioris unchanged: the expression is rewritten eagerly and never reaches the physical plan. An
expression overrides it to
falseto survive.ReplaceExpressionsrewrites eagerly wheneagerReplaceistrue, or when the replacementcannot survive: it is non-deterministic (e.g. the
Randinsideuniform) or contains anUnevaluableexpression (e.g.With, which depends on the logical-phaseRewriteWithExpressionrule).
RuntimeReplaceableAggregateis always rewritten. Everything else witheagerReplace = falsesurvives.MaterializeRuntimeReplaceablerewrites the survivors into theirreplacement after columnar/native conversion and before
CollapseCodegenStages, so Sparkwhole-stage codegen never sees a
RuntimeReplaceableand per-operator metrics stay intact.RuntimeReplaceableis correct:deterministic,foldable,eval, anddoGenCodedelegate toreplacement;FoldablePropagationonlypropagates literals;
NormalizePlanfully unfoldsRuntimeReplaceable.MultiGetJsonObject(inserted byOptimizeCsvJsonExprs, which runs afterReplaceExpressions) becomes aRuntimeReplaceablewitheagerReplace = falseand anInvokereplacement, dropping its hand-written
eval/doGenCode. This was historically impossible for anoptimizer-inserted node and is exactly the use case the mechanism unlocks.
Note on the approach: an earlier revision of this PR made every evaluable
RuntimeReplaceablesurvive the optimizer whenever possible (rewriting early only for the non-deterministic /
unevaluable cases), and full CI was green. That proves the survival mechanism and the supporting
framework changes are sound end to end. This revision keeps the same machinery but gates survival
behind the
eagerReplaceflag (defaulttrue), so existing expressions can be migrated one at atime instead of all at once.
Why are the changes needed?
Keeping the semantic expression (e.g.
right(a, b)) in the plan lets optimizer rules introduceRuntimeReplaceables freely, keeps the high-level expression visible in the optimized logicalplan, and lets a native engine match the high-level expression directly instead of
reverse-engineering its lowered form. The
eagerReplaceflag lets existing expressions migrate tothis behavior one at a time rather than all at once.
MultiGetJsonObjectis the first suchmigration: an optimizer-inserted
RuntimeReplaceablethat previously could not exist.Does this PR introduce any user-facing change?
No query results change. All existing expressions default to
eagerReplace = true, so their plans,EXPLAINoutput, and results are unchanged.MultiGetJsonObjectis only produced under the opt-inspark.sql.optimizer.getJsonObjectSharedParsing.enabledpath (off by default); there, theoptimized logical plan still shows the readable
multi_get_json_objectnode, and only the physicalplan now shows its
Invokereplacement instead of the node itself.How was this patch tested?
New
MaterializeRuntimeReplaceableSuiteverifies that an optimizer rule can insert aneagerReplace = falseRuntimeReplaceable, that it survives the optimizer, is materialized awaybefore execution, and produces correct results.
OptimizeJsonExprsSuiteandJsonFunctionsSuite(including the shared-
get_json_objectcodegen and deeply-nestedevaltests) cover theMultiGetJsonObjectmigration end to end. ExistingOptimizerSuiteand Spark Connect explaingolden tests are unchanged. In addition, an earlier revision of this PR that made every evaluable
RuntimeReplaceablesurvive passed full CI, validating the survival and materialization machineryacross the whole test suite.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.8)