Skip to content

Conversation

@adriangb
Copy link
Contributor

@github-actions github-actions bot added the functions Changes to functions implementation label Jun 27, 2025
@adriangb adriangb marked this pull request as draft June 27, 2025 20:23
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good change -- thank you @adriangb

@alamb alamb changed the title move min_batc/max_batch to functions-aggregate-common move min_batch/max_batch to functions-aggregate-common Jun 27, 2025
@adriangb adriangb marked this pull request as ready for review June 27, 2025 21:11
Copy link
Contributor

@jonathanc-n jonathanc-n left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @adriangb! really needed something like this for what im doing.

@adriangb
Copy link
Contributor Author

adriangb commented Jun 28, 2025

In case it's useful we use something like this for ~ similar purposes internally:

Min/max accumulators
This is how you dropdown.

use arrow::array::RecordBatch;
use arrow::compute::cast;
use arrow_schema::{DataType, Field, Schema};
use datafusion::common::stats::Precision;
use datafusion::common::{ColumnStatistics, Statistics};
use datafusion::error::{DataFusionError, Result as DatafusionResult};
use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion::functions_aggregate::sum::Sum;
use datafusion::logical_expr::function::AccumulatorArgs;
use datafusion::logical_expr::{Accumulator, AggregateUDFImpl};
use datafusion::scalar::ScalarValue;
use std::sync::Arc;

use crate::utils::unpack_dict_type;

pub struct ColumnStatisticsAccumulator {
    min_index: usize,
    min: MinAccumulator,
    max_index: usize,
    max: MaxAccumulator,
    null_count_index: usize,
    // Null count accumulator always expects UInt64 input, we must cast the data if needed.
    // Note also that if set to None it means we've encountered a null value and thus the actual null count is unknown.
    // This is because we can't distinguish between "no nulls" and "unknown number of nulls" if we encounter a null value.
    null_count: Option<Box<dyn Accumulator>>,
}

macro_rules! sum_accumulator_args {
    ($data_type:expr) => {
        AccumulatorArgs {
            return_field: Arc::new(Field::new("sum", DataType::UInt64, true)),
            schema: &Schema::new(vec![Field::new("value", $data_type.clone(), true)]),
            ignore_nulls: false,
            order_bys: &[],
            is_reversed: false,
            is_distinct: false,
            name: "sum",
            exprs: &[],
        }
    };
}

impl ColumnStatisticsAccumulator {
    pub fn try_new(column_name: &str, stats_schema: &Schema) -> DatafusionResult<Self> {
        let min_column_name = format!("{column_name}_min");
        let max_column_name = format!("{column_name}_max");
        let null_count_column_name = format!("{column_name}_null_count");
        let min_index = stats_schema.index_of(&min_column_name)?;
        let max_index = stats_schema.index_of(&max_column_name)?;
        let null_count_index = stats_schema.index_of(&null_count_column_name)?;
        Ok(Self {
            min_index,
            min: MinAccumulator::try_new(&unpack_dict_type(stats_schema.field(min_index).data_type()))?,
            max_index,
            max: MaxAccumulator::try_new(&unpack_dict_type(stats_schema.field(max_index).data_type()))?,
            null_count_index,
            null_count: Some(Sum::new().accumulator(sum_accumulator_args!(DataType::UInt64))?),
        })
    }

    pub fn update_batch(&mut self, batch: &RecordBatch) -> DatafusionResult<()> {
        self.min.update_batch(&[Arc::clone(batch.column(self.min_index))])?;
        self.max.update_batch(&[Arc::clone(batch.column(self.max_index))])?;
        let null_counts_array = batch.column(self.null_count_index);
        if null_counts_array.null_count() > 0 {
            // We've encountered a null value, we can't track the null count anymore.
            self.null_count = None;
        } else {
            let null_counts_array = if null_counts_array.data_type() == &DataType::UInt64 {
                Arc::clone(null_counts_array)
            } else {
                cast(null_counts_array, &DataType::UInt64)?
            };
            if let Some(null_count) = &mut self.null_count {
                null_count.update_batch(&[null_counts_array])?;
            }
        }
        Ok(())
    }

    pub fn finish(mut self) -> DatafusionResult<ColumnStatistics> {
        let min_value = self.min.evaluate()?;
        let max_value = self.max.evaluate()?;
        let null_count = if let Some(mut null_count) = self.null_count {
            let null_count_value = null_count.evaluate()?;
            if null_count_value.is_null() {
                Precision::Absent
            } else {
                match null_count_value {
                    ScalarValue::UInt64(Some(v)) => {
                        Precision::Exact(usize::try_from(v).expect("null count should fit in usize"))
                    }
                    _ => {
                        return Err(DataFusionError::Internal(format!(
                            "Expected null count to be of type UInt64, got {null_count_value:?}"
                        )));
                    }
                }
            }
        } else {
            Precision::Absent
        };
        Ok(ColumnStatistics {
            min_value: if min_value.is_null() {
                Precision::Absent
            } else {
                Precision::Exact(min_value)
            },
            max_value: if max_value.is_null() {
                Precision::Absent
            } else {
                Precision::Exact(max_value)
            },
            null_count,
            ..Default::default()
        })
    }
}

pub struct StatisticsAccumulator {
    column_accumulators: Vec<Option<ColumnStatisticsAccumulator>>,
    row_count_index: usize,
    row_count: Box<dyn Accumulator>,
}

impl StatisticsAccumulator {
    pub fn try_new(schema: &Schema, stats_schema: &Schema, columns_for_stats: &[String]) -> DatafusionResult<Self> {
        let column_accumulators = schema
            .fields()
            .iter()
            .map(|field| {
                if columns_for_stats.contains(field.name()) {
                    Ok(Some(ColumnStatisticsAccumulator::try_new(field.name(), stats_schema)?))
                } else {
                    Ok(None)
                }
            })
            .collect::<DatafusionResult<Vec<_>>>()?;
        let row_count_index = stats_schema.index_of("row_count")?;
        Ok(Self {
            column_accumulators,
            row_count_index,
            row_count: Sum::new().accumulator(sum_accumulator_args!(DataType::UInt64))?,
        })
    }

    pub fn update_batch(&mut self, batch: &RecordBatch) -> DatafusionResult<()> {
        for acc in self.column_accumulators.iter_mut().flatten() {
            acc.update_batch(batch)?;
        }
        let row_count_array = batch.column(self.row_count_index);
        let row_count_array = if row_count_array.data_type() == &DataType::UInt64 {
            Arc::clone(row_count_array)
        } else {
            cast(row_count_array, &DataType::UInt64)?
        };
        self.row_count.update_batch(&[row_count_array])?;
        Ok(())
    }

    pub fn update_statistics(mut self, stats: &mut Statistics) -> DatafusionResult<()> {
        let mut column_statistics = vec![];
        for acc in self.column_accumulators {
            if let Some(acc) = acc {
                column_statistics.push(acc.finish()?);
            } else {
                column_statistics.push(ColumnStatistics::new_unknown());
            }
        }
        stats.column_statistics = column_statistics;
        let row_count_value = self.row_count.evaluate()?;
        if row_count_value.is_null() {
            stats.num_rows = Precision::Absent;
        } else {
            match row_count_value {
                ScalarValue::UInt64(Some(v)) => {
                    stats.num_rows = Precision::Exact(usize::try_from(v).expect("row count should fit in usize"));
                }
                _ => {
                    return Err(DataFusionError::Internal(format!(
                        "Expected row count to be of type UInt64, got {row_count_value:?}",
                    )));
                }
            }
        }
        Ok(())
    }

    pub fn update_stats_from_batches(
        columns_for_stats: &[String],
        schema: &Schema,
        stats_schema: &Schema,
        mut stats: Statistics,
        batches: &[RecordBatch],
    ) -> DatafusionResult<Statistics> {
        let mut acc = StatisticsAccumulator::try_new(schema, stats_schema, columns_for_stats)?;
        for batch in batches {
            acc.update_batch(batch)?;
        }
        acc.update_statistics(&mut stats)?;
        Ok(stats)
    }
}

@adriangb adriangb merged commit 9f3cc7b into apache:main Jun 28, 2025
29 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

functions Changes to functions implementation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants