Skip to content

Commit f8ce1c9

Browse files
committed
error types generalized.
1 parent 712f691 commit f8ce1c9

File tree

5 files changed

+149
-67
lines changed

5 files changed

+149
-67
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ For more information see https://en.wikipedia.org/wiki/External_sorting.
4141
let input_reader = io::BufReader::new(fs::File::open("input.txt").unwrap());
4242
let mut output_writer = io::BufWriter::new(fs::File::create("output.txt").unwrap());
4343

44-
let sorter: ExternalSorter<String, MemoryLimitedBufferBuilder> = ExternalSorterBuilder::new()
44+
let sorter: ExternalSorter<String, io::Error, MemoryLimitedBufferBuilder> = ExternalSorterBuilder::new()
4545
.with_tmp_dir(path::Path::new("tmp"))
4646
.with_buffer(MemoryLimitedBufferBuilder::new(50 * MB))
4747
.build()

examples/quickstart.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ fn main() {
1515
let input_reader = io::BufReader::new(fs::File::open("input.txt").unwrap());
1616
let mut output_writer = io::BufWriter::new(fs::File::create("output.txt").unwrap());
1717

18-
let sorter: ExternalSorter<String, MemoryLimitedBufferBuilder> = ExternalSorterBuilder::new()
18+
let sorter: ExternalSorter<String, io::Error, MemoryLimitedBufferBuilder> = ExternalSorterBuilder::new()
1919
.with_tmp_dir(path::Path::new("tmp"))
2020
.with_buffer(MemoryLimitedBufferBuilder::new(50 * MB))
2121
.build()

src/chunk.rs

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,54 @@
11
use std::error::Error;
2+
use std::fmt::{self, Display};
23
use std::fs;
34
use std::io;
45
use std::io::prelude::*;
56
use std::marker::PhantomData;
67

78
use tempfile;
89

10+
/// External chunk error
11+
#[derive(Debug)]
12+
pub enum ExternalChunkError<S: Error> {
13+
/// Common I/O error.
14+
IO(io::Error),
15+
/// Data serialization error.
16+
SerializationError(S),
17+
}
18+
19+
impl<S: Error> Error for ExternalChunkError<S> {}
20+
21+
impl<S: Error> Display for ExternalChunkError<S> {
22+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23+
write!(f, "{}", self)
24+
}
25+
}
26+
27+
impl<S: Error> From<io::Error> for ExternalChunkError<S> {
28+
fn from(err: io::Error) -> Self {
29+
ExternalChunkError::IO(err)
30+
}
31+
}
32+
933
/// External chunk interface. Provides methods for creating a chunk stored on file system and reading data from it.
10-
pub trait ExternalChunk<T>: Sized + Iterator<Item = Result<T, Box<dyn Error>>> {
34+
pub trait ExternalChunk<T>: Sized + Iterator<Item = Result<T, Self::DeserializationError>> {
35+
type SerializationError: Error;
36+
type DeserializationError: Error;
37+
1138
/// Builds an instance of an external chunk.
1239
fn build(
1340
dir: &tempfile::TempDir,
1441
items: impl IntoIterator<Item = T>,
1542
buf_size: Option<usize>,
16-
) -> Result<Self, Box<dyn Error>> {
43+
) -> Result<Self, ExternalChunkError<Self::SerializationError>> {
1744
let tmp_file = tempfile::tempfile_in(dir)?;
1845

1946
let mut chunk_writer = match buf_size {
2047
Some(buf_size) => io::BufWriter::with_capacity(buf_size, tmp_file.try_clone()?),
2148
None => io::BufWriter::new(tmp_file.try_clone()?),
2249
};
2350

24-
Self::dump(&mut chunk_writer, items)?;
51+
Self::dump(&mut chunk_writer, items).map_err(ExternalChunkError::SerializationError)?;
2552

2653
chunk_writer.flush()?;
2754

@@ -43,7 +70,7 @@ pub trait ExternalChunk<T>: Sized + Iterator<Item = Result<T, Box<dyn Error>>> {
4370
fn dump(
4471
chunk_writer: &mut io::BufWriter<fs::File>,
4572
items: impl IntoIterator<Item = T>,
46-
) -> Result<(), Box<dyn Error>>;
73+
) -> Result<(), Self::SerializationError>;
4774
}
4875

4976
/// RMP (Rust MessagePack) external chunk implementation.
@@ -59,6 +86,9 @@ impl<T> ExternalChunk<T> for RmpExternalChunk<T>
5986
where
6087
T: serde::ser::Serialize + serde::de::DeserializeOwned,
6188
{
89+
type SerializationError = rmp_serde::encode::Error;
90+
type DeserializationError = rmp_serde::decode::Error;
91+
6292
fn new(reader: io::Take<io::BufReader<fs::File>>) -> Self {
6393
RmpExternalChunk {
6494
reader,
@@ -69,7 +99,7 @@ where
6999
fn dump(
70100
mut chunk_writer: &mut io::BufWriter<fs::File>,
71101
items: impl IntoIterator<Item = T>,
72-
) -> Result<(), Box<dyn Error>> {
102+
) -> Result<(), Self::SerializationError> {
73103
for item in items.into_iter() {
74104
rmp_serde::encode::write(&mut chunk_writer, &item)?;
75105
}
@@ -82,24 +112,22 @@ impl<T> Iterator for RmpExternalChunk<T>
82112
where
83113
T: serde::ser::Serialize + serde::de::DeserializeOwned,
84114
{
85-
type Item = Result<T, Box<dyn Error>>;
115+
type Item = Result<T, <Self as ExternalChunk<T>>::DeserializationError>;
86116

87117
fn next(&mut self) -> Option<Self::Item> {
88118
if self.reader.limit() == 0 {
89119
None
90120
} else {
91121
match rmp_serde::decode::from_read(&mut self.reader) {
92122
Ok(result) => Some(Ok(result)),
93-
Err(err) => Some(Err(Box::new(err))),
123+
Err(err) => Some(Err(err)),
94124
}
95125
}
96126
}
97127
}
98128

99129
#[cfg(test)]
100130
mod test {
101-
use std::error::Error;
102-
103131
use rstest::*;
104132

105133
use super::{ExternalChunk, RmpExternalChunk};
@@ -113,9 +141,9 @@ mod test {
113141
fn test_rmp_chunk(tmp_dir: tempfile::TempDir) {
114142
let saved = Vec::from_iter(0..100);
115143

116-
let chunk: RmpExternalChunk<_> = ExternalChunk::build(&tmp_dir, saved.clone(), None).unwrap();
144+
let chunk: RmpExternalChunk<i32> = ExternalChunk::build(&tmp_dir, saved.clone(), None).unwrap();
117145

118-
let restored: Result<Vec<i32>, Box<dyn Error>> = chunk.collect();
146+
let restored: Result<Vec<i32>, _> = chunk.collect();
119147
let restored = restored.unwrap();
120148

121149
assert_eq!(restored, saved);

src/merger.rs

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,23 @@ use std::error::Error;
77
/// Merges multiple sorted inputs into a single sorted output.
88
/// Time complexity is *m* \* log(*n*) in worst case where *m* is the number of items,
99
/// *n* is the number of chunks (inputs).
10-
pub struct BinaryHeapMerger<T, C>
10+
pub struct BinaryHeapMerger<T, E, C>
1111
where
1212
T: Ord,
13-
C: IntoIterator<Item = Result<T, Box<dyn Error>>>,
13+
E: Error,
14+
C: IntoIterator<Item = Result<T, E>>,
1415
{
1516
// binary heap is max-heap by default so we reverse it to convert it to min-heap
1617
items: BinaryHeap<(std::cmp::Reverse<T>, usize)>,
1718
chunks: Vec<C::IntoIter>,
1819
initiated: bool,
1920
}
2021

21-
impl<T, C> BinaryHeapMerger<T, C>
22+
impl<T, E, C> BinaryHeapMerger<T, E, C>
2223
where
2324
T: Ord,
24-
C: IntoIterator<Item = Result<T, Box<dyn Error>>>,
25+
E: Error,
26+
C: IntoIterator<Item = Result<T, E>>,
2527
{
2628
/// Creates an instance of binary heap merger using chunks as inputs.
2729
/// Chunk items should be sorted in ascending order otherwise the result is undefined.
@@ -40,12 +42,13 @@ where
4042
}
4143
}
4244

43-
impl<T, C> Iterator for BinaryHeapMerger<T, C>
45+
impl<T, E, C> Iterator for BinaryHeapMerger<T, E, C>
4446
where
4547
T: Ord,
46-
C: IntoIterator<Item = Result<T, Box<dyn Error>>>,
48+
E: Error,
49+
C: IntoIterator<Item = Result<T, E>>,
4750
{
48-
type Item = Result<T, Box<dyn Error>>;
51+
type Item = Result<T, E>;
4952

5053
/// Returns the next item from inputs in ascending order.
5154
fn next(&mut self) -> Option<Self::Item> {
@@ -104,26 +107,26 @@ mod test {
104107
)]
105108
#[case(
106109
vec![
107-
vec![Result::<i32, Box<dyn Error>>::Err(Box::new(io::Error::new(ErrorKind::Other, "test error")))]
110+
vec![Result::Err(io::Error::new(ErrorKind::Other, "test error"))]
108111
],
109112
vec![
110-
Result::<i32, Box<dyn Error>>::Err(Box::new(io::Error::new(ErrorKind::Other, "test error")))
113+
Result::Err(io::Error::new(ErrorKind::Other, "test error"))
111114
],
112115
)]
113116
#[case(
114117
vec![
115-
vec![Ok(3), Result::<i32, Box<dyn Error>>::Err(Box::new(io::Error::new(ErrorKind::Other, "test error")))],
118+
vec![Ok(3), Result::Err(io::Error::new(ErrorKind::Other, "test error"))],
116119
vec![Ok(1), Ok(2)],
117120
],
118121
vec![
119122
Ok(1),
120123
Ok(2),
121-
Result::<i32, Box<dyn Error>>::Err(Box::new(io::Error::new(ErrorKind::Other, "test error"))),
124+
Result::Err(io::Error::new(ErrorKind::Other, "test error")),
122125
],
123126
)]
124127
fn test_merger(
125-
#[case] chunks: Vec<Vec<Result<i32, Box<dyn Error>>>>,
126-
#[case] expected_result: Vec<Result<i32, Box<dyn Error>>>,
128+
#[case] chunks: Vec<Vec<Result<i32, io::Error>>>,
129+
#[case] expected_result: Vec<Result<i32, io::Error>>,
127130
) {
128131
let merger = BinaryHeapMerger::new(chunks);
129132
let actual_result = merger.collect();
@@ -136,16 +139,16 @@ mod test {
136139
}
137140

138141
fn compare_vectors_of_result<T: PartialEq, E: Error + 'static>(
139-
actual: &Vec<Result<T, Box<dyn Error>>>,
140-
expected: &Vec<Result<T, Box<dyn Error>>>,
142+
actual: &Vec<Result<T, E>>,
143+
expected: &Vec<Result<T, E>>,
141144
) -> bool {
142145
actual
143146
.into_iter()
144147
.zip(expected)
145148
.all(
146149
|(actual_result, expected_result)| match (actual_result, expected_result) {
147150
(Ok(actual_result), Ok(expected_result)) if actual_result == expected_result => true,
148-
(Err(actual_err), Err(expected_err)) if actual_err.is::<E>() && expected_err.is::<E>() => true,
151+
(Err(actual_err), Err(expected_err)) => actual_err.to_string() == expected_err.to_string(),
149152
_ => false,
150153
},
151154
)

0 commit comments

Comments
 (0)