@@ -2,13 +2,15 @@ use anyhow::{anyhow, Result};
22#[ cfg( feature = "json" ) ]
33use serde:: de:: DeserializeOwned ;
44use std:: collections:: HashMap ;
5- use wasi:: http:: types:: { IncomingResponse , StatusCode } ;
6- use wasi:: io:: streams:: StreamError ;
5+ use wasi:: http:: types:: { IncomingBody , IncomingResponse , StatusCode } ;
6+ use wasi:: io:: streams:: { InputStream , StreamError } ;
77
88pub struct Response {
99 status : StatusCode ,
1010 headers : HashMap < String , String > ,
11- body : Vec < u8 > ,
11+ // input-stream resource is a child: it must be dropped before the parent incoming-body is dropped
12+ input_stream : InputStream ,
13+ _incoming_body : IncomingBody ,
1214}
1315
1416impl Response {
@@ -22,42 +24,48 @@ impl Response {
2224 }
2325 drop ( headers_handle) ;
2426
25- let incoming_body = incoming_response
26- . consume ( )
27- . map_err ( |( ) | anyhow ! ( "incoming response has no body stream" ) ) ?;
27+ // The consume() method can only be called once
28+ let incoming_body = incoming_response. consume ( ) . unwrap ( ) ;
2829 drop ( incoming_response) ;
2930
31+ // The stream() method can only be called once
3032 let input_stream = incoming_body. stream ( ) . unwrap ( ) ;
31- let mut body = vec ! [ ] ;
32- loop {
33- let mut body_chunk = match input_stream. read ( 1024 * 1024 ) {
34- Ok ( c) => c,
35- Err ( StreamError :: Closed ) => break ,
36- Err ( e) => Err ( anyhow ! ( "input_stream read failed: {e:?}" ) ) ?,
37- } ;
38-
39- if !body_chunk. is_empty ( ) {
40- body. append ( & mut body_chunk) ;
41- }
42- }
43-
4433 Ok ( Self {
4534 status,
4635 headers,
47- body,
36+ input_stream,
37+ _incoming_body : incoming_body,
4838 } )
4939 }
5040
51- pub fn status ( & self ) -> & StatusCode {
52- & self . status
41+ pub fn status ( & self ) -> StatusCode {
42+ self . status
5343 }
5444
5545 pub fn headers ( & self ) -> & HashMap < String , String > {
5646 & self . headers
5747 }
5848
59- pub fn body ( & self ) -> & Vec < u8 > {
60- & self . body
49+ /// Get a chunk of the response body.
50+ ///
51+ /// It will block until at least one byte can be read or the stream is closed.
52+ pub fn chunk ( & self , len : u64 ) -> Result < Option < Vec < u8 > > > {
53+ match self . input_stream . blocking_read ( len) {
54+ Ok ( c) => Ok ( Some ( c) ) ,
55+ Err ( StreamError :: Closed ) => Ok ( None ) ,
56+ Err ( e) => Err ( anyhow ! ( "input_stream read failed: {e:?}" ) ) ?,
57+ }
58+ }
59+
60+ /// Get the full response body.
61+ ///
62+ /// It will block until the stream is closed.
63+ pub fn body ( self ) -> Result < Vec < u8 > > {
64+ let mut body = Vec :: new ( ) ;
65+ while let Some ( mut chunk) = self . chunk ( 1024 * 1024 ) ? {
66+ body. append ( & mut chunk) ;
67+ }
68+ Ok ( body)
6169 }
6270
6371 /// Deserialize the response body as JSON.
@@ -67,7 +75,7 @@ impl Response {
6775 /// This requires the `json` feature enabled.
6876 #[ cfg( feature = "json" ) ]
6977 #[ cfg_attr( docsrs, doc( cfg( feature = "json" ) ) ) ]
70- pub fn json < T : DeserializeOwned > ( & self ) -> Result < T > {
71- Ok ( serde_json:: from_slice ( & self . body ) ?)
78+ pub fn json < T : DeserializeOwned > ( self ) -> Result < T > {
79+ Ok ( serde_json:: from_slice ( self . body ( ) ? . as_ref ( ) ) ?)
7280 }
7381}
0 commit comments