Skip to content

Commit e10d606

Browse files
Dandandanjorgecarleitaoalambkounevi-me
committed
ARROW-11362:[Rust][DataFusion] Use iterator APIs in to_array_of_size to improve performance
This function `to_array_of_size` is about 8.3% of total instructions in the db-benchmark (aggregation) queries. This uses the PR #9293 The case of converting an int32 to an array improved by ~5x according to the microbenchmark: ``` to_array_of_size 100000 time: [55.501 us 55.627 us 55.809 us] change: [-82.457% -82.384% -82.299%] (p = 0.00 < 0.05) Performance has improved. Found 3 outliers among 100 measurements (3.00%) 2 (2.00%) high mild 1 (1.00%) high severe ``` And on TCPH query 1 (SF=1, 16 partitions). PR: ``` Query 1 iteration 0 took 90.8 ms Query 1 iteration 1 took 106.6 ms Query 1 iteration 2 took 101.1 ms Query 1 iteration 3 took 101.5 ms Query 1 iteration 4 took 96.9 ms Query 1 iteration 5 took 100.3 ms Query 1 iteration 6 took 99.6 ms Query 1 iteration 7 took 100.4 ms Query 1 iteration 8 took 104.2 ms Query 1 iteration 9 took 100.3 ms Query 1 avg time: 100.18 ms ``` Master: ``` Query 1 iteration 0 took 121.1 ms Query 1 iteration 1 took 123.4 ms Query 1 iteration 2 took 121.0 ms Query 1 iteration 3 took 121.0 ms Query 1 iteration 4 took 123.0 ms Query 1 iteration 5 took 121.7 ms Query 1 iteration 6 took 121.7 ms Query 1 iteration 7 took 120.2 ms Query 1 iteration 8 took 119.7 ms Query 1 iteration 9 took 121.4 ms Query 1 avg time: 121.43 ms ``` Closes #9305 from Dandandan/to_array_of_size_perf Lead-authored-by: Heres, Daniel <danielheres@gmail.com> Co-authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: Sutou Kouhei <kou@clear-code.com> Co-authored-by: Neville Dipale <nevilledips@gmail.com> Co-authored-by: Dmitry Patsura <zaets28rus@gmail.com> Co-authored-by: Yibo Cai <yibo.cai@arm.com> Co-authored-by: Daniël Heres <danielheres@gmail.com> Co-authored-by: Kenta Murata <mrkn@mrkn.jp> Co-authored-by: Mahmut Bulut <vertexclique@gmail.com> Co-authored-by: Yordan Pavlov <yordan.pavlov@outlook.com> Co-authored-by: Max Burke <max@urbanlogiq.com> Co-authored-by: Ryan Jennings <ryan@ryanj.net> Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com> Co-authored-by: Jörn Horstmann <joern.horstmann@signavio.com> Co-authored-by: Krisztián Szűcs <szucs.krisztian@gmail.com> Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Co-authored-by: Johannes Müller <JohannesMueller@fico.com> Co-authored-by: mqy <meng.qingyou@gmail.com> Co-authored-by: Maarten A. Breddels <maartenbreddels@gmail.com> Co-authored-by: Antoine Pitrou <antoine@python.org> Co-authored-by: Matt Brubeck <mbrubeck@limpet.net> Co-authored-by: Weston Pace <weston.pace@gmail.com> Signed-off-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
1 parent 71dab27 commit e10d606

File tree

3 files changed

+139
-24
lines changed

3 files changed

+139
-24
lines changed

rust/datafusion/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,7 @@ harness = false
8888
[[bench]]
8989
name = "filter_query_sql"
9090
harness = false
91+
92+
[[bench]]
93+
name = "scalar"
94+
harness = false

rust/datafusion/benches/scalar.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use criterion::{criterion_group, criterion_main, Criterion};
19+
use datafusion::scalar::ScalarValue;
20+
21+
fn criterion_benchmark(c: &mut Criterion) {
22+
c.bench_function("to_array_of_size 100000", |b| {
23+
let scalar = ScalarValue::Int32(Some(100));
24+
25+
b.iter(|| assert_eq!(scalar.to_array_of_size(100000).null_count(), 0))
26+
});
27+
}
28+
29+
criterion_group!(benches, criterion_benchmark);
30+
criterion_main!(benches);

rust/datafusion/src/scalar.rs

Lines changed: 105 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! This module provides ScalarValue, an enum that can be used for storage of single elements
1919
20-
use std::{convert::TryFrom, fmt, sync::Arc};
20+
use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc};
2121

2222
use arrow::array::{
2323
Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder,
@@ -205,28 +205,104 @@ impl ScalarValue {
205205
ScalarValue::Boolean(e) => {
206206
Arc::new(BooleanArray::from(vec![*e; size])) as ArrayRef
207207
}
208-
ScalarValue::Float64(e) => {
209-
Arc::new(Float64Array::from(vec![*e; size])) as ArrayRef
210-
}
211-
ScalarValue::Float32(e) => Arc::new(Float32Array::from(vec![*e; size])),
212-
ScalarValue::Int8(e) => Arc::new(Int8Array::from(vec![*e; size])),
213-
ScalarValue::Int16(e) => Arc::new(Int16Array::from(vec![*e; size])),
214-
ScalarValue::Int32(e) => Arc::new(Int32Array::from(vec![*e; size])),
215-
ScalarValue::Int64(e) => Arc::new(Int64Array::from(vec![*e; size])),
216-
ScalarValue::UInt8(e) => Arc::new(UInt8Array::from(vec![*e; size])),
217-
ScalarValue::UInt16(e) => Arc::new(UInt16Array::from(vec![*e; size])),
218-
ScalarValue::UInt32(e) => Arc::new(UInt32Array::from(vec![*e; size])),
219-
ScalarValue::UInt64(e) => Arc::new(UInt64Array::from(vec![*e; size])),
220-
ScalarValue::TimeMicrosecond(e) => {
221-
Arc::new(TimestampMicrosecondArray::from(vec![*e]))
222-
}
223-
ScalarValue::TimeNanosecond(e) => {
224-
Arc::new(TimestampNanosecondArray::from_opt_vec(vec![*e], None))
225-
}
226-
ScalarValue::Utf8(e) => Arc::new(StringArray::from(vec![e.as_deref(); size])),
227-
ScalarValue::LargeUtf8(e) => {
228-
Arc::new(LargeStringArray::from(vec![e.as_deref(); size]))
229-
}
208+
ScalarValue::Float64(e) => match e {
209+
Some(value) => {
210+
Arc::new(Float64Array::from_iter_values(repeat(*value).take(size)))
211+
}
212+
None => Arc::new(repeat(None).take(size).collect::<Float64Array>()),
213+
},
214+
ScalarValue::Float32(e) => match e {
215+
Some(value) => {
216+
Arc::new(Float32Array::from_iter_values(repeat(*value).take(size)))
217+
}
218+
None => Arc::new(repeat(None).take(size).collect::<Float32Array>()),
219+
},
220+
ScalarValue::Int8(e) => match e {
221+
Some(value) => {
222+
Arc::new(Int8Array::from_iter_values(repeat(*value).take(size)))
223+
}
224+
None => Arc::new(repeat(None).take(size).collect::<Int8Array>()),
225+
},
226+
ScalarValue::Int16(e) => match e {
227+
Some(value) => {
228+
Arc::new(Int16Array::from_iter_values(repeat(*value).take(size)))
229+
}
230+
None => Arc::new(repeat(None).take(size).collect::<Int16Array>()),
231+
},
232+
ScalarValue::Int32(e) => match e {
233+
Some(value) => {
234+
Arc::new(Int32Array::from_iter_values(repeat(*value).take(size)))
235+
}
236+
None => Arc::new(repeat(None).take(size).collect::<Int32Array>()),
237+
},
238+
ScalarValue::Int64(e) => match e {
239+
Some(value) => {
240+
Arc::new(Int64Array::from_iter_values(repeat(*value).take(size)))
241+
}
242+
None => Arc::new(repeat(None).take(size).collect::<Int64Array>()),
243+
},
244+
ScalarValue::UInt8(e) => match e {
245+
Some(value) => {
246+
Arc::new(UInt8Array::from_iter_values(repeat(*value).take(size)))
247+
}
248+
None => Arc::new(repeat(None).take(size).collect::<UInt8Array>()),
249+
},
250+
ScalarValue::UInt16(e) => match e {
251+
Some(value) => {
252+
Arc::new(UInt16Array::from_iter_values(repeat(*value).take(size)))
253+
}
254+
None => Arc::new(repeat(None).take(size).collect::<UInt16Array>()),
255+
},
256+
ScalarValue::UInt32(e) => match e {
257+
Some(value) => {
258+
Arc::new(UInt32Array::from_iter_values(repeat(*value).take(size)))
259+
}
260+
None => Arc::new(repeat(None).take(size).collect::<UInt32Array>()),
261+
},
262+
ScalarValue::UInt64(e) => match e {
263+
Some(value) => {
264+
Arc::new(UInt64Array::from_iter_values(repeat(*value).take(size)))
265+
}
266+
None => Arc::new(repeat(None).take(size).collect::<UInt64Array>()),
267+
},
268+
ScalarValue::TimeMicrosecond(e) => match e {
269+
Some(value) => Arc::new(TimestampMicrosecondArray::from_iter_values(
270+
repeat(*value).take(size),
271+
)),
272+
None => Arc::new(
273+
repeat(None)
274+
.take(size)
275+
.collect::<TimestampMicrosecondArray>(),
276+
),
277+
},
278+
ScalarValue::TimeNanosecond(e) => match e {
279+
Some(value) => Arc::new(TimestampNanosecondArray::from_iter_values(
280+
repeat(*value).take(size),
281+
)),
282+
None => Arc::new(
283+
repeat(None)
284+
.take(size)
285+
.collect::<TimestampNanosecondArray>(),
286+
),
287+
},
288+
ScalarValue::Utf8(e) => match e {
289+
Some(value) => {
290+
Arc::new(StringArray::from_iter_values(repeat(value).take(size)))
291+
}
292+
None => {
293+
Arc::new(repeat(None::<&str>).take(size).collect::<StringArray>())
294+
}
295+
},
296+
ScalarValue::LargeUtf8(e) => match e {
297+
Some(value) => {
298+
Arc::new(LargeStringArray::from_iter_values(repeat(value).take(size)))
299+
}
300+
None => Arc::new(
301+
repeat(None::<&str>)
302+
.take(size)
303+
.collect::<LargeStringArray>(),
304+
),
305+
},
230306
ScalarValue::List(values, data_type) => Arc::new(match data_type {
231307
DataType::Int8 => build_list!(Int8Builder, Int8, values, size),
232308
DataType::Int16 => build_list!(Int16Builder, Int16, values, size),
@@ -238,7 +314,12 @@ impl ScalarValue {
238314
DataType::UInt64 => build_list!(UInt64Builder, UInt64, values, size),
239315
_ => panic!("Unexpected DataType for list"),
240316
}),
241-
ScalarValue::Date32(e) => Arc::new(Date32Array::from(vec![*e; size])),
317+
ScalarValue::Date32(e) => match e {
318+
Some(value) => {
319+
Arc::new(Date32Array::from_iter_values(repeat(*value).take(size)))
320+
}
321+
None => Arc::new(repeat(None).take(size).collect::<Date32Array>()),
322+
},
242323
}
243324
}
244325

0 commit comments

Comments
 (0)