diff --git a/bindings/matrix-sdk-ffi/src/client.rs b/bindings/matrix-sdk-ffi/src/client.rs index 6cdcead91..359969053 100644 --- a/bindings/matrix-sdk-ffi/src/client.rs +++ b/bindings/matrix-sdk-ffi/src/client.rs @@ -1,6 +1,7 @@ use std::sync::{Arc, RwLock}; use anyhow::{anyhow, Context}; +use eyeball::shared::Observable as SharedObservable; use matrix_sdk::{ media::{MediaFileHandle as SdkMediaFileHandle, MediaFormat, MediaRequest, MediaThumbnailSize}, room::Room as SdkRoom, @@ -27,7 +28,7 @@ use matrix_sdk::{ }; use ruma::push::{HttpPusherData as RumaHttpPusherData, PushFormat as RumaPushFormat}; use serde_json::Value; -use tokio::sync::broadcast::{self, error::RecvError}; +use tokio::sync::broadcast::error::RecvError; use tracing::{debug, error, warn}; use super::{room::Room, session_verification::SessionVerificationController, RUNTIME}; @@ -114,7 +115,7 @@ pub struct Client { /// If this value is `Some`, it will be automatically added to the builder /// when calling `sliding_sync()`. pub(crate) sliding_sync_proxy: Arc>>, - pub(crate) sliding_sync_reset_broadcast_tx: broadcast::Sender<()>, + pub(crate) sliding_sync_reset_broadcast_tx: Arc>, } impl Client { @@ -135,14 +136,12 @@ impl Client { } }); - let (sliding_sync_reset_broadcast_tx, _) = broadcast::channel(1); - let client = Client { client, delegate: Arc::new(RwLock::new(None)), session_verification_controller, sliding_sync_proxy: Arc::new(RwLock::new(None)), - sliding_sync_reset_broadcast_tx, + sliding_sync_reset_broadcast_tx: Default::default(), }; let mut unknown_token_error_receiver = client.subscribe_to_unknown_token_errors(); @@ -571,7 +570,7 @@ impl Client { let client_api_error_kind = sync_error.client_api_error_kind(); match client_api_error_kind { Some(ErrorKind::UnknownPos) => { - let _ = self.sliding_sync_reset_broadcast_tx.send(()); + self.sliding_sync_reset_broadcast_tx.set(()); LoopCtrl::Continue } _ => { diff --git a/bindings/matrix-sdk-ffi/src/sliding_sync.rs b/bindings/matrix-sdk-ffi/src/sliding_sync.rs index 1e839f722..572e6a4f2 100644 --- a/bindings/matrix-sdk-ffi/src/sliding_sync.rs +++ b/bindings/matrix-sdk-ffi/src/sliding_sync.rs @@ -15,7 +15,7 @@ pub use matrix_sdk::{ Client as MatrixClient, LoopCtrl, RoomListEntry as MatrixRoomEntry, SlidingSyncBuilder as MatrixSlidingSyncBuilder, SlidingSyncMode, SlidingSyncState, }; -use tokio::{sync::broadcast::error::RecvError, task::JoinHandle}; +use tokio::task::JoinHandle; use tracing::{debug, error, warn}; use url::Url; @@ -213,15 +213,10 @@ impl SlidingSyncRoom { .await; }; - let mut reset_broadcast_rx = self.client.sliding_sync_reset_broadcast_tx.subscribe(); + let reset_broadcast_rx = self.client.sliding_sync_reset_broadcast_tx.subscribe(); let timeline = timeline.to_owned(); let handle_sliding_sync_reset = async move { - loop { - match reset_broadcast_rx.recv().await { - Err(RecvError::Closed) => break, - Ok(_) | Err(RecvError::Lagged(_)) => timeline.clear().await, - } - } + reset_broadcast_rx.for_each(|_| timeline.clear()).await; }; let items = timeline_items.into_iter().map(TimelineItem::from_arc).collect();