Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ impl LogicalType {
self.0 == "large_list" || self.0 == "large_list.struct"
}

fn is_fixed_size_list_struct(&self) -> bool {
self.0.starts_with("fixed_size_list:struct:")
}

fn is_struct(&self) -> bool {
self.0 == "struct"
}
Expand Down
13 changes: 13 additions & 0 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ impl Field {
lt if lt.is_large_list() => {
DataType::LargeList(Arc::new(ArrowField::from(&self.children[0])))
}
lt if lt.is_fixed_size_list_struct() => {
// Parse size from "fixed_size_list:struct:N"
let size: i32 =
lt.0.split(':')
.next_back()
.expect("fixed_size_list:struct logical type missing size suffix")
.parse()
.expect("fixed_size_list:struct logical type has invalid size");
Comment on lines +170 to +175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we can panic if we read a dataset that has a corrupt schema?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the lance schema. I'm not entirely sure users are able to create their own lance schema. So I think the only way this could happen is if there was some kind of corrupt protobuf. Also, there is a significant panic potential down below at lt => DataType::try_from(lt).unwrap().

I suppose it is technically a valid concern but this method has many many callsites and changing it to result returning should probably be a PR on its own or else this one is going to get real confusing.

DataType::FixedSizeList(Arc::new(ArrowField::from(&self.children[0])), size)
}
lt if lt.is_struct() => {
DataType::Struct(self.children.iter().map(ArrowField::from).collect())
}
Expand Down Expand Up @@ -1076,6 +1086,9 @@ impl TryFrom<&ArrowField> for Field {
.collect::<Result<_>>()?,
DataType::List(item) => vec![Self::try_from(item.as_ref())?],
DataType::LargeList(item) => vec![Self::try_from(item.as_ref())?],
DataType::FixedSizeList(item, _) if matches!(item.data_type(), DataType::Struct(_)) => {
vec![Self::try_from(item.as_ref())?]
}
DataType::Map(entries, keys_sorted) => {
// TODO: We only support keys_sorted=false for now,
// because converting a rust arrow map field to the python arrow field will
Expand Down
96 changes: 95 additions & 1 deletion rust/lance-datagen/src/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use arrow_array::{
make_array,
types::{ArrowDictionaryKeyType, BinaryType, ByteArrayType, Utf8Type},
Array, BinaryArray, FixedSizeBinaryArray, FixedSizeListArray, Float32Array, LargeListArray,
LargeStringArray, ListArray, NullArray, OffsetSizeTrait, PrimitiveArray, RecordBatch,
LargeStringArray, ListArray, MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, RecordBatch,
RecordBatchOptions, RecordBatchReader, StringArray, StructArray,
};
use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema, SchemaRef};
Expand Down Expand Up @@ -1712,6 +1712,85 @@ impl ArrayGenerator for RandomListGenerator {
}
}

/// Generates random map arrays where each map has 0-4 entries.
#[derive(Debug)]
struct RandomMapGenerator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document this (or the rand_map method). Just a brief comment somewhere to describe that it will randomly generate maps with 0-4 items.

field: Arc<Field>,
entries_field: Arc<Field>,
keys_gen: Box<dyn ArrayGenerator>,
values_gen: Box<dyn ArrayGenerator>,
lengths_gen: Box<dyn ArrayGenerator>,
}

impl RandomMapGenerator {
fn new(keys_gen: Box<dyn ArrayGenerator>, values_gen: Box<dyn ArrayGenerator>) -> Self {
let entries_fields = Fields::from(vec![
Field::new("keys", keys_gen.data_type().clone(), false),
Field::new("values", values_gen.data_type().clone(), true),
]);
let entries_field = Arc::new(Field::new(
"entries",
DataType::Struct(entries_fields),
false,
));
let map_type = DataType::Map(entries_field.clone(), false);
let field = Arc::new(Field::new("", map_type, true));
let lengths_dist = Uniform::new_inclusive(0_i32, 4).unwrap();
let lengths_gen = rand_with_distribution::<Int32Type, Uniform<i32>>(lengths_dist);

Self {
field,
entries_field,
keys_gen,
values_gen,
lengths_gen,
}
}
}

impl ArrayGenerator for RandomMapGenerator {
fn generate(
&mut self,
length: RowCount,
rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
) -> Result<Arc<dyn Array>, ArrowError> {
let lengths = self.lengths_gen.generate(length, rng)?;
let lengths = lengths.as_primitive::<Int32Type>();
let total_entries = lengths.values().iter().sum::<i32>() as u64;
let offsets = OffsetBuffer::from_lengths(lengths.values().iter().map(|v| *v as usize));

let keys = self.keys_gen.generate(RowCount::from(total_entries), rng)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really relevant but I wonder if keys need to be unique within a map? I guess not.

let values = self
.values_gen
.generate(RowCount::from(total_entries), rng)?;

let entries = StructArray::new(
Fields::from(vec![
Field::new("keys", keys.data_type().clone(), false),
Field::new("values", values.data_type().clone(), true),
]),
vec![keys, values],
None,
);

Ok(Arc::new(MapArray::try_new(
self.entries_field.clone(),
offsets,
entries,
None,
false,
)?))
}

fn data_type(&self) -> &DataType {
self.field.data_type()
}

fn element_size_bytes(&self) -> Option<ByteCount> {
None
}
}

#[derive(Debug)]
struct NullArrayGenerator {}

Expand Down Expand Up @@ -2754,6 +2833,13 @@ pub mod array {
Box::new(RandomListGenerator::new(item_gen, is_large))
}

/// Generates random map arrays where each map has 0-4 entries.
pub fn rand_map(key_type: &DataType, value_type: &DataType) -> Box<dyn ArrayGenerator> {
let keys_gen = rand_type(key_type);
let values_gen = rand_type(value_type);
Box::new(RandomMapGenerator::new(keys_gen, values_gen))
}

pub fn rand_struct(fields: Fields) -> Box<dyn ArrayGenerator> {
let child_gens = fields
.iter()
Expand Down Expand Up @@ -2797,6 +2883,14 @@ pub mod array {
DataType::FixedSizeBinary(size) => rand_fsb(*size),
DataType::List(child) => rand_list(child.data_type(), false),
DataType::LargeList(child) => rand_list(child.data_type(), true),
DataType::Map(entries_field, _) => {
let DataType::Struct(fields) = entries_field.data_type() else {
panic!("Map entries field must be a struct");
};
let key_type = fields[0].data_type();
let value_type = fields[1].data_type();
rand_map(key_type, value_type)
}
DataType::Duration(unit) => match unit {
TimeUnit::Second => rand::<DurationSecondType>(),
TimeUnit::Millisecond => rand::<DurationMillisecondType>(),
Expand Down
23 changes: 15 additions & 8 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,14 @@ use snafu::location;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{self, unbounded_channel};

use lance_core::error::LanceOptionExt;
use lance_core::{ArrowResult, Error, Result};
use tracing::instrument;

use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
use crate::data::DataBlock;
use crate::encoder::EncodedBatch;
use crate::encodings::logical::fixed_size_list::StructuralFixedSizeListScheduler;
use crate::encodings::logical::list::StructuralListScheduler;
use crate::encodings::logical::map::StructuralMapScheduler;
use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
Expand Down Expand Up @@ -765,15 +767,23 @@ impl CoreFieldDecoderStrategy {
)
}
DataType::List(_) | DataType::LargeList(_) => {
let child = field
.children
.first()
.expect("List field must have a child");
let child = field.children.first().expect_ok()?;
let child_scheduler =
self.create_structural_field_scheduler(child, column_infos)?;
Ok(Box::new(StructuralListScheduler::new(child_scheduler))
as Box<dyn StructuralFieldScheduler>)
}
DataType::FixedSizeList(inner, dimension)
if matches!(inner.data_type(), DataType::Struct(_)) =>
{
let child = field.children.first().expect_ok()?;
let child_scheduler =
self.create_structural_field_scheduler(child, column_infos)?;
Ok(Box::new(StructuralFixedSizeListScheduler::new(
child_scheduler,
*dimension,
)) as Box<dyn StructuralFieldScheduler>)
}
DataType::Map(_, keys_sorted) => {
// TODO: We only support keys_sorted=false for now,
// because converting a rust arrow map field to the python arrow field will
Expand All @@ -784,10 +794,7 @@ impl CoreFieldDecoderStrategy {
location: location!(),
});
}
let entries_child = field
.children
.first()
.expect("Map field must have an entries child");
let entries_child = field.children.first().expect_ok()?;
let child_scheduler =
self.create_structural_field_scheduler(entries_child, column_infos)?;
Ok(Box::new(StructuralMapScheduler::new(child_scheduler))
Expand Down
95 changes: 63 additions & 32 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use arrow_schema::DataType;
use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use lance_core::datatypes::{Field, Schema};
use lance_core::error::LanceOptionExt;
use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
use lance_core::{Error, Result};
use snafu::location;
Expand All @@ -29,6 +30,7 @@ use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
use crate::compression_config::CompressionParams;
use crate::decoder::PageEncoding;
use crate::encodings::logical::blob::{BlobStructuralEncoder, BlobV2StructuralEncoder};
use crate::encodings::logical::fixed_size_list::FixedSizeListStructuralEncoder;
use crate::encodings::logical::list::ListStructuralEncoder;
use crate::encodings::logical::map::MapStructuralEncoder;
use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
Expand Down Expand Up @@ -345,37 +347,39 @@ impl StructuralEncodingStrategy {
}

fn is_primitive_type(data_type: &DataType) -> bool {
matches!(
data_type,
DataType::Boolean
| DataType::Date32
| DataType::Date64
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _)
| DataType::Duration(_)
| DataType::Float16
| DataType::Float32
| DataType::Float64
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Int8
| DataType::Interval(_)
| DataType::Null
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::UInt8
| DataType::FixedSizeBinary(_)
| DataType::FixedSizeList(_, _)
| DataType::Binary
| DataType::LargeBinary
| DataType::Utf8
| DataType::LargeUtf8,
)
match data_type {
DataType::FixedSizeList(inner, _) => Self::is_primitive_type(inner.data_type()),
_ => matches!(
data_type,
DataType::Boolean
| DataType::Date32
| DataType::Date64
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _)
| DataType::Duration(_)
| DataType::Float16
| DataType::Float32
| DataType::Float64
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Int8
| DataType::Interval(_)
| DataType::Null
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::UInt8
| DataType::FixedSizeBinary(_)
| DataType::Binary
| DataType::LargeBinary
| DataType::Utf8
| DataType::LargeUtf8,
),
}
}

fn do_create_field_encoder(
Expand Down Expand Up @@ -437,7 +441,7 @@ impl StructuralEncodingStrategy {
} else {
match data_type {
DataType::List(_) | DataType::LargeList(_) => {
let child = field.children.first().expect("List should have a child");
let child = field.children.first().expect_ok()?;
let child_encoder = self.do_create_field_encoder(
_encoding_strategy_root,
child,
Expand All @@ -450,6 +454,33 @@ impl StructuralEncodingStrategy {
child_encoder,
)))
}
DataType::FixedSizeList(inner, _)
if matches!(inner.data_type(), DataType::Struct(_)) =>
{
if self.version < LanceFileVersion::V2_2 {
return Err(Error::NotSupported {
source: format!(
"FixedSizeList<Struct> is only supported in Lance file format 2.2+, current version: {}",
self.version
)
.into(),
location: location!(),
});
}
// Complex FixedSizeList needs structural encoding
let child = field.children.first().expect_ok()?;
let child_encoder = self.do_create_field_encoder(
_encoding_strategy_root,
child,
column_index,
options,
root_field_metadata,
)?;
Ok(Box::new(FixedSizeListStructuralEncoder::new(
options.keep_original_array,
child_encoder,
)))
}
DataType::Map(_, keys_sorted) => {
// TODO: We only support keys_sorted=false for now,
// because converting a rust arrow map field to the python arrow field will
Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/src/encodings/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-FileCopyrightText: Copyright The Lance Authors

pub mod blob;
pub mod fixed_size_list;
pub mod list;
pub mod map;
pub mod primitive;
Expand Down
Loading