Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 244 additions & 24 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ use std::sync::Arc;

use crate::utils::scatter;

use arrow::array::{ArrayRef, BooleanArray};
use arrow::array::{new_empty_array, ArrayRef, BooleanArray};
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
use datafusion_common::{exec_err, internal_err, not_impl_err, Result, ScalarValue};
use datafusion_expr_common::columnar_value::ColumnarValue;
use datafusion_expr_common::interval_arithmetic::Interval;
use datafusion_expr_common::sort_properties::ExprProperties;
Expand Down Expand Up @@ -90,36 +90,69 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
self.nullable(input_schema)?,
)))
}
/// Evaluate an expression against a RecordBatch after first applying a
/// validity array
/// Evaluate an expression against a RecordBatch after first applying a validity array
///
/// # Errors
///
/// Returns an `Err` if the expression could not be evaluated or if the length of the
/// `selection` validity array and the number of row in `batch` is not equal.
fn evaluate_selection(
&self,
batch: &RecordBatch,
selection: &BooleanArray,
) -> Result<ColumnarValue> {
let tmp_batch = filter_record_batch(batch, selection)?;

let tmp_result = self.evaluate(&tmp_batch)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of new code which repeats logic of filter_record_batch.
What if we just changed this line only?

let tmp_result =  if tmp_batch.is_empty {
  // Do not call `evaluate` when the selection is empty.
            // When `evaluate_selection` is being used for conditional, lazy evaluation,
            // evaluating an expression for a false selection vector may end up unintentionally
            // evaluating a fallible expression.
            let datatype = self.data_type(batch.schema_ref().as_ref())?;
            ColumnarValue::Array(make_builder(&datatype, 0).finish())
} else {
  self.evaluate(&tmp_batch)?;
}

Note how this does not inspect selection / selection_count directly, leveraging the work done by filter_record_batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing it, but I don't see the overlap with filter_record_batch. AFAICT there are no checks to avoid creating a new record batch. What this code is doing is preparing an empty result value while filter_record_batch has optimised code to an empty record batch if the filter is all-false.


if batch.num_rows() == tmp_batch.num_rows() {
// All values from the `selection` filter are true.
Ok(tmp_result)
} else if let ColumnarValue::Array(a) = tmp_result {
scatter(selection, a.as_ref()).map(ColumnarValue::Array)
} else if let ColumnarValue::Scalar(ScalarValue::Boolean(value)) = &tmp_result {
// When the scalar is true or false, skip the scatter process
if let Some(v) = value {
if *v {
Ok(ColumnarValue::from(Arc::new(selection.clone()) as ArrayRef))
let row_count = batch.num_rows();
if row_count != selection.len() {
return exec_err!("Selection array length does not match batch row count: {} != {row_count}", selection.len());
}

let selection_count = selection.true_count();

// First, check if we can avoid filtering altogether.
if selection_count == row_count {
// All values from the `selection` filter are true and match the input batch.
// No need to perform any filtering.
return self.evaluate(batch);
}

// Next, prepare the result array for each 'true' row in the selection vector.
let filtered_result = if selection_count == 0 {
// Do not call `evaluate` when the selection is empty.
// `evaluate_selection` is used to conditionally evaluate expressions.
// When the expression in question is fallible, evaluating it with an empty
// record batch may trigger a runtime error (e.g. division by zero).
//
// Instead, create an empty array matching the expected return type.
let datatype = self.data_type(batch.schema_ref().as_ref())?;
ColumnarValue::Array(new_empty_array(&datatype))
} else {
// If we reach this point, there's no other option than to filter the batch.
// This is a fairly costly operation since it requires creating partial copies
// (worst case of length `row_count - 1`) of all the arrays in the record batch.
// The resulting `filtered_batch` will contain `selection_count` rows.
let filtered_batch = filter_record_batch(batch, selection)?;
self.evaluate(&filtered_batch)?
};

// Finally, scatter the filtered result array so that the indices match the input rows again.
match &filtered_result {
ColumnarValue::Array(a) => {
scatter(selection, a.as_ref()).map(ColumnarValue::Array)
}
ColumnarValue::Scalar(ScalarValue::Boolean(value)) => {
// When the scalar is true or false, skip the scatter process
if let Some(v) = value {
if *v {
Ok(ColumnarValue::from(Arc::new(selection.clone()) as ArrayRef))
} else {
Ok(filtered_result)
}
} else {
Ok(tmp_result)
let array = BooleanArray::from(vec![None; row_count]);
scatter(selection, &array).map(ColumnarValue::Array)
}
} else {
let array = BooleanArray::from(vec![None; batch.num_rows()]);
scatter(selection, &array).map(ColumnarValue::Array)
}
} else {
Ok(tmp_result)
ColumnarValue::Scalar(_) => Ok(filtered_result),
}
}

Expand Down Expand Up @@ -601,3 +634,190 @@ pub fn is_volatile(expr: &Arc<dyn PhysicalExpr>) -> bool {
.expect("infallible closure should not fail");
is_volatile
}

#[cfg(test)]
mod test {
use crate::physical_expr::PhysicalExpr;
use arrow::array::{Array, BooleanArray, Int64Array, RecordBatch};
use arrow::datatypes::{DataType, Schema};
use datafusion_expr_common::columnar_value::ColumnarValue;
use std::fmt::{Display, Formatter};
use std::sync::Arc;

#[derive(Debug, PartialEq, Eq, Hash)]
struct TestExpr {}

impl PhysicalExpr for TestExpr {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn data_type(&self, _schema: &Schema) -> datafusion_common::Result<DataType> {
Ok(DataType::Int64)
}

fn nullable(&self, _schema: &Schema) -> datafusion_common::Result<bool> {
Ok(false)
}

fn evaluate(
&self,
batch: &RecordBatch,
) -> datafusion_common::Result<ColumnarValue> {
let data = vec![1; batch.num_rows()];
Ok(ColumnarValue::Array(Arc::new(Int64Array::from(data))))
}

fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn PhysicalExpr>>,
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(Self {}))
}

fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("TestExpr")
}
}

impl Display for TestExpr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.fmt_sql(f)
}
}

macro_rules! assert_arrays_eq {
($EXPECTED: expr, $ACTUAL: expr, $MESSAGE: expr) => {
let expected = $EXPECTED.to_array(1).unwrap();
let actual = $ACTUAL;

let actual_array = actual.to_array(expected.len()).unwrap();
let actual_ref = actual_array.as_ref();
let expected_ref = expected.as_ref();
assert!(
actual_ref == expected_ref,
"{}: expected: {:?}, actual: {:?}",
$MESSAGE,
$EXPECTED,
actual_ref
);
};
}

fn test_evaluate_selection(
batch: &RecordBatch,
selection: &BooleanArray,
expected: &ColumnarValue,
) {
let expr = TestExpr {};

// First check that the `evaluate_selection` is the expected one
let selection_result = expr.evaluate_selection(batch, selection).unwrap();
assert_eq!(
expected.to_array(1).unwrap().len(),
selection_result.to_array(1).unwrap().len(),
"evaluate_selection should output row count should match input record batch"
);
assert_arrays_eq!(
expected,
&selection_result,
"evaluate_selection returned unexpected value"
);

// If we're selecting all rows, the result should be the same as calling `evaluate`
// with the full record batch.
if (0..batch.num_rows())
.all(|row_idx| row_idx < selection.len() && selection.value(row_idx))
{
let empty_result = expr.evaluate(batch).unwrap();

assert_arrays_eq!(
empty_result,
&selection_result,
"evaluate_selection does not match unfiltered evaluate result"
);
}
}

fn test_evaluate_selection_error(batch: &RecordBatch, selection: &BooleanArray) {
let expr = TestExpr {};

// First check that the `evaluate_selection` is the expected one
let selection_result = expr.evaluate_selection(batch, selection);
assert!(selection_result.is_err(), "evaluate_selection should fail");
}

#[test]
pub fn test_evaluate_selection_with_empty_record_batch() {
test_evaluate_selection(
&RecordBatch::new_empty(Arc::new(Schema::empty())),
&BooleanArray::from(vec![false; 0]),
&ColumnarValue::Array(Arc::new(Int64Array::new_null(0))),
);
}

#[test]
pub fn test_evaluate_selection_with_empty_record_batch_with_larger_false_selection() {
test_evaluate_selection_error(
&RecordBatch::new_empty(Arc::new(Schema::empty())),
&BooleanArray::from(vec![false; 10]),
);
}

#[test]
pub fn test_evaluate_selection_with_empty_record_batch_with_larger_true_selection() {
test_evaluate_selection_error(
&RecordBatch::new_empty(Arc::new(Schema::empty())),
&BooleanArray::from(vec![true; 10]),
);
}

#[test]
pub fn test_evaluate_selection_with_non_empty_record_batch() {
test_evaluate_selection(
unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) },
&BooleanArray::from(vec![true; 10]),
&ColumnarValue::Array(Arc::new(Int64Array::from(vec![1; 10]))),
);
}

#[test]
pub fn test_evaluate_selection_with_non_empty_record_batch_with_larger_false_selection(
) {
test_evaluate_selection_error(
unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) },
&BooleanArray::from(vec![false; 20]),
);
}

#[test]
pub fn test_evaluate_selection_with_non_empty_record_batch_with_larger_true_selection(
) {
test_evaluate_selection_error(
unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) },
&BooleanArray::from(vec![true; 20]),
);
}

#[test]
pub fn test_evaluate_selection_with_non_empty_record_batch_with_smaller_false_selection(
) {
test_evaluate_selection_error(
unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) },
&BooleanArray::from(vec![false; 5]),
);
}

#[test]
pub fn test_evaluate_selection_with_non_empty_record_batch_with_smaller_true_selection(
) {
test_evaluate_selection_error(
unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) },
&BooleanArray::from(vec![true; 5]),
);
}
}
15 changes: 11 additions & 4 deletions datafusion/physical-expr/src/expressions/case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,7 @@ impl CaseExpr {
&& else_expr.as_ref().unwrap().as_any().is::<Literal>()
{
EvalMethod::ScalarOrScalar
} else if when_then_expr.len() == 1
&& is_cheap_and_infallible(&(when_then_expr[0].1))
&& else_expr.as_ref().is_some_and(is_cheap_and_infallible)
{
} else if when_then_expr.len() == 1 && else_expr.is_some() {
EvalMethod::ExpressionOrExpression
} else {
EvalMethod::NoExpression
Expand Down Expand Up @@ -425,6 +422,16 @@ impl CaseExpr {
)
})?;

// For the true and false/null selection vectors, bypass `evaluate_selection` and merging
// results. This avoids materializing the array for the other branch which we will discard
// entirely anyway.
let true_count = when_value.true_count();
if true_count == batch.num_rows() {
return self.when_then_expr[0].1.evaluate(batch);
} else if true_count == 0 {
return self.else_expr.as_ref().unwrap().evaluate(batch);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of this code is that it will still evaluate the then expression as long as there is at least one true value in when -- if evaluate_selection is a problem, shouldn't we fix evaluate_selection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of this code is that it will still evaluate the then expression as long as there is at least one true value in when

Yes, that's correct. There's no way to avoid that.

This particular bit of code is both an optimisation and a correctness thing.

From a performance point of view, we already know the selection vector is redundant, so there's really no point in calling evaluate_selection.

For correctness, what's being avoid here is calling either then or else with a selection vector that will result in an empty record batch after filtering. We could add similar checks in evaluate_selection to prevent evaluating the downstream expression for empty record batches as well. Its current contract requires it to return an array with the same length as the unfiltered input batch though. You can't avoid having to create an all-nulls array then.

Copy link
Member

@findepi findepi Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of this code is that it will still evaluate the then expression as long as there is at least one true value in when

Yes, that's correct. There's no way to avoid that.

isn't this reintroducing the bug that was fixed in #15384, just in a big more complex wrapping?

Copy link
Contributor Author

@pepijnve pepijnve Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this reintroducing the bug that was fixed in #15384, just in a big more complex wrapping?

No, why do you think that's the case? If you write case foo is not null then foo else 1/0 end and foo happens to be NULL, what do you expect to happen?

The original issue was that in the example above for a single row with a non-null foo, the code was evaluating the then branch with [true] as selection vector and the else branch with [false]. The latter was passed to evaluate_selection which then filters the record batch down to an empty record batch and then calls the else expression with that record batch. For an expression like 1/0, you end up getting executing that division anyway even though the result would be discarded.

There are two ways to fix this:

  1. don't evaluate binary expressions and literals for empty batches (as you had suggested in a comment earlier I believe)
  2. don't call evaluate with empty input batches

The earlier fix had the effect of 2. as well, just in a less explicit way. The fix here does the same but adds the necessary checks in the explicit expr/expr code path. The code that's being seen as an optimisation is intended to prevent calling evaluate_selection with an all-false selection vector.

Copy link
Contributor Author

@pepijnve pepijnve Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I woke up early this morning to the realisation that the actual bug was a subtlety in the implementation of evaluate_selection. There's a difference between calling it with the empty set vs a non-empty set and a false selection vector. The implementation was actually treating both cases identically which can cause a spurious row to get materialised. I've pushed a correction for this and tweaked the comments in the code a bit.

I believe this properly addresses the original evaluation problem. All SQL logic tests pass even when commenting out the optimisation for true and false selection vectors in expr_or_expr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the change in evaluate_selection
Now that evaluate_selection is changed, do we need those lines here?

(They look nice but my only concern is additional code complexity which is harder to cover with SLT tests. Now that we have branching here, we should have a bunch of SLT cases that clearly exercise all-true, all-false, some-true situations.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, more SLTs! I will add some for the various cases you mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that evaluate_selection is changed, do we need those lines here?

@findepi Yes I would prefer to keep these early outs since they're pretty trivial and I think they're appropriate for the expr_or_expr function since the knowledge that it's "then or else" is located here. evaluate_selection cannot be implemented with awareness of this particular usage pattern.

Calling evaluate_selection still has to produce a value –it can't return None– so you pay at least a non-zero cost for calling it unnecessarily. If it's obvious that it will not perform any useful work, it makes sense to avoid it IMO.

Just for context, the queries I'm working on are quite case heavy. Any work we can save in the inner loop of the queries seems worthwhile.

// Treat 'NULL' as false value
let when_value = match when_value.null_count() {
0 => Cow::Borrowed(when_value),
Expand Down
27 changes: 27 additions & 0 deletions datafusion/sqllogictest/test_files/case.slt
Original file line number Diff line number Diff line change
Expand Up @@ -467,19 +467,46 @@ FROM t;
----
[{foo: blarg}]

# mix of then and else
query II
SELECT v, CASE WHEN v != 0 THEN 10/v ELSE 42 END FROM (VALUES (0), (1), (2)) t(v)
----
0 42
1 10
2 5

# when expressions is always false, then branch should never be evaluated
query II
SELECT v, CASE WHEN v < 0 THEN 10/0 ELSE 1 END FROM (VALUES (1), (2)) t(v)
----
1 1
2 1

# when expressions is always true, else branch should never be evaluated
query II
SELECT v, CASE WHEN v > 0 THEN 1 ELSE 10/0 END FROM (VALUES (1), (2)) t(v)
----
1 1
2 1


# lazy evaluation of multiple when branches, else branch should never be evaluated
query II
SELECT v, CASE WHEN v == 1 THEN -1 WHEN v == 2 THEN -2 WHEN v == 3 THEN -3 ELSE 10/0 END FROM (VALUES (1), (2), (3)) t(v)
----
1 -1
2 -2
3 -3

# covers the InfallibleExprOrNull evaluation strategy
query II
SELECT v, CASE WHEN v THEN 1 END FROM (VALUES (1), (2), (3), (NULL)) t(v)
----
1 1
2 1
3 1
NULL NULL

statement ok
drop table t

Expand Down