Skip to content

Commit 83efd9e

Browse files
authored
Improve TPCH gen command (#230)
* Improve TPCH gen command * Add comment about default repartition behavior
1 parent 6b66c22 commit 83efd9e

File tree

2 files changed

+11
-39
lines changed

2 files changed

+11
-39
lines changed

benchmarks/gen-tpch.sh

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,15 @@
22

33
set -e
44

5-
SCALE_FACTOR=1
5+
SCALE_FACTOR=${SCALE_FACTOR:-1}
6+
PARTITIONS=${PARTITIONS:-16}
7+
8+
echo "Generating TPCH dataset with SCALE_FACTOR=${SCALE_FACTOR} and PARTITIONS=${PARTITIONS}"
69

710
# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script
811
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
912
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
1013
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
11-
12-
if [ -z "$SCALE_FACTOR" ] ; then
13-
echo "Internal error: Scale factor not specified"
14-
exit 1
15-
fi
16-
1714
TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"
1815
echo "Creating tpch dataset at Scale Factor ${SCALE_FACTOR} in ${TPCH_DIR}..."
1916

@@ -29,35 +26,13 @@ else
2926
docker run -v "${TPCH_DIR}":/data -it --rm ghcr.io/scalytics/tpch-docker:main -vf -s "${SCALE_FACTOR}"
3027
fi
3128

32-
# Copy expected answers into the ./data/answers directory if it does not already exist
33-
FILE="${TPCH_DIR}/answers/q1.out"
34-
if test -f "${FILE}"; then
35-
echo " Expected answers exist (${FILE} exists)."
36-
else
37-
echo " Copying answers to ${TPCH_DIR}/answers"
38-
mkdir -p "${TPCH_DIR}/answers"
39-
docker run -v "${TPCH_DIR}":/data -it --entrypoint /bin/bash --rm ghcr.io/scalytics/tpch-docker:main -c "cp -f /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
40-
fi
41-
4229
# Create 'parquet' files from tbl
4330
FILE="${TPCH_DIR}/supplier"
4431
if test -d "${FILE}"; then
4532
echo " parquet files exist ($FILE exists)."
4633
else
4734
echo " creating parquet files using benchmark binary ..."
4835
pushd "${SCRIPT_DIR}" > /dev/null
49-
$CARGO_COMMAND -- tpch-convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet
36+
$CARGO_COMMAND -- tpch-convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet --partitions "$PARTITIONS"
5037
popd > /dev/null
5138
fi
52-
53-
# Create 'csv' files from tbl
54-
FILE="${TPCH_DIR}/csv/supplier"
55-
if test -d "${FILE}"; then
56-
echo " csv files exist ($FILE exists)."
57-
else
58-
echo " creating csv files using benchmark binary ..."
59-
pushd "${SCRIPT_DIR}" > /dev/null
60-
$CARGO_COMMAND -- tpch-convert --input "${TPCH_DIR}" --output "${TPCH_DIR}/csv" --format csv
61-
popd > /dev/null
62-
fi
63-

benchmarks/src/tpch/convert.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@ use datafusion::logical_expr::select_expr::SelectExpr;
2020
use std::fs;
2121
use std::path::{Path, PathBuf};
2222

23-
use datafusion::common::not_impl_err;
24-
2523
use super::TPCH_TABLES;
2624
use super::get_tbl_tpch_table_schema;
25+
use datafusion::common::not_impl_err;
2726
use datafusion::error::Result;
2827
use datafusion::prelude::*;
2928
use parquet::basic::Compression;
@@ -49,7 +48,7 @@ pub struct ConvertOpt {
4948
#[structopt(short = "c", long = "compression", default_value = "zstd")]
5049
compression: String,
5150

52-
/// Number of partitions to produce
51+
/// Number of partitions to produce. By default, uses only 1 partition.
5352
#[structopt(short = "n", long = "partitions", default_value = "1")]
5453
partitions: usize,
5554

@@ -88,7 +87,9 @@ impl ConvertOpt {
8887
options
8988
};
9089

91-
let config = SessionConfig::new().with_batch_size(self.batch_size);
90+
let config = SessionConfig::new()
91+
.with_target_partitions(self.partitions)
92+
.with_batch_size(self.batch_size);
9293
let ctx = SessionContext::new_with_config(config);
9394

9495
// build plan to read the TBL file
@@ -104,11 +105,7 @@ impl ConvertOpt {
104105
.collect::<Vec<_>>();
105106

106107
csv = csv.select(selection)?;
107-
// optionally, repartition the file
108-
let partitions = self.partitions;
109-
if partitions > 1 {
110-
csv = csv.repartition(Partitioning::RoundRobinBatch(partitions))?
111-
}
108+
csv = csv.repartition(Partitioning::RoundRobinBatch(self.partitions))?;
112109
let csv = if self.sort {
113110
csv.sort_by(vec![col(key_column_name)])?
114111
} else {

0 commit comments

Comments
 (0)