Skip to content

[BUG] Getting duplicate values  #312

@animodak7

Description

@animodak7

I have a parquet with following schema

    Field::new("target_status_code", DataType::Int32, false),
    Field::new("timestamp", DataType::Int64, false),
    Field::new("___row_id", DataType::Int32, false),

Code to query using

let file_source = Arc::new(ParquetSource::default());

    let file_scan_config =
        FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(),  schemaP1.clone(), file_source)
            .with_file(PartitionedFile::new(
                parquet_path.to_str().unwrap().to_string(),
                fs::metadata(parquet_path).unwrap().len()
            ))
            .build();

    let parquet_exec = DataSourceExec::from_data_source(file_scan_config);

    let filter_exec = match FilterExec::try_new(
        predicate.clone(),
        parquet_exec.clone(),
    ) {
        Ok(filter) => Arc::new(filter.with_projection(Some(vec![2])).unwrap()),
        Err(e) => {
            let err_ptr: Result<*mut SendableRecordBatchStream, DataFusionError> = Err(e);
            set_object_result(
                &mut env,
                callback,
                err_ptr
            );
            return;
        }
    };

    let phase1_limit: Arc<dyn ExecutionPlan> = if( Some(size as usize) > Some(0)) {
        Arc::new(GlobalLimitExec::new(filter_exec, 0, Some(size as usize)))
    } else {
        filter_exec
    };

    let runtime = unsafe { &mut *(runtime_ptr as *mut Runtime) };
    runtime.block_on(async {
        let task_ctx = Arc::new(TaskContext::default());
        let liquid_ctx = unsafe { &mut *(context_ptr as *mut SessionContext) };
        let mut optimized_plan: Arc<dyn ExecutionPlan> = phase1_limit.clone(); // Initialize with original plan

        if is_cache == 1 {
            for rule in liquid_ctx.state().physical_optimizers().clone() {
                print!("{}, ", rule.name());
                if rule.name() == "InProcessLiquidCacheOptimizer" {
                    // Apply the rule directly
                    optimized_plan = rule.optimize(
                        optimized_plan.clone(),
                        liquid_ctx.state().config_options(),
                    ).unwrap();
                }
            }
        }

        let stream_result = optimized_plan.execute(0, task_ctx).unwrap();
        let mut row_ids = Vec::new();

        let mut stream = stream_result;
        while let Some(batch) = stream.next().await {
            let batch = batch.unwrap();
            let row_id_array = batch.column(0)
                .as_any()
                .downcast_ref::<Int32Array>()
                .ok_or_else(|| DataFusionError::Internal("Expected Int64Array".to_string())).unwrap();

            row_ids.extend(row_id_array.iter().flatten());
        }
        println!("row_ids {}", row_ids.len());
        println!("duplicate row_ids {}", find_duplicates(row_ids.clone()).len());

        set_object_result_ok(&mut env, callback, Box::into_raw(Box::new(stream)))
    });

Liquid context used -

let liquid_ctx = match LiquidCacheInProcessBuilder::new()
        .with_max_cache_bytes(10* 1024 * 1024 * 1024) // 10GB
        .with_cache_dir(temp_dir.path().to_path_buf())
        .with_cache_mode(LiquidCacheMode::Liquid {
            transcode_in_background: true,
        })
        .with_cache_strategy(Box::new(DiscardPolicy::default()))
        .build(config.clone()) {
        Ok(ctx) => ctx,
        Err(e) => {
            eprintln!("Failed to build context: {}", e);
            return 0; // Return null pointer to Java
        }
    };

If is_cache is true we get
row_ids 10000
duplicate row_ids 4804

if for the same query if just is_cache is false
row_ids 10000
duplicate row_ids 0

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions