-
Notifications
You must be signed in to change notification settings - Fork 10
Switch from reqwest to bitreq HTTP client
#56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e3fda44
eed3592
a366503
c9d7764
ef17d83
ef39744
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,14 +1,13 @@ | ||
| use bitreq::Client; | ||
| use prost::Message; | ||
| use reqwest::header::CONTENT_TYPE; | ||
| use reqwest::Client; | ||
| use std::collections::HashMap; | ||
| use std::default::Default; | ||
| use std::sync::Arc; | ||
|
|
||
| use log::trace; | ||
|
|
||
| use crate::error::VssError; | ||
| use crate::headers::{get_headermap, FixedHeaders, VssHeaderProvider}; | ||
| use crate::headers::{FixedHeaders, VssHeaderProvider}; | ||
| use crate::types::{ | ||
| DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse, | ||
| ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse, | ||
|
|
@@ -17,7 +16,9 @@ use crate::util::retry::{retry, RetryPolicy}; | |
| use crate::util::KeyValueVecKeyPrinter; | ||
|
|
||
| const APPLICATION_OCTET_STREAM: &str = "application/octet-stream"; | ||
| const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); | ||
| const DEFAULT_TIMEOUT_SECS: u64 = 10; | ||
| const MAX_RESPONSE_BODY_SIZE: usize = 500 * 1024 * 1024; // 500 MiB | ||
| const DEFAULT_CLIENT_CAPACITY: usize = 10; | ||
|
|
||
| /// Thin-client to access a hosted instance of Versioned Storage Service (VSS). | ||
| /// The provided [`VssClient`] API is minimalistic and is congruent to the VSS server-side API. | ||
|
|
@@ -35,11 +36,11 @@ where | |
| impl<R: RetryPolicy<E = VssError>> VssClient<R> { | ||
| /// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint. | ||
| pub fn new(base_url: String, retry_policy: R) -> Self { | ||
| let client = build_client(); | ||
| let client = Client::new(DEFAULT_CLIENT_CAPACITY); | ||
tnull marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Self::from_client(base_url, client, retry_policy) | ||
| } | ||
|
|
||
| /// Constructs a [`VssClient`] from a given [`reqwest::Client`], using `base_url` as the VSS server endpoint. | ||
| /// Constructs a [`VssClient`] from a given [`bitreq::Client`], using `base_url` as the VSS server endpoint. | ||
| pub fn from_client(base_url: String, client: Client, retry_policy: R) -> Self { | ||
| Self { | ||
| base_url, | ||
|
|
@@ -49,7 +50,7 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> { | |
| } | ||
| } | ||
|
|
||
| /// Constructs a [`VssClient`] from a given [`reqwest::Client`], using `base_url` as the VSS server endpoint. | ||
| /// Constructs a [`VssClient`] from a given [`bitreq::Client`], using `base_url` as the VSS server endpoint. | ||
| /// | ||
| /// HTTP headers will be provided by the given `header_provider`. | ||
| pub fn from_client_and_headers( | ||
|
|
@@ -65,7 +66,7 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> { | |
| pub fn new_with_headers( | ||
| base_url: String, retry_policy: R, header_provider: Arc<dyn VssHeaderProvider>, | ||
| ) -> Self { | ||
| let client = build_client(); | ||
| let client = Client::new(DEFAULT_CLIENT_CAPACITY); | ||
| Self { base_url, client, retry_policy, header_provider } | ||
| } | ||
|
|
||
|
|
@@ -190,37 +191,30 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> { | |
| &self, request: &Rq, url: &str, | ||
| ) -> Result<Rs, VssError> { | ||
| let request_body = request.encode_to_vec(); | ||
| let headermap = self | ||
| let headers = self | ||
| .header_provider | ||
| .get_headers(&request_body) | ||
| .await | ||
| .and_then(|h| get_headermap(&h)) | ||
| .map_err(|e| VssError::AuthError(e.to_string()))?; | ||
| let response_raw = self | ||
| .client | ||
| .post(url) | ||
| .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM) | ||
| .headers(headermap) | ||
| .body(request_body) | ||
| .send() | ||
| .await?; | ||
| let status = response_raw.status(); | ||
| let payload = response_raw.bytes().await?; | ||
|
|
||
| if status.is_success() { | ||
|
|
||
| let http_request = bitreq::post(url) | ||
| .with_header("content-type", APPLICATION_OCTET_STREAM) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: like you've done elsewhere, let's put this hard coded string in a constant ? Here and also in the lnurl auth module. |
||
| .with_headers(headers) | ||
| .with_body(request_body) | ||
| .with_timeout(DEFAULT_TIMEOUT_SECS) | ||
| .with_max_body_size(Some(MAX_RESPONSE_BODY_SIZE)) | ||
| .with_pipelining(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm bitreq says "Note that because pipelined requests may be replayed in case of failure, you should only set this on idempotent requests", and our
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Why is it not idempotent? If the request fails, we also assume the HTTP client to retry? This is essentially the same, no? |
||
|
|
||
| let response = self.client.send_async(http_request).await?; | ||
|
|
||
| let status_code = response.status_code; | ||
| let payload = response.into_bytes(); | ||
|
|
||
| if (200..300).contains(&status_code) { | ||
| let response = Rs::decode(&payload[..])?; | ||
| Ok(response) | ||
| } else { | ||
| Err(VssError::new(status, payload)) | ||
| Err(VssError::new(status_code, payload)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn build_client() -> Client { | ||
| Client::builder() | ||
| .timeout(DEFAULT_TIMEOUT) | ||
| .connect_timeout(DEFAULT_TIMEOUT) | ||
| .read_timeout(DEFAULT_TIMEOUT) | ||
| .build() | ||
| .unwrap() | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,5 @@ | ||
| use crate::types::{ErrorCode, ErrorResponse}; | ||
| use prost::bytes::Bytes; | ||
| use prost::{DecodeError, Message}; | ||
| use reqwest::StatusCode; | ||
| use std::error::Error; | ||
| use std::fmt::{Display, Formatter}; | ||
|
|
||
|
|
@@ -32,13 +30,13 @@ pub enum VssError { | |
|
|
||
| impl VssError { | ||
| /// Create new instance of `VssError` | ||
| pub fn new(status: StatusCode, payload: Bytes) -> VssError { | ||
| pub fn new(status_code: i32, payload: Vec<u8>) -> VssError { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we don't need ownership and can pass a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, but if we don't end up |
||
| match ErrorResponse::decode(&payload[..]) { | ||
| Ok(error_response) => VssError::from(error_response), | ||
| Err(e) => { | ||
| let message = format!( | ||
| "Unable to decode ErrorResponse from server, HttpStatusCode: {}, DecodeErr: {}", | ||
| status, e | ||
| status_code, e | ||
| ); | ||
| VssError::InternalError(message) | ||
| }, | ||
|
|
@@ -99,8 +97,8 @@ impl From<DecodeError> for VssError { | |
| } | ||
| } | ||
|
|
||
| impl From<reqwest::Error> for VssError { | ||
| fn from(err: reqwest::Error) -> Self { | ||
| impl From<bitreq::Error> for VssError { | ||
| fn from(err: bitreq::Error) -> Self { | ||
| VssError::InternalError(err.to_string()) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| use crate::headers::{get_headermap, VssHeaderProvider, VssHeaderProviderError}; | ||
| use crate::headers::{VssHeaderProvider, VssHeaderProviderError}; | ||
| use async_trait::async_trait; | ||
| use base64::engine::general_purpose::URL_SAFE_NO_PAD; | ||
| use base64::Engine; | ||
|
|
@@ -45,13 +45,14 @@ impl JwtToken { | |
| } | ||
| } | ||
|
|
||
| const DEFAULT_TIMEOUT_SECS: u64 = 10; | ||
|
|
||
| /// Provides a JWT token based on LNURL Auth. | ||
| pub struct LnurlAuthToJwtProvider { | ||
| engine: Secp256k1<SignOnly>, | ||
| parent_key: Xpriv, | ||
| url: String, | ||
| default_headers: HashMap<String, String>, | ||
| client: reqwest::Client, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure we want to delete the cached client here ? Thought we could cache it since the first get request in the lnurl auth flow is always to the same URL.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIU, we we'll only retrieve a new JWT token if the old one expired, which should be in the order of days or at least ours. It seems to me that keeping a client around (and potentially even keeping a connection open) is a bunch of unnecessary overhead if we just make single requests that rarely. |
||
| cached_jwt_token: RwLock<Option<JwtToken>>, | ||
| } | ||
|
|
||
|
|
@@ -70,47 +71,44 @@ impl LnurlAuthToJwtProvider { | |
| /// with the JWT authorization header for VSS requests. | ||
| pub fn new( | ||
| parent_key: Xpriv, url: String, default_headers: HashMap<String, String>, | ||
| ) -> Result<LnurlAuthToJwtProvider, VssHeaderProviderError> { | ||
| ) -> LnurlAuthToJwtProvider { | ||
| let engine = Secp256k1::signing_only(); | ||
| let default_headermap = get_headermap(&default_headers)?; | ||
| let client = reqwest::Client::builder() | ||
| .default_headers(default_headermap) | ||
| .build() | ||
| .map_err(VssHeaderProviderError::from)?; | ||
|
|
||
| Ok(LnurlAuthToJwtProvider { | ||
| LnurlAuthToJwtProvider { | ||
| engine, | ||
| parent_key, | ||
| url, | ||
| default_headers, | ||
| client, | ||
| cached_jwt_token: RwLock::new(None), | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| async fn fetch_jwt_token(&self) -> Result<JwtToken, VssHeaderProviderError> { | ||
| // Fetch the LNURL. | ||
| let lnurl_str = self | ||
| .client | ||
| .get(&self.url) | ||
| .send() | ||
| .await | ||
| .map_err(VssHeaderProviderError::from)? | ||
| .text() | ||
| .await | ||
| .map_err(VssHeaderProviderError::from)?; | ||
| let lnurl_request = bitreq::get(&self.url) | ||
| .with_headers(self.default_headers.clone()) | ||
| .with_timeout(DEFAULT_TIMEOUT_SECS); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know this is a trusted server, but do we want to set max body size here to prevent any possible OOM crashes ? same for the get request below.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, I overlooked that. Do you have a suggestion for a good limit here? |
||
| let lnurl_response = | ||
| lnurl_request.send_async().await.map_err(VssHeaderProviderError::from)?; | ||
| let lnurl_str = String::from_utf8(lnurl_response.into_bytes()).map_err(|e| { | ||
| VssHeaderProviderError::InvalidData { | ||
| error: format!("LNURL response is not valid UTF-8: {}", e), | ||
| } | ||
| })?; | ||
|
|
||
| // Sign the LNURL and perform the request. | ||
| let signed_lnurl = sign_lnurl(&self.engine, &self.parent_key, &lnurl_str)?; | ||
| let lnurl_auth_response: LnurlAuthResponse = self | ||
| .client | ||
| .get(&signed_lnurl) | ||
| .send() | ||
| .await | ||
| .map_err(VssHeaderProviderError::from)? | ||
| .json() | ||
| .await | ||
| .map_err(VssHeaderProviderError::from)?; | ||
| let auth_request = bitreq::get(&signed_lnurl) | ||
| .with_headers(self.default_headers.clone()) | ||
| .with_timeout(DEFAULT_TIMEOUT_SECS); | ||
| let auth_response = | ||
| auth_request.send_async().await.map_err(VssHeaderProviderError::from)?; | ||
| let lnurl_auth_response: LnurlAuthResponse = | ||
| serde_json::from_slice(&auth_response.into_bytes()).map_err(|e| { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could use
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. either is fine to me 🤷♂️
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tnull sounds to me like this is something we can upstream to bitreq no ? ie pass the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah, I considered that too. Just haven't fully made my mind up whether not validating UTF8 is okay. Maybe it's here, but not necessarily in the general case? |
||
| VssHeaderProviderError::InvalidData { | ||
| error: format!("Failed to parse LNURL Auth response as JSON: {}", e), | ||
| } | ||
| })?; | ||
|
|
||
| let untrusted_token = match lnurl_auth_response { | ||
| LnurlAuthResponse { token: Some(token), .. } => token, | ||
|
|
@@ -128,13 +126,14 @@ impl LnurlAuthToJwtProvider { | |
| parse_jwt_token(untrusted_token) | ||
| } | ||
|
|
||
| async fn get_jwt_token(&self, force_refresh: bool) -> Result<String, VssHeaderProviderError> { | ||
| let cached_token_str = if force_refresh { | ||
| None | ||
| } else { | ||
| let jwt_token = self.cached_jwt_token.read().unwrap(); | ||
| jwt_token.as_ref().filter(|t| !t.is_expired()).map(|t| t.token_str.clone()) | ||
| }; | ||
| async fn get_jwt_token(&self) -> Result<String, VssHeaderProviderError> { | ||
| let cached_token_str = self | ||
| .cached_jwt_token | ||
| .read() | ||
| .unwrap() | ||
| .as_ref() | ||
| .filter(|t| !t.is_expired()) | ||
| .map(|t| t.token_str.clone()); | ||
| if let Some(token_str) = cached_token_str { | ||
| Ok(token_str) | ||
| } else { | ||
|
|
@@ -150,7 +149,7 @@ impl VssHeaderProvider for LnurlAuthToJwtProvider { | |
| async fn get_headers( | ||
| &self, _request: &[u8], | ||
| ) -> Result<HashMap<String, String>, VssHeaderProviderError> { | ||
| let jwt_token = self.get_jwt_token(false).await?; | ||
| let jwt_token = self.get_jwt_token().await?; | ||
| let mut headers = self.default_headers.clone(); | ||
| headers.insert(AUTHORIZATION.to_string(), format!("Bearer {}", jwt_token)); | ||
| Ok(headers) | ||
|
|
@@ -256,8 +255,8 @@ impl From<bitcoin::bip32::Error> for VssHeaderProviderError { | |
| } | ||
| } | ||
|
|
||
| impl From<reqwest::Error> for VssHeaderProviderError { | ||
| fn from(e: reqwest::Error) -> VssHeaderProviderError { | ||
| impl From<bitreq::Error> for VssHeaderProviderError { | ||
| fn from(e: bitreq::Error) -> VssHeaderProviderError { | ||
| VssHeaderProviderError::RequestError { error: e.to_string() } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now bumped this considerably, as of course 10MB would be much too small. Let me know if you have an opinion on a better value here @TheBlueMatt @tankyleo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mmm, yea, good question. On the server side we defaulted to 1G because its postgres' limit. Not that "just because its postgres' limit" is a good reason to do anything, though. 500 seems fine enough to me, honestly, though. As long as its documented I don't feel super strongly.