diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 572e6a4f2..af03e3b65 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, RwLock}; use anyhow::Context; use eyeball_im::VectorDiff; -use futures_util::{future::join, pin_mut, StreamExt}; +use futures_util::{future::join3, pin_mut, StreamExt}; use matrix_sdk::ruma::{ api::client::sync::sync_events::{ v4::RoomSubscription as RumaRoomSubscription, @@ -213,15 +213,25 @@ impl SlidingSyncRoom { .await; }; - let reset_broadcast_rx = self.client.sliding_sync_reset_broadcast_tx.subscribe(); - let timeline = timeline.to_owned(); - let handle_sliding_sync_reset = async move { - reset_broadcast_rx.for_each(|_| timeline.clear()).await; + let handle_sliding_sync_reset = { + let reset_broadcast_rx = self.client.sliding_sync_reset_broadcast_tx.subscribe(); + let timeline = timeline.to_owned(); + async move { + reset_broadcast_rx.for_each(|_| timeline.clear()).await; + } + }; + + let handle_sync_gap = { + let gap_broadcast_rx = self.client.client.subscribe_sync_gap(self.inner.room_id()); + let timeline = timeline.to_owned(); + async move { + gap_broadcast_rx.for_each(|_| timeline.clear()).await; + } }; let items = timeline_items.into_iter().map(TimelineItem::from_arc).collect(); let task_handle = TaskHandle::new(RUNTIME.spawn(async move { - join(handle_events, handle_sliding_sync_reset).await; + join3(handle_events, handle_sliding_sync_reset, handle_sync_gap).await; })); Ok((items, task_handle)) diff --git a/crates/matrix-sdk/src/client/builder.rs b/crates/matrix-sdk/src/client/builder.rs index b275a0741..fc02573de 100644 --- a/crates/matrix-sdk/src/client/builder.rs +++ b/crates/matrix-sdk/src/client/builder.rs @@ -433,6 +433,7 @@ impl ClientBuilder { typing_notice_times: Default::default(), event_handlers: Default::default(), notification_handlers: Default::default(), + sync_gap_broadcast_txs: Default::default(), appservice_mode: self.appservice_mode, respect_login_well_known: self.respect_login_well_known, sync_beat: event_listener::Event::new(), diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index af618cce6..5259a64c6 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -19,10 +19,11 @@ use std::{ fmt::{self, Debug}, future::Future, pin::Pin, - sync::Arc, + sync::{Arc, Mutex as StdMutex}, }; use dashmap::DashMap; +use eyeball::{unique::Observable, Subscriber}; use futures_core::Stream; use futures_util::StreamExt; use matrix_sdk_base::{ @@ -162,6 +163,7 @@ pub(crate) struct ClientInner { pub(crate) event_handlers: EventHandlerStore, /// Notification handlers. See `register_notification_handler`. notification_handlers: RwLock>, + pub(crate) sync_gap_broadcast_txs: StdMutex>>, /// Whether the client should operate in application service style mode. /// This is low-level functionality. For an high-level API check the /// `matrix_sdk_appservice` crate. @@ -2528,6 +2530,16 @@ impl Client { let request = set_pusher::v3::Request::post(pusher); self.send(request, None).await } + + /// Subscribe to sync gaps for the given room. + /// + /// This method is meant to be removed in favor of making event handlers + /// more general in the future. + pub fn subscribe_sync_gap(&self, room_id: &RoomId) -> Subscriber<()> { + let mut lock = self.inner.sync_gap_broadcast_txs.lock().unwrap(); + let observable = lock.entry(room_id.to_owned()).or_default(); + Observable::subscribe(observable) + } } // The http mocking library is not supported for wasm32 diff --git a/crates/matrix-sdk/src/sync.rs b/crates/matrix-sdk/src/sync.rs index 67a99136f..f51c605fb 100644 --- a/crates/matrix-sdk/src/sync.rs +++ b/crates/matrix-sdk/src/sync.rs @@ -2,6 +2,7 @@ use std::{collections::BTreeMap, time::Duration}; +use eyeball::unique::Observable; pub use matrix_sdk_base::sync::*; use matrix_sdk_base::{ deserialized_responses::AmbiguityChanges, instant::Instant, @@ -14,7 +15,7 @@ use ruma::{ }, events::{AnyGlobalAccountDataEvent, AnyToDeviceEvent}, serde::Raw, - DeviceKeyAlgorithm, OwnedRoomId, + DeviceKeyAlgorithm, OwnedRoomId, RoomId, }; use tracing::{debug, error, warn}; @@ -105,6 +106,10 @@ impl Client { self.handle_sync_events(HandlerKind::ToDevice, &None, to_device_events).await?; for (room_id, room_info) in &rooms.join { + if room_info.timeline.limited { + self.notify_sync_gap(room_id); + } + let room = self.get_room(room_id); if room.is_none() { error!(?room_id, "Can't call event handler, room not found"); @@ -122,6 +127,10 @@ impl Client { } for (room_id, room_info) in &rooms.leave { + if room_info.timeline.limited { + self.notify_sync_gap(room_id); + } + let room = self.get_room(room_id); if room.is_none() { error!(?room_id, "Can't call event handler, room not found"); @@ -222,4 +231,11 @@ impl Client { *last_sync_time = Some(now); } + + fn notify_sync_gap(&self, room_id: &RoomId) { + let mut lock = self.inner.sync_gap_broadcast_txs.lock().unwrap(); + if let Some(tx) = lock.get_mut(room_id) { + Observable::set(tx, ()); + } + } }