1616// under the License.
1717
1818use std:: cmp:: { max, min} ;
19- use std:: mem:: align_of;
20- use std:: mem:: size_of;
21- use std:: mem:: { replace, swap} ;
22- use std:: slice;
19+ use std:: mem:: { replace, size_of} ;
2320
2421use crate :: column:: { page:: PageReader , reader:: ColumnReaderImpl } ;
2522use crate :: data_type:: DataType ;
@@ -28,7 +25,6 @@ use crate::schema::types::ColumnDescPtr;
2825use arrow:: array:: BooleanBufferBuilder ;
2926use arrow:: bitmap:: Bitmap ;
3027use arrow:: buffer:: { Buffer , MutableBuffer } ;
31- use arrow:: memory;
3228
3329const MIN_BATCH_SIZE : usize = 1024 ;
3430
@@ -53,45 +49,6 @@ pub struct RecordReader<T: DataType> {
5349 in_middle_of_record : bool ,
5450}
5551
56- #[ derive( Debug ) ]
57- struct FatPtr < ' a , T > {
58- ptr : & ' a mut [ T ] ,
59- }
60-
61- impl < ' a , T > FatPtr < ' a , T > {
62- fn new ( ptr : & ' a mut [ T ] ) -> Self {
63- Self { ptr }
64- }
65-
66- fn with_offset ( buf : & ' a mut MutableBuffer , offset : usize ) -> Self {
67- FatPtr :: < T > :: with_offset_and_size ( buf, offset, size_of :: < T > ( ) )
68- }
69-
70- fn with_offset_and_size (
71- buf : & ' a mut MutableBuffer ,
72- offset : usize ,
73- type_size : usize ,
74- ) -> Self {
75- assert ! ( align_of:: <T >( ) <= memory:: ALIGNMENT ) ;
76- // TODO Prevent this from being called with non primitive types (like `Box<A>`)
77- unsafe {
78- FatPtr :: new ( slice:: from_raw_parts_mut (
79- & mut * ( buf. raw_data ( ) as * mut T ) . add ( offset) ,
80- buf. capacity ( ) / type_size - offset,
81- ) )
82- }
83- }
84-
85- fn to_slice ( & self ) -> & [ T ] {
86- self . ptr
87- }
88-
89- #[ allow( clippy:: wrong_self_convention) ]
90- fn to_slice_mut ( & mut self ) -> & mut [ T ] {
91- self . ptr
92- }
93- }
94-
9552impl < T : DataType > RecordReader < T > {
9653 pub fn new ( column_schema : ColumnDescPtr ) -> Self {
9754 let ( def_levels, null_map) = if column_schema. max_def_level ( ) > 0 {
@@ -199,21 +156,19 @@ impl<T: DataType> RecordReader<T> {
199156 pub fn consume_def_levels ( & mut self ) -> Result < Option < Buffer > > {
200157 let new_buffer = if let Some ( ref mut def_levels_buf) = & mut self . def_levels {
201158 let num_left_values = self . values_written - self . num_values ;
202- let mut new_buffer = MutableBuffer :: new (
203- size_of :: < i16 > ( ) * max ( MIN_BATCH_SIZE , num_left_values) ,
204- ) ;
205- new_buffer. resize ( num_left_values * size_of :: < i16 > ( ) ) ;
159+ // create an empty buffer, as it will be resized below
160+ let mut new_buffer = MutableBuffer :: new ( 0 ) ;
161+ let num_bytes = num_left_values * size_of :: < i16 > ( ) ;
162+ let new_len = self . num_values * size_of :: < i16 > ( ) ;
163+
164+ new_buffer. resize ( num_bytes) ;
206165
207- let mut new_def_levels = FatPtr :: < i16 > :: with_offset ( & mut new_buffer, 0 ) ;
208- let new_def_levels = new_def_levels. to_slice_mut ( ) ;
209- let left_def_levels =
210- FatPtr :: < i16 > :: with_offset ( def_levels_buf, self . num_values ) ;
211- let left_def_levels = left_def_levels. to_slice ( ) ;
166+ let new_def_levels = new_buffer. data_mut ( ) ;
167+ let left_def_levels = & def_levels_buf. data_mut ( ) [ new_len..] ;
212168
213- new_def_levels[ 0 ..num_left_values]
214- . copy_from_slice ( & left_def_levels[ 0 ..num_left_values] ) ;
169+ new_def_levels[ 0 ..num_bytes] . copy_from_slice ( & left_def_levels[ 0 ..num_bytes] ) ;
215170
216- def_levels_buf. resize ( self . num_values * size_of :: < i16 > ( ) ) ;
171+ def_levels_buf. resize ( new_len ) ;
217172 Some ( new_buffer)
218173 } else {
219174 None
@@ -228,21 +183,19 @@ impl<T: DataType> RecordReader<T> {
228183 // TODO: Optimize to reduce the copy
229184 let new_buffer = if let Some ( ref mut rep_levels_buf) = & mut self . rep_levels {
230185 let num_left_values = self . values_written - self . num_values ;
231- let mut new_buffer = MutableBuffer :: new (
232- size_of :: < i16 > ( ) * max ( MIN_BATCH_SIZE , num_left_values ) ,
233- ) ;
234- new_buffer . resize ( num_left_values * size_of :: < i16 > ( ) ) ;
186+ // create an empty buffer, as it will be resized below
187+ let mut new_buffer = MutableBuffer :: new ( 0 ) ;
188+ let num_bytes = num_left_values * size_of :: < i16 > ( ) ;
189+ let new_len = self . num_values * size_of :: < i16 > ( ) ;
235190
236- let mut new_rep_levels = FatPtr :: < i16 > :: with_offset ( & mut new_buffer, 0 ) ;
237- let new_rep_levels = new_rep_levels. to_slice_mut ( ) ;
238- let left_rep_levels =
239- FatPtr :: < i16 > :: with_offset ( rep_levels_buf, self . num_values ) ;
240- let left_rep_levels = left_rep_levels. to_slice ( ) ;
191+ new_buffer. resize ( num_bytes) ;
241192
242- new_rep_levels[ 0 ..num_left_values ]
243- . copy_from_slice ( & left_rep_levels [ 0 ..num_left_values ] ) ;
193+ let new_rep_levels = new_buffer . data_mut ( ) ;
194+ let left_rep_levels = & rep_levels_buf . data_mut ( ) [ new_len.. ] ;
244195
245- rep_levels_buf. resize ( self . num_values * size_of :: < i16 > ( ) ) ;
196+ new_rep_levels[ 0 ..num_bytes] . copy_from_slice ( & left_rep_levels[ 0 ..num_bytes] ) ;
197+
198+ rep_levels_buf. resize ( new_len) ;
246199
247200 Some ( new_buffer)
248201 } else {
@@ -257,24 +210,19 @@ impl<T: DataType> RecordReader<T> {
257210 pub fn consume_record_data ( & mut self ) -> Result < Buffer > {
258211 // TODO: Optimize to reduce the copy
259212 let num_left_values = self . values_written - self . num_values ;
260- let mut new_buffer = MutableBuffer :: new ( max ( MIN_BATCH_SIZE , num_left_values) ) ;
261- new_buffer. resize ( num_left_values * T :: get_type_size ( ) ) ;
262-
263- let mut new_records =
264- FatPtr :: < T :: T > :: with_offset_and_size ( & mut new_buffer, 0 , T :: get_type_size ( ) ) ;
265- let new_records = new_records. to_slice_mut ( ) ;
266- let mut left_records = FatPtr :: < T :: T > :: with_offset_and_size (
267- & mut self . records ,
268- self . num_values ,
269- T :: get_type_size ( ) ,
270- ) ;
271- let left_records = left_records. to_slice_mut ( ) ;
213+ // create an empty buffer, as it will be resized below
214+ let mut new_buffer = MutableBuffer :: new ( 0 ) ;
215+ let num_bytes = num_left_values * T :: get_type_size ( ) ;
216+ let new_len = self . num_values * T :: get_type_size ( ) ;
272217
273- for idx in 0 ..num_left_values {
274- swap ( & mut new_records[ idx] , & mut left_records[ idx] ) ;
275- }
218+ new_buffer. resize ( num_bytes) ;
219+
220+ let new_records = new_buffer. data_mut ( ) ;
221+ let left_records = & mut self . records . data_mut ( ) [ new_len..] ;
276222
277- self . records . resize ( self . num_values * T :: get_type_size ( ) ) ;
223+ new_records[ 0 ..num_bytes] . copy_from_slice ( & left_records[ 0 ..num_bytes] ) ;
224+
225+ self . records . resize ( new_len) ;
278226
279227 Ok ( replace ( & mut self . records , new_buffer) . freeze ( ) )
280228 }
@@ -331,70 +279,71 @@ impl<T: DataType> RecordReader<T> {
331279 fn read_one_batch ( & mut self , batch_size : usize ) -> Result < usize > {
332280 // Reserve spaces
333281 self . records
334- . reserve ( self . records . len ( ) + batch_size * T :: get_type_size ( ) ) ;
282+ . resize ( self . records . len ( ) + batch_size * T :: get_type_size ( ) ) ;
335283 if let Some ( ref mut buf) = self . rep_levels {
336- buf. reserve ( buf. len ( ) + batch_size * size_of :: < i16 > ( ) ) ;
284+ buf. resize ( buf. len ( ) + batch_size * size_of :: < i16 > ( ) ) ;
337285 }
338286 if let Some ( ref mut buf) = self . def_levels {
339- buf. reserve ( buf. len ( ) + batch_size * size_of :: < i16 > ( ) ) ;
287+ buf. resize ( buf. len ( ) + batch_size * size_of :: < i16 > ( ) ) ;
340288 }
341289
342- // Convert mutable buffer spaces to mutable slices
343- let mut values_buf = FatPtr :: < T :: T > :: with_offset_and_size (
344- & mut self . records ,
345- self . values_written ,
346- T :: get_type_size ( ) ,
347- ) ;
348-
349290 let values_written = self . values_written ;
350- let mut def_levels_buf = self
351- . def_levels
352- . as_mut ( )
353- . map ( |buf| FatPtr :: < i16 > :: with_offset ( buf, values_written) ) ;
354291
355- let mut rep_levels_buf = self
356- . rep_levels
292+ // Convert mutable buffer spaces to mutable slices
293+ let ( prefix, values, suffix) =
294+ unsafe { self . records . data_mut ( ) . align_to_mut :: < T :: T > ( ) } ;
295+ assert ! ( prefix. is_empty( ) && suffix. is_empty( ) ) ;
296+ let values = & mut values[ values_written..] ;
297+
298+ let def_levels = self . def_levels . as_mut ( ) . map ( |buf| {
299+ let ( prefix, def_levels, suffix) =
300+ unsafe { buf. data_mut ( ) . align_to_mut :: < i16 > ( ) } ;
301+ assert ! ( prefix. is_empty( ) && suffix. is_empty( ) ) ;
302+ & mut def_levels[ values_written..]
303+ } ) ;
304+
305+ let rep_levels = self . rep_levels . as_mut ( ) . map ( |buf| {
306+ let ( prefix, rep_levels, suffix) =
307+ unsafe { buf. data_mut ( ) . align_to_mut :: < i16 > ( ) } ;
308+ assert ! ( prefix. is_empty( ) && suffix. is_empty( ) ) ;
309+ & mut rep_levels[ values_written..]
310+ } ) ;
311+
312+ let ( values_read, levels_read) = self
313+ . column_reader
357314 . as_mut ( )
358- . map ( |buf| FatPtr :: < i16 > :: with_offset ( buf, values_written) ) ;
315+ . unwrap ( )
316+ . read_batch ( batch_size, def_levels, rep_levels, values) ?;
359317
360- let ( values_read, levels_read) =
361- self . column_reader . as_mut ( ) . unwrap ( ) . read_batch (
362- batch_size,
363- def_levels_buf. as_mut ( ) . map ( |ptr| ptr. to_slice_mut ( ) ) ,
364- rep_levels_buf. as_mut ( ) . map ( |ptr| ptr. to_slice_mut ( ) ) ,
365- values_buf. to_slice_mut ( ) ,
366- ) ?;
318+ // get new references for the def levels.
319+ let def_levels = self . def_levels . as_ref ( ) . map ( |buf| {
320+ let ( prefix, def_levels, suffix) = unsafe { buf. data ( ) . align_to :: < i16 > ( ) } ;
321+ assert ! ( prefix. is_empty( ) && suffix. is_empty( ) ) ;
322+ & def_levels[ values_written..]
323+ } ) ;
367324
368325 let max_def_level = self . column_desc . max_def_level ( ) ;
369326
370327 if values_read < levels_read {
371- // This means that there are null values in column data
372- // TODO: Move this into ColumnReader
373-
374- let values_buf = values_buf. to_slice_mut ( ) ;
375-
376- let def_levels_buf = def_levels_buf
377- . as_mut ( )
378- . map ( |ptr| ptr. to_slice_mut ( ) )
379- . ok_or_else ( || {
380- general_err ! (
381- "Definition levels should exist when data is less than levels!"
382- )
383- } ) ?;
328+ let def_levels = def_levels. ok_or_else ( || {
329+ general_err ! (
330+ "Definition levels should exist when data is less than levels!"
331+ )
332+ } ) ?;
384333
385334 // Fill spaces in column data with default values
386335 let mut values_pos = values_read;
387336 let mut level_pos = levels_read;
388337
389338 while level_pos > values_pos {
390- if def_levels_buf [ level_pos - 1 ] == max_def_level {
339+ if def_levels [ level_pos - 1 ] == max_def_level {
391340 // This values is not empty
392341 // We use swap rather than assign here because T::T doesn't
393342 // implement Copy
394- values_buf . swap ( level_pos - 1 , values_pos - 1 ) ;
343+ values . swap ( level_pos - 1 , values_pos - 1 ) ;
395344 values_pos -= 1 ;
396345 } else {
397- values_buf [ level_pos - 1 ] = T :: T :: default ( ) ;
346+ values [ level_pos - 1 ] = T :: T :: default ( ) ;
398347 }
399348
400349 level_pos -= 1 ;
@@ -403,16 +352,13 @@ impl<T: DataType> RecordReader<T> {
403352
404353 // Fill in bitmap data
405354 if let Some ( null_buffer) = self . null_bitmap . as_mut ( ) {
406- let def_levels_buf = def_levels_buf
407- . as_mut ( )
408- . map ( |ptr| ptr. to_slice_mut ( ) )
409- . ok_or_else ( || {
410- general_err ! (
411- "Definition levels should exist when data is less than levels!"
412- )
413- } ) ?;
355+ let def_levels = def_levels. ok_or_else ( || {
356+ general_err ! (
357+ "Definition levels should exist when data is less than levels!"
358+ )
359+ } ) ?;
414360 ( 0 ..levels_read) . try_for_each ( |idx| {
415- null_buffer. append ( def_levels_buf [ idx] == max_def_level)
361+ null_buffer. append ( def_levels [ idx] == max_def_level)
416362 } ) ?;
417363 }
418364
@@ -424,13 +370,13 @@ impl<T: DataType> RecordReader<T> {
424370 /// Split values into records according repetition definition and returns number of
425371 /// records read.
426372 fn split_records ( & mut self , records_to_read : usize ) -> Result < usize > {
427- let rep_levels_buf = self
428- . rep_levels
429- . as_mut ( )
430- . map ( |buf| FatPtr :: < i16 > :: with_offset ( buf , 0 ) ) ;
431- let rep_levels_buf = rep_levels_buf . as_ref ( ) . map ( |x| x . to_slice ( ) ) ;
373+ let rep_levels = self . rep_levels . as_ref ( ) . map ( |buf| {
374+ let ( prefix , rep_levels, suffix ) = unsafe { buf . data ( ) . align_to :: < i16 > ( ) } ;
375+ assert ! ( prefix . is_empty ( ) && suffix . is_empty ( ) ) ;
376+ rep_levels
377+ } ) ;
432378
433- match rep_levels_buf {
379+ match rep_levels {
434380 Some ( buf) => {
435381 let mut records_read = 0 ;
436382
0 commit comments