Skip to content

[SPARK-57512][SQL] Allow RuntimeReplaceable to opt out of eager replacement and survive into the physical plan#56575

Open
cloud-fan wants to merge 12 commits into
apache:masterfrom
cloud-fan:rr-selfeval-validation
Open

[SPARK-57512][SQL] Allow RuntimeReplaceable to opt out of eager replacement and survive into the physical plan#56575
cloud-fan wants to merge 12 commits into
apache:masterfrom
cloud-fan:rr-selfeval-validation

Conversation

@cloud-fan

@cloud-fan cloud-fan commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Today every RuntimeReplaceable is rewritten into its replacement by ReplaceExpressions in
the logical optimizer, so it never reaches the physical plan. This PR adds an opt-in mechanism
that lets a RuntimeReplaceable survive into the physical plan, so expressions can be migrated
incrementally:

  • New RuntimeReplaceable.eagerReplace: Boolean, defaulting to true. With the default, behavior
    is unchanged: the expression is rewritten eagerly and never reaches the physical plan. An
    expression overrides it to false to survive.
  • ReplaceExpressions rewrites eagerly when eagerReplace is true, or when the replacement
    cannot survive: it is non-deterministic (e.g. the Rand inside uniform) or contains an
    Unevaluable expression (e.g. With, which depends on the logical-phase RewriteWithExpression
    rule). RuntimeReplaceableAggregate is always rewritten. Everything else with
    eagerReplace = false survives.
  • A new physical-preparation rule MaterializeRuntimeReplaceable rewrites the survivors into their
    replacement after columnar/native conversion and before CollapseCodegenStages, so Spark
    whole-stage codegen never sees a RuntimeReplaceable and per-operator metrics stay intact.
  • Supporting framework changes so a surviving RuntimeReplaceable is correct: deterministic,
    foldable, eval, and doGenCode delegate to replacement; FoldablePropagation only
    propagates literals; NormalizePlan fully unfolds RuntimeReplaceable.
  • First consumer: MultiGetJsonObject (inserted by OptimizeCsvJsonExprs, which runs after
    ReplaceExpressions) becomes a RuntimeReplaceable with eagerReplace = false and an Invoke
    replacement, dropping its hand-written eval/doGenCode. This was historically impossible for an
    optimizer-inserted node and is exactly the use case the mechanism unlocks.

Note on the approach: an earlier revision of this PR made every evaluable RuntimeReplaceable
survive the optimizer whenever possible (rewriting early only for the non-deterministic /
unevaluable cases), and full CI was green. That proves the survival mechanism and the supporting
framework changes are sound end to end. This revision keeps the same machinery but gates survival
behind the eagerReplace flag (default true), so existing expressions can be migrated one at a
time instead of all at once.

Why are the changes needed?

Keeping the semantic expression (e.g. right(a, b)) in the plan lets optimizer rules introduce
RuntimeReplaceables freely, keeps the high-level expression visible in the optimized logical
plan, and lets a native engine match the high-level expression directly instead of
reverse-engineering its lowered form. The eagerReplace flag lets existing expressions migrate to
this behavior one at a time rather than all at once. MultiGetJsonObject is the first such
migration: an optimizer-inserted RuntimeReplaceable that previously could not exist.

Does this PR introduce any user-facing change?

No query results change. All existing expressions default to eagerReplace = true, so their plans,
EXPLAIN output, and results are unchanged. MultiGetJsonObject is only produced under the opt-in
spark.sql.optimizer.getJsonObjectSharedParsing.enabled path (off by default); there, the
optimized logical plan still shows the readable multi_get_json_object node, and only the physical
plan now shows its Invoke replacement instead of the node itself.

How was this patch tested?

New MaterializeRuntimeReplaceableSuite verifies that an optimizer rule can insert an
eagerReplace = false RuntimeReplaceable, that it survives the optimizer, is materialized away
before execution, and produces correct results. OptimizeJsonExprsSuite and JsonFunctionsSuite
(including the shared-get_json_object codegen and deeply-nested eval tests) cover the
MultiGetJsonObject migration end to end. Existing OptimizerSuite and Spark Connect explain
golden tests are unchanged. In addition, an earlier revision of this PR that made every evaluable
RuntimeReplaceable survive passed full CI, validating the survival and materialization machinery
across the whole test suite.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.8)

@cloud-fan cloud-fan changed the title [WIP][SPARK-57512][SQL] Allow evaluable RuntimeReplaceable to survive into the physical plan [SPARK-57512][SQL] Allow RuntimeReplaceable to opt out of eager replacement and survive into the physical plan Jun 20, 2026
@cloud-fan cloud-fan force-pushed the rr-selfeval-validation branch from 8436c7c to 10bf8bc Compare June 20, 2026 08:04
@cloud-fan cloud-fan requested a review from sunchao June 21, 2026 20:41
@cloud-fan

Copy link
Copy Markdown
Contributor Author

cc @sunchao

@HyukjinKwon HyukjinKwon left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 blocking, 1 non-blocking, 0 nits. Solid, carefully-commented mechanism; the blocking item is that the materialization step isn't wired into the AQE path.

Design / architecture (1)

  • QueryExecution.scala:781: MaterializeRuntimeReplaceable is added only to QueryExecution.preparations, not to AQE's stage prep. Under AQE (default) a surviving RuntimeReplaceable reaches CollapseCodegenStages un-materialized -- see inline.

Suggestions (1)

  • NormalizePlan.scala:94: full-unfold recursion duplicated with MaterializeRuntimeReplaceable.replace; consider a shared helper -- see inline.

Verification

Traced the survive-then-materialize transformation for result-equivalence: eval/doGenCode delegate to replacement; non-deterministic (Rand) and Unevaluable (With) replacements are gated to the eager path; foldable-but-referencing survivors are handled by literal-only FoldablePropagation + same-batch ConstantFolding; NormalizePlan fully unfolds. All result-bearing dimensions are equivalent or gated -- query results stay correct. The one gap is the AQE wiring (inline): AdaptiveSparkPlanExec is a LeafExecNode so the outer rule can't reach the managed subtree, and AQE's reOptimize re-inserts the survivor via OptimizeCsvJsonExprs, so under AQE the survivor reaches codegen (correct via delegation, but the stated invariant/metrics don't hold).

// Materialize any RuntimeReplaceable that survived the optimizer into its replacement for
// the Spark execution path. After columnar/native conversion (so a native engine sees the
// original expression), before codegen (so Spark codegen never sees a RuntimeReplaceable).
MaterializeRuntimeReplaceable,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaterializeRuntimeReplaceable is added here, to QueryExecution.preparations, but I don't see it in any of AQE's stage-prep lists (queryStagePreparationRules, queryStageOptimizerRules, postStageCreationRules in AdaptiveSparkPlanExec). Under AQE (default-on) that looks like a gap:

  • InsertAdaptiveSparkPlan runs first in preparations, so the plan is wrapped in AdaptiveSparkPlanExec (a LeafExecNode) before this rule runs -- transformUpWithSubqueries can't reach the AQE-managed subtree.
  • AQE's reOptimize re-runs the full optimizer (optimizer.execute), which includes OptimizeCsvJsonExprs, so the surviving MultiGetJsonObject (eagerReplace = false) is re-inserted on the re-optimized stage.
  • AQE's postStageCreationRules then runs ApplyColumnarRulesAndInsertTransitions -> CollapseCodegenStages with no materialization in between.

So under AQE a RuntimeReplaceable reaches whole-stage codegen un-materialized. Results stay correct because eval/doGenCode now delegate to replacement, but the invariant this PR relies on ("Spark whole-stage codegen never sees a RuntimeReplaceable", and "only the physical plan shows its Invoke replacement") wouldn't hold under AQE -- which affects EXPLAIN and the per-operator-metrics rationale. The peer prep rules (CollapseCodegenStages, ApplyColumnarRulesAndInsertTransitions, EnsureRequirements, ...) are all present in both the non-AQE and AQE lists; this rule is the only one that isn't.

Would adding MaterializeRuntimeReplaceable to AQE's postStageCreationRules (just before collapseCodegenStagesRule) be the right fix? The two new tests use no-shuffle queries that aren't AQE-wrapped, so an AQE-path (shuffle) test that inspects the AQE final plan would cover this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this was a real gap. Fixed in b80438b by adding MaterializeRuntimeReplaceable to AQE's postStageCreationRules, right before collapseCodegenStagesRule, mirroring QueryExecution.preparations. I confirmed both newQueryStage and newResultQueryStage apply postStageCreationRules, so shuffle stages and the final result stage are both covered. Added an AQE-path test (repartition shuffle) that asserts no RuntimeReplaceable reaches the AQE finalPhysicalPlan and that results are correct.

One minor correction to the mechanism: AQEOptimizer doesn't actually run OptimizeCsvJsonExprs or spark.experimental.extraOptimizations, so the survivor isn't re-inserted during reOptimize. Instead it originates in the main optimizer and persists into the AQE-managed subtree, which the outer preparations rule can't reach (since AdaptiveSparkPlanExec is a LeafExecNode). So the conclusion stands -- AQE-side materialization is required -- just via a slightly different path.

}

private def replaceRuntimeReplaceable(e: Expression): Expression = e match {
case r: RuntimeReplaceable => replaceRuntimeReplaceable(r.replacement)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit / non-blocking: this full-unfold recursion (case r: RuntimeReplaceable => replaceRuntimeReplaceable(r.replacement) / case _ => e.mapChildren(...)) is identical to MaterializeRuntimeReplaceable.replace. Two copies of the same unfold can drift over time -- might be worth a small shared helper.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in b80438b. Extracted a shared RuntimeReplaceable.unfold helper (companion object in Expression.scala); both NormalizePlan.normalizeRuntimeReplaceable and MaterializeRuntimeReplaceable.apply now call it, removing the two private copies.

@sunchao sunchao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One additional inline finding. The AQE materialization issue is already covered by the existing unresolved review thread, so I did not duplicate it.

// initialized through the `lazy val replacement`, which tree transforms may re-create.
// - A replacement that contains an `Unevaluable` expression (e.g. `With`) depends on a later
// logical rule (such as `RewriteWithExpression`) that can only run in the logical phase.
val cannotSurvive = !replaced.deterministic || replaced.exists(_.isInstanceOf[Unevaluable])

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] AttributeReference extends Unevaluable, so this condition is true for every replacement that reads a resolved input column. For example, an eagerReplace = false wrapper whose replacement is Add(attr, Literal(1L)) is still returned as replaced below and never reaches physical/native planning. In practice, the opt-out only works for constants or nodes inserted after ReplaceExpressions—which is why both MultiGetJsonObject and the new test avoid this path.

Could we exclude AttributeReference from this check (as ConvertToLocalRelation.hasUnevaluableExpr already does) and add a direct ReplaceExpressions regression test with a column-based wrapper?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, you're exactly right. Fixed in b80438b: the survival gate now reuses ConvertToLocalRelation.hasUnevaluableExpr(replaced), which already excludes AttributeReference, so a replacement that reads an input column (e.g. Add(attr, Literal(1))) can survive instead of being forced down the eager path. Added a direct ReplaceExpressions regression test in OptimizerSuite with a column-based wrapper (ColumnBasedRuntimeReplaceable) that fails without the fix and passes with it.

@sunchao sunchao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One inline P2 follow-up on the AQE table-cache path.

// re-insert a surviving `RuntimeReplaceable`, e.g. via `OptimizeCsvJsonExprs`), and
// `AdaptiveSparkPlanExec` is a `LeafExecNode` that the outer `preparations` rule can't reach,
// so the materialization must also run here.
MaterializeRuntimeReplaceable,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] This fixes AQE exchange and result stages, but newQueryStage's InMemoryTableScanLike branch still deliberately skips postStageCreationRules, so this materialization rule never reaches table-cache scans. InMemoryScans copies logical filters into InMemoryTableScanExec.predicates; after the scan is wrapped in TableCacheQueryStageExec (a leaf), later result-stage traversal cannot reach that hidden plan. A surviving predicate wrapper whose replacement is supported by CachedBatchSerializer.buildFilter (for example, EqualTo(attr, literal)) is therefore left unrecognized, disabling cached-batch pruning even though the separate FilterExec still produces correct rows.

Could we apply MaterializeRuntimeReplaceable explicitly to the optimized InMemoryTableScanLike before wrapping it, and add an AQE cache/filter test that inspects TableCacheQueryStageExec.plan? The current repartition test covers only the result-stage path, and finalPlan.exists does not traverse query-stage plans.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in 50cc6a1, though at a slightly different layer than suggested.

I materialized at the pushdown consumer (InMemoryTableScanExec, unfolding RuntimeReplaceable in the predicates handed to buildFilter) rather than applying MaterializeRuntimeReplaceable to the optimized InMemoryTableScanLike before wrapping it. Reasoning:

  • The scan is a LeafExecNode that never reaches whole-stage codegen, so this isn't a codegen-prep concern. MaterializeRuntimeReplaceable sits between columnar conversion and CollapseCodegenStages specifically for codegen, which is exactly why the InMemoryTableScanLike branch skips postStageCreationRules — extending that rule into a non-codegen leaf would be misplaced.
  • predicates only feed buildFilter (a runtime closure that pattern-matches expression shape), so unfolding them at the consumer is the right layer. It also covers AQE and non-AQE uniformly (non-AQE was relying on QueryExecution.preparations reaching the predicates field; now both paths go through the same unfold) while keeping the readable node in the plan/EXPLAIN output.

Added an AQE regression test in MaterializeRuntimeReplaceableSuite using an adaptive cached plan, so the scan is wrapped in a TableCacheQueryStageExec leaf (the gap you flagged). A surviving predicate RuntimeReplaceable is pushed into the scan; the test asserts the wrapper survives into scan.predicates and that cached-batch pruning still kicks in — it fails without the fix (readBatches == 10, no pruning). I also added comments at all three sites recording the codegen-vs-consumer split.

One scoping note: this is forward-looking. The current opt-in survivor (multi_get_json_object) isn't a buildFilter-recognizable predicate shape, so no current query regresses; the fix matters for a future survivor whose replacement is a prunable comparison.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up: I generalized this beyond the cached-batch case.

First-principle we landed on: a surviving RuntimeReplaceable (eagerReplace = false) must be unfolded to its replacement wherever a predicate leaves Spark's own expression-evaluation engine and is handed to a consumer that interprets its structure or sends it elsewhere — because RuntimeReplaceable is a Spark-internal optimizer concept the consumer can't understand. Internal Spark evaluation (FilterExec/Project codegen + interpreted) needs no unfold: it's handled by the eval/doGenCode delegation plus MaterializeRuntimeReplaceable.

That boundary is realized at three places, all now unfolding:

  • InMemoryTableScanExec.buildFilter (cached-batch pruning — the case you flagged; CachedBatchSerializer is a @DeveloperApi).
  • DataSourceStrategy.translateLeafNodeFilter (V1 / file source filter pushdown).
  • V2ExpressionBuilder.generateExpression (V2) — and since all V2 pushdown (filters, aggregate functions, aggregate arguments, group-by, sort) funnels through generateExpression via PushableExpression/translateAggregation, this single fallback covers V2 filter and aggregate/group-by pushdown. It's a fallback (after the explicit cases) so native high-level pushdown like AES_ENCRYPT still wins.

Out of scope (different kind of consumer — column selection, not expression translation, and driven by references which for a survivor equals its children's references): nested-column/schema pruning and partition pruning.

Tests added for all three boundaries (V1/V2 filter translation, V2 aggregate/group-by, and an AQE cached-batch pruning test); each was verified to fail without the corresponding unfold.

Note: I squashed the branch into a single commit.

@cloud-fan cloud-fan force-pushed the rr-selfeval-validation branch from 6cdb81d to b80438b Compare June 22, 2026 16:59
…cement and survive into the physical plan

Add `RuntimeReplaceable.eagerReplace` (default `true`, preserving current behavior). An
expression can override it to `false` to survive the logical optimizer into the physical
plan, so a native engine can match the high-level expression directly and the optimized
plan stays readable. `ReplaceExpressions` still rewrites eagerly when the replacement
cannot survive (non-deterministic, or contains an `Unevaluable` other than
`AttributeReference`); `RuntimeReplaceableAggregate` is always rewritten.

Supporting changes:
- `eval`/`doGenCode`/`deterministic`/`foldable` delegate to `replacement`; a new
  physical-preparation rule `MaterializeRuntimeReplaceable` unfolds survivors before
  `CollapseCodegenStages` (in both `QueryExecution.preparations` and AQE
  `postStageCreationRules`). This is required for correctness: whole-stage codegen reasons
  about `references`/input materialization/CSE via `children`, while the emitted code comes
  from `replacement`, so an un-materialized survivor can produce invalid generated code.
- `FoldablePropagation` only propagates literals; `NormalizePlan` fully unfolds.
- Structure-interpreting consumers that receive a predicate leaving Spark's evaluation
  engine unfold survivors at their boundary: data source filter pushdown
  (`DataSourceStrategy.translateLeafNodeFilter` for V1, `V2ExpressionBuilder` for V2 which
  also covers aggregate/group-by) and cached-batch pruning
  (`InMemoryTableScanExec.buildFilter`).
- First adopter: `MultiGetJsonObject` (inserted by `OptimizeCsvJsonExprs` after
  `ReplaceExpressions`) becomes a surviving `RuntimeReplaceable` with an `Invoke`
  replacement.

Co-authored-by: Isaac
@cloud-fan cloud-fan force-pushed the rr-selfeval-validation branch from f459bc6 to 5251cd7 Compare June 22, 2026 20:06

@sunchao sunchao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One inline P2 follow-up on compound V2 filter reconstruction.

// natively is a Spark-internal optimizer node the connector cannot understand. Fall back to its
// concrete `replacement` so the lowered form can still be pushed -- same boundary rationale as
// `DataSourceStrategy.translateLeafNodeFilter` (V1) and `CachedBatchSerializer.buildFilter`.
case r: RuntimeReplaceable => generateExpression(r.replacement, isPredicate)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Preserve the mapping for compound replacement predicates

translateFilterV2WithMapping treats the surviving wrapper as its leaf/other case, but this fallback can return a structural V2And, V2Or, or V2Not. The map then contains only compoundPredicate -> originalWrapper. If a SupportsPushDownV2Filters source returns that predicate unchanged because it cannot fully push it (as the interface contract requires), rebuildExpressionFromFilter matches the compound node first and recursively visits children that were created inside V2ExpressionBuilder and have no map entries. Planning then throws Failed to rebuild Expression for filter instead of retaining a post-scan filter. For example, this occurs for an eagerReplace = false wrapper whose replacement is And(a > 1, b < 2) on a rejecting V2 source.

Could we either unfold before entering the mapping-aware recursion, or make rebuilding prefer an exact map entry before descending, and add a rejecting-source regression test? The new test exercises only a leaf GreaterThan translation, so it does not cover this reconstruction path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed in cafddb8 with the "prefer an exact map entry before descending" option.

rebuildExpressionFromFilter now checks translatedFilterToExpr.get(predicate) before matching V2And/V2Or/V2Not. For the surviving wrapper, the compound V2And is a map key (mapped to the wrapper), so it's restored directly and we never descend into the synthetic children that have no entries. The original readable wrapper is kept in the post-scan filter and materialized into its replacement before codegen by MaterializeRuntimeReplaceable.

This is granularity-correct at every level of descent, so nesting is covered too -- e.g. And(c = 5, wrapper(And(a > 1, b < 2))) rebuilds as And(c = 5, wrapper): the outer V2And isn't a map key so it descends, and the inner V2And(a>1, b<2) is a map key so it's restored before its synthetic children are visited. Normal compound filters are decomposed at translation (only leaves mapped), so the exact lookup misses and falls through to the existing descent -- behavior unchanged.

Added a regression test in DataSourceV2StrategySuite that translates a compound-replacement wrapper via translateFilterV2WithMapping, then feeds the result back through rebuildExpressionFromFilter (equivalent to a rejecting SupportsPushDownV2Filters source returning the predicate unchanged) and asserts the wrapper is restored; it throws Failed to rebuild Expression for filter without the fix.

One note on V1: the V1 path (DataSourceStrategy) does not have this crash. There the unfold sits inside translateLeafNodeFilter, which only recognizes leaf shapes, so a compound replacement simply isn't translated (returns None) -- no pushdown, no map entry, correct results. That's a milder missed-optimization version of the same root cause; I left V1 as-is since fixing it would be an enhancement (and would lose the readable wrapper in the post-scan filter).

…2 filter rebuild

A surviving RuntimeReplaceable (eagerReplace = false) is opaque to
translateFilterV2WithMapping (matched as a leaf), so when its unfolded
replacement is a compound predicate it is mapped as a single V2And/V2Or/V2Not
entry while the synthetic children have no entries. rebuildExpressionFromFilter
descended structurally first and threw "Failed to rebuild Expression for filter"
on those un-mapped children when a SupportsPushDownV2Filters source returned the
predicate unchanged.

Prefer an exact map entry before descending, restoring the readable wrapper
(materialized later by MaterializeRuntimeReplaceable). Normal compound filters
are decomposed at translation so only their leaves are mapped; the exact lookup
misses and falls through to the existing descent, keeping behavior unchanged.

Co-authored-by: Isaac
…eplacement is rewritten eagerly

The cannotSurvive gate in ReplaceExpressions forces eager rewrite of an
eagerReplace = false expression when its replacement is non-deterministic
or unevaluable. The non-deterministic branch was untested: Uniform uses the
default eagerReplace = true, so it never exercises cannotSurvive. Add a
regression test with an eagerReplace = false wrapper whose replacement is a
Rand, asserting ReplaceExpressions rewrites it (no survivor remains).

Co-authored-by: Isaac
…bject and keep cached-scan predicates readable on the non-AQE path

Two small consistency fixes for the survival path:

- MultiGetJsonObject lost its 'nullIntolerant = true' hint when it became a
  RuntimeReplaceable, because RuntimeReplaceable does not delegate
  nullIntolerant. Since the node now survives into the optimized logical
  plan, the hint must live on the node so NullPropagation and IsNotNull
  constraint inference still see it. Restore it on the node.

- MaterializeRuntimeReplaceable now skips InMemoryTableScanExec, so its
  pushed-down predicates keep the readable wrapper in the plan/EXPLAIN on the
  non-AQE path too (they are unfolded at their consumer, filteredCachedBatches).
  Previously the rule reached the scan only on the non-AQE path, making the
  in-plan representation differ between AQE and non-AQE. This matches the
  rule's own docstring and the AQE InMemoryTableScanLike skip.

Co-authored-by: Isaac

@sunchao sunchao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One inline P2 follow-up on throwable metadata for surviving RuntimeReplaceable expressions.

// non-deterministic `Rand`. This matters once a `RuntimeReplaceable` may survive into the
// physical plan (see `eagerReplace`): the survival decision relies on an accurate determinism
// signal.
override lazy val deterministic: Boolean = replacement.deterministic

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Delegate throwable to the surviving replacement

RuntimeReplaceable now delegates deterministic and foldable to replacement, but it still inherits Expression.throwable, which checks only the wrapper's original children. A surviving wrapper can therefore look non-throwing even when its replacement contains an explicitly throwable expression such as Sequence(..., step).

For a concrete example, consider a boolean wrapper whose replacement is size(sequence(start, stop, step)) > 0, and an inner join with these inputs:

  • left row (key=1, start=0, stop=1, step=1); right contains key=1
  • left row (key=2, start=0, stop=1, step=-1); right has no key=2

With the predicate above the join, the second row is removed by the join before sequence(0, 1, -1) is evaluated, so the query succeeds. With eagerReplace = false, the wrapper reports throwable = false; PushPredicateThroughJoin moves it onto the left input, and the second row now throws Illegal sequence boundaries before the join can discard it. Physical materialization happens too late to undo that relocation. Spark's existing SPARK-46707 test specifically keeps explicit-step Sequence predicates above joins for this reason.

Could we also delegate this metadata, e.g. override lazy val throwable: Boolean = replacement.throwable, and add the wrapped join case as a regression test?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed in bd8f0ee.

RuntimeReplaceable now does override lazy val throwable: Boolean = replacement.throwable, alongside the existing deterministic/foldable delegations. You're right that the inherited children.exists(_.throwable) was wrong for a survivor: the children are the original arguments (non-throwing literal bounds) while the replacement is where the throwing Sequence lives, so PushPredicateThroughJoin (the SPARK-46707 guard) would relocate the predicate below the join and surface Illegal sequence boundaries on rows the join would otherwise discard.

Added a FilterPushdownSuite regression test mirroring the existing SPARK-46707 join test: a wrapper whose children (x.a, x.b) are non-throwing but whose replacement is an explicit-step Sequence; the predicate must stay above the join. It fails without the delegation (the wrapper is pushed onto the left input) and passes with it.

One implementation note for whoever reviews the test: the wrapper is deliberately not an InheritAnalysisRules expression. With InheritAnalysisRules the replacement becomes the wrapper's child, so the inherited children.exists(_.throwable) would already observe the throwing Sequence and the test would pass even without the fix. The test wrapper keeps its original args as children (the Uniform / MultiGetJsonObject shape) so it actually exercises the gap.

…ntimeReplaceable

A surviving `RuntimeReplaceable` (`eagerReplace = false`) still inherited
`Expression.throwable`, which inspects only the wrapper's original `children`.
When the children are non-throwing but the `replacement` contains an explicitly
throwing expression (e.g. a step-`Sequence`), the wrapper reported
`throwable = false`, so `PushPredicateThroughJoin` could relocate it below a
join and surface an error on rows the join would otherwise discard (the case
SPARK-46707 guards against).

Delegate `throwable` to `replacement`, mirroring the existing `deterministic`
and `foldable` delegations. Add a `FilterPushdownSuite` regression test using a
wrapper whose children are non-throwing but whose replacement is a throwing
explicit-step `Sequence`; the predicate must stay above the join.

Co-authored-by: Isaac
// which already excludes it.
val cannotSurvive =
!replaced.deterministic || ConvertToLocalRelation.hasUnevaluableExpr(replaced)
if (r.eagerReplace || cannotSurvive) replaced else r

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Force eager replacement when later finish-analysis rules need a hidden expression

This gate also lets through replacements that require a later finish-analysis rewrite. For example, consider a non-InheritAnalysisRules wrapper with an input timestamp child, eagerReplace = false, and replacement = Greatest(Seq(CurrentTimestamp(), child)). The replacement is deterministic and evaluable, and the input attribute makes it non-foldable, so the wrapper survives here. However, the replacement is not one of the wrapper's children, so its tree-pattern bits do not expose CURRENT_LIKE. ComputeCurrentTime, which runs after ReplaceExpressions, prunes this subtree and never replaces CurrentTimestamp with the single query-start literal. MaterializeRuntimeReplaceable exposes it only during physical preparation, where CurrentTimestamp.eval calls Instant.now() as rows are evaluated. In a focused 20,000-row regression, this produced multiple distinct timestamps within one query.

Before this PR, ReplaceExpressions always exposed the replacement before ComputeCurrentTime. Could we force eager replacement at least when replaced.containsPattern(CURRENT_LIKE) (or otherwise run the remaining finish-analysis rewrites on the hidden replacement)? Please keep the regression attribute-dependent so constant folding cannot mask the problem.

// Materialize any RuntimeReplaceable that survived the optimizer into its replacement for
// the Spark execution path. After columnar/native conversion (so a native engine sees the
// original expression), before codegen (so Spark codegen never sees a RuntimeReplaceable).
MaterializeRuntimeReplaceable,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Expose structural equality before join-key extraction

Materializing at this point is too late for planner consumers. A supported eagerReplace = false binary predicate with replacement = EqualTo(leftKey, rightKey) survives the logical optimizer, but ExtractEquiJoinKeys only pattern-matches visible EqualTo / EqualNullSafe conjuncts and therefore extracts no keys from the wrapper. In a focused regression with broadcasting disabled, the batch plan contained CartesianProductExec and no SortMergeJoinExec (while still returning the correct rows), turning a normal equi-join into O(N*M) work that can OOM. A second control showed that an ordinary stream-stream EqualTo is accepted by StreamingJoinStrategy, while the identical equality hidden in this wrapper throws the 'stream-stream join without equality predicate' AnalysisException.

MaterializeRuntimeReplaceable unfolds the condition only after Spark strategies have selected the operator, so it cannot recover a hash/sort-merge/streaming equi-join. Before this PR, unconditional replacement exposed the equality before planning. Could join-key extraction inspect an unfolded condition (while preserving the original expression where needed), or should such structural predicates be forced to replace eagerly? Batch physical-plan and stream-stream planning regressions would cover both consequences.

…RRENT_LIKE expression

A surviving RuntimeReplaceable (eagerReplace = false) exposes only the
RUNTIME_REPLACEABLE pattern of its children, not the pattern bits of its hidden
replacement. The finish-analysis rules ComputeCurrentTime / ReplaceCurrentLike run
right after ReplaceExpressions and prune on CURRENT_LIKE, so a CurrentTimestamp /
CurrentDate / Now hidden in a survivor's replacement is never folded into the single
query-start literal and would be evaluated per row at execution, yielding multiple
distinct values within one query.

Extend the cannotSurvive gate so a replacement containing CURRENT_LIKE is rewritten
eagerly, mirroring the existing Unevaluable handling. Adds an attribute-dependent
OptimizerSuite regression so constant folding cannot mask the problem.

Co-authored-by: Isaac
…JoinKeys

A surviving equi-join predicate whose replacement is an EqualTo is hidden from
ExtractEquiJoinKeys, which only pattern-matches visible EqualTo / EqualNullSafe
conjuncts. The join then extracts no keys and degrades to a CartesianProductExec
(batch), and stream-stream join analysis fails with "without equality predicate"
since StreamingJoinStrategy shares this extractor.

Unfold a predicate's RuntimeReplaceable (gated on the tree pattern) before matching,
so key extraction sees the underlying equality. Only the extracted keys use the
unfolded form; non-key predicates stay in their original survivor form so
MaterializeRuntimeReplaceable still materializes them before codegen and EXPLAIN keeps
the readable node. Adds a batch join-planning regression asserting SortMergeJoinExec
rather than CartesianProductExec.

Co-authored-by: Isaac

@sunchao sunchao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up review on 09e4f260: the exact CURRENT_LIKE and root-EqualTo cases are improved, but the broader correctness contracts are still incomplete in the cases below.

val cannotSurvive =
!replaced.deterministic ||
ConvertToLocalRelation.hasUnevaluableExpr(replaced) ||
replaced.containsPattern(CURRENT_LIKE)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Cover every required finish-analysis rewrite

This only forces eager replacement for CURRENT_LIKE, but the same survivor opacity still bypasses other mandatory rules that run after ReplaceExpressions. For example, a deterministic, non-foldable wrapper with replacement = ArrayDistinct(arrayDoubleAttr) hides ARRAY_DISTINCT from both NormalizeFloatingNumbers passes; in a focused regression, [-0.0d, 0.0d] remained two elements instead of one canonical positive zero. Likewise, a wrapper whose replacement contains Cast(Literal("epoch"), DateType, ...) hides CAST from SpecialDatetimeValues; it returned NULL in legacy mode (and would throw in ANSI mode) instead of 1970-01-01.

MaterializeRuntimeReplaceable exposes these expressions only after logical optimization, so neither rewrite gets another chance. Could the survival gate cover every replacement requiring a mandatory post-ReplaceExpressions rewrite, or run those rewrites over the hidden replacement, with regressions for these two cases? A CURRENT_LIKE-only guard still permits wrong results and runtime errors.

@@ -190,30 +191,47 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Split the unfolded condition for compound replacements

The condition is split while the outer RuntimeReplaceable is still opaque. If a supported survivor has replacement = And(EqualTo(left.k, right.k), GreaterThan(left.v, 0)), splitConjunctivePredicates produces the wrapper as one predicate; unfoldKeyPredicate then yields an And, which none of the top-level equality cases match. A focused regression still planned this as CartesianProductExec, and the equivalent stream-stream join is still rejected as lacking an equality predicate. This compound replacement shape is already treated as supported by TestCompoundPredicateRuntimeReplaceable in the V2 tests.

Could we expose the replacement before splitting (while retaining the mapping to the original survivor where needed), and add batch plus streaming coverage for a compound replacement?

// keeps showing the readable node. The unfold is gated on the tree pattern so the common
// survivor-free join is unaffected.
def unfoldKeyPredicate(p: Expression): Expression =
if (p.containsPattern(RUNTIME_REPLACEABLE)) RuntimeReplaceable.unfold(p) else p

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Preserve nested survivors when exposing the outer equality

This recursively unfolds every RuntimeReplaceable in the predicate, including a survivor that is already a visible join-key operand. For EqualTo(partitionKey, Survivor(dimensionKey)), the old extractor retained Survivor(dimensionKey) as the physical key; this code emits only its low-level replacement, so native planning can no longer match the high-level key.

It also makes DPP invalid: ExtractEquiJoinKeys returns the unfolded build key, but PartitionPruning inspects the original equality and calls joinKeys.indexOf(Survivor(dimensionKey)), producing -1. The resulting DynamicPruningSubquery is unresolved and fails default plan validation; with validation disabled, the negative index later reaches buildKeys(-1). In a focused planning regression, the visible survivor key was already reduced to (k + 0) before columnar/native conversion.

Could this unfold only the outer survivor needed to reveal an equality, preserving already-visible key operands, with a DPP regression for this shape?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants