Skip to content

Commit e268a72

Browse files
committed
chore: remove synchronous read methods and update opendal dependency
1 parent 6c2414e commit e268a72

File tree

12 files changed

+56
-429
lines changed

12 files changed

+56
-429
lines changed

Cargo.lock

Lines changed: 28 additions & 55 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ num-derive = "0.4.2"
385385
num-traits = "0.2.19"
386386
num_cpus = "1.17"
387387
object = "0.36.5"
388-
object_store_opendal = { version = "0.52.0" }
388+
object_store_opendal = { version = "0.54.1" }
389389
once_cell = "1.15.0"
390390
opendal = { version = "0.54.1", features = [
391391
"layers-fastrace",

src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -535,12 +535,11 @@ async fn fs_list_until_prefix(
535535
gc_root_meta_ts: Option<DateTime<Utc>>,
536536
) -> Result<Vec<Entry>> {
537537
// Fetch ALL entries from the path and sort them by path in lexicographical order.
538-
let lister = dal.blocking().lister(path)?;
538+
let mut lister = dal.lister(path).await?;
539539
let mut entries = Vec::new();
540-
for item in lister {
541-
let entry = item?;
542-
if entry.metadata().is_file() {
543-
entries.push(entry);
540+
while let Some(item) = lister.try_next().await? {
541+
if item.metadata().is_file() {
542+
entries.push(item);
544543
}
545544
}
546545
entries.sort_by(|l, r| l.path().cmp(r.path()));

src/query/storages/common/io/src/merge_io_reader.rs

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -113,62 +113,4 @@ impl MergeIOReader {
113113

114114
Ok(read_res)
115115
}
116-
117-
pub fn sync_merge_io_read(
118-
read_settings: &ReadSettings,
119-
op: Operator,
120-
location: &str,
121-
raw_ranges: &[(ColumnId, Range<u64>)],
122-
) -> Result<MergeIOReadResult> {
123-
let path = location.to_string();
124-
125-
// Build merged read ranges.
126-
let ranges = raw_ranges
127-
.iter()
128-
.map(|(_, r)| r.clone())
129-
.collect::<Vec<_>>();
130-
let range_merger = RangeMerger::from_iter(
131-
ranges,
132-
read_settings.max_gap_size,
133-
read_settings.max_range_size,
134-
Some(read_settings.parquet_fast_read_bytes),
135-
);
136-
let merged_ranges = range_merger.ranges();
137-
138-
// Read merged range data.
139-
let mut io_res = Vec::with_capacity(merged_ranges.len());
140-
for (idx, range) in merged_ranges.iter().enumerate() {
141-
let buf = op
142-
.blocking()
143-
.read_with(location)
144-
.range(range.clone())
145-
.call()?;
146-
io_res.push((idx, buf));
147-
}
148-
149-
let owner_memory = OwnerMemory::create(io_res);
150-
151-
let mut columns_chunk_offsets = HashMap::with_capacity(raw_ranges.len());
152-
for (raw_idx, raw_range) in raw_ranges {
153-
let column_id = *raw_idx as ColumnId;
154-
let column_range = raw_range.start..raw_range.end;
155-
156-
// Find the range index and Range from merged ranges.
157-
let (merged_range_idx, merged_range) = range_merger.get(column_range.clone()).ok_or_else(|| ErrorCode::Internal(format!(
158-
"It's a terrible bug, not found raw range:[{:?}], path:{} from merged ranges\n: {:?}",
159-
column_range, path, merged_ranges
160-
)))?;
161-
162-
// Fetch the raw data for the raw range.
163-
let start = (column_range.start - merged_range.start) as usize;
164-
let end = (column_range.end - merged_range.start) as usize;
165-
let range = start..end;
166-
columns_chunk_offsets.insert(column_id, (merged_range_idx, range));
167-
}
168-
169-
let read_res =
170-
MergeIOReadResult::create(owner_memory, columns_chunk_offsets, location.to_string());
171-
172-
Ok(read_res)
173-
}
174116
}

0 commit comments

Comments
 (0)