Skip to content

Commit d5d2d41

Browse files
authored
merge column: small refactors (#2579)
* merge column: small refactors * make ord dependency more explicit * add columnar merge crashtest proptest * fix naming
1 parent 80f5f1e commit d5d2d41

File tree

10 files changed

+218
-118
lines changed

10 files changed

+218
-118
lines changed

columnar/src/column_index/merge/stacked.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ fn get_doc_ids_with_values<'a>(
5656
ColumnIndex::Full => Box::new(doc_range),
5757
ColumnIndex::Optional(optional_index) => Box::new(
5858
optional_index
59-
.iter_rows()
59+
.iter_docs()
6060
.map(move |row| row + doc_range.start),
6161
),
6262
ColumnIndex::Multivalued(multivalued_index) => match multivalued_index {
@@ -73,7 +73,7 @@ fn get_doc_ids_with_values<'a>(
7373
MultiValueIndex::MultiValueIndexV2(multivalued_index) => Box::new(
7474
multivalued_index
7575
.optional_index
76-
.iter_rows()
76+
.iter_docs()
7777
.map(move |row| row + doc_range.start),
7878
),
7979
},
@@ -177,7 +177,7 @@ impl<'a> Iterable<RowId> for StackedOptionalIndex<'a> {
177177
ColumnIndex::Full => Box::new(columnar_row_range),
178178
ColumnIndex::Optional(optional_index) => Box::new(
179179
optional_index
180-
.iter_rows()
180+
.iter_docs()
181181
.map(move |row_id: RowId| columnar_row_range.start + row_id),
182182
),
183183
ColumnIndex::Multivalued(_) => {

columnar/src/column_index/optional_index/mod.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -80,23 +80,23 @@ impl BlockVariant {
8080
/// index is the block index. For each block `byte_start` and `offset` is computed.
8181
#[derive(Clone)]
8282
pub struct OptionalIndex {
83-
num_rows: RowId,
84-
num_non_null_rows: RowId,
83+
num_docs: RowId,
84+
num_non_null_docs: RowId,
8585
block_data: OwnedBytes,
8686
block_metas: Arc<[BlockMeta]>,
8787
}
8888

8989
impl Iterable<u32> for &OptionalIndex {
9090
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
91-
Box::new(self.iter_rows())
91+
Box::new(self.iter_docs())
9292
}
9393
}
9494

9595
impl std::fmt::Debug for OptionalIndex {
9696
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
9797
f.debug_struct("OptionalIndex")
98-
.field("num_rows", &self.num_rows)
99-
.field("num_non_null_rows", &self.num_non_null_rows)
98+
.field("num_docs", &self.num_docs)
99+
.field("num_non_null_docs", &self.num_non_null_docs)
100100
.finish_non_exhaustive()
101101
}
102102
}
@@ -271,17 +271,17 @@ impl OptionalIndex {
271271
}
272272

273273
pub fn num_docs(&self) -> RowId {
274-
self.num_rows
274+
self.num_docs
275275
}
276276

277277
pub fn num_non_nulls(&self) -> RowId {
278-
self.num_non_null_rows
278+
self.num_non_null_docs
279279
}
280280

281-
pub fn iter_rows(&self) -> impl Iterator<Item = RowId> + '_ {
281+
pub fn iter_docs(&self) -> impl Iterator<Item = RowId> + '_ {
282282
// TODO optimize
283283
let mut select_batch = self.select_cursor();
284-
(0..self.num_non_null_rows).map(move |rank| select_batch.select(rank))
284+
(0..self.num_non_null_docs).map(move |rank| select_batch.select(rank))
285285
}
286286
pub fn select_batch(&self, ranks: &mut [RowId]) {
287287
let mut select_cursor = self.select_cursor();
@@ -519,15 +519,15 @@ pub fn open_optional_index(bytes: OwnedBytes) -> io::Result<OptionalIndex> {
519519
let (mut bytes, num_non_empty_blocks_bytes) = bytes.rsplit(2);
520520
let num_non_empty_block_bytes =
521521
u16::from_le_bytes(num_non_empty_blocks_bytes.as_slice().try_into().unwrap());
522-
let num_rows = VInt::deserialize_u64(&mut bytes)? as u32;
522+
let num_docs = VInt::deserialize_u64(&mut bytes)? as u32;
523523
let block_metas_num_bytes =
524524
num_non_empty_block_bytes as usize * SERIALIZED_BLOCK_META_NUM_BYTES;
525525
let (block_data, block_metas) = bytes.rsplit(block_metas_num_bytes);
526-
let (block_metas, num_non_null_rows) =
527-
deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_rows);
526+
let (block_metas, num_non_null_docs) =
527+
deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_docs);
528528
let optional_index = OptionalIndex {
529-
num_rows,
530-
num_non_null_rows,
529+
num_docs,
530+
num_non_null_docs,
531531
block_data,
532532
block_metas: block_metas.into(),
533533
};

columnar/src/column_index/optional_index/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ fn test_optional_index_large() {
164164
fn test_optional_index_iter_aux(row_ids: &[RowId], num_rows: RowId) {
165165
let optional_index = OptionalIndex::for_test(num_rows, row_ids);
166166
assert_eq!(optional_index.num_docs(), num_rows);
167-
assert!(optional_index.iter_rows().eq(row_ids.iter().copied()));
167+
assert!(optional_index.iter_docs().eq(row_ids.iter().copied()));
168168
}
169169

170170
#[test]

columnar/src/columnar/merge/merge_dict_column.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::io::{self, Write};
33
use common::{BitSet, CountingWriter, ReadOnlyBitSet};
44
use sstable::{SSTable, Streamer, TermOrdinal, VoidSSTable};
55

6-
use super::term_merger::TermMerger;
6+
use super::term_merger::{TermMerger, TermsWithSegmentOrd};
77
use crate::column::serialize_column_mappable_to_u64;
88
use crate::column_index::SerializableColumnIndex;
99
use crate::iterable::Iterable;
@@ -126,14 +126,17 @@ fn serialize_merged_dict(
126126
let mut term_ord_mapping = TermOrdinalMapping::default();
127127

128128
let mut field_term_streams = Vec::new();
129-
for column_opt in bytes_columns.iter() {
129+
for (segment_ord, column_opt) in bytes_columns.iter().enumerate() {
130130
if let Some(column) = column_opt {
131131
term_ord_mapping.add_segment(column.dictionary.num_terms());
132132
let terms: Streamer<VoidSSTable> = column.dictionary.stream()?;
133-
field_term_streams.push(terms);
133+
field_term_streams.push(TermsWithSegmentOrd { terms, segment_ord });
134134
} else {
135135
term_ord_mapping.add_segment(0);
136-
field_term_streams.push(Streamer::empty());
136+
field_term_streams.push(TermsWithSegmentOrd {
137+
terms: Streamer::empty(),
138+
segment_ord,
139+
});
137140
}
138141
}
139142

@@ -191,6 +194,7 @@ fn serialize_merged_dict(
191194

192195
#[derive(Default, Debug)]
193196
struct TermOrdinalMapping {
197+
/// Contains the new term ordinals for each segment.
194198
per_segment_new_term_ordinals: Vec<Vec<TermOrdinal>>,
195199
}
196200

@@ -205,6 +209,6 @@ impl TermOrdinalMapping {
205209
}
206210

207211
fn get_segment(&self, segment_ord: u32) -> &[TermOrdinal] {
208-
&(self.per_segment_new_term_ordinals[segment_ord as usize])[..]
212+
&self.per_segment_new_term_ordinals[segment_ord as usize]
209213
}
210214
}

columnar/src/columnar/merge/merge_mapping.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ impl StackMergeOrder {
2626
let mut cumulated_row_ids: Vec<RowId> = Vec::with_capacity(columnars.len());
2727
let mut cumulated_row_id = 0;
2828
for columnar in columnars {
29-
cumulated_row_id += columnar.num_rows();
29+
cumulated_row_id += columnar.num_docs();
3030
cumulated_row_ids.push(cumulated_row_id);
3131
}
3232
StackMergeOrder { cumulated_row_ids }

columnar/src/columnar/merge/mod.rs

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -80,29 +80,31 @@ pub fn merge_columnar(
8080
output: &mut impl io::Write,
8181
) -> io::Result<()> {
8282
let mut serializer = ColumnarSerializer::new(output);
83-
let num_rows_per_columnar = columnar_readers
83+
let num_docs_per_columnar = columnar_readers
8484
.iter()
85-
.map(|reader| reader.num_rows())
85+
.map(|reader| reader.num_docs())
8686
.collect::<Vec<u32>>();
8787

88-
let columns_to_merge =
89-
group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?;
88+
let columns_to_merge = group_columns_for_merge(columnar_readers, required_columns)?;
9089
for res in columns_to_merge {
9190
let ((column_name, _column_type_category), grouped_columns) = res;
9291
let grouped_columns = grouped_columns.open(&merge_row_order)?;
9392
if grouped_columns.is_empty() {
9493
continue;
9594
}
9695

97-
let column_type = grouped_columns.column_type_after_merge();
96+
let column_type_after_merge = grouped_columns.column_type_after_merge();
9897
let mut columns = grouped_columns.columns;
99-
coerce_columns(column_type, &mut columns)?;
98+
// Make sure the number of columns is the same as the number of columnar readers.
99+
// Or num_docs_per_columnar would be incorrect.
100+
assert_eq!(columns.len(), columnar_readers.len());
101+
coerce_columns(column_type_after_merge, &mut columns)?;
100102

101103
let mut column_serializer =
102-
serializer.start_serialize_column(column_name.as_bytes(), column_type);
104+
serializer.start_serialize_column(column_name.as_bytes(), column_type_after_merge);
103105
merge_column(
104-
column_type,
105-
&num_rows_per_columnar,
106+
column_type_after_merge,
107+
&num_docs_per_columnar,
106108
columns,
107109
&merge_row_order,
108110
&mut column_serializer,
@@ -128,7 +130,7 @@ fn dynamic_column_to_u64_monotonic(dynamic_column: DynamicColumn) -> Option<Colu
128130
fn merge_column(
129131
column_type: ColumnType,
130132
num_docs_per_column: &[u32],
131-
columns: Vec<Option<DynamicColumn>>,
133+
columns_to_merge: Vec<Option<DynamicColumn>>,
132134
merge_row_order: &MergeRowOrder,
133135
wrt: &mut impl io::Write,
134136
) -> io::Result<()> {
@@ -138,10 +140,10 @@ fn merge_column(
138140
| ColumnType::F64
139141
| ColumnType::DateTime
140142
| ColumnType::Bool => {
141-
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
143+
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
142144
let mut column_values: Vec<Option<Arc<dyn ColumnValues>>> =
143-
Vec::with_capacity(columns.len());
144-
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
145+
Vec::with_capacity(columns_to_merge.len());
146+
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
145147
if let Some(Column { index: idx, values }) =
146148
dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic)
147149
{
@@ -164,10 +166,10 @@ fn merge_column(
164166
serialize_column_mappable_to_u64(merged_column_index, &merge_column_values, wrt)?;
165167
}
166168
ColumnType::IpAddr => {
167-
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
169+
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
168170
let mut column_values: Vec<Option<Arc<dyn ColumnValues<Ipv6Addr>>>> =
169-
Vec::with_capacity(columns.len());
170-
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
171+
Vec::with_capacity(columns_to_merge.len());
172+
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
171173
if let Some(DynamicColumn::IpAddr(Column { index: idx, values })) =
172174
dynamic_column_opt
173175
{
@@ -192,9 +194,10 @@ fn merge_column(
192194
serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?;
193195
}
194196
ColumnType::Bytes | ColumnType::Str => {
195-
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
196-
let mut bytes_columns: Vec<Option<BytesColumn>> = Vec::with_capacity(columns.len());
197-
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
197+
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
198+
let mut bytes_columns: Vec<Option<BytesColumn>> =
199+
Vec::with_capacity(columns_to_merge.len());
200+
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
198201
match dynamic_column_opt {
199202
Some(DynamicColumn::Str(str_column)) => {
200203
column_indexes.push(str_column.term_ord_column.index.clone());
@@ -248,7 +251,7 @@ impl GroupedColumns {
248251
if column_type.len() == 1 {
249252
return column_type.into_iter().next().unwrap();
250253
}
251-
// At the moment, only the numerical categorical column type has more than one possible
254+
// At the moment, only the numerical column type category has more than one possible
252255
// column type.
253256
assert!(self
254257
.columns
@@ -361,7 +364,7 @@ fn is_empty_after_merge(
361364
ColumnIndex::Empty { .. } => true,
362365
ColumnIndex::Full => alive_bitset.len() == 0,
363366
ColumnIndex::Optional(optional_index) => {
364-
for doc in optional_index.iter_rows() {
367+
for doc in optional_index.iter_docs() {
365368
if alive_bitset.contains(doc) {
366369
return false;
367370
}
@@ -391,7 +394,6 @@ fn is_empty_after_merge(
391394
fn group_columns_for_merge<'a>(
392395
columnar_readers: &'a [&'a ColumnarReader],
393396
required_columns: &'a [(String, ColumnType)],
394-
_merge_row_order: &'a MergeRowOrder,
395397
) -> io::Result<BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle>> {
396398
let mut columns: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = BTreeMap::new();
397399

columnar/src/columnar/merge/term_merger.rs

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,29 @@ use sstable::TermOrdinal;
55

66
use crate::Streamer;
77

8-
pub struct HeapItem<'a> {
9-
pub streamer: Streamer<'a>,
8+
/// The terms of a column with the ordinal of the segment.
9+
pub struct TermsWithSegmentOrd<'a> {
10+
pub terms: Streamer<'a>,
1011
pub segment_ord: usize,
1112
}
1213

13-
impl PartialEq for HeapItem<'_> {
14+
impl PartialEq for TermsWithSegmentOrd<'_> {
1415
fn eq(&self, other: &Self) -> bool {
1516
self.segment_ord == other.segment_ord
1617
}
1718
}
1819

19-
impl Eq for HeapItem<'_> {}
20+
impl Eq for TermsWithSegmentOrd<'_> {}
2021

21-
impl<'a> PartialOrd for HeapItem<'a> {
22-
fn partial_cmp(&self, other: &HeapItem<'a>) -> Option<Ordering> {
22+
impl<'a> PartialOrd for TermsWithSegmentOrd<'a> {
23+
fn partial_cmp(&self, other: &TermsWithSegmentOrd<'a>) -> Option<Ordering> {
2324
Some(self.cmp(other))
2425
}
2526
}
2627

27-
impl<'a> Ord for HeapItem<'a> {
28-
fn cmp(&self, other: &HeapItem<'a>) -> Ordering {
29-
(&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord))
28+
impl<'a> Ord for TermsWithSegmentOrd<'a> {
29+
fn cmp(&self, other: &TermsWithSegmentOrd<'a>) -> Ordering {
30+
(&other.terms.key(), &other.segment_ord).cmp(&(&self.terms.key(), &self.segment_ord))
3031
}
3132
}
3233

@@ -37,39 +38,32 @@ impl<'a> Ord for HeapItem<'a> {
3738
/// - the term
3839
/// - a slice with the ordinal of the segments containing the terms.
3940
pub struct TermMerger<'a> {
40-
heap: BinaryHeap<HeapItem<'a>>,
41-
current_streamers: Vec<HeapItem<'a>>,
41+
heap: BinaryHeap<TermsWithSegmentOrd<'a>>,
42+
term_streams_with_segment: Vec<TermsWithSegmentOrd<'a>>,
4243
}
4344

4445
impl<'a> TermMerger<'a> {
4546
/// Stream of merged term dictionary
46-
pub fn new(streams: Vec<Streamer<'a>>) -> TermMerger<'a> {
47+
pub fn new(term_streams_with_segment: Vec<TermsWithSegmentOrd<'a>>) -> TermMerger<'a> {
4748
TermMerger {
4849
heap: BinaryHeap::new(),
49-
current_streamers: streams
50-
.into_iter()
51-
.enumerate()
52-
.map(|(ord, streamer)| HeapItem {
53-
streamer,
54-
segment_ord: ord,
55-
})
56-
.collect(),
50+
term_streams_with_segment,
5751
}
5852
}
5953

6054
pub(crate) fn matching_segments<'b: 'a>(
6155
&'b self,
6256
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
63-
self.current_streamers
57+
self.term_streams_with_segment
6458
.iter()
65-
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
59+
.map(|heap_item| (heap_item.segment_ord, heap_item.terms.term_ord()))
6660
}
6761

6862
fn advance_segments(&mut self) {
69-
let streamers = &mut self.current_streamers;
63+
let streamers = &mut self.term_streams_with_segment;
7064
let heap = &mut self.heap;
7165
for mut heap_item in streamers.drain(..) {
72-
if heap_item.streamer.advance() {
66+
if heap_item.terms.advance() {
7367
heap.push(heap_item);
7468
}
7569
}
@@ -81,13 +75,13 @@ impl<'a> TermMerger<'a> {
8175
pub fn advance(&mut self) -> bool {
8276
self.advance_segments();
8377
if let Some(head) = self.heap.pop() {
84-
self.current_streamers.push(head);
78+
self.term_streams_with_segment.push(head);
8579
while let Some(next_streamer) = self.heap.peek() {
86-
if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() {
80+
if self.term_streams_with_segment[0].terms.key() != next_streamer.terms.key() {
8781
break;
8882
}
8983
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
90-
self.current_streamers.push(next_heap_it);
84+
self.term_streams_with_segment.push(next_heap_it);
9185
}
9286
true
9387
} else {
@@ -101,6 +95,6 @@ impl<'a> TermMerger<'a> {
10195
/// if and only if advance() has been called before
10296
/// and "true" was returned.
10397
pub fn key(&self) -> &[u8] {
104-
self.current_streamers[0].streamer.key()
98+
self.term_streams_with_segment[0].terms.key()
10599
}
106100
}

0 commit comments

Comments
 (0)