refactor(sdk)!: Move retrying out of HttpSend trait

This commit is contained in:
Jonas Platte 2022-11-15 19:09:43 +01:00 committed by Jonas Platte
parent 98600e4c1e
commit 02165f7a05
2 changed files with 86 additions and 99 deletions

View File

@ -16,7 +16,6 @@
use std::io::Error as IoError;
use http::StatusCode;
#[cfg(feature = "qrcode")]
use matrix_sdk_base::crypto::ScanError;
#[cfg(feature = "e2e-encryption")]
@ -103,10 +102,6 @@ pub enum HttpError {
#[error(transparent)]
IntoHttp(#[from] IntoHttpError),
/// The server returned a status code that should be retried.
#[error("Server returned an error {0}")]
Server(StatusCode),
/// The given request can't be cloned and thus can't be retried.
#[error("The request cannot be cloned")]
UnableToCloneRequest,

View File

@ -48,14 +48,13 @@ pub trait HttpSend: AsyncTraitDeps {
/// * `request` - The http request that has been converted from a ruma
/// `Request`.
///
/// * `request_config` - The config used for this request.
///
/// * `timeout` - A timeout for the full request > response cycle.
/// # Examples
///
/// ```
/// use matrix_sdk::{
/// async_trait, bytes::Bytes, config::RequestConfig, HttpError, HttpSend,
/// };
/// use std::time::Duration;
///
/// use matrix_sdk::{async_trait, bytes::Bytes, HttpError, HttpSend};
///
/// #[derive(Debug)]
/// struct Client(reqwest::Client);
@ -75,7 +74,7 @@ pub trait HttpSend: AsyncTraitDeps {
/// async fn send_request(
/// &self,
/// request: http::Request<Bytes>,
/// config: RequestConfig,
/// timeout: Duration,
/// ) -> Result<http::Response<Bytes>, HttpError> {
/// Ok(self
/// .response_to_http_response(
@ -90,7 +89,7 @@ pub trait HttpSend: AsyncTraitDeps {
async fn send_request(
&self,
request: http::Request<Bytes>,
config: RequestConfig,
timeout: Duration,
) -> Result<http::Response<Bytes>, HttpError>;
}
@ -165,11 +164,63 @@ impl HttpClient {
let request = request.map(|body| body.freeze());
trace!("Sending request");
let response = self.inner.send_request(request, config).await?;
trace!("Got response: {:?}", response);
#[cfg(not(target_arch = "wasm32"))]
let response = {
use std::sync::atomic::{AtomicU64, Ordering};
use backoff::{future::retry, Error as RetryError, ExponentialBackoff};
use ruma::api::client::error::ErrorKind as ClientApiErrorKind;
let backoff =
ExponentialBackoff { max_elapsed_time: config.retry_timeout, ..Default::default() };
let retry_count = AtomicU64::new(1);
let send_request = || async {
let stop = if let Some(retry_limit) = config.retry_limit {
retry_count.fetch_add(1, Ordering::Relaxed) >= retry_limit
} else {
false
};
// Turn errors into permanent errors when the retry limit is reached
let error_type = if stop {
RetryError::Permanent
} else {
|err: HttpError| {
let retry_after = err.client_api_error_kind().and_then(|kind| match kind {
ClientApiErrorKind::LimitExceeded { retry_after_ms } => *retry_after_ms,
_ => None,
});
RetryError::Transient { err, retry_after }
}
};
let raw_response = self
.inner
.send_request(clone_request(&request), config.timeout)
.await
.map_err(error_type)?;
trace!("Got response: {raw_response:?}");
let response = Request::IncomingResponse::try_from_http_response(raw_response)
.map_err(|e| error_type(HttpError::from(e)))?;
Ok(response)
};
retry::<_, HttpError, _, _, _>(backoff, send_request).await?
};
#[cfg(target_arch = "wasm32")]
let response = {
let raw_response = self.inner.send_request(request, config.timeout).await?;
trace!("Got response: {raw_response:?}");
Request::IncomingResponse::try_from_http_response(raw_response)?
};
let response = Request::IncomingResponse::try_from_http_response(response)?;
Ok(response)
}
}
@ -228,6 +279,18 @@ impl HttpSettings {
}
}
// Clones all request parts except the extensions which can't be cloned.
// See also https://github.com/hyperium/http/issues/395
#[cfg(not(target_arch = "wasm32"))]
fn clone_request(request: &http::Request<Bytes>) -> http::Request<Bytes> {
let mut builder = http::Request::builder()
.version(request.version())
.method(request.method())
.uri(request.uri());
*builder.headers_mut().unwrap() = request.headers().clone();
builder.body(request.body().clone()).unwrap()
}
async fn response_to_http_response(
mut response: Response,
) -> Result<http::Response<Bytes>, reqwest::Error> {
@ -247,96 +310,25 @@ async fn response_to_http_response(
Ok(http_builder.body(body).expect("Can't construct a response using the given body"))
}
#[cfg(any(target_arch = "wasm32"))]
async fn send_request(
client: &reqwest::Client,
request: http::Request<Bytes>,
_: RequestConfig,
) -> Result<http::Response<Bytes>, HttpError> {
let request = reqwest::Request::try_from(request)?;
let response = client.execute(request).await?;
Ok(response_to_http_response(response).await?)
}
#[cfg(all(not(target_arch = "wasm32")))]
async fn send_request(
client: &reqwest::Client,
request: http::Request<Bytes>,
config: RequestConfig,
) -> Result<http::Response<Bytes>, HttpError> {
use std::sync::atomic::{AtomicU64, Ordering};
use backoff::{future::retry, Error as RetryError, ExponentialBackoff};
use http::StatusCode;
use ruma::api::client::error::ErrorKind as ClientApiErrorKind;
let mut backoff = ExponentialBackoff::default();
let mut request = reqwest::Request::try_from(request)?;
let retry_limit = config.retry_limit;
let retry_count = AtomicU64::new(1);
*request.timeout_mut() = Some(config.timeout);
backoff.max_elapsed_time = config.retry_timeout;
let request = &request;
let retry_count = &retry_count;
let request = || async move {
let stop = if let Some(retry_limit) = retry_limit {
retry_count.fetch_add(1, Ordering::Relaxed) >= retry_limit
} else {
false
};
// Turn errors into permanent errors when the retry limit is reached
let error_type = if stop {
RetryError::Permanent
} else {
|err: HttpError| {
let retry_after = err.client_api_error_kind().and_then(|kind| match kind {
ClientApiErrorKind::LimitExceeded { retry_after_ms } => *retry_after_ms,
_ => None,
});
RetryError::Transient { err, retry_after }
}
};
let request = request.try_clone().ok_or(HttpError::UnableToCloneRequest)?;
let response =
client.execute(request).await.map_err(|e| error_type(HttpError::Reqwest(e)))?;
let status_code = response.status();
// TODO TOO_MANY_REQUESTS will have a retry timeout which we should
// use.
if !stop
&& (status_code.is_server_error() || response.status() == StatusCode::TOO_MANY_REQUESTS)
{
return Err(error_type(HttpError::Server(status_code)));
}
let response = response_to_http_response(response)
.await
.map_err(|e| RetryError::Permanent(HttpError::Reqwest(e)))?;
Ok(response)
};
let response = retry(backoff, request).await?;
Ok(response)
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl HttpSend for reqwest::Client {
async fn send_request(
&self,
request: http::Request<Bytes>,
config: RequestConfig,
_timeout: Duration,
) -> Result<http::Response<Bytes>, HttpError> {
send_request(self, request, config).await
#[allow(unused_mut)]
let mut request = reqwest::Request::try_from(request)?;
// reqwest's timeout functionality is not available on WASM
#[cfg(not(target_arch = "wasm32"))]
{
*request.timeout_mut() = Some(_timeout);
}
let response = self.execute(request).await?;
Ok(response_to_http_response(response).await?)
}
}