Reset visible timeline when a gappy sync arrives

This commit is contained in:
Jonas Platte 2023-03-28 19:57:42 +02:00 committed by Jonas Platte
parent 50cc356c7b
commit 6b5f0b8ec3
4 changed files with 47 additions and 8 deletions

View File

@ -2,7 +2,7 @@ use std::sync::{Arc, RwLock};
use anyhow::Context;
use eyeball_im::VectorDiff;
use futures_util::{future::join, pin_mut, StreamExt};
use futures_util::{future::join3, pin_mut, StreamExt};
use matrix_sdk::ruma::{
api::client::sync::sync_events::{
v4::RoomSubscription as RumaRoomSubscription,
@ -213,15 +213,25 @@ impl SlidingSyncRoom {
.await;
};
let reset_broadcast_rx = self.client.sliding_sync_reset_broadcast_tx.subscribe();
let timeline = timeline.to_owned();
let handle_sliding_sync_reset = async move {
reset_broadcast_rx.for_each(|_| timeline.clear()).await;
let handle_sliding_sync_reset = {
let reset_broadcast_rx = self.client.sliding_sync_reset_broadcast_tx.subscribe();
let timeline = timeline.to_owned();
async move {
reset_broadcast_rx.for_each(|_| timeline.clear()).await;
}
};
let handle_sync_gap = {
let gap_broadcast_rx = self.client.client.subscribe_sync_gap(self.inner.room_id());
let timeline = timeline.to_owned();
async move {
gap_broadcast_rx.for_each(|_| timeline.clear()).await;
}
};
let items = timeline_items.into_iter().map(TimelineItem::from_arc).collect();
let task_handle = TaskHandle::new(RUNTIME.spawn(async move {
join(handle_events, handle_sliding_sync_reset).await;
join3(handle_events, handle_sliding_sync_reset, handle_sync_gap).await;
}));
Ok((items, task_handle))

View File

@ -433,6 +433,7 @@ impl ClientBuilder {
typing_notice_times: Default::default(),
event_handlers: Default::default(),
notification_handlers: Default::default(),
sync_gap_broadcast_txs: Default::default(),
appservice_mode: self.appservice_mode,
respect_login_well_known: self.respect_login_well_known,
sync_beat: event_listener::Event::new(),

View File

@ -19,10 +19,11 @@ use std::{
fmt::{self, Debug},
future::Future,
pin::Pin,
sync::Arc,
sync::{Arc, Mutex as StdMutex},
};
use dashmap::DashMap;
use eyeball::{unique::Observable, Subscriber};
use futures_core::Stream;
use futures_util::StreamExt;
use matrix_sdk_base::{
@ -162,6 +163,7 @@ pub(crate) struct ClientInner {
pub(crate) event_handlers: EventHandlerStore,
/// Notification handlers. See `register_notification_handler`.
notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
pub(crate) sync_gap_broadcast_txs: StdMutex<BTreeMap<OwnedRoomId, Observable<()>>>,
/// Whether the client should operate in application service style mode.
/// This is low-level functionality. For an high-level API check the
/// `matrix_sdk_appservice` crate.
@ -2528,6 +2530,16 @@ impl Client {
let request = set_pusher::v3::Request::post(pusher);
self.send(request, None).await
}
/// Subscribe to sync gaps for the given room.
///
/// This method is meant to be removed in favor of making event handlers
/// more general in the future.
pub fn subscribe_sync_gap(&self, room_id: &RoomId) -> Subscriber<()> {
let mut lock = self.inner.sync_gap_broadcast_txs.lock().unwrap();
let observable = lock.entry(room_id.to_owned()).or_default();
Observable::subscribe(observable)
}
}
// The http mocking library is not supported for wasm32

View File

@ -2,6 +2,7 @@
use std::{collections::BTreeMap, time::Duration};
use eyeball::unique::Observable;
pub use matrix_sdk_base::sync::*;
use matrix_sdk_base::{
deserialized_responses::AmbiguityChanges, instant::Instant,
@ -14,7 +15,7 @@ use ruma::{
},
events::{AnyGlobalAccountDataEvent, AnyToDeviceEvent},
serde::Raw,
DeviceKeyAlgorithm, OwnedRoomId,
DeviceKeyAlgorithm, OwnedRoomId, RoomId,
};
use tracing::{debug, error, warn};
@ -105,6 +106,10 @@ impl Client {
self.handle_sync_events(HandlerKind::ToDevice, &None, to_device_events).await?;
for (room_id, room_info) in &rooms.join {
if room_info.timeline.limited {
self.notify_sync_gap(room_id);
}
let room = self.get_room(room_id);
if room.is_none() {
error!(?room_id, "Can't call event handler, room not found");
@ -122,6 +127,10 @@ impl Client {
}
for (room_id, room_info) in &rooms.leave {
if room_info.timeline.limited {
self.notify_sync_gap(room_id);
}
let room = self.get_room(room_id);
if room.is_none() {
error!(?room_id, "Can't call event handler, room not found");
@ -222,4 +231,11 @@ impl Client {
*last_sync_time = Some(now);
}
fn notify_sync_gap(&self, room_id: &RoomId) {
let mut lock = self.inner.sync_gap_broadcast_txs.lock().unwrap();
if let Some(tx) = lock.get_mut(room_id) {
Observable::set(tx, ());
}
}
}