Use Observable for sliding_sync_reset_broadcast_tx

This commit is contained in:
Jonas Platte 2023-03-29 13:03:54 +02:00 committed by Jonas Platte
parent 2ac192f307
commit 50cc356c7b
2 changed files with 8 additions and 14 deletions

View File

@ -1,6 +1,7 @@
use std::sync::{Arc, RwLock};
use anyhow::{anyhow, Context};
use eyeball::shared::Observable as SharedObservable;
use matrix_sdk::{
media::{MediaFileHandle as SdkMediaFileHandle, MediaFormat, MediaRequest, MediaThumbnailSize},
room::Room as SdkRoom,
@ -27,7 +28,7 @@ use matrix_sdk::{
};
use ruma::push::{HttpPusherData as RumaHttpPusherData, PushFormat as RumaPushFormat};
use serde_json::Value;
use tokio::sync::broadcast::{self, error::RecvError};
use tokio::sync::broadcast::error::RecvError;
use tracing::{debug, error, warn};
use super::{room::Room, session_verification::SessionVerificationController, RUNTIME};
@ -114,7 +115,7 @@ pub struct Client {
/// If this value is `Some`, it will be automatically added to the builder
/// when calling `sliding_sync()`.
pub(crate) sliding_sync_proxy: Arc<RwLock<Option<String>>>,
pub(crate) sliding_sync_reset_broadcast_tx: broadcast::Sender<()>,
pub(crate) sliding_sync_reset_broadcast_tx: Arc<SharedObservable<()>>,
}
impl Client {
@ -135,14 +136,12 @@ impl Client {
}
});
let (sliding_sync_reset_broadcast_tx, _) = broadcast::channel(1);
let client = Client {
client,
delegate: Arc::new(RwLock::new(None)),
session_verification_controller,
sliding_sync_proxy: Arc::new(RwLock::new(None)),
sliding_sync_reset_broadcast_tx,
sliding_sync_reset_broadcast_tx: Default::default(),
};
let mut unknown_token_error_receiver = client.subscribe_to_unknown_token_errors();
@ -571,7 +570,7 @@ impl Client {
let client_api_error_kind = sync_error.client_api_error_kind();
match client_api_error_kind {
Some(ErrorKind::UnknownPos) => {
let _ = self.sliding_sync_reset_broadcast_tx.send(());
self.sliding_sync_reset_broadcast_tx.set(());
LoopCtrl::Continue
}
_ => {

View File

@ -15,7 +15,7 @@ pub use matrix_sdk::{
Client as MatrixClient, LoopCtrl, RoomListEntry as MatrixRoomEntry,
SlidingSyncBuilder as MatrixSlidingSyncBuilder, SlidingSyncMode, SlidingSyncState,
};
use tokio::{sync::broadcast::error::RecvError, task::JoinHandle};
use tokio::task::JoinHandle;
use tracing::{debug, error, warn};
use url::Url;
@ -213,15 +213,10 @@ impl SlidingSyncRoom {
.await;
};
let mut reset_broadcast_rx = self.client.sliding_sync_reset_broadcast_tx.subscribe();
let reset_broadcast_rx = self.client.sliding_sync_reset_broadcast_tx.subscribe();
let timeline = timeline.to_owned();
let handle_sliding_sync_reset = async move {
loop {
match reset_broadcast_rx.recv().await {
Err(RecvError::Closed) => break,
Ok(_) | Err(RecvError::Lagged(_)) => timeline.clear().await,
}
}
reset_broadcast_rx.for_each(|_| timeline.clear()).await;
};
let items = timeline_items.into_iter().map(TimelineItem::from_arc).collect();