@@ -393,181 +393,181 @@ impl EventCacheStore for SqliteEventCacheStore {
393
393
let this = self . clone ( ) ;
394
394
395
395
with_immediate_transaction ( self . acquire ( ) . await ?, move |txn| {
396
- for up in updates {
397
- match up {
398
- Update :: NewItemsChunk { previous, new, next } => {
399
- let previous = previous. as_ref ( ) . map ( ChunkIdentifier :: index) ;
400
- let new = new. index ( ) ;
401
- let next = next. as_ref ( ) . map ( ChunkIdentifier :: index) ;
402
-
403
- trace ! (
404
- %room_id,
405
- "new events chunk (prev={previous:?}, i={new}, next={next:?})" ,
406
- ) ;
407
-
408
- insert_chunk (
409
- txn,
410
- & hashed_room_id,
411
- previous,
412
- new,
413
- next,
414
- CHUNK_TYPE_EVENT_TYPE_STRING ,
415
- ) ?;
416
- }
417
-
418
- Update :: NewGapChunk { previous, new, next, gap } => {
419
- let serialized = serde_json:: to_vec ( & gap. prev_token ) ?;
420
- let prev_token = this. encode_value ( serialized) ?;
421
-
422
- let previous = previous. as_ref ( ) . map ( ChunkIdentifier :: index) ;
423
- let new = new. index ( ) ;
424
- let next = next. as_ref ( ) . map ( ChunkIdentifier :: index) ;
425
-
426
- trace ! (
427
- %room_id,
428
- "new gap chunk (prev={previous:?}, i={new}, next={next:?})" ,
429
- ) ;
430
-
431
- // Insert the chunk as a gap.
432
- insert_chunk (
433
- txn,
434
- & hashed_room_id,
435
- previous,
436
- new,
437
- next,
438
- CHUNK_TYPE_GAP_TYPE_STRING ,
439
- ) ?;
440
-
441
- // Insert the gap's value.
442
- txn. execute (
443
- r#"
444
- INSERT INTO gaps(chunk_id, room_id, prev_token)
445
- VALUES (?, ?, ?)
446
- "# ,
447
- ( new, & hashed_room_id, prev_token) ,
448
- ) ?;
449
- }
396
+ for up in updates {
397
+ match up {
398
+ Update :: NewItemsChunk { previous, new, next } => {
399
+ let previous = previous. as_ref ( ) . map ( ChunkIdentifier :: index) ;
400
+ let new = new. index ( ) ;
401
+ let next = next. as_ref ( ) . map ( ChunkIdentifier :: index) ;
402
+
403
+ trace ! (
404
+ %room_id,
405
+ "new events chunk (prev={previous:?}, i={new}, next={next:?})" ,
406
+ ) ;
407
+
408
+ insert_chunk (
409
+ txn,
410
+ & hashed_room_id,
411
+ previous,
412
+ new,
413
+ next,
414
+ CHUNK_TYPE_EVENT_TYPE_STRING ,
415
+ ) ?;
416
+ }
450
417
451
- Update :: RemoveChunk ( chunk_identifier) => {
452
- let chunk_id = chunk_identifier. index ( ) ;
418
+ Update :: NewGapChunk { previous, new, next, gap } => {
419
+ let serialized = serde_json:: to_vec ( & gap. prev_token ) ?;
420
+ let prev_token = this. encode_value ( serialized) ?;
421
+
422
+ let previous = previous. as_ref ( ) . map ( ChunkIdentifier :: index) ;
423
+ let new = new. index ( ) ;
424
+ let next = next. as_ref ( ) . map ( ChunkIdentifier :: index) ;
425
+
426
+ trace ! (
427
+ %room_id,
428
+ "new gap chunk (prev={previous:?}, i={new}, next={next:?})" ,
429
+ ) ;
430
+
431
+ // Insert the chunk as a gap.
432
+ insert_chunk (
433
+ txn,
434
+ & hashed_room_id,
435
+ previous,
436
+ new,
437
+ next,
438
+ CHUNK_TYPE_GAP_TYPE_STRING ,
439
+ ) ?;
453
440
454
- trace ! ( %room_id, "removing chunk @ {chunk_id}" ) ;
441
+ // Insert the gap's value.
442
+ txn. execute (
443
+ r#"
444
+ INSERT INTO gaps(chunk_id, room_id, prev_token)
445
+ VALUES (?, ?, ?)
446
+ "# ,
447
+ ( new, & hashed_room_id, prev_token) ,
448
+ ) ?;
449
+ }
455
450
456
- // Find chunk to delete.
457
- let ( previous, next) : ( Option < usize > , Option < usize > ) = txn. query_row (
458
- "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?" ,
459
- ( chunk_id, & hashed_room_id) ,
460
- |row| Ok ( ( row. get ( 0 ) ?, row. get ( 1 ) ?) )
461
- ) ?;
451
+ Update :: RemoveChunk ( chunk_identifier) => {
452
+ let chunk_id = chunk_identifier. index ( ) ;
462
453
463
- // Replace its previous' next to its own next.
464
- if let Some ( previous) = previous {
465
- txn. execute ( "UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?" , ( next, previous, & hashed_room_id) ) ?;
466
- }
454
+ trace ! ( %room_id, "removing chunk @ {chunk_id}" ) ;
467
455
468
- // Replace its next' previous to its own previous.
469
- if let Some ( next) = next {
470
- txn. execute ( "UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?" , ( previous, next, & hashed_room_id) ) ?;
471
- }
456
+ // Find chunk to delete.
457
+ let ( previous, next) : ( Option < usize > , Option < usize > ) = txn. query_row (
458
+ "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?" ,
459
+ ( chunk_id, & hashed_room_id) ,
460
+ |row| Ok ( ( row. get ( 0 ) ?, row. get ( 1 ) ?) )
461
+ ) ?;
472
462
473
- // Now delete it, and let cascading delete corresponding entries in the
474
- // other data tables.
475
- txn. execute ( "DELETE FROM linked_chunks WHERE id = ? AND room_id = ?" , ( chunk_id , & hashed_room_id) ) ?;
463
+ // Replace its previous' next to its own next.
464
+ if let Some ( previous ) = previous {
465
+ txn. execute ( "UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?" , ( next , previous , & hashed_room_id) ) ?;
476
466
}
477
467
478
- Update :: PushItems { at, items } => {
479
- let chunk_id = at. chunk_identifier ( ) . index ( ) ;
480
-
481
- trace ! ( %room_id, "pushing {} items @ {chunk_id}" , items. len( ) ) ;
482
-
483
- for ( i, event) in items. into_iter ( ) . enumerate ( ) {
484
- let serialized = serde_json:: to_vec ( & event) ?;
485
- let content = this. encode_value ( serialized) ?;
486
-
487
- let event_id = event. event_id ( ) . map ( |event_id| event_id. to_string ( ) ) ;
488
- let index = at. index ( ) + i;
489
-
490
- txn. execute (
491
- r#"
492
- INSERT INTO events(chunk_id, room_id, event_id, content, position)
493
- VALUES (?, ?, ?, ?, ?)
494
- "# ,
495
- ( chunk_id, & hashed_room_id, event_id, content, index) ,
496
- ) ?;
497
- }
468
+ // Replace its next' previous to its own previous.
469
+ if let Some ( next) = next {
470
+ txn. execute ( "UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?" , ( previous, next, & hashed_room_id) ) ?;
498
471
}
499
472
500
- Update :: ReplaceItem { at, item : event } => {
501
- let chunk_id = at. chunk_identifier ( ) . index ( ) ;
502
- let index = at. index ( ) ;
473
+ // Now delete it, and let cascading delete corresponding entries in the
474
+ // other data tables.
475
+ txn. execute ( "DELETE FROM linked_chunks WHERE id = ? AND room_id = ?" , ( chunk_id, & hashed_room_id) ) ?;
476
+ }
477
+
478
+ Update :: PushItems { at, items } => {
479
+ let chunk_id = at. chunk_identifier ( ) . index ( ) ;
503
480
504
- trace ! ( %room_id, "replacing item @ {chunk_id}:{index}" ) ;
481
+ trace ! ( %room_id, "pushing {} items @ {chunk_id}" , items . len ( ) ) ;
505
482
483
+ for ( i, event) in items. into_iter ( ) . enumerate ( ) {
506
484
let serialized = serde_json:: to_vec ( & event) ?;
507
485
let content = this. encode_value ( serialized) ?;
508
486
509
- // The event id should be the same, but just in case it changed…
510
487
let event_id = event. event_id ( ) . map ( |event_id| event_id. to_string ( ) ) ;
488
+ let index = at. index ( ) + i;
511
489
512
490
txn. execute (
513
491
r#"
514
- UPDATE events
515
- SET content = ?, event_id = ?
516
- WHERE room_id = ? AND chunk_id = ? AND position = ?
492
+ INSERT INTO events(chunk_id, room_id, event_id, content, position)
493
+ VALUES (?, ?, ?, ?, ?)
517
494
"# ,
518
- ( content , event_id , & hashed_room_id, chunk_id , index , )
495
+ ( chunk_id , & hashed_room_id, event_id , content , index ) ,
519
496
) ?;
520
497
}
498
+ }
521
499
522
- Update :: RemoveItem { at } => {
523
- let chunk_id = at. chunk_identifier ( ) . index ( ) ;
524
- let index = at. index ( ) ;
500
+ Update :: ReplaceItem { at, item : event } => {
501
+ let chunk_id = at. chunk_identifier ( ) . index ( ) ;
502
+ let index = at. index ( ) ;
525
503
526
- trace ! ( %room_id, "removing item @ {chunk_id}:{index}" ) ;
504
+ trace ! ( %room_id, "replacing item @ {chunk_id}:{index}" ) ;
527
505
528
- // Remove the entry.
529
- txn . execute ( "DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?" , ( & hashed_room_id , chunk_id , index ) ) ?;
506
+ let serialized = serde_json :: to_vec ( & event ) ? ;
507
+ let content = this . encode_value ( serialized ) ?;
530
508
531
- // Decrement the index of each item after the one we're going to remove.
532
- txn. execute (
533
- r#"
534
- UPDATE events
535
- SET position = position - 1
536
- WHERE room_id = ? AND chunk_id = ? AND position > ?
537
- "# ,
538
- ( & hashed_room_id, chunk_id, index)
539
- ) ?;
509
+ // The event id should be the same, but just in case it changed…
510
+ let event_id = event. event_id ( ) . map ( |event_id| event_id. to_string ( ) ) ;
540
511
541
- }
512
+ txn. execute (
513
+ r#"
514
+ UPDATE events
515
+ SET content = ?, event_id = ?
516
+ WHERE room_id = ? AND chunk_id = ? AND position = ?
517
+ "# ,
518
+ ( content, event_id, & hashed_room_id, chunk_id, index, )
519
+ ) ?;
520
+ }
542
521
543
- Update :: DetachLastItems { at } => {
544
- let chunk_id = at. chunk_identifier ( ) . index ( ) ;
545
- let index = at. index ( ) ;
522
+ Update :: RemoveItem { at } => {
523
+ let chunk_id = at. chunk_identifier ( ) . index ( ) ;
524
+ let index = at. index ( ) ;
546
525
547
- trace ! ( %room_id, "truncating items >= {chunk_id}:{index}" ) ;
526
+ trace ! ( %room_id, "removing item @ {chunk_id}:{index}" ) ;
548
527
549
- // Remove these entries.
550
- txn. execute ( "DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?" , ( & hashed_room_id, chunk_id, index) ) ?;
551
- }
528
+ // Remove the entry.
529
+ txn. execute ( "DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?" , ( & hashed_room_id, chunk_id, index) ) ?;
552
530
553
- Update :: Clear => {
554
- trace ! ( %room_id, "clearing items" ) ;
531
+ // Decrement the index of each item after the one we're going to remove.
532
+ txn. execute (
533
+ r#"
534
+ UPDATE events
535
+ SET position = position - 1
536
+ WHERE room_id = ? AND chunk_id = ? AND position > ?
537
+ "# ,
538
+ ( & hashed_room_id, chunk_id, index)
539
+ ) ?;
555
540
556
- // Remove chunks, and let cascading do its job.
557
- txn. execute (
558
- "DELETE FROM linked_chunks WHERE room_id = ?" ,
559
- ( & hashed_room_id, ) ,
560
- ) ?;
561
- }
541
+ }
562
542
563
- Update :: StartReattachItems | Update :: EndReattachItems => {
564
- // Nothing.
565
- }
543
+ Update :: DetachLastItems { at } => {
544
+ let chunk_id = at. chunk_identifier ( ) . index ( ) ;
545
+ let index = at. index ( ) ;
546
+
547
+ trace ! ( %room_id, "truncating items >= {chunk_id}:{index}" ) ;
548
+
549
+ // Remove these entries.
550
+ txn. execute ( "DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?" , ( & hashed_room_id, chunk_id, index) ) ?;
551
+ }
552
+
553
+ Update :: Clear => {
554
+ trace ! ( %room_id, "clearing items" ) ;
555
+
556
+ // Remove chunks, and let cascading do its job.
557
+ txn. execute (
558
+ "DELETE FROM linked_chunks WHERE room_id = ?" ,
559
+ ( & hashed_room_id, ) ,
560
+ ) ?;
561
+ }
562
+
563
+ Update :: StartReattachItems | Update :: EndReattachItems => {
564
+ // Nothing.
566
565
}
567
566
}
567
+ }
568
568
569
- Ok ( ( ) )
570
- } )
569
+ Ok ( ( ) )
570
+ } )
571
571
. await ?;
572
572
573
573
Ok ( ( ) )
0 commit comments