Skip to content

Commit 3b854d7

Browse files
authored
chore: bump opendal to 0.54.1 (#18970)
* chore: remove blocking IO * chore: bump iceberg-rs's opendal * chore: remove synchronous read methods and update opendal dependency * chore: clean * fix: unit test * fix: tests * chore: replace url * chore: replace url * fix: ensure max_files does not exceed available files
1 parent a36804f commit 3b854d7

File tree

32 files changed

+332
-1255
lines changed

32 files changed

+332
-1255
lines changed

Cargo.lock

Lines changed: 148 additions & 73 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -330,13 +330,13 @@ hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio"
330330
lru = "0.12"
331331

332332
## in branch dev
333-
iceberg = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "d5cca1c15f240f3cb04e57569bce648933b1c79b", features = [
333+
iceberg = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "1dace26ea25a9b9e2066367cbd3b7badc75dd7f9", features = [
334334
"storage-all",
335335
] }
336-
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "d5cca1c15f240f3cb04e57569bce648933b1c79b" }
337-
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "d5cca1c15f240f3cb04e57569bce648933b1c79b" }
338-
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "d5cca1c15f240f3cb04e57569bce648933b1c79b" }
339-
iceberg-catalog-s3tables = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "d5cca1c15f240f3cb04e57569bce648933b1c79b" }
336+
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "1dace26ea25a9b9e2066367cbd3b7badc75dd7f9" }
337+
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "1dace26ea25a9b9e2066367cbd3b7badc75dd7f9" }
338+
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "1dace26ea25a9b9e2066367cbd3b7badc75dd7f9" }
339+
iceberg-catalog-s3tables = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "1dace26ea25a9b9e2066367cbd3b7badc75dd7f9" }
340340

341341
# Explicitly specify compatible AWS SDK versions
342342
aws-config = "1.5.18"
@@ -385,13 +385,12 @@ num-derive = "0.4.2"
385385
num-traits = "0.2.19"
386386
num_cpus = "1.17"
387387
object = "0.36.5"
388-
object_store_opendal = { version = "0.52.0" }
388+
object_store_opendal = { version = "0.54.1" }
389389
once_cell = "1.15.0"
390-
opendal = { version = "0.53.2", features = [
390+
opendal = { version = "0.54.1", features = [
391391
"layers-fastrace",
392392
"layers-prometheus-client",
393393
"layers-async-backtrace",
394-
"layers-blocking",
395394
"services-s3",
396395
"services-fs",
397396
"services-gcs",

src/common/storage/src/metrics.rs

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,9 @@ pub struct StorageMetricsAccessor<A: Access> {
164164
impl<A: Access> LayeredAccess for StorageMetricsAccessor<A> {
165165
type Inner = A;
166166
type Reader = StorageMetricsWrapper<A::Reader>;
167-
type BlockingReader = StorageMetricsWrapper<A::BlockingReader>;
168167
type Writer = StorageMetricsWrapper<A::Writer>;
169-
type BlockingWriter = StorageMetricsWrapper<A::BlockingWriter>;
170168
type Lister = A::Lister;
171-
type BlockingLister = A::BlockingLister;
172169
type Deleter = A::Deleter;
173-
type BlockingDeleter = A::BlockingDeleter;
174170

175171
fn inner(&self) -> &Self::Inner {
176172
&self.inner
@@ -201,26 +197,6 @@ impl<A: Access> LayeredAccess for StorageMetricsAccessor<A> {
201197
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
202198
self.inner.delete().await
203199
}
204-
205-
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
206-
self.inner
207-
.blocking_read(path, args)
208-
.map(|(rp, r)| (rp, StorageMetricsWrapper::new(r, self.metrics.clone())))
209-
}
210-
211-
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
212-
self.inner
213-
.blocking_write(path, args)
214-
.map(|(rp, r)| (rp, StorageMetricsWrapper::new(r, self.metrics.clone())))
215-
}
216-
217-
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
218-
self.inner.blocking_list(path, args)
219-
}
220-
221-
fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
222-
self.inner.blocking_delete()
223-
}
224200
}
225201

226202
pub struct StorageMetricsWrapper<R> {
@@ -246,12 +222,6 @@ impl<R: oio::Read> oio::Read for StorageMetricsWrapper<R> {
246222
}
247223
}
248224

249-
impl<R: oio::BlockingRead> oio::BlockingRead for StorageMetricsWrapper<R> {
250-
fn read(&mut self) -> Result<Buffer> {
251-
self.inner.read()
252-
}
253-
}
254-
255225
impl<R: oio::Write> oio::Write for StorageMetricsWrapper<R> {
256226
async fn write(&mut self, bs: Buffer) -> Result<()> {
257227
let start = Instant::now();
@@ -272,20 +242,3 @@ impl<R: oio::Write> oio::Write for StorageMetricsWrapper<R> {
272242
self.inner.abort().await
273243
}
274244
}
275-
276-
impl<R: oio::BlockingWrite> oio::BlockingWrite for StorageMetricsWrapper<R> {
277-
fn write(&mut self, bs: Buffer) -> Result<()> {
278-
let start = Instant::now();
279-
let size = bs.len();
280-
281-
self.inner.write(bs).inspect(|_| {
282-
self.metrics.inc_write_bytes(size);
283-
self.metrics
284-
.inc_write_bytes_cost(start.elapsed().as_millis() as u64);
285-
})
286-
}
287-
288-
fn close(&mut self) -> Result<Metadata> {
289-
self.inner.close()
290-
}
291-
}

src/common/storage/src/metrics_layer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ impl std::hash::Hash for OperationLabels {
263263

264264
impl EncodeLabelSet for OperationLabels {
265265
fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), fmt::Error> {
266-
(observe::LABEL_SCHEME, self.0.scheme.into_static()).encode(encoder.encode_label())?;
266+
(observe::LABEL_SCHEME, self.0.scheme).encode(encoder.encode_label())?;
267267
(observe::LABEL_NAMESPACE, self.0.namespace.as_ref()).encode(encoder.encode_label())?;
268268
(observe::LABEL_OPERATION, self.0.operation).encode(encoder.encode_label())?;
269269

src/common/storage/src/operator.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use log::warn;
4949
use opendal::layers::AsyncBacktraceLayer;
5050
use opendal::layers::ConcurrentLimitLayer;
5151
use opendal::layers::FastraceLayer;
52+
use opendal::layers::HttpClientLayer;
5253
use opendal::layers::ImmutableIndexLayer;
5354
use opendal::layers::LoggingLayer;
5455
use opendal::layers::RetryInterceptor;
@@ -201,7 +202,7 @@ fn build_operator<B: Builder>(builder: B, cfg: Option<&StorageNetworkParams>) ->
201202
.finish();
202203

203204
// Make sure the http client has been updated.
204-
ob.update_http_client(|_| HttpClient::with(get_http_client(cfg)));
205+
let ob = ob.layer(HttpClientLayer::new(HttpClient::with(get_http_client(cfg))));
205206

206207
let mut op = ob
207208
// Add retry

src/common/storage/src/parquet.rs

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,6 @@ pub async fn read_parquet_schema_async_rs(
4242
infer_schema_with_extension(meta.file_metadata())
4343
}
4444

45-
pub fn read_parquet_schema_sync_rs(
46-
operator: &Operator,
47-
path: &str,
48-
file_size: Option<u64>,
49-
) -> Result<ArrowSchema> {
50-
let meta = read_metadata_sync(path, operator, file_size)?;
51-
infer_schema_with_extension(meta.file_metadata())
52-
}
53-
5445
pub fn infer_schema_with_extension(meta: &FileMetaData) -> Result<ArrowSchema> {
5546
let mut arrow_schema = parquet_to_arrow_schema(meta.schema_descr(), meta.key_value_metadata())?;
5647
// Convert data types to extension types using meta information.
@@ -146,54 +137,6 @@ pub async fn read_metadata_async(
146137
}
147138
}
148139

149-
pub fn read_metadata_sync(
150-
path: &str,
151-
operator: &Operator,
152-
file_size: Option<u64>,
153-
) -> Result<ParquetMetaData> {
154-
let blocking = operator.blocking();
155-
let file_size = match file_size {
156-
None => blocking.stat(path)?.content_length(),
157-
Some(n) => n,
158-
};
159-
160-
check_footer_size(file_size, path)?;
161-
162-
let map_err =
163-
|e: ParquetError| ErrorCode::BadBytes(format!("Invalid Parquet file '{path}': {e}",));
164-
// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
165-
let default_end_len = DEFAULT_FOOTER_READ_SIZE.min(file_size);
166-
let buffer = blocking
167-
.read_with(path)
168-
.range((file_size - default_end_len)..file_size)
169-
.call()?
170-
.to_vec();
171-
let buffer_len = buffer.len();
172-
let footer_tail = ParquetMetaDataReader::decode_footer_tail(
173-
&buffer[(buffer_len - FOOTER_SIZE as usize)..]
174-
.try_into()
175-
.unwrap(),
176-
)
177-
.map_err(map_err)?;
178-
let metadata_len = footer_tail.metadata_length() as u64;
179-
check_meta_size(file_size, metadata_len, path)?;
180-
181-
let footer_len = FOOTER_SIZE + metadata_len;
182-
if (footer_len as usize) <= buffer_len {
183-
// The whole metadata is in the bytes we already read
184-
let offset = buffer_len - footer_len as usize;
185-
Ok(ParquetMetaDataReader::decode_metadata(&buffer[offset..]).map_err(map_err)?)
186-
} else {
187-
let mut metadata = blocking
188-
.read_with(path)
189-
.range((file_size - footer_len)..(file_size - buffer_len as u64))
190-
.call()?
191-
.to_vec();
192-
metadata.extend(buffer);
193-
Ok(ParquetMetaDataReader::decode_metadata(&metadata).map_err(map_err)?)
194-
}
195-
}
196-
197140
/// check file is large enough to hold footer
198141
fn check_footer_size(file_size: u64, path: &str) -> Result<()> {
199142
if file_size < FOOTER_SIZE {

src/common/storage/src/runtime_layer.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,9 @@ impl<A> Debug for RuntimeAccessor<A> {
9191
impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
9292
type Inner = A;
9393
type Reader = RuntimeIO<A::Reader>;
94-
type BlockingReader = A::BlockingReader;
9594
type Writer = RuntimeIO<A::Writer>;
96-
type BlockingWriter = A::BlockingWriter;
9795
type Lister = RuntimeIO<A::Lister>;
98-
type BlockingLister = A::BlockingLister;
9996
type Deleter = RuntimeIO<A::Deleter>;
100-
type BlockingDeleter = A::BlockingDeleter;
10197

10298
fn inner(&self) -> &Self::Inner {
10399
&self.inner
@@ -182,22 +178,6 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
182178
.await
183179
.expect("join must success")
184180
}
185-
186-
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
187-
self.inner.blocking_read(path, args)
188-
}
189-
190-
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
191-
self.inner.blocking_write(path, args)
192-
}
193-
194-
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
195-
self.inner.blocking_list(path, args)
196-
}
197-
198-
fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
199-
self.inner.blocking_delete()
200-
}
201181
}
202182

203183
pub struct RuntimeIO<R: 'static> {

src/common/storage/src/stage.rs

Lines changed: 1 addition & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ impl StageFilesInfo {
130130
mut files: &[String],
131131
) -> Result<Vec<StageFileInfo>> {
132132
if let Some(m) = max_files {
133-
files = &files[..m]
133+
files = &files[..m.min(files.len())]
134134
}
135135
let file_infos = self.stat_concurrent(operator, thread_num, files).await?;
136136
let mut res = Vec::with_capacity(file_infos.len());
@@ -207,39 +207,6 @@ impl StageFilesInfo {
207207
Ok(files.pop())
208208
}
209209

210-
pub fn blocking_list(
211-
&self,
212-
operator: &Operator,
213-
max_files: Option<usize>,
214-
) -> Result<Vec<StageFileInfo>> {
215-
let max_files = max_files.unwrap_or(usize::MAX);
216-
if let Some(files) = &self.files {
217-
let mut res = Vec::new();
218-
for file in files {
219-
let full_path = Path::new(&self.path)
220-
.join(file)
221-
.to_string_lossy()
222-
.trim_start_matches('/')
223-
.to_string();
224-
let meta = operator.blocking().stat(&full_path)?;
225-
if meta.mode().is_file() {
226-
res.push(StageFileInfo::new(full_path, &meta))
227-
} else {
228-
return Err(ErrorCode::BadArguments(format!(
229-
"{full_path} is not a file"
230-
)));
231-
}
232-
if res.len() == max_files {
233-
return Ok(res);
234-
}
235-
}
236-
Ok(res)
237-
} else {
238-
let pattern = self.get_pattern()?;
239-
blocking_list_files_with_pattern(operator, &self.path, pattern, max_files)
240-
}
241-
}
242-
243210
#[async_backtrace::framed]
244211
pub async fn list_files_with_pattern(
245212
operator: &Operator,
@@ -374,52 +341,6 @@ fn check_file(path: &str, mode: EntryMode, pattern: &Option<Regex>) -> bool {
374341
}
375342
}
376343

377-
fn blocking_list_files_with_pattern(
378-
operator: &Operator,
379-
path: &str,
380-
pattern: Option<Regex>,
381-
max_files: usize,
382-
) -> Result<Vec<StageFileInfo>> {
383-
if path == STDIN_FD {
384-
return Ok(vec![stdin_stage_info()]);
385-
}
386-
let operator = operator.blocking();
387-
let mut files = Vec::new();
388-
let prefix_meta = operator.stat(path);
389-
match prefix_meta {
390-
Ok(meta) if meta.is_file() => {
391-
files.push(StageFileInfo::new(path.to_string(), &meta));
392-
}
393-
Err(e) if e.kind() != opendal::ErrorKind::NotFound => {
394-
return Err(e.into());
395-
}
396-
_ => {}
397-
};
398-
let prefix_len = if path == "/" { 0 } else { path.len() };
399-
let list = operator.lister_with(path).recursive(true).call()?;
400-
if files.len() == max_files {
401-
return Ok(files);
402-
}
403-
for obj in list {
404-
let obj = obj?;
405-
let (path, mut meta) = obj.into_parts();
406-
if check_file(&path[prefix_len..], meta.mode(), &pattern) {
407-
if meta.etag().is_none() {
408-
meta = match operator.stat(&path) {
409-
Ok(meta) => meta,
410-
Err(err) => return Err(ErrorCode::from(err)),
411-
}
412-
}
413-
414-
files.push(StageFileInfo::new(path, &meta));
415-
if files.len() == max_files {
416-
return Ok(files);
417-
}
418-
}
419-
}
420-
Ok(files)
421-
}
422-
423344
pub const STDIN_FD: &str = "/dev/fd/0";
424345

425346
fn stdin_stage_info() -> StageFileInfo {

src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -535,12 +535,11 @@ async fn fs_list_until_prefix(
535535
gc_root_meta_ts: Option<DateTime<Utc>>,
536536
) -> Result<Vec<Entry>> {
537537
// Fetch ALL entries from the path and sort them by path in lexicographical order.
538-
let lister = dal.blocking().lister(path)?;
538+
let mut lister = dal.lister(path).await?;
539539
let mut entries = Vec::new();
540-
for item in lister {
541-
let entry = item?;
542-
if entry.metadata().is_file() {
543-
entries.push(entry);
540+
while let Some(item) = lister.try_next().await? {
541+
if item.metadata().is_file() {
542+
entries.push(item);
544543
}
545544
}
546545
entries.sort_by(|l, r| l.path().cmp(r.path()));

src/query/ee/tests/it/storages/fuse/operations/vacuum.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -313,13 +313,9 @@ mod test_accessor {
313313

314314
impl Access for AccessorFaultyDeletion {
315315
type Reader = ();
316-
type BlockingReader = ();
317316
type Writer = ();
318-
type BlockingWriter = ();
319317
type Lister = VecLister;
320-
type BlockingLister = ();
321318
type Deleter = MockDeleter;
322-
type BlockingDeleter = ();
323319

324320
fn info(&self) -> Arc<AccessorInfo> {
325321
let info = AccessorInfo::default();

0 commit comments

Comments
 (0)