Skip to content

Commit eb81c81

Browse files
jorgecarleitaombrubeck
authored andcommitted
ARROW-11291: [Rust] Add extend to MutableBuffer (-20% for arithmetic, -97% for length)
# Rational Rust forbids safely accessing uninitialized memory because it is undefined behavior. However, when building `Buffer`s, it is important to be able to _write_ to uninitialized memory regions, thereby avoiding the need to write _something_ to it before using it. Currently, all our initializations are zeroed, which is expensive. #9076 modifies our allocator to allocate uninitialized regions. However, by itself, this is not useful if we do not offer any methods to write to those (uninitialized) regions. # This PR This PR is built on top of #9076 and introduces methods to extend a `MutableBuffer` from an iterator, thereby offering an API to efficiently grow `MutableBuffer` without having to initialize memory regions with zeros (i.e. without `with_bitset` and the like). This PR also introduces methods to create a `Buffer` from an iterator and a `trusted_len` iterator. The design is inspired in `Vec`, with the catch that we use stable Rust (i.e. no trait specialization, no `TrustedLen`), and thus have to expose a bit more methods than what `Vec` exposes. This means that we can't use that (nicer) API for trustedlen iterators based on `collect()`. Note that, as before, there are still `unsafe` uses of the `MutableBuffer` derived from the fact that it is not a generic over a type `T` (and thus people can mix types and grow the buffer in unsound ways). Special thanks to @mbrubeck for all the help on this, originally discussed [here](https://users.rust-lang.org/t/collect-for-exactsizediterator/54367/6). ```bash git checkout master cargo bench --bench arithmetic_kernels git checkout length_faster cargo bench --bench arithmetic_kernels git checkout 16bc7200f3baa6e526aea7135c60dcc949c9b592 cargo bench --bench length_kernel git checkout length_faster ``` ``` Switched to branch 'length_faster' (use "git pull" to merge the remote branch into yours) Compiling arrow v3.0.0-SNAPSHOT (/Users/jorgecarleitao/projects/arrow/rust/arrow) Finished bench [optimized] target(s) in 1m 02s Running /Users/jorgecarleitao/projects/arrow/rust/target/release/deps/arithmetic_kernels-ec2cc20ce07d9b83 Gnuplot not found, using plotters backend add 512 time: [522.24 ns 523.67 ns 525.26 ns] change: [-21.738% -20.960% -20.233%] (p = 0.00 < 0.05) Performance has improved. Found 12 outliers among 100 measurements (12.00%) 9 (9.00%) high mild 3 (3.00%) high severe subtract 512 time: [503.18 ns 504.93 ns 506.81 ns] change: [-21.741% -21.075% -20.308%] (p = 0.00 < 0.05) Performance has improved. Found 4 outliers among 100 measurements (4.00%) 2 (2.00%) high mild 2 (2.00%) high severe multiply 512 time: [508.25 ns 512.04 ns 516.06 ns] change: [-22.569% -21.946% -21.305%] (p = 0.00 < 0.05) Performance has improved. Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild divide 512 time: [1.4711 us 1.4753 us 1.4799 us] change: [-24.783% -23.305% -22.176%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 2 (2.00%) high mild 3 (3.00%) high severe limit 512, 512 time: [373.47 ns 377.76 ns 382.21 ns] change: [+3.3055% +4.4193% +5.5923%] (p = 0.00 < 0.05) Performance has regressed. add_nulls_512 time: [502.94 ns 504.51 ns 506.28 ns] change: [-24.876% -24.299% -23.709%] (p = 0.00 < 0.05) Performance has improved. Found 2 outliers among 100 measurements (2.00%) 2 (2.00%) high severe divide_nulls_512 time: [1.4843 us 1.4931 us 1.5053 us] change: [-22.968% -22.243% -21.420%] (p = 0.00 < 0.05) Performance has improved. Found 24 outliers among 100 measurements (24.00%) 15 (15.00%) low mild 1 (1.00%) high mild 8 (8.00%) high severe ``` Length (against the commit that fixes the bench, `16bc7200f3baa6e526aea7135c60dcc949c9b592`, not master): ``` length time: [1.5379 us 1.5408 us 1.5437 us] change: [-97.311% -97.295% -97.278%] (p = 0.00 < 0.05) Performance has improved. Found 12 outliers among 100 measurements (12.00%) 1 (1.00%) low severe 4 (4.00%) low mild 3 (3.00%) high mild 4 (4.00%) high severe ``` Closes #9235 from jorgecarleitao/length_faster Lead-authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com> Co-authored-by: Matt Brubeck <mbrubeck@limpet.net> Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 6710ddd commit eb81c81

File tree

8 files changed

+298
-61
lines changed

8 files changed

+298
-61
lines changed

rust/arrow/benches/buffer_create.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@ fn mutable_buffer(data: &[Vec<u32>], capacity: usize) -> Buffer {
3939
})
4040
}
4141

42+
fn mutable_buffer_extend(data: &[Vec<u32>], capacity: usize) -> Buffer {
43+
criterion::black_box({
44+
let mut result = MutableBuffer::new(capacity);
45+
46+
data.iter()
47+
.for_each(|vec| result.extend(vec.iter().copied()));
48+
49+
result.into()
50+
})
51+
}
52+
4253
fn from_slice(data: &[Vec<u32>], capacity: usize) -> Buffer {
4354
criterion::black_box({
4455
let mut a = Vec::<u32>::with_capacity(capacity);
@@ -72,6 +83,10 @@ fn benchmark(c: &mut Criterion) {
7283

7384
c.bench_function("mutable", |b| b.iter(|| mutable_buffer(&data, 0)));
7485

86+
c.bench_function("mutable extend", |b| {
87+
b.iter(|| mutable_buffer_extend(&data, 0))
88+
});
89+
7590
c.bench_function("mutable prepared", |b| {
7691
b.iter(|| mutable_buffer(&data, byte_cap))
7792
});

rust/arrow/benches/length_kernel.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,23 @@ extern crate arrow;
2424
use arrow::array::*;
2525
use arrow::compute::kernels::length::length;
2626

27-
fn bench_length() {
27+
fn bench_length(array: &StringArray) {
28+
criterion::black_box(length(array).unwrap());
29+
}
30+
31+
fn add_benchmark(c: &mut Criterion) {
2832
fn double_vec<T: Clone>(v: Vec<T>) -> Vec<T> {
2933
[&v[..], &v[..]].concat()
3034
}
3135

3236
// double ["hello", " ", "world", "!"] 10 times
3337
let mut values = vec!["one", "on", "o", ""];
34-
let mut expected = vec![3, 2, 1, 0];
3538
for _ in 0..10 {
3639
values = double_vec(values);
37-
expected = double_vec(expected);
3840
}
3941
let array = StringArray::from(values);
4042

41-
criterion::black_box(length(&array).unwrap());
42-
}
43-
44-
fn add_benchmark(c: &mut Criterion) {
45-
c.bench_function("length", |b| b.iter(bench_length));
43+
c.bench_function("length", |b| b.iter(|| bench_length(&array)));
4644
}
4745

4846
criterion_group!(benches, add_benchmark);

rust/arrow/src/array/transform/fixed_binary.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend {
4646
let bytes = &values[i * size..(i + 1) * size];
4747
values_buffer.extend_from_slice(bytes);
4848
} else {
49-
values_buffer.extend(size);
49+
values_buffer.extend_zeros(size);
5050
}
5151
})
5252
},
@@ -61,5 +61,5 @@ pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
6161
};
6262

6363
let values_buffer = &mut mutable.buffer1;
64-
values_buffer.extend(len * size);
64+
values_buffer.extend_zeros(len * size);
6565
}

rust/arrow/src/array/transform/primitive.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,5 @@ pub(super) fn extend_nulls<T: ArrowNativeType>(
3636
mutable: &mut _MutableArrayData,
3737
len: usize,
3838
) {
39-
mutable.buffer1.extend(len * size_of::<T>());
39+
mutable.buffer1.extend_zeros(len * size_of::<T>());
4040
}

rust/arrow/src/buffer.rs

Lines changed: 219 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,19 @@ use packed_simd::u8x64;
2323

2424
use crate::{
2525
bytes::{Bytes, Deallocation},
26-
datatypes::ToByteSlice,
26+
datatypes::{ArrowNativeType, ToByteSlice},
2727
ffi,
2828
};
2929

30-
use std::convert::AsRef;
3130
use std::fmt::Debug;
31+
use std::iter::FromIterator;
3232
use std::ops::{BitAnd, BitOr, Not};
3333
use std::ptr::NonNull;
3434
use std::sync::Arc;
35+
use std::{convert::AsRef, usize};
3536

3637
#[cfg(feature = "avx512")]
3738
use crate::arch::avx512::*;
38-
use crate::datatypes::ArrowNativeType;
3939
use crate::error::{ArrowError, Result};
4040
use crate::memory;
4141
use crate::util::bit_chunk_iterator::BitChunks;
@@ -697,6 +697,7 @@ unsafe impl Sync for Buffer {}
697697
unsafe impl Send for Buffer {}
698698

699699
impl From<MutableBuffer> for Buffer {
700+
#[inline]
700701
fn from(buffer: MutableBuffer) -> Self {
701702
buffer.into_buffer()
702703
}
@@ -727,13 +728,14 @@ pub struct MutableBuffer {
727728

728729
impl MutableBuffer {
729730
/// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`.
731+
#[inline]
730732
pub fn new(capacity: usize) -> Self {
731-
let new_capacity = bit_util::round_upto_multiple_of_64(capacity);
732-
let ptr = memory::allocate_aligned(new_capacity);
733+
let capacity = bit_util::round_upto_multiple_of_64(capacity);
734+
let ptr = memory::allocate_aligned(capacity);
733735
Self {
734736
data: ptr,
735737
len: 0,
736-
capacity: new_capacity,
738+
capacity,
737739
}
738740
}
739741

@@ -810,10 +812,14 @@ impl MutableBuffer {
810812
pub fn reserve(&mut self, additional: usize) {
811813
let required_cap = self.len + additional;
812814
if required_cap > self.capacity {
813-
let new_capacity = bit_util::round_upto_multiple_of_64(required_cap);
814-
let new_capacity = std::cmp::max(new_capacity, self.capacity * 2);
815-
self.data =
816-
unsafe { memory::reallocate(self.data, self.capacity, new_capacity) };
815+
// JUSTIFICATION
816+
// Benefit
817+
// necessity
818+
// Soundness
819+
// `self.data` is valid for `self.capacity`.
820+
let (ptr, new_capacity) =
821+
unsafe { reallocate(self.data, self.capacity, required_cap) };
822+
self.data = ptr;
817823
self.capacity = new_capacity;
818824
}
819825
}
@@ -899,6 +905,7 @@ impl MutableBuffer {
899905
self.into_buffer()
900906
}
901907

908+
#[inline]
902909
fn into_buffer(self) -> Buffer {
903910
let buffer_data = unsafe {
904911
Bytes::new(self.data, self.len, Deallocation::Native(self.capacity))
@@ -963,11 +970,167 @@ impl MutableBuffer {
963970

964971
/// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed.
965972
#[inline]
966-
pub fn extend(&mut self, additional: usize) {
973+
pub fn extend_zeros(&mut self, additional: usize) {
967974
self.resize(self.len + additional, 0);
968975
}
969976
}
970977

978+
/// # Safety
979+
/// `ptr` must be allocated for `old_capacity`.
980+
#[inline]
981+
unsafe fn reallocate(
982+
ptr: NonNull<u8>,
983+
old_capacity: usize,
984+
new_capacity: usize,
985+
) -> (NonNull<u8>, usize) {
986+
let new_capacity = bit_util::round_upto_multiple_of_64(new_capacity);
987+
let new_capacity = std::cmp::max(new_capacity, old_capacity * 2);
988+
let ptr = memory::reallocate(ptr, old_capacity, new_capacity);
989+
(ptr, new_capacity)
990+
}
991+
992+
impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
993+
#[inline]
994+
fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
995+
let iterator = iter.into_iter();
996+
self.extend_from_iter(iterator)
997+
}
998+
}
999+
1000+
impl MutableBuffer {
1001+
#[inline]
1002+
fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
1003+
&mut self,
1004+
mut iterator: I,
1005+
) {
1006+
let size = std::mem::size_of::<T>();
1007+
let (lower, _) = iterator.size_hint();
1008+
let additional = lower * size;
1009+
self.reserve(additional);
1010+
1011+
// this is necessary because of https://github.com/rust-lang/rust/issues/32155
1012+
let mut len = SetLenOnDrop::new(&mut self.len);
1013+
let mut dst = unsafe { self.data.as_ptr().add(len.local_len) as *mut T };
1014+
let capacity = self.capacity;
1015+
1016+
while len.local_len + size <= capacity {
1017+
if let Some(item) = iterator.next() {
1018+
unsafe {
1019+
std::ptr::write(dst, item);
1020+
dst = dst.add(1);
1021+
}
1022+
len.local_len += size;
1023+
} else {
1024+
break;
1025+
}
1026+
}
1027+
drop(len);
1028+
1029+
iterator.for_each(|item| self.push(item));
1030+
}
1031+
}
1032+
1033+
impl Buffer {
1034+
/// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length.
1035+
/// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
1036+
/// # Example
1037+
/// ```
1038+
/// # use arrow::buffer::Buffer;
1039+
/// let v = vec![1u32];
1040+
/// let iter = v.iter().map(|x| x * 2);
1041+
/// let buffer = unsafe { Buffer::from_trusted_len_iter(iter) };
1042+
/// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
1043+
/// ```
1044+
/// # Safety
1045+
/// This method assumes that the iterator's size is correct and is undefined behavior
1046+
/// to use it on an iterator that reports an incorrect length.
1047+
// This implementation is required for two reasons:
1048+
// 1. there is no trait `TrustedLen` in stable rust and therefore
1049+
// we can't specialize `extend` for `TrustedLen` like `Vec` does.
1050+
// 2. `from_trusted_len_iter` is faster.
1051+
pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
1052+
iterator: I,
1053+
) -> Self {
1054+
let (_, upper) = iterator.size_hint();
1055+
let upper = upper.expect("from_trusted_len_iter requires an upper limit");
1056+
let len = upper * std::mem::size_of::<T>();
1057+
1058+
let mut buffer = MutableBuffer::new(len);
1059+
1060+
let mut dst = buffer.data.as_ptr() as *mut T;
1061+
for item in iterator {
1062+
// note how there is no reserve here (compared with `extend_from_iter`)
1063+
std::ptr::write(dst, item);
1064+
dst = dst.add(1);
1065+
}
1066+
assert_eq!(
1067+
dst.offset_from(buffer.data.as_ptr() as *mut T) as usize,
1068+
upper,
1069+
"Trusted iterator length was not accurately reported"
1070+
);
1071+
buffer.len = len;
1072+
buffer.into()
1073+
}
1074+
1075+
/// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length or errors
1076+
/// if any of the items of the iterator is an error.
1077+
/// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
1078+
/// # Safety
1079+
/// This method assumes that the iterator's size is correct and is undefined behavior
1080+
/// to use it on an iterator that reports an incorrect length.
1081+
pub unsafe fn try_from_trusted_len_iter<
1082+
E,
1083+
T: ArrowNativeType,
1084+
I: Iterator<Item = std::result::Result<T, E>>,
1085+
>(
1086+
iterator: I,
1087+
) -> std::result::Result<Self, E> {
1088+
let (_, upper) = iterator.size_hint();
1089+
let upper = upper.expect("try_from_trusted_len_iter requires an upper limit");
1090+
let len = upper * std::mem::size_of::<T>();
1091+
1092+
let mut buffer = MutableBuffer::new(len);
1093+
1094+
let mut dst = buffer.data.as_ptr() as *mut T;
1095+
for item in iterator {
1096+
// note how there is no reserve here (compared with `extend_from_iter`)
1097+
std::ptr::write(dst, item?);
1098+
dst = dst.add(1);
1099+
}
1100+
assert_eq!(
1101+
dst.offset_from(buffer.data.as_ptr() as *mut T) as usize,
1102+
upper,
1103+
"Trusted iterator length was not accurately reported"
1104+
);
1105+
buffer.len = len;
1106+
Ok(buffer.into())
1107+
}
1108+
}
1109+
1110+
impl<T: ArrowNativeType> FromIterator<T> for Buffer {
1111+
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
1112+
let mut iterator = iter.into_iter();
1113+
let size = std::mem::size_of::<T>();
1114+
1115+
// first iteration, which will likely reserve sufficient space for the buffer.
1116+
let mut buffer = match iterator.next() {
1117+
None => MutableBuffer::new(0),
1118+
Some(element) => {
1119+
let (lower, _) = iterator.size_hint();
1120+
let mut buffer = MutableBuffer::new(lower.saturating_add(1) * size);
1121+
unsafe {
1122+
std::ptr::write(buffer.as_mut_ptr() as *mut T, element);
1123+
buffer.len = size;
1124+
}
1125+
buffer
1126+
}
1127+
};
1128+
1129+
buffer.extend_from_iter(iterator);
1130+
buffer.into()
1131+
}
1132+
}
1133+
9711134
impl std::ops::Deref for MutableBuffer {
9721135
type Target = [u8];
9731136

@@ -1003,6 +1166,28 @@ impl PartialEq for MutableBuffer {
10031166
unsafe impl Sync for MutableBuffer {}
10041167
unsafe impl Send for MutableBuffer {}
10051168

1169+
struct SetLenOnDrop<'a> {
1170+
len: &'a mut usize,
1171+
local_len: usize,
1172+
}
1173+
1174+
impl<'a> SetLenOnDrop<'a> {
1175+
#[inline]
1176+
fn new(len: &'a mut usize) -> Self {
1177+
SetLenOnDrop {
1178+
local_len: *len,
1179+
len,
1180+
}
1181+
}
1182+
}
1183+
1184+
impl Drop for SetLenOnDrop<'_> {
1185+
#[inline]
1186+
fn drop(&mut self) {
1187+
*self.len = self.local_len;
1188+
}
1189+
}
1190+
10061191
#[cfg(test)]
10071192
mod tests {
10081193
use std::thread;
@@ -1172,6 +1357,29 @@ mod tests {
11721357
assert_eq!(b"hello arrow", buf.as_slice());
11731358
}
11741359

1360+
#[test]
1361+
fn mutable_extend_from_iter() {
1362+
let mut buf = MutableBuffer::new(0);
1363+
buf.extend(vec![1u32, 2]);
1364+
assert_eq!(8, buf.len());
1365+
assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1366+
1367+
buf.extend(vec![3u32, 4]);
1368+
assert_eq!(16, buf.len());
1369+
assert_eq!(
1370+
&[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0],
1371+
buf.as_slice()
1372+
);
1373+
}
1374+
1375+
#[test]
1376+
fn test_from_trusted_len_iter() {
1377+
let iter = vec![1u32, 2].into_iter();
1378+
let buf = unsafe { Buffer::from_trusted_len_iter(iter) };
1379+
assert_eq!(8, buf.len());
1380+
assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1381+
}
1382+
11751383
#[test]
11761384
fn test_mutable_reserve() {
11771385
let mut buf = MutableBuffer::new(1);

rust/arrow/src/bytes.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ impl Bytes {
7979
///
8080
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
8181
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
82+
#[inline]
8283
pub unsafe fn new(
8384
ptr: std::ptr::NonNull<u8>,
8485
len: usize,

0 commit comments

Comments
 (0)