sliding sync: infer the storage key from the loop id and user id (#2008)

* sliding sync: infer the storage key from the loop id and user id
* chore: rename `sync_id` to `id`
* chore: check that the sliding sync id is less than 16 chars
* chore: rejigger the storage key creation logic

Now the prefix is visible only in the `format_storage_key_prefix` function, and other `format_storage_key` function will be based off that.

* chore: update sliding sync README with API updates and fix outdated information
* chore: clippy + fix test

Signed-off-by: Benjamin Bouvier <public@benj.me>
pull/2015/head
Benjamin Bouvier 2023-06-05 16:51:40 +02:00 committed by GitHub
parent c6dae678bd
commit 61c3a2a2c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 141 additions and 85 deletions

View File

@ -819,10 +819,10 @@ impl SlidingSyncBuilder {
Ok(Arc::new(builder))
}
pub fn storage_key(self: Arc<Self>, name: Option<String>) -> Arc<Self> {
pub fn enable_caching(self: Arc<Self>) -> Result<Arc<Self>, ClientError> {
let mut builder = unwrap_or_clone_arc(self);
builder.inner = builder.inner.storage_key(name);
Arc::new(builder)
builder.inner = builder.inner.enable_caching()?;
Ok(Arc::new(builder))
}
pub fn add_list(self: Arc<Self>, list_builder: Arc<SlidingSyncListBuilder>) -> Arc<Self> {
@ -894,8 +894,11 @@ impl SlidingSyncBuilder {
#[uniffi::export]
impl Client {
pub fn sliding_sync(&self) -> Arc<SlidingSyncBuilder> {
let mut inner = self.inner.sliding_sync();
/// Creates a new Sliding Sync instance with the given identifier.
///
/// Note: the identifier must be less than 16 chars long.
pub fn sliding_sync(&self, id: String) -> Result<Arc<SlidingSyncBuilder>, ClientError> {
let mut inner = self.inner.sliding_sync(id)?;
if let Some(sliding_sync_proxy) = self
.sliding_sync_proxy
.read()
@ -905,6 +908,6 @@ impl Client {
{
inner = inner.homeserver(sliding_sync_proxy);
}
Arc::new(SlidingSyncBuilder { inner, client: self.clone() })
Ok(Arc::new(SlidingSyncBuilder { inner, client: self.clone() }))
}
}

View File

@ -151,7 +151,7 @@ macro_rules! assert_timeline_stream {
async fn new_sliding_sync(lists: Vec<SlidingSyncListBuilder>) -> Result<(MockServer, SlidingSync)> {
let (client, server) = logged_in_client().await;
let mut sliding_sync_builder = client.sliding_sync();
let mut sliding_sync_builder = client.sliding_sync("integration-test")?;
for list in lists {
sliding_sync_builder = sliding_sync_builder.add_list(list);

View File

@ -27,12 +27,16 @@ To create a new Sliding Sync session, one must query an existing
[`Client::sliding_sync`](`super::Client::sliding_sync`). The
[`SlidingSyncBuilder`] is the baseline configuration to create a
[`SlidingSync`] session by calling `.build()` once everything is ready.
Typically one configures the custom homeserver endpoint.
Typically one configures the custom homeserver endpoint, although it's
automatically detected using the `.well-known` endpoint, if configured.
At the time of writing, no Matrix server natively supports Sliding Sync;
a sidecar called the [Sliding Sync Proxy][proxy] is needed. As that
typically runs on a separate domain, it can be configured on the
[`SlidingSyncBuilder`]:
[`SlidingSyncBuilder`].
A unique identifier, less than 16 chars long, is required for each instance
of Sliding Sync, and must be provided when getting a builder:
```rust,no_run
# use matrix_sdk::Client;
@ -41,7 +45,7 @@ typically runs on a separate domain, it can be configured on the
# let homeserver = Url::parse("http://example.com")?;
# let client = Client::new(homeserver).await?;
let sliding_sync_builder = client
.sliding_sync()
.sliding_sync("main-sync")?
.homeserver(Url::parse("http://sliding-sync.example.org")?);
# anyhow::Ok(())
@ -268,7 +272,7 @@ In full, this typically looks like this:
# let homeserver = Url::parse("http://example.com")?;
# let client = Client::new(homeserver).await?;
let sliding_sync = client
.sliding_sync()
.sliding_sync("main-sync")?
// any lists you want are added here.
.build()
.await?;
@ -351,22 +355,25 @@ timeline events as well as all list `room_lists` and
out).
This is a purely in-memory cache layer though. If one wants Sliding Sync to
persist and load from cold (storage) cache, one needs to set its key with
[`storage_key(name)`][`SlidingSyncBuilder::storage_key`] and for each list
present at `.build()`[`SlidingSyncBuilder::build`] sliding sync will attempt
to load their latest cached version from storage, as well as some overall
information of Sliding Sync. If that succeeded the lists `state` has been
set to [`Preloaded`][SlidingSyncState::Preloaded]. Only room data of rooms
present in one of the lists is loaded from storage.
persist and load from cold (storage) cache, one needs to explicitly
[`enable_caching()`][`SlidingSyncBuilder::enable_caching`]. This will reload the
Sliding Sync state from the storage, namely since tokens.
Notice that lists added after Sliding Sync has been built **will not be
loaded from cache** regardless of their settings (as this could lead to
inconsistencies between lists). The same goes for any extension: some
extension data (like the to-device-message position) are stored to storage,
but only retrieved upon `build()` of the `SlidingSyncBuilder`. So if one
only adds them later, they will not be reading the data from storage (to
avoid inconsistencies) and might require more data to be sent in their first
request than if they were loaded form cold-cache.
Caching for lists can be enabled independently, using the
[`add_cached_list`][`SlidingSyncBuilder::add_cached_list`] method, assuming
caching has been enabled before. In this case, during
`.build()`[`SlidingSyncBuilder::build`] sliding sync will attempt to load their
latest cached version from storage, as well as some overall information of
Sliding Sync. If that succeeded the lists `state` has been set to
[`Preloaded`][SlidingSyncState::Preloaded]. Only room data of rooms present in
one of the lists is loaded from storage.
Any extension data will not be loaded from the cache, if added after Sliding
Sync has been built: some extension data (like the to-device-message position)
are stored to storage, but only retrieved upon `build()` of the
`SlidingSyncBuilder`. So if one only adds them later, they will not be reading
the data from storage (to avoid inconsistencies) and might require more data to
be sent in their first request than if they were loaded from a cold cache.
When loading from storage `room_list` entries found are set to
`Invalidated` — the initial setting here is communicated as a single
@ -410,10 +417,10 @@ use url::Url;
let full_sync_list_name = "full-sync".to_owned();
let active_list_name = "active-list".to_owned();
let sliding_sync_builder = client
.sliding_sync()
.sliding_sync("main-sync")?
.homeserver(Url::parse("http://sliding-sync.example.org")?) // our proxy server
.with_common_extensions() // we want the e2ee and to-device enabled, please
.storage_key(Some("example-cache".to_owned())); // we want these to be loaded from and stored into the persistent storage
.enable_caching()?; // we want these to be loaded from and stored into the persistent storage
let full_sync_list = SlidingSyncList::builder(&full_sync_list_name)
.sync_mode(SlidingSyncMode::Growing { batch_size: 50, maximum_number_of_rooms_to_fetch: Some(500) }) // sync up by growing the window

View File

@ -11,8 +11,9 @@ use tokio::sync::broadcast::channel;
use url::Url;
use super::{
cache::restore_sliding_sync_state, SlidingSync, SlidingSyncInner, SlidingSyncListBuilder,
SlidingSyncPositionMarkers, SlidingSyncRoom,
cache::{format_storage_key_prefix, restore_sliding_sync_state},
Error, SlidingSync, SlidingSyncInner, SlidingSyncListBuilder, SlidingSyncPositionMarkers,
SlidingSyncRoom,
};
use crate::{Client, Result};
@ -22,6 +23,7 @@ use crate::{Client, Result};
/// [`crate::SlidingSync::builder`].
#[derive(Debug, Clone)]
pub struct SlidingSyncBuilder {
id: String,
storage_key: Option<String>,
homeserver: Option<Url>,
client: Client,
@ -32,22 +34,34 @@ pub struct SlidingSyncBuilder {
}
impl SlidingSyncBuilder {
pub(super) fn new(client: Client) -> Self {
Self {
storage_key: None,
homeserver: None,
client,
lists: Vec::new(),
extensions: None,
subscriptions: BTreeMap::new(),
rooms: BTreeMap::new(),
pub(super) fn new(id: String, client: Client) -> Result<Self, Error> {
if id.len() > 16 {
Err(Error::InvalidSlidingSyncIdentifier)
} else {
Ok(Self {
id,
storage_key: None,
homeserver: None,
client,
lists: Vec::new(),
extensions: None,
subscriptions: BTreeMap::new(),
rooms: BTreeMap::new(),
})
}
}
/// Set the storage key to keep this cache at and load it from.
pub fn storage_key(mut self, value: Option<String>) -> Self {
self.storage_key = value;
self
/// Enable caching for the given sliding sync.
///
/// This will cause lists and the sliding sync tokens to be saved into and
/// restored from the cache.
pub fn enable_caching(mut self) -> Result<Self> {
// Compute the final storage key now.
self.storage_key = Some(format_storage_key_prefix(
&self.id,
self.client.user_id().ok_or(super::Error::UnauthenticatedUser)?,
));
Ok(self)
}
/// Set the homeserver for sliding sync only.
@ -67,13 +81,14 @@ impl SlidingSyncBuilder {
/// Enroll the list in caching, reloads it from the cache if possible, and
/// adds it to the list of lists.
///
/// This will raise an error if a [`storage_key()`][Self::storage_key] was
/// not set, or if there was a I/O error reading from the cache.
/// This will raise an error if caching wasn't enabled with
/// [`enable_caching`][Self::enable_caching], or if there was a I/O error
/// reading from the cache.
///
/// Replace any list with the same name.
pub async fn add_cached_list(mut self, mut list: SlidingSyncListBuilder) -> Result<Self> {
let Some(ref storage_key) = self.storage_key else {
return Err(super::error::Error::MissingStorageKeyForCaching.into());
return Err(super::error::Error::CacheDisabled.into());
};
let reloaded_rooms = list.set_cached_and_reload(&self.client, storage_key).await?;
@ -238,6 +253,7 @@ impl SlidingSyncBuilder {
let lists = StdRwLock::new(lists);
Ok(SlidingSync::new(SlidingSyncInner {
_id: Some(self.id),
homeserver: self.homeserver,
client,
storage_key: self.storage_key,

View File

@ -7,21 +7,28 @@
use std::collections::BTreeMap;
use matrix_sdk_base::{StateStore, StoreError};
use ruma::UserId;
use tracing::{trace, warn};
use super::{FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList};
use crate::{sliding_sync::SlidingSyncListCachePolicy, Client, Result};
/// Be careful: as this is used as a storage key; changing it requires migrating
/// data!
pub(super) fn format_storage_key_prefix(id: &str, user_id: &UserId) -> String {
format!("sliding_sync_store::{}::{}", id, user_id)
}
/// Be careful: as this is used as a storage key; changing it requires migrating
/// data!
fn format_storage_key_for_sliding_sync(storage_key: &str) -> String {
format!("sliding_sync_store::{storage_key}")
format!("{storage_key}::instance")
}
/// Be careful: as this is used as a storage key; changing it requires migrating
/// data!
fn format_storage_key_for_sliding_sync_list(storage_key: &str, list_name: &str) -> String {
format!("sliding_sync_store::{storage_key}::{list_name}")
format!("{storage_key}::list::{list_name}")
}
/// Invalidate a single [`SlidingSyncList`] cache entry by removing it from the
@ -202,26 +209,22 @@ mod tests {
use futures_executor::block_on;
use futures_util::StreamExt;
use url::Url;
use super::*;
use crate::{Client, Result};
use crate::{test_utils::logged_in_client, Result};
#[test]
fn test_cannot_cache_without_a_storage_key() -> Result<()> {
block_on(async {
let homeserver = Url::parse("https://foo.bar")?;
let client = Client::new(homeserver).await?;
let client = logged_in_client(Some("https://foo.bar".to_owned())).await;
let err = client
.sliding_sync()
.sliding_sync("test")?
.add_cached_list(SlidingSyncList::builder("list_foo"))
.await
.unwrap_err();
assert!(matches!(
err,
crate::Error::SlidingSync(
crate::sliding_sync::error::Error::MissingStorageKeyForCaching
)
crate::Error::SlidingSync(crate::sliding_sync::error::Error::CacheDisabled)
));
Ok(())
})
@ -231,8 +234,7 @@ mod tests {
#[test]
fn test_sliding_sync_can_be_stored_and_restored() -> Result<()> {
block_on(async {
let homeserver = Url::parse("https://foo.bar")?;
let client = Client::new(homeserver).await?;
let client = logged_in_client(Some("https://foo.bar".to_owned())).await;
let store = client.store();
@ -257,10 +259,12 @@ mod tests {
.is_none());
// Create a new `SlidingSync` instance, and store it.
{
let storage_key = {
let sync_id = "test-sync-id";
let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap());
let sliding_sync = client
.sliding_sync()
.storage_key(Some("hello".to_owned()))
.sliding_sync(sync_id)?
.enable_caching()?
.add_cached_list(SlidingSyncList::builder("list_foo"))
.await?
.add_list(SlidingSyncList::builder("list_bar"))
@ -279,17 +283,18 @@ mod tests {
}
assert!(sliding_sync.cache_to_storage().await.is_ok());
}
storage_key
};
// Store entries now exist for the sliding sync object and list_foo.
assert!(store
.get_custom_value(format_storage_key_for_sliding_sync("hello").as_bytes())
.get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes())
.await?
.is_some());
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list("hello", "list_foo").as_bytes()
format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes()
)
.await?
.is_some());
@ -297,18 +302,20 @@ mod tests {
// But not for list_bar.
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes()
format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes()
)
.await?
.is_none());
// Create a new `SlidingSync`, and it should be read from the cache.
{
let storage_key = {
let sync_id = "test-sync-id";
let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap());
let max_number_of_room_stream = Arc::new(RwLock::new(None));
let cloned_stream = max_number_of_room_stream.clone();
let sliding_sync = client
.sliding_sync()
.storage_key(Some("hello".to_owned()))
.sliding_sync(sync_id)?
.enable_caching()?
.add_cached_list(SlidingSyncList::builder("list_foo").once_built(move |list| {
// In the `once_built()` handler, nothing has been read from the cache yet.
assert_eq!(list.maximum_number_of_rooms(), None);
@ -349,25 +356,27 @@ mod tests {
}
// Clean the cache.
clean_storage(&client, "hello", &sliding_sync.inner.lists.read().unwrap()).await;
}
clean_storage(&client, &storage_key, &sliding_sync.inner.lists.read().unwrap())
.await;
storage_key
};
// Store entries don't exist.
assert!(store
.get_custom_value(format_storage_key_for_sliding_sync("hello").as_bytes())
.get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes())
.await?
.is_none());
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list("hello", "list_foo").as_bytes()
format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes()
)
.await?
.is_none());
assert!(store
.get_custom_value(
format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes()
format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes()
)
.await?
.is_none());

View File

@ -6,9 +6,12 @@ use super::{SlidingSync, SlidingSyncBuilder};
use crate::{Client, Result};
impl Client {
/// Create a [`SlidingSyncBuilder`] tied to this client.
pub fn sliding_sync(&self) -> SlidingSyncBuilder {
SlidingSync::builder(self.clone())
/// Create a [`SlidingSyncBuilder`] tied to this client, with the given
/// identifier.
///
/// Note: the identifier must not be more than 16 chars long!
pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
Ok(SlidingSync::builder(id.into(), self.clone())?)
}
#[instrument(skip(self, response))]

View File

@ -30,10 +30,20 @@ pub enum Error {
/// Missing storage key when asking to deserialize some sub-state of sliding
/// sync.
#[error("A caching request was made but a storage key is missing in sliding sync")]
MissingStorageKeyForCaching,
#[error(
"A caching request was made but caching was not enabled in this instance of sliding sync"
)]
CacheDisabled,
/// We tried to read the user id of a client but it was missing.
#[error("Unauthenticated user in sliding sync")]
UnauthenticatedUser,
/// The internal channel of `SlidingSync` seems to be broken.
#[error("SlidingSync's internal channel is broken")]
InternalChannelIsBroken,
/// The name of the Sliding Sync instance is too long.
#[error("The Sliding Sync instance's identifier must be less than 16 chars long")]
InvalidSlidingSyncIdentifier,
}

View File

@ -79,6 +79,11 @@ pub struct SlidingSync {
#[derive(Debug)]
pub(super) struct SlidingSyncInner {
/// A unique identifier for this instance of sliding sync.
///
/// Used to distinguish different connections to the sliding sync proxy.
_id: Option<String>,
/// Customize the homeserver for sliding sync only
homeserver: Option<Url>,
@ -126,8 +131,8 @@ impl SlidingSync {
}
/// Create a new [`SlidingSyncBuilder`].
pub fn builder(client: Client) -> SlidingSyncBuilder {
SlidingSyncBuilder::new(client)
pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
SlidingSyncBuilder::new(id, client)
}
/// Subscribe to a given room.
@ -212,7 +217,7 @@ impl SlidingSync {
mut list_builder: SlidingSyncListBuilder,
) -> Result<Option<SlidingSyncList>> {
let Some(ref storage_key) = self.inner.storage_key else {
return Err(error::Error::MissingStorageKeyForCaching.into());
return Err(error::Error::CacheDisabled.into());
};
let reloaded_rooms =
@ -408,6 +413,7 @@ impl SlidingSync {
(
// Build the request itself.
assign!(v4::Request::new(), {
// conn_id: self.inner.id.clone(),
pos,
delta_token,
timeout: Some(timeout),
@ -714,7 +720,7 @@ mod tests {
let server = MockServer::start().await;
let client = logged_in_client(Some(server.uri())).await;
let sync = client.sliding_sync().build().await?;
let sync = client.sliding_sync("test-slidingsync")?.build().await?;
let extensions = sync.prepare_extension_config(None);
// If the user doesn't provide any extension config, we enable to-device and
@ -755,7 +761,7 @@ mod tests {
let server = MockServer::start().await;
let client = logged_in_client(Some(server.uri())).await;
let mut sliding_sync_builder = client.sliding_sync();
let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
for list in lists {
sliding_sync_builder = sliding_sync_builder.add_list(list);

View File

@ -12,8 +12,10 @@ async fn setup(
let sliding_sync_proxy_url =
option_env!("SLIDING_SYNC_PROXY_URL").unwrap_or("http://localhost:8338").to_owned();
let client = get_client_for_user(name, use_sqlite_store).await?;
let sliding_sync_builder =
client.sliding_sync().homeserver(sliding_sync_proxy_url.parse()?).with_common_extensions();
let sliding_sync_builder = client
.sliding_sync("test-slidingsync")?
.homeserver(sliding_sync_proxy_url.parse()?)
.with_common_extensions();
Ok((client, sliding_sync_builder))
}