-
Notifications
You must be signed in to change notification settings - Fork 15
Add get_blob_stream (prefer GetRawBlob) #114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Priyanshu Kumar <priyanshu.kumar@broadcom.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new get_blob_stream API that intelligently prefers the GetRawBlob method when supported by the proxy protocol, falling back to GetBlob otherwise. A significant addition is the VerifiedBlobReader and associated hashing logic, which enables on-the-fly digest and size verification for streamed blobs, enhancing data integrity. The changes also include necessary dependency updates (hex, sha2), new error types, and protocol version checks. The ProxyTooOld error message has been corrected for clarity. Overall, this is a well-implemented feature that improves the robustness and security of blob fetching.
| let driver = async move { | ||
| err.await.map_err(Error::from)?; | ||
| match rx.await { | ||
| Ok(r) => verify_blob_bytes_read(&expected, expected_size, r), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the driver future, if rx.await returns an Err(_) (meaning the oneshot::Sender was dropped without sending a message), the driver currently returns Ok(()). This could potentially mask an issue if the VerifiedBlobReader was dropped unexpectedly (e.g., due to a panic) before its Drop implementation could send VerifiedBlobReadResult::Incomplete. While the Drop impl is designed to send Incomplete, an Err from rx.await indicates an even more unexpected state. It might be safer to propagate this error or at least log it, rather than silently succeeding, to ensure that all potential issues during blob streaming are surfaced.
| Ok(r) => verify_blob_bytes_read(&expected, expected_size, r), | |
| Err(e) => Err(Error::Other(format!("Blob stream verification channel error: {e}").into())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this correctly, this review is incorrect. rx.await returns Err(_) when the sender is dropped without sending (canceled). A panic will still run Drop, and Drop sends Incomplete, so you’d receive Ok(Incomplete), not Err(_).
|
Question on fallback robustness: cc: @cgwalters |
cgwalters
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this! Just as a first pass I think what would help build confidence here is ensuring in CI we're always testing both the old and new code.
One suggestion: A CI run that tests in quay.io/almalinuxorg/almalinux-bootc:10.0 as that uses skopeo version 1.18.1 which predates https://github.com/containers/skopeo/releases/tag/v1.19.0 when this feature appeared.
Run tests in containers with skopeo 1.18 and >=1.19 to exercise both GetBlob fallback and GetRawBlob paths. Signed-off-by: Priyanshu Kumar <priyanshu.kumar@broadcom.com>
824950e to
a8886bc
Compare
|
@cgwalters I've updated the PR Can you please check? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much for working on this! A second pass review
| Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap() | ||
| } | ||
|
|
||
| fn write_blob(root: &std::path::Path, bytes: &[u8]) -> Result<(Digest, u64)> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an https://docs.rs/ocidir/latest/ocidir/ crate for this
| -v "$PWD:/src:ro" \ | ||
| -v "$PWD/.ci-cargo-home:/root/.cargo" \ | ||
| -w /src \ | ||
| quay.io/fedora/fedora:41 \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't thin fedora:41 has the new skopeo, also it's end of life. Let's use quay.io/centos/centos:stream10` instead
| debug_assert!(after >= before); | ||
| let delta = after - before; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about let delta = after.checked_sub(before).unwrap()
| digest: &Digest, | ||
| expected_size: u64, | ||
| ) -> Result<BlobStream<'a>> { | ||
| let fallback_to_get_blob = || async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the closure?
| return fallback_to_get_blob().await; | ||
| } | ||
|
|
||
| match self.get_raw_blob(img, digest).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just ? here
|
|
||
| #[derive(Debug)] | ||
| enum VerifiedBlobReadResult { | ||
| Complete { nbytes: u64, digest_hex: String }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add type safety to digest_hex using https://docs.rs/oci-spec/latest/oci_spec/image/struct.Digest.html
| let delta = after - before; | ||
| if delta > 0 { | ||
| let chunk = &buf.filled()[before..after]; | ||
| if let Some(hasher) = self.hasher.as_mut() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should always have a hasher right?
| debug_assert!(after >= before); | ||
| let delta = after - before; | ||
| if delta > 0 { | ||
| let chunk = &buf.filled()[before..after]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit confused by this...it seems like we're trying to avoid skipping the start of the buffer, but why do we need to do that? Can't we just consume filled() each time?
Also isn't this missing calling clear()?
(I am not super familiar with implementing AsyncRead this way to be clear)
| inner: R, | ||
| nbytes: u64, | ||
| hasher: Option<Hasher>, | ||
| completion: Option<oneshot::Sender<VerifiedBlobReadResult>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment on this would be useful, the expected control flow here is not obvious to me.
If there's a way we can avoid this oneshot channel that'd be good
| } | ||
| let before = buf.filled().len(); | ||
| match Pin::new(&mut self.inner).poll_read(cx, buf) { | ||
| v @ std::task::Poll::Ready(Ok(_)) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use () instead of _ to strengthen the match
This adds a higher-level blob streaming API that prefers the newer GetRawBlob method (when supported by the negotiated proxy protocol version) and otherwise falls back to GetBlob.
Motivation: bootc-dev/bootc#1424