Skip to content

Commit d5cca1c

Browse files
committed
fix: Once DeletesForDataFile is pending, it will not be blocked by poll
1 parent 274b0d5 commit d5cca1c

File tree

2 files changed

+32
-34
lines changed

2 files changed

+32
-34
lines changed

crates/iceberg/src/delete_file_index.rs

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::task::{Context, Poll};
2424

2525
use futures::channel::mpsc::{channel, Sender};
2626
use futures::StreamExt;
27+
use tokio::sync::Notify;
2728

2829
use crate::runtime::spawn;
2930
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
@@ -38,7 +39,7 @@ pub(crate) struct DeleteFileIndex {
3839

3940
#[derive(Debug)]
4041
enum DeleteFileIndexState {
41-
Populating,
42+
Populating(Arc<Notify>),
4243
Populated(PopulatedDeleteFileIndex),
4344
}
4445

@@ -59,7 +60,10 @@ impl DeleteFileIndex {
5960
pub(crate) fn new() -> (DeleteFileIndex, Sender<DeleteFileContext>) {
6061
// TODO: what should the channel limit be?
6162
let (tx, rx) = channel(10);
62-
let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating));
63+
let notify = Arc::new(Notify::new());
64+
let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating(
65+
notify.clone(),
66+
)));
6367
let delete_file_stream = rx.boxed();
6468

6569
spawn({
@@ -69,8 +73,11 @@ impl DeleteFileIndex {
6973

7074
let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files);
7175

72-
let mut guard = state.write().unwrap();
73-
*guard = DeleteFileIndexState::Populated(populated_delete_file_index);
76+
{
77+
let mut guard = state.write().unwrap();
78+
*guard = DeleteFileIndexState::Populated(populated_delete_file_index);
79+
}
80+
notify.notify_waiters()
7481
}
7582
});
7683

@@ -80,15 +87,29 @@ impl DeleteFileIndex {
8087
/// Gets all the delete files that apply to the specified data file.
8188
///
8289
/// Returns a future that resolves to a Result<Vec<FileScanTaskDeleteFile>>
83-
pub(crate) fn get_deletes_for_data_file<'a>(
90+
pub(crate) async fn get_deletes_for_data_file<'a>(
8491
&self,
8592
data_file: &'a DataFile,
8693
seq_num: Option<i64>,
87-
) -> DeletesForDataFile<'a> {
88-
DeletesForDataFile {
89-
state: self.state.clone(),
90-
data_file,
91-
seq_num,
94+
) -> Vec<FileScanTaskDeleteFile> {
95+
let notifier = {
96+
let guard = self.state.read().unwrap();
97+
match *guard {
98+
DeleteFileIndexState::Populating(ref notifier) => notifier.clone(),
99+
DeleteFileIndexState::Populated(ref index) => {
100+
return index.get_deletes_for_data_file(data_file, seq_num);
101+
}
102+
}
103+
};
104+
105+
notifier.notified().await;
106+
107+
let guard = self.state.read().unwrap();
108+
match guard.deref() {
109+
DeleteFileIndexState::Populated(index) => {
110+
index.get_deletes_for_data_file(data_file, seq_num)
111+
}
112+
_ => unreachable!("Cannot be any other state than loaded"),
92113
}
93114
}
94115
}
@@ -193,26 +214,3 @@ impl PopulatedDeleteFileIndex {
193214
results
194215
}
195216
}
196-
197-
/// Future for the `DeleteFileIndex::get_deletes_for_data_file` method
198-
pub(crate) struct DeletesForDataFile<'a> {
199-
state: Arc<RwLock<DeleteFileIndexState>>,
200-
data_file: &'a DataFile,
201-
seq_num: Option<i64>,
202-
}
203-
204-
impl Future for DeletesForDataFile<'_> {
205-
type Output = Result<Vec<FileScanTaskDeleteFile>>;
206-
207-
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
208-
match self.state.try_read() {
209-
Ok(guard) => match guard.deref() {
210-
DeleteFileIndexState::Populated(idx) => Poll::Ready(Ok(
211-
idx.get_deletes_for_data_file(self.data_file, self.seq_num)
212-
)),
213-
_ => Poll::Pending,
214-
},
215-
Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Unexpected, err.to_string()))),
216-
}
217-
}
218-
}

crates/iceberg/src/scan/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl ManifestEntryContext {
111111
self.manifest_entry.data_file(),
112112
self.manifest_entry.sequence_number(),
113113
)
114-
.await?
114+
.await
115115
} else {
116116
vec![]
117117
};

0 commit comments

Comments
 (0)