From 02165f7a05d20b9b80e3b7a78a4c014499cb3c25 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Tue, 15 Nov 2022 19:09:43 +0100 Subject: [PATCH] refactor(sdk)!: Move retrying out of HttpSend trait --- crates/matrix-sdk/src/error.rs | 5 - crates/matrix-sdk/src/http_client.rs | 180 +++++++++++++-------------- 2 files changed, 86 insertions(+), 99 deletions(-) diff --git a/crates/matrix-sdk/src/error.rs b/crates/matrix-sdk/src/error.rs index dea936e9e..19daa341d 100644 --- a/crates/matrix-sdk/src/error.rs +++ b/crates/matrix-sdk/src/error.rs @@ -16,7 +16,6 @@ use std::io::Error as IoError; -use http::StatusCode; #[cfg(feature = "qrcode")] use matrix_sdk_base::crypto::ScanError; #[cfg(feature = "e2e-encryption")] @@ -103,10 +102,6 @@ pub enum HttpError { #[error(transparent)] IntoHttp(#[from] IntoHttpError), - /// The server returned a status code that should be retried. - #[error("Server returned an error {0}")] - Server(StatusCode), - /// The given request can't be cloned and thus can't be retried. #[error("The request cannot be cloned")] UnableToCloneRequest, diff --git a/crates/matrix-sdk/src/http_client.rs b/crates/matrix-sdk/src/http_client.rs index 350ab3b47..4ea766db2 100644 --- a/crates/matrix-sdk/src/http_client.rs +++ b/crates/matrix-sdk/src/http_client.rs @@ -48,14 +48,13 @@ pub trait HttpSend: AsyncTraitDeps { /// * `request` - The http request that has been converted from a ruma /// `Request`. /// - /// * `request_config` - The config used for this request. - /// + /// * `timeout` - A timeout for the full request > response cycle. /// # Examples /// /// ``` - /// use matrix_sdk::{ - /// async_trait, bytes::Bytes, config::RequestConfig, HttpError, HttpSend, - /// }; + /// use std::time::Duration; + /// + /// use matrix_sdk::{async_trait, bytes::Bytes, HttpError, HttpSend}; /// /// #[derive(Debug)] /// struct Client(reqwest::Client); @@ -75,7 +74,7 @@ pub trait HttpSend: AsyncTraitDeps { /// async fn send_request( /// &self, /// request: http::Request, - /// config: RequestConfig, + /// timeout: Duration, /// ) -> Result, HttpError> { /// Ok(self /// .response_to_http_response( @@ -90,7 +89,7 @@ pub trait HttpSend: AsyncTraitDeps { async fn send_request( &self, request: http::Request, - config: RequestConfig, + timeout: Duration, ) -> Result, HttpError>; } @@ -165,11 +164,63 @@ impl HttpClient { let request = request.map(|body| body.freeze()); trace!("Sending request"); - let response = self.inner.send_request(request, config).await?; - trace!("Got response: {:?}", response); + #[cfg(not(target_arch = "wasm32"))] + let response = { + use std::sync::atomic::{AtomicU64, Ordering}; + + use backoff::{future::retry, Error as RetryError, ExponentialBackoff}; + use ruma::api::client::error::ErrorKind as ClientApiErrorKind; + + let backoff = + ExponentialBackoff { max_elapsed_time: config.retry_timeout, ..Default::default() }; + let retry_count = AtomicU64::new(1); + + let send_request = || async { + let stop = if let Some(retry_limit) = config.retry_limit { + retry_count.fetch_add(1, Ordering::Relaxed) >= retry_limit + } else { + false + }; + + // Turn errors into permanent errors when the retry limit is reached + let error_type = if stop { + RetryError::Permanent + } else { + |err: HttpError| { + let retry_after = err.client_api_error_kind().and_then(|kind| match kind { + ClientApiErrorKind::LimitExceeded { retry_after_ms } => *retry_after_ms, + _ => None, + }); + RetryError::Transient { err, retry_after } + } + }; + + let raw_response = self + .inner + .send_request(clone_request(&request), config.timeout) + .await + .map_err(error_type)?; + + trace!("Got response: {raw_response:?}"); + + let response = Request::IncomingResponse::try_from_http_response(raw_response) + .map_err(|e| error_type(HttpError::from(e)))?; + + Ok(response) + }; + + retry::<_, HttpError, _, _, _>(backoff, send_request).await? + }; + + #[cfg(target_arch = "wasm32")] + let response = { + let raw_response = self.inner.send_request(request, config.timeout).await?; + trace!("Got response: {raw_response:?}"); + + Request::IncomingResponse::try_from_http_response(raw_response)? + }; - let response = Request::IncomingResponse::try_from_http_response(response)?; Ok(response) } } @@ -228,6 +279,18 @@ impl HttpSettings { } } +// Clones all request parts except the extensions which can't be cloned. +// See also https://github.com/hyperium/http/issues/395 +#[cfg(not(target_arch = "wasm32"))] +fn clone_request(request: &http::Request) -> http::Request { + let mut builder = http::Request::builder() + .version(request.version()) + .method(request.method()) + .uri(request.uri()); + *builder.headers_mut().unwrap() = request.headers().clone(); + builder.body(request.body().clone()).unwrap() +} + async fn response_to_http_response( mut response: Response, ) -> Result, reqwest::Error> { @@ -247,96 +310,25 @@ async fn response_to_http_response( Ok(http_builder.body(body).expect("Can't construct a response using the given body")) } -#[cfg(any(target_arch = "wasm32"))] -async fn send_request( - client: &reqwest::Client, - request: http::Request, - _: RequestConfig, -) -> Result, HttpError> { - let request = reqwest::Request::try_from(request)?; - let response = client.execute(request).await?; - - Ok(response_to_http_response(response).await?) -} - -#[cfg(all(not(target_arch = "wasm32")))] -async fn send_request( - client: &reqwest::Client, - request: http::Request, - config: RequestConfig, -) -> Result, HttpError> { - use std::sync::atomic::{AtomicU64, Ordering}; - - use backoff::{future::retry, Error as RetryError, ExponentialBackoff}; - use http::StatusCode; - use ruma::api::client::error::ErrorKind as ClientApiErrorKind; - - let mut backoff = ExponentialBackoff::default(); - let mut request = reqwest::Request::try_from(request)?; - let retry_limit = config.retry_limit; - let retry_count = AtomicU64::new(1); - - *request.timeout_mut() = Some(config.timeout); - - backoff.max_elapsed_time = config.retry_timeout; - - let request = &request; - let retry_count = &retry_count; - - let request = || async move { - let stop = if let Some(retry_limit) = retry_limit { - retry_count.fetch_add(1, Ordering::Relaxed) >= retry_limit - } else { - false - }; - - // Turn errors into permanent errors when the retry limit is reached - let error_type = if stop { - RetryError::Permanent - } else { - |err: HttpError| { - let retry_after = err.client_api_error_kind().and_then(|kind| match kind { - ClientApiErrorKind::LimitExceeded { retry_after_ms } => *retry_after_ms, - _ => None, - }); - RetryError::Transient { err, retry_after } - } - }; - - let request = request.try_clone().ok_or(HttpError::UnableToCloneRequest)?; - - let response = - client.execute(request).await.map_err(|e| error_type(HttpError::Reqwest(e)))?; - - let status_code = response.status(); - // TODO TOO_MANY_REQUESTS will have a retry timeout which we should - // use. - if !stop - && (status_code.is_server_error() || response.status() == StatusCode::TOO_MANY_REQUESTS) - { - return Err(error_type(HttpError::Server(status_code))); - } - - let response = response_to_http_response(response) - .await - .map_err(|e| RetryError::Permanent(HttpError::Reqwest(e)))?; - - Ok(response) - }; - - let response = retry(backoff, request).await?; - - Ok(response) -} - #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] impl HttpSend for reqwest::Client { async fn send_request( &self, request: http::Request, - config: RequestConfig, + _timeout: Duration, ) -> Result, HttpError> { - send_request(self, request, config).await + #[allow(unused_mut)] + let mut request = reqwest::Request::try_from(request)?; + + // reqwest's timeout functionality is not available on WASM + #[cfg(not(target_arch = "wasm32"))] + { + *request.timeout_mut() = Some(_timeout); + } + + let response = self.execute(request).await?; + + Ok(response_to_http_response(response).await?) } }