@@ -461,6 +461,8 @@ impl RoomEventCacheInner {
461
461
} )
462
462
. await ?;
463
463
464
+ // If there was a previous batch token, and there's at least one non-duplicated
465
+ // We must do this *after* the above call to `.with_events_mut`, so the new
464
466
{
465
467
// Fill the AllEventsCache.
466
468
let mut all_events = self . all_events . write ( ) . await ;
@@ -530,12 +532,12 @@ mod private {
530
532
use matrix_sdk_base:: {
531
533
deserialized_responses:: { TimelineEvent , TimelineEventKind } ,
532
534
event_cache:: { store:: EventCacheStoreLock , Event } ,
533
- linked_chunk:: { lazy_loader, ChunkContent , Update } ,
535
+ linked_chunk:: { lazy_loader, ChunkContent , ChunkIdentifierGenerator , Update } ,
534
536
} ;
535
537
use matrix_sdk_common:: executor:: spawn;
536
538
use once_cell:: sync:: OnceCell ;
537
539
use ruma:: { serde:: Raw , OwnedEventId , OwnedRoomId } ;
538
- use tracing:: { error, instrument, trace} ;
540
+ use tracing:: { debug , error, instrument, trace} ;
539
541
540
542
use super :: { events:: RoomEvents , LoadMoreEventsBackwardsOutcome } ;
541
543
use crate :: event_cache:: { deduplicator:: Deduplicator , EventCacheError } ;
@@ -740,6 +742,60 @@ mod private {
740
742
} )
741
743
}
742
744
745
+ /// If storage is enabled, unload all the chunks, then reloads only the
746
+ /// last one.
747
+ ///
748
+ /// Will return `Some` updates to be consumed by the caller, if and only
749
+ /// if storage is enabled. Otherwise, is a no-op.
750
+ #[ must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`" ]
751
+ pub ( super ) async fn shrink_to_last_chunk (
752
+ & mut self ,
753
+ ) -> Result < Option < Vec < VectorDiff < TimelineEvent > > > , EventCacheError > {
754
+ let Some ( store) = self . store . get ( ) else {
755
+ // No need to do anything if there's no storage; we'll already reset the
756
+ // timeline after a limited response.
757
+ // TODO: that might be a way to unify our code, though?
758
+ return Ok ( None ) ;
759
+ } ;
760
+
761
+ let store_lock = store. lock ( ) . await ?;
762
+
763
+ // Attempt to load the last chunk.
764
+ let ( last_chunk, chunk_identifier_generator) = match store_lock
765
+ . load_last_chunk ( & self . room )
766
+ . await
767
+ {
768
+ Ok ( pair) => pair,
769
+
770
+ Err ( err) => {
771
+ // If loading the last chunk failed, clear the entire linked chunk.
772
+ error ! ( "error when reloading a linked chunk from memory: {err}" ) ;
773
+
774
+ // Clear storage for this room.
775
+ store_lock. handle_linked_chunk_updates ( & self . room , vec ! [ Update :: Clear ] ) . await ?;
776
+
777
+ // Restart with an empty linked chunk.
778
+ ( None , ChunkIdentifierGenerator :: new_from_scratch ( ) )
779
+ }
780
+ } ;
781
+
782
+ debug ! ( "unloading the linked chunk, and resetting it to its last chunk" ) ;
783
+
784
+ // Remove all the chunks from the linked chunks, except for the last one, and
785
+ // updates the chunk identifier generator.
786
+ if let Err ( err) = self . events . replace_with ( last_chunk, chunk_identifier_generator) {
787
+ error ! ( "error when replacing the linked chunk: {err}" ) ;
788
+ return self . reset ( ) . await . map ( Some ) ;
789
+ }
790
+
791
+ // Don't propagate those updates to the store; this is only for the in-memory
792
+ // representation that we're doing this. Let's drain those store updates.
793
+ let _ = self . events . store_updates ( ) . take ( ) ;
794
+
795
+ // However, we want to get updates as `VectorDiff`s, for the external listeners.
796
+ Ok ( Some ( self . events . updates_as_vector_diffs ( ) ) )
797
+ }
798
+
743
799
/// Removes the bundled relations from an event, if they were present.
744
800
///
745
801
/// Only replaces the present if it contained bundled relations.
@@ -1764,4 +1820,122 @@ mod tests {
1764
1820
let related_event_id = related_events[ 0 ] . event_id ( ) . unwrap ( ) ;
1765
1821
assert_eq ! ( related_event_id, related_id) ;
1766
1822
}
1823
+
1824
+ #[ cfg( not( target_arch = "wasm32" ) ) ] // This uses the cross-process lock, so needs time support.
1825
+ #[ async_test]
1826
+ async fn test_shrink_to_last_chunk ( ) {
1827
+ use std:: ops:: Not as _;
1828
+
1829
+ use eyeball_im:: VectorDiff ;
1830
+
1831
+ use crate :: { assert_let_timeout, event_cache:: RoomEventCacheUpdate } ;
1832
+
1833
+ let room_id = room_id ! ( "!galette:saucisse.bzh" ) ;
1834
+
1835
+ let client = MockClientBuilder :: new ( "http://localhost" . to_owned ( ) ) . build ( ) . await ;
1836
+
1837
+ let f = EventFactory :: new ( ) . room ( room_id) ;
1838
+
1839
+ let evid1 = event_id ! ( "$1" ) ;
1840
+ let evid2 = event_id ! ( "$2" ) ;
1841
+
1842
+ let ev1 = f. text_msg ( "hello world" ) . sender ( * ALICE ) . event_id ( evid1) . into_event ( ) ;
1843
+ let ev2 = f. text_msg ( "howdy" ) . sender ( * BOB ) . event_id ( evid2) . into_event ( ) ;
1844
+
1845
+ // Fill the event cache store with an initial linked chunk with 2 events chunks.
1846
+ {
1847
+ let store = client. event_cache_store ( ) ;
1848
+ let store = store. lock ( ) . await . unwrap ( ) ;
1849
+ store
1850
+ . handle_linked_chunk_updates (
1851
+ room_id,
1852
+ vec ! [
1853
+ Update :: NewItemsChunk {
1854
+ previous: None ,
1855
+ new: ChunkIdentifier :: new( 0 ) ,
1856
+ next: None ,
1857
+ } ,
1858
+ Update :: PushItems {
1859
+ at: Position :: new( ChunkIdentifier :: new( 0 ) , 0 ) ,
1860
+ items: vec![ ev1] ,
1861
+ } ,
1862
+ Update :: NewItemsChunk {
1863
+ previous: Some ( ChunkIdentifier :: new( 0 ) ) ,
1864
+ new: ChunkIdentifier :: new( 1 ) ,
1865
+ next: None ,
1866
+ } ,
1867
+ Update :: PushItems {
1868
+ at: Position :: new( ChunkIdentifier :: new( 1 ) , 0 ) ,
1869
+ items: vec![ ev2] ,
1870
+ } ,
1871
+ ] ,
1872
+ )
1873
+ . await
1874
+ . unwrap ( ) ;
1875
+ }
1876
+
1877
+ let event_cache = client. event_cache ( ) ;
1878
+ event_cache. subscribe ( ) . unwrap ( ) ;
1879
+ event_cache. enable_storage ( ) . unwrap ( ) ;
1880
+
1881
+ client. base_client ( ) . get_or_create_room ( room_id, matrix_sdk_base:: RoomState :: Joined ) ;
1882
+ let room = client. get_room ( room_id) . unwrap ( ) ;
1883
+ let ( room_event_cache, _drop_handles) = room. event_cache ( ) . await . unwrap ( ) ;
1884
+
1885
+ // Sanity check: lazily loaded, so only includes one item at start.
1886
+ let ( events, mut stream) = room_event_cache. subscribe ( ) . await ;
1887
+ assert_eq ! ( events. len( ) , 1 ) ;
1888
+ assert_eq ! ( events[ 0 ] . event_id( ) . as_deref( ) , Some ( evid2) ) ;
1889
+ assert ! ( stream. is_empty( ) ) ;
1890
+
1891
+ // Force loading the full linked chunk by back-paginating.
1892
+ let outcome = room_event_cache. pagination ( ) . run_backwards_once ( 20 ) . await . unwrap ( ) ;
1893
+ assert_eq ! ( outcome. events. len( ) , 1 ) ;
1894
+ assert_eq ! ( outcome. events[ 0 ] . event_id( ) . as_deref( ) , Some ( evid1) ) ;
1895
+ assert ! ( outcome. reached_start. not( ) ) ;
1896
+
1897
+ // We also get an update about the loading from the store.
1898
+ assert_let_timeout ! (
1899
+ Ok ( RoomEventCacheUpdate :: UpdateTimelineEvents { diffs, .. } ) = stream. recv( )
1900
+ ) ;
1901
+ assert_eq ! ( diffs. len( ) , 1 ) ;
1902
+ assert_matches ! ( & diffs[ 0 ] , VectorDiff :: Insert { index: 0 , value } => {
1903
+ assert_eq!( value. event_id( ) . as_deref( ) , Some ( evid1) ) ;
1904
+ } ) ;
1905
+
1906
+ assert ! ( stream. is_empty( ) ) ;
1907
+
1908
+ // Shrink the linked chunk to the last chunk.
1909
+ let diffs = room_event_cache
1910
+ . inner
1911
+ . state
1912
+ . write ( )
1913
+ . await
1914
+ . shrink_to_last_chunk ( )
1915
+ . await
1916
+ . expect ( "shrinking should succeed" )
1917
+ . expect ( "there must be updates" ) ;
1918
+
1919
+ // We receive updates about the changes to the linked chunk.
1920
+ assert_eq ! ( diffs. len( ) , 2 ) ;
1921
+ assert_matches ! ( & diffs[ 0 ] , VectorDiff :: Clear ) ;
1922
+ assert_matches ! ( & diffs[ 1 ] , VectorDiff :: Append { values} => {
1923
+ assert_eq!( values. len( ) , 1 ) ;
1924
+ assert_eq!( values[ 0 ] . event_id( ) . as_deref( ) , Some ( evid2) ) ;
1925
+ } ) ;
1926
+
1927
+ assert ! ( stream. is_empty( ) ) ;
1928
+
1929
+ // When reading the events, we do get only the last one.
1930
+ let ( events, _) = room_event_cache. subscribe ( ) . await ;
1931
+ assert_eq ! ( events. len( ) , 1 ) ;
1932
+ assert_eq ! ( events[ 0 ] . event_id( ) . as_deref( ) , Some ( evid2) ) ;
1933
+
1934
+ // But if we back-paginate, we don't need access to network to find out about
1935
+ // the previous event.
1936
+ let outcome = room_event_cache. pagination ( ) . run_backwards_once ( 20 ) . await . unwrap ( ) ;
1937
+ assert_eq ! ( outcome. events. len( ) , 1 ) ;
1938
+ assert_eq ! ( outcome. events[ 0 ] . event_id( ) . as_deref( ) , Some ( evid1) ) ;
1939
+ assert ! ( outcome. reached_start. not( ) ) ;
1940
+ }
1767
1941
}
0 commit comments