@@ -461,6 +461,8 @@ impl RoomEventCacheInner {
461
461
} )
462
462
. await ?;
463
463
464
+ // If there was a previous batch token, and there's at least one non-duplicated
465
+ // We must do this *after* the above call to `.with_events_mut`, so the new
464
466
{
465
467
// Fill the AllEventsCache.
466
468
let mut all_events = self . all_events . write ( ) . await ;
@@ -530,12 +532,12 @@ mod private {
530
532
use matrix_sdk_base:: {
531
533
deserialized_responses:: { TimelineEvent , TimelineEventKind } ,
532
534
event_cache:: { store:: EventCacheStoreLock , Event } ,
533
- linked_chunk:: { lazy_loader, ChunkContent , Update } ,
535
+ linked_chunk:: { lazy_loader, ChunkContent , ChunkIdentifierGenerator , Update } ,
534
536
} ;
535
537
use matrix_sdk_common:: executor:: spawn;
536
538
use once_cell:: sync:: OnceCell ;
537
539
use ruma:: { serde:: Raw , OwnedEventId , OwnedRoomId } ;
538
- use tracing:: { error, instrument, trace} ;
540
+ use tracing:: { debug , error, instrument, trace} ;
539
541
540
542
use super :: { events:: RoomEvents , LoadMoreEventsBackwardsOutcome } ;
541
543
use crate :: event_cache:: { deduplicator:: Deduplicator , EventCacheError } ;
@@ -740,6 +742,60 @@ mod private {
740
742
} )
741
743
}
742
744
745
+ /// If storage is enabled, unload all the chunks, then reloads only the
746
+ /// last one.
747
+ ///
748
+ /// Will return `Some` updates to be consumed by the caller, if and only
749
+ /// if storage is enabled. Otherwise, is a no-op.
750
+ #[ must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`" ]
751
+ pub ( super ) async fn shrink_to_last_chunk (
752
+ & mut self ,
753
+ ) -> Result < Option < Vec < VectorDiff < TimelineEvent > > > , EventCacheError > {
754
+ let Some ( store) = self . store . get ( ) else {
755
+ // No need to do anything if there's no storage; we'll already reset the
756
+ // timeline after a limited response.
757
+ // TODO: that might be a way to unify our code, though?
758
+ return Ok ( None ) ;
759
+ } ;
760
+
761
+ let store_lock = store. lock ( ) . await ?;
762
+
763
+ // Attempt to load the last chunk.
764
+ let ( last_chunk, chunk_identifier_generator) = match store_lock
765
+ . load_last_chunk ( & self . room )
766
+ . await
767
+ {
768
+ Ok ( pair) => pair,
769
+
770
+ Err ( err) => {
771
+ // If loading the last chunk failed, clear the entire linked chunk.
772
+ error ! ( "error when reloading a linked chunk from memory: {err}" ) ;
773
+
774
+ // Clear storage for this room.
775
+ store_lock. handle_linked_chunk_updates ( & self . room , vec ! [ Update :: Clear ] ) . await ?;
776
+
777
+ // Restart with an empty linked chunk.
778
+ ( None , ChunkIdentifierGenerator :: new_from_scratch ( ) )
779
+ }
780
+ } ;
781
+
782
+ debug ! ( "unloading the linked chunk, and resetting it to its last chunk" ) ;
783
+
784
+ // Remove all the chunks from the linked chunks, except for the last one, and
785
+ // updates the chunk identifier generator.
786
+ if let Err ( err) = self . events . replace_with ( last_chunk, chunk_identifier_generator) {
787
+ error ! ( "error when replacing the linked chunk: {err}" ) ;
788
+ return self . reset ( ) . await . map ( Some ) ;
789
+ }
790
+
791
+ // Don't propagate those updates to the store; this is only for the in-memory
792
+ // representation that we're doing this. Let's drain those store updates.
793
+ let _ = self . events . store_updates ( ) . take ( ) ;
794
+
795
+ // However, we want to get updates as `VectorDiff`s, for the external listeners.
796
+ Ok ( Some ( self . events . updates_as_vector_diffs ( ) ) )
797
+ }
798
+
743
799
/// Removes the bundled relations from an event, if they were present.
744
800
///
745
801
/// Only replaces the present if it contained bundled relations.
@@ -1767,4 +1823,122 @@ mod tests {
1767
1823
let related_event_id = related_events[ 0 ] . event_id ( ) . unwrap ( ) ;
1768
1824
assert_eq ! ( related_event_id, related_id) ;
1769
1825
}
1826
+
1827
+ #[ cfg( not( target_arch = "wasm32" ) ) ] // This uses the cross-process lock, so needs time support.
1828
+ #[ async_test]
1829
+ async fn test_shrink_to_last_chunk ( ) {
1830
+ use std:: ops:: Not as _;
1831
+
1832
+ use eyeball_im:: VectorDiff ;
1833
+
1834
+ use crate :: { assert_let_timeout, event_cache:: RoomEventCacheUpdate } ;
1835
+
1836
+ let room_id = room_id ! ( "!galette:saucisse.bzh" ) ;
1837
+
1838
+ let client = MockClientBuilder :: new ( "http://localhost" . to_owned ( ) ) . build ( ) . await ;
1839
+
1840
+ let f = EventFactory :: new ( ) . room ( room_id) ;
1841
+
1842
+ let evid1 = event_id ! ( "$1" ) ;
1843
+ let evid2 = event_id ! ( "$2" ) ;
1844
+
1845
+ let ev1 = f. text_msg ( "hello world" ) . sender ( * ALICE ) . event_id ( evid1) . into_event ( ) ;
1846
+ let ev2 = f. text_msg ( "howdy" ) . sender ( * BOB ) . event_id ( evid2) . into_event ( ) ;
1847
+
1848
+ // Fill the event cache store with an initial linked chunk with 2 events chunks.
1849
+ {
1850
+ let store = client. event_cache_store ( ) ;
1851
+ let store = store. lock ( ) . await . unwrap ( ) ;
1852
+ store
1853
+ . handle_linked_chunk_updates (
1854
+ room_id,
1855
+ vec ! [
1856
+ Update :: NewItemsChunk {
1857
+ previous: None ,
1858
+ new: ChunkIdentifier :: new( 0 ) ,
1859
+ next: None ,
1860
+ } ,
1861
+ Update :: PushItems {
1862
+ at: Position :: new( ChunkIdentifier :: new( 0 ) , 0 ) ,
1863
+ items: vec![ ev1] ,
1864
+ } ,
1865
+ Update :: NewItemsChunk {
1866
+ previous: Some ( ChunkIdentifier :: new( 0 ) ) ,
1867
+ new: ChunkIdentifier :: new( 1 ) ,
1868
+ next: None ,
1869
+ } ,
1870
+ Update :: PushItems {
1871
+ at: Position :: new( ChunkIdentifier :: new( 1 ) , 0 ) ,
1872
+ items: vec![ ev2] ,
1873
+ } ,
1874
+ ] ,
1875
+ )
1876
+ . await
1877
+ . unwrap ( ) ;
1878
+ }
1879
+
1880
+ let event_cache = client. event_cache ( ) ;
1881
+ event_cache. subscribe ( ) . unwrap ( ) ;
1882
+ event_cache. enable_storage ( ) . unwrap ( ) ;
1883
+
1884
+ client. base_client ( ) . get_or_create_room ( room_id, matrix_sdk_base:: RoomState :: Joined ) ;
1885
+ let room = client. get_room ( room_id) . unwrap ( ) ;
1886
+ let ( room_event_cache, _drop_handles) = room. event_cache ( ) . await . unwrap ( ) ;
1887
+
1888
+ // Sanity check: lazily loaded, so only includes one item at start.
1889
+ let ( events, mut stream) = room_event_cache. subscribe ( ) . await ;
1890
+ assert_eq ! ( events. len( ) , 1 ) ;
1891
+ assert_eq ! ( events[ 0 ] . event_id( ) . as_deref( ) , Some ( evid2) ) ;
1892
+ assert ! ( stream. is_empty( ) ) ;
1893
+
1894
+ // Force loading the full linked chunk by back-paginating.
1895
+ let outcome = room_event_cache. pagination ( ) . run_backwards_once ( 20 ) . await . unwrap ( ) ;
1896
+ assert_eq ! ( outcome. events. len( ) , 1 ) ;
1897
+ assert_eq ! ( outcome. events[ 0 ] . event_id( ) . as_deref( ) , Some ( evid1) ) ;
1898
+ assert ! ( outcome. reached_start. not( ) ) ;
1899
+
1900
+ // We also get an update about the loading from the store.
1901
+ assert_let_timeout ! (
1902
+ Ok ( RoomEventCacheUpdate :: UpdateTimelineEvents { diffs, .. } ) = stream. recv( )
1903
+ ) ;
1904
+ assert_eq ! ( diffs. len( ) , 1 ) ;
1905
+ assert_matches ! ( & diffs[ 0 ] , VectorDiff :: Insert { index: 0 , value } => {
1906
+ assert_eq!( value. event_id( ) . as_deref( ) , Some ( evid1) ) ;
1907
+ } ) ;
1908
+
1909
+ assert ! ( stream. is_empty( ) ) ;
1910
+
1911
+ // Shrink the linked chunk to the last chunk.
1912
+ let diffs = room_event_cache
1913
+ . inner
1914
+ . state
1915
+ . write ( )
1916
+ . await
1917
+ . shrink_to_last_chunk ( )
1918
+ . await
1919
+ . expect ( "shrinking should succeed" )
1920
+ . expect ( "there must be updates" ) ;
1921
+
1922
+ // We receive updates about the changes to the linked chunk.
1923
+ assert_eq ! ( diffs. len( ) , 2 ) ;
1924
+ assert_matches ! ( & diffs[ 0 ] , VectorDiff :: Clear ) ;
1925
+ assert_matches ! ( & diffs[ 1 ] , VectorDiff :: Append { values} => {
1926
+ assert_eq!( values. len( ) , 1 ) ;
1927
+ assert_eq!( values[ 0 ] . event_id( ) . as_deref( ) , Some ( evid2) ) ;
1928
+ } ) ;
1929
+
1930
+ assert ! ( stream. is_empty( ) ) ;
1931
+
1932
+ // When reading the events, we do get only the last one.
1933
+ let ( events, _) = room_event_cache. subscribe ( ) . await ;
1934
+ assert_eq ! ( events. len( ) , 1 ) ;
1935
+ assert_eq ! ( events[ 0 ] . event_id( ) . as_deref( ) , Some ( evid2) ) ;
1936
+
1937
+ // But if we back-paginate, we don't need access to network to find out about
1938
+ // the previous event.
1939
+ let outcome = room_event_cache. pagination ( ) . run_backwards_once ( 20 ) . await . unwrap ( ) ;
1940
+ assert_eq ! ( outcome. events. len( ) , 1 ) ;
1941
+ assert_eq ! ( outcome. events[ 0 ] . event_id( ) . as_deref( ) , Some ( evid1) ) ;
1942
+ assert ! ( outcome. reached_start. not( ) ) ;
1943
+ }
1770
1944
}
0 commit comments