From 933d89eafda3488c838999f64277954727a59922 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 12 Dec 2020 06:36:01 +0100 Subject: [PATCH 1/4] Bla --- rust/parquet/src/arrow/record_reader.rs | 164 ++++++++++-------------- 1 file changed, 66 insertions(+), 98 deletions(-) diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index 16b084698b8..53fcd86bcfb 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -16,10 +16,8 @@ // under the License. use std::cmp::{max, min}; -use std::mem::align_of; use std::mem::size_of; use std::mem::{replace, swap}; -use std::slice; use crate::column::{page::PageReader, reader::ColumnReaderImpl}; use crate::data_type::DataType; @@ -28,7 +26,6 @@ use crate::schema::types::ColumnDescPtr; use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::{Buffer, MutableBuffer}; -use arrow::memory; const MIN_BATCH_SIZE: usize = 1024; @@ -58,40 +55,6 @@ struct FatPtr<'a, T> { ptr: &'a mut [T], } -impl<'a, T> FatPtr<'a, T> { - fn new(ptr: &'a mut [T]) -> Self { - Self { ptr } - } - - fn with_offset(buf: &'a mut MutableBuffer, offset: usize) -> Self { - FatPtr::::with_offset_and_size(buf, offset, size_of::()) - } - - fn with_offset_and_size( - buf: &'a mut MutableBuffer, - offset: usize, - type_size: usize, - ) -> Self { - assert!(align_of::() <= memory::ALIGNMENT); - // TODO Prevent this from being called with non primitive types (like `Box`) - unsafe { - FatPtr::new(slice::from_raw_parts_mut( - &mut *(buf.raw_data() as *mut T).add(offset), - buf.capacity() / type_size - offset, - )) - } - } - - fn to_slice(&self) -> &[T] { - self.ptr - } - - #[allow(clippy::wrong_self_convention)] - fn to_slice_mut(&mut self) -> &mut [T] { - self.ptr - } -} - impl RecordReader { pub fn new(column_schema: ColumnDescPtr) -> Self { let (def_levels, null_map) = if column_schema.max_def_level() > 0 { @@ -202,18 +165,17 @@ impl RecordReader { let mut new_buffer = MutableBuffer::new( size_of::() * max(MIN_BATCH_SIZE, num_left_values), ); - new_buffer.resize(num_left_values * size_of::()); + let num_bytes = num_left_values * size_of::(); + let new_len = self.num_values * size_of::(); - let mut new_def_levels = FatPtr::::with_offset(&mut new_buffer, 0); - let new_def_levels = new_def_levels.to_slice_mut(); - let left_def_levels = - FatPtr::::with_offset(def_levels_buf, self.num_values); - let left_def_levels = left_def_levels.to_slice(); + new_buffer.resize(num_bytes); - new_def_levels[0..num_left_values] - .copy_from_slice(&left_def_levels[0..num_left_values]); + let new_def_levels = new_buffer.data_mut(); + let left_def_levels = &def_levels_buf.data_mut()[new_len..]; - def_levels_buf.resize(self.num_values * size_of::()); + new_def_levels[0..num_bytes].copy_from_slice(&left_def_levels[0..num_bytes]); + + def_levels_buf.resize(new_len); Some(new_buffer) } else { None @@ -231,18 +193,18 @@ impl RecordReader { let mut new_buffer = MutableBuffer::new( size_of::() * max(MIN_BATCH_SIZE, num_left_values), ); - new_buffer.resize(num_left_values * size_of::()); + let num_bytes = num_left_values * size_of::(); + let new_len = self.num_values * size_of::(); + + new_buffer.resize(num_bytes); - let mut new_rep_levels = FatPtr::::with_offset(&mut new_buffer, 0); - let new_rep_levels = new_rep_levels.to_slice_mut(); - let left_rep_levels = - FatPtr::::with_offset(rep_levels_buf, self.num_values); - let left_rep_levels = left_rep_levels.to_slice(); + let new_rep_levels = new_buffer.data_mut(); + let left_rep_levels = &rep_levels_buf.data_mut()[new_len..]; new_rep_levels[0..num_left_values] .copy_from_slice(&left_rep_levels[0..num_left_values]); - rep_levels_buf.resize(self.num_values * size_of::()); + rep_levels_buf.resize(new_len); Some(new_buffer) } else { @@ -260,21 +222,16 @@ impl RecordReader { let mut new_buffer = MutableBuffer::new(max(MIN_BATCH_SIZE, num_left_values)); new_buffer.resize(num_left_values * T::get_type_size()); - let mut new_records = - FatPtr::::with_offset_and_size(&mut new_buffer, 0, T::get_type_size()); - let new_records = new_records.to_slice_mut(); - let mut left_records = FatPtr::::with_offset_and_size( - &mut self.records, - self.num_values, - T::get_type_size(), - ); - let left_records = left_records.to_slice_mut(); + let new_len = self.num_values * T::get_type_size(); + + let new_records = new_buffer.data_mut(); + let left_records = &mut self.records.data_mut()[new_len..]; for idx in 0..num_left_values { swap(&mut new_records[idx], &mut left_records[idx]); } - self.records.resize(self.num_values * T::get_type_size()); + self.records.resize(new_len); Ok(replace(&mut self.records, new_buffer).freeze()) } @@ -331,51 +288,61 @@ impl RecordReader { fn read_one_batch(&mut self, batch_size: usize) -> Result { // Reserve spaces self.records - .reserve(self.records.len() + batch_size * T::get_type_size()); + .resize(self.records.len() + batch_size * T::get_type_size()); if let Some(ref mut buf) = self.rep_levels { - buf.reserve(buf.len() + batch_size * size_of::()); + buf.resize(buf.len() + batch_size * size_of::()); } if let Some(ref mut buf) = self.def_levels { - buf.reserve(buf.len() + batch_size * size_of::()); + buf.resize(buf.len() + batch_size * size_of::()); } + let values_written = self.values_written; + // Convert mutable buffer spaces to mutable slices - let mut values_buf = FatPtr::::with_offset_and_size( - &mut self.records, - self.values_written, - T::get_type_size(), - ); + let (prefix, values, suffix) = unsafe { self.records.data_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + let values = &mut values[values_written..]; - let values_written = self.values_written; - let mut def_levels_buf = self + let def_levels = self .def_levels .as_mut() - .map(|buf| FatPtr::::with_offset(buf, values_written)); + .map(|buf| { + let (prefix, def_levels, suffix) = unsafe { buf.data_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + &mut def_levels[values_written..] + }); - let mut rep_levels_buf = self + let rep_levels = self .rep_levels .as_mut() - .map(|buf| FatPtr::::with_offset(buf, values_written)); + .map(|buf| { + let (prefix, rep_levels, suffix) = unsafe { buf.data_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + &mut rep_levels[values_written..] + }); let (values_read, levels_read) = self.column_reader.as_mut().unwrap().read_batch( batch_size, - def_levels_buf.as_mut().map(|ptr| ptr.to_slice_mut()), - rep_levels_buf.as_mut().map(|ptr| ptr.to_slice_mut()), - values_buf.to_slice_mut(), + def_levels, + rep_levels, + values, )?; + // get new references for the def levels. + let def_levels = self + .def_levels + .as_ref() + .map(|buf| { + let (prefix, def_levels, suffix) = unsafe { buf.data().align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + &def_levels[values_written..] + }); + let max_def_level = self.column_desc.max_def_level(); if values_read < levels_read { - // This means that there are null values in column data - // TODO: Move this into ColumnReader - - let values_buf = values_buf.to_slice_mut(); - - let def_levels_buf = def_levels_buf - .as_mut() - .map(|ptr| ptr.to_slice_mut()) + let def_levels = def_levels .ok_or_else(|| { general_err!( "Definition levels should exist when data is less than levels!" @@ -387,14 +354,14 @@ impl RecordReader { let mut level_pos = levels_read; while level_pos > values_pos { - if def_levels_buf[level_pos - 1] == max_def_level { + if def_levels[level_pos - 1] == max_def_level { // This values is not empty // We use swap rather than assign here because T::T doesn't // implement Copy - values_buf.swap(level_pos - 1, values_pos - 1); + values.swap(level_pos - 1, values_pos - 1); values_pos -= 1; } else { - values_buf[level_pos - 1] = T::T::default(); + values[level_pos - 1] = T::T::default(); } level_pos -= 1; @@ -403,16 +370,14 @@ impl RecordReader { // Fill in bitmap data if let Some(null_buffer) = self.null_bitmap.as_mut() { - let def_levels_buf = def_levels_buf - .as_mut() - .map(|ptr| ptr.to_slice_mut()) + let def_levels = def_levels .ok_or_else(|| { general_err!( "Definition levels should exist when data is less than levels!" ) })?; (0..levels_read).try_for_each(|idx| { - null_buffer.append(def_levels_buf[idx] == max_def_level) + null_buffer.append(def_levels[idx] == max_def_level) })?; } @@ -424,13 +389,16 @@ impl RecordReader { /// Split values into records according repetition definition and returns number of /// records read. fn split_records(&mut self, records_to_read: usize) -> Result { - let rep_levels_buf = self + let rep_levels = self .rep_levels .as_mut() - .map(|buf| FatPtr::::with_offset(buf, 0)); - let rep_levels_buf = rep_levels_buf.as_ref().map(|x| x.to_slice()); + .map(|buf| { + let (prefix, rep_levels, suffix) = unsafe { buf.data_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + rep_levels + }); - match rep_levels_buf { + match rep_levels { Some(buf) => { let mut records_read = 0; From 1bdabf614fb4bd6d5e88ad3bb7575ddb648fcb4e Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 12 Dec 2020 23:07:02 +0200 Subject: [PATCH 2/4] consume the right amount of data (bits vs bytes) --- rust/parquet/src/arrow/record_reader.rs | 28 +++++++++++-------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index 53fcd86bcfb..2fd29a2bbd4 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -16,8 +16,7 @@ // under the License. use std::cmp::{max, min}; -use std::mem::size_of; -use std::mem::{replace, swap}; +use std::mem::{replace, size_of}; use crate::column::{page::PageReader, reader::ColumnReaderImpl}; use crate::data_type::DataType; @@ -162,9 +161,8 @@ impl RecordReader { pub fn consume_def_levels(&mut self) -> Result> { let new_buffer = if let Some(ref mut def_levels_buf) = &mut self.def_levels { let num_left_values = self.values_written - self.num_values; - let mut new_buffer = MutableBuffer::new( - size_of::() * max(MIN_BATCH_SIZE, num_left_values), - ); + // create an empty buffer, as it will be resized below + let mut new_buffer = MutableBuffer::new(0); let num_bytes = num_left_values * size_of::(); let new_len = self.num_values * size_of::(); @@ -190,9 +188,8 @@ impl RecordReader { // TODO: Optimize to reduce the copy let new_buffer = if let Some(ref mut rep_levels_buf) = &mut self.rep_levels { let num_left_values = self.values_written - self.num_values; - let mut new_buffer = MutableBuffer::new( - size_of::() * max(MIN_BATCH_SIZE, num_left_values), - ); + // create an empty buffer, as it will be resized below + let mut new_buffer = MutableBuffer::new(0); let num_bytes = num_left_values * size_of::(); let new_len = self.num_values * size_of::(); @@ -201,8 +198,7 @@ impl RecordReader { let new_rep_levels = new_buffer.data_mut(); let left_rep_levels = &rep_levels_buf.data_mut()[new_len..]; - new_rep_levels[0..num_left_values] - .copy_from_slice(&left_rep_levels[0..num_left_values]); + new_rep_levels[0..num_bytes].copy_from_slice(&left_rep_levels[0..num_bytes]); rep_levels_buf.resize(new_len); @@ -219,17 +215,17 @@ impl RecordReader { pub fn consume_record_data(&mut self) -> Result { // TODO: Optimize to reduce the copy let num_left_values = self.values_written - self.num_values; - let mut new_buffer = MutableBuffer::new(max(MIN_BATCH_SIZE, num_left_values)); - new_buffer.resize(num_left_values * T::get_type_size()); - + // create an empty buffer, as it will be resized below + let mut new_buffer = MutableBuffer::new(0); + let num_bytes = num_left_values * T::get_type_size(); let new_len = self.num_values * T::get_type_size(); + new_buffer.resize(num_bytes); + let new_records = new_buffer.data_mut(); let left_records = &mut self.records.data_mut()[new_len..]; - for idx in 0..num_left_values { - swap(&mut new_records[idx], &mut left_records[idx]); - } + new_records[0..num_bytes].copy_from_slice(&left_records[0..num_bytes]); self.records.resize(new_len); From 5ca2fb1ea78133d4992c71e153b999390d46a38e Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 12 Dec 2020 22:07:46 +0000 Subject: [PATCH 3/4] Fmt. --- rust/parquet/src/arrow/record_reader.rs | 94 +++++++++++-------------- 1 file changed, 41 insertions(+), 53 deletions(-) diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index 2fd29a2bbd4..f8f7a4f01b4 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -295,55 +295,46 @@ impl RecordReader { let values_written = self.values_written; // Convert mutable buffer spaces to mutable slices - let (prefix, values, suffix) = unsafe { self.records.data_mut().align_to_mut::() }; + let (prefix, values, suffix) = + unsafe { self.records.data_mut().align_to_mut::() }; assert!(prefix.is_empty() && suffix.is_empty()); let values = &mut values[values_written..]; - let def_levels = self - .def_levels + let def_levels = self.def_levels.as_mut().map(|buf| { + let (prefix, def_levels, suffix) = + unsafe { buf.data_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + &mut def_levels[values_written..] + }); + + let rep_levels = self.rep_levels.as_mut().map(|buf| { + let (prefix, rep_levels, suffix) = + unsafe { buf.data_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + &mut rep_levels[values_written..] + }); + + let (values_read, levels_read) = self + .column_reader .as_mut() - .map(|buf| { - let (prefix, def_levels, suffix) = unsafe { buf.data_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - &mut def_levels[values_written..] - }); - - let rep_levels = self - .rep_levels - .as_mut() - .map(|buf| { - let (prefix, rep_levels, suffix) = unsafe { buf.data_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - &mut rep_levels[values_written..] - }); - - let (values_read, levels_read) = - self.column_reader.as_mut().unwrap().read_batch( - batch_size, - def_levels, - rep_levels, - values, - )?; + .unwrap() + .read_batch(batch_size, def_levels, rep_levels, values)?; // get new references for the def levels. - let def_levels = self - .def_levels - .as_ref() - .map(|buf| { - let (prefix, def_levels, suffix) = unsafe { buf.data().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - &def_levels[values_written..] - }); + let def_levels = self.def_levels.as_ref().map(|buf| { + let (prefix, def_levels, suffix) = unsafe { buf.data().align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + &def_levels[values_written..] + }); let max_def_level = self.column_desc.max_def_level(); if values_read < levels_read { - let def_levels = def_levels - .ok_or_else(|| { - general_err!( - "Definition levels should exist when data is less than levels!" - ) - })?; + let def_levels = def_levels.ok_or_else(|| { + general_err!( + "Definition levels should exist when data is less than levels!" + ) + })?; // Fill spaces in column data with default values let mut values_pos = values_read; @@ -366,12 +357,11 @@ impl RecordReader { // Fill in bitmap data if let Some(null_buffer) = self.null_bitmap.as_mut() { - let def_levels = def_levels - .ok_or_else(|| { - general_err!( - "Definition levels should exist when data is less than levels!" - ) - })?; + let def_levels = def_levels.ok_or_else(|| { + general_err!( + "Definition levels should exist when data is less than levels!" + ) + })?; (0..levels_read).try_for_each(|idx| { null_buffer.append(def_levels[idx] == max_def_level) })?; @@ -385,14 +375,12 @@ impl RecordReader { /// Split values into records according repetition definition and returns number of /// records read. fn split_records(&mut self, records_to_read: usize) -> Result { - let rep_levels = self - .rep_levels - .as_mut() - .map(|buf| { - let (prefix, rep_levels, suffix) = unsafe { buf.data_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - rep_levels - }); + let rep_levels = self.rep_levels.as_mut().map(|buf| { + let (prefix, rep_levels, suffix) = + unsafe { buf.data_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + rep_levels + }); match rep_levels { Some(buf) => { From 55974f660c15e4386fdaed580cd9899fd1add337 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 12 Dec 2020 22:11:34 +0000 Subject: [PATCH 4/4] Further simplifications. --- rust/parquet/src/arrow/record_reader.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index f8f7a4f01b4..14908de6d43 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -49,11 +49,6 @@ pub struct RecordReader { in_middle_of_record: bool, } -#[derive(Debug)] -struct FatPtr<'a, T> { - ptr: &'a mut [T], -} - impl RecordReader { pub fn new(column_schema: ColumnDescPtr) -> Self { let (def_levels, null_map) = if column_schema.max_def_level() > 0 { @@ -375,9 +370,8 @@ impl RecordReader { /// Split values into records according repetition definition and returns number of /// records read. fn split_records(&mut self, records_to_read: usize) -> Result { - let rep_levels = self.rep_levels.as_mut().map(|buf| { - let (prefix, rep_levels, suffix) = - unsafe { buf.data_mut().align_to_mut::() }; + let rep_levels = self.rep_levels.as_ref().map(|buf| { + let (prefix, rep_levels, suffix) = unsafe { buf.data().align_to::() }; assert!(prefix.is_empty() && suffix.is_empty()); rep_levels });