@@ -390,181 +390,181 @@ impl EventCacheStore for SqliteEventCacheStore {
390
390
let this = self . clone ( ) ;
391
391
392
392
with_immediate_transaction ( self . acquire ( ) . await ?, move |txn| {
393
- for up in updates {
394
- match up {
395
- Update :: NewItemsChunk { previous, new, next } => {
396
- let previous = previous. as_ref ( ) . map ( ChunkIdentifier :: index) ;
397
- let new = new. index ( ) ;
398
- let next = next. as_ref ( ) . map ( ChunkIdentifier :: index) ;
399
-
400
- trace ! (
401
- %room_id,
402
- "new events chunk (prev={previous:?}, i={new}, next={next:?})" ,
403
- ) ;
404
-
405
- insert_chunk (
406
- txn,
407
- & hashed_room_id,
408
- previous,
409
- new,
410
- next,
411
- CHUNK_TYPE_EVENT_TYPE_STRING ,
412
- ) ?;
413
- }
414
-
415
- Update :: NewGapChunk { previous, new, next, gap } => {
416
- let serialized = serde_json:: to_vec ( & gap. prev_token ) ?;
417
- let prev_token = this. encode_value ( serialized) ?;
418
-
419
- let previous = previous. as_ref ( ) . map ( ChunkIdentifier :: index) ;
420
- let new = new. index ( ) ;
421
- let next = next. as_ref ( ) . map ( ChunkIdentifier :: index) ;
422
-
423
- trace ! (
424
- %room_id,
425
- "new gap chunk (prev={previous:?}, i={new}, next={next:?})" ,
426
- ) ;
427
-
428
- // Insert the chunk as a gap.
429
- insert_chunk (
430
- txn,
431
- & hashed_room_id,
432
- previous,
433
- new,
434
- next,
435
- CHUNK_TYPE_GAP_TYPE_STRING ,
436
- ) ?;
437
-
438
- // Insert the gap's value.
439
- txn. execute (
440
- r#"
441
- INSERT INTO gaps(chunk_id, room_id, prev_token)
442
- VALUES (?, ?, ?)
443
- "# ,
444
- ( new, & hashed_room_id, prev_token) ,
445
- ) ?;
446
- }
393
+ for up in updates {
394
+ match up {
395
+ Update :: NewItemsChunk { previous, new, next } => {
396
+ let previous = previous. as_ref ( ) . map ( ChunkIdentifier :: index) ;
397
+ let new = new. index ( ) ;
398
+ let next = next. as_ref ( ) . map ( ChunkIdentifier :: index) ;
399
+
400
+ trace ! (
401
+ %room_id,
402
+ "new events chunk (prev={previous:?}, i={new}, next={next:?})" ,
403
+ ) ;
404
+
405
+ insert_chunk (
406
+ txn,
407
+ & hashed_room_id,
408
+ previous,
409
+ new,
410
+ next,
411
+ CHUNK_TYPE_EVENT_TYPE_STRING ,
412
+ ) ?;
413
+ }
447
414
448
- Update :: RemoveChunk ( chunk_identifier) => {
449
- let chunk_id = chunk_identifier. index ( ) ;
415
+ Update :: NewGapChunk { previous, new, next, gap } => {
416
+ let serialized = serde_json:: to_vec ( & gap. prev_token ) ?;
417
+ let prev_token = this. encode_value ( serialized) ?;
418
+
419
+ let previous = previous. as_ref ( ) . map ( ChunkIdentifier :: index) ;
420
+ let new = new. index ( ) ;
421
+ let next = next. as_ref ( ) . map ( ChunkIdentifier :: index) ;
422
+
423
+ trace ! (
424
+ %room_id,
425
+ "new gap chunk (prev={previous:?}, i={new}, next={next:?})" ,
426
+ ) ;
427
+
428
+ // Insert the chunk as a gap.
429
+ insert_chunk (
430
+ txn,
431
+ & hashed_room_id,
432
+ previous,
433
+ new,
434
+ next,
435
+ CHUNK_TYPE_GAP_TYPE_STRING ,
436
+ ) ?;
450
437
451
- trace ! ( %room_id, "removing chunk @ {chunk_id}" ) ;
438
+ // Insert the gap's value.
439
+ txn. execute (
440
+ r#"
441
+ INSERT INTO gaps(chunk_id, room_id, prev_token)
442
+ VALUES (?, ?, ?)
443
+ "# ,
444
+ ( new, & hashed_room_id, prev_token) ,
445
+ ) ?;
446
+ }
452
447
453
- // Find chunk to delete.
454
- let ( previous, next) : ( Option < usize > , Option < usize > ) = txn. query_row (
455
- "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?" ,
456
- ( chunk_id, & hashed_room_id) ,
457
- |row| Ok ( ( row. get ( 0 ) ?, row. get ( 1 ) ?) )
458
- ) ?;
448
+ Update :: RemoveChunk ( chunk_identifier) => {
449
+ let chunk_id = chunk_identifier. index ( ) ;
459
450
460
- // Replace its previous' next to its own next.
461
- if let Some ( previous) = previous {
462
- txn. execute ( "UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?" , ( next, previous, & hashed_room_id) ) ?;
463
- }
451
+ trace ! ( %room_id, "removing chunk @ {chunk_id}" ) ;
464
452
465
- // Replace its next' previous to its own previous.
466
- if let Some ( next) = next {
467
- txn. execute ( "UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?" , ( previous, next, & hashed_room_id) ) ?;
468
- }
453
+ // Find chunk to delete.
454
+ let ( previous, next) : ( Option < usize > , Option < usize > ) = txn. query_row (
455
+ "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?" ,
456
+ ( chunk_id, & hashed_room_id) ,
457
+ |row| Ok ( ( row. get ( 0 ) ?, row. get ( 1 ) ?) )
458
+ ) ?;
469
459
470
- // Now delete it, and let cascading delete corresponding entries in the
471
- // other data tables.
472
- txn. execute ( "DELETE FROM linked_chunks WHERE id = ? AND room_id = ?" , ( chunk_id , & hashed_room_id) ) ?;
460
+ // Replace its previous' next to its own next.
461
+ if let Some ( previous ) = previous {
462
+ txn. execute ( "UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?" , ( next , previous , & hashed_room_id) ) ?;
473
463
}
474
464
475
- Update :: PushItems { at, items } => {
476
- let chunk_id = at. chunk_identifier ( ) . index ( ) ;
477
-
478
- trace ! ( %room_id, "pushing {} items @ {chunk_id}" , items. len( ) ) ;
479
-
480
- for ( i, event) in items. into_iter ( ) . enumerate ( ) {
481
- let serialized = serde_json:: to_vec ( & event) ?;
482
- let content = this. encode_value ( serialized) ?;
483
-
484
- let event_id = event. event_id ( ) . map ( |event_id| event_id. to_string ( ) ) ;
485
- let index = at. index ( ) + i;
486
-
487
- txn. execute (
488
- r#"
489
- INSERT INTO events(chunk_id, room_id, event_id, content, position)
490
- VALUES (?, ?, ?, ?, ?)
491
- "# ,
492
- ( chunk_id, & hashed_room_id, event_id, content, index) ,
493
- ) ?;
494
- }
465
+ // Replace its next' previous to its own previous.
466
+ if let Some ( next) = next {
467
+ txn. execute ( "UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?" , ( previous, next, & hashed_room_id) ) ?;
495
468
}
496
469
497
- Update :: ReplaceItem { at, item : event } => {
498
- let chunk_id = at. chunk_identifier ( ) . index ( ) ;
499
- let index = at. index ( ) ;
470
+ // Now delete it, and let cascading delete corresponding entries in the
471
+ // other data tables.
472
+ txn. execute ( "DELETE FROM linked_chunks WHERE id = ? AND room_id = ?" , ( chunk_id, & hashed_room_id) ) ?;
473
+ }
474
+
475
+ Update :: PushItems { at, items } => {
476
+ let chunk_id = at. chunk_identifier ( ) . index ( ) ;
500
477
501
- trace ! ( %room_id, "replacing item @ {chunk_id}:{index}" ) ;
478
+ trace ! ( %room_id, "pushing {} items @ {chunk_id}" , items . len ( ) ) ;
502
479
480
+ for ( i, event) in items. into_iter ( ) . enumerate ( ) {
503
481
let serialized = serde_json:: to_vec ( & event) ?;
504
482
let content = this. encode_value ( serialized) ?;
505
483
506
- // The event id should be the same, but just in case it changed…
507
484
let event_id = event. event_id ( ) . map ( |event_id| event_id. to_string ( ) ) ;
485
+ let index = at. index ( ) + i;
508
486
509
487
txn. execute (
510
488
r#"
511
- UPDATE events
512
- SET content = ?, event_id = ?
513
- WHERE room_id = ? AND chunk_id = ? AND position = ?
489
+ INSERT INTO events(chunk_id, room_id, event_id, content, position)
490
+ VALUES (?, ?, ?, ?, ?)
514
491
"# ,
515
- ( content , event_id , & hashed_room_id, chunk_id , index , )
492
+ ( chunk_id , & hashed_room_id, event_id , content , index ) ,
516
493
) ?;
517
494
}
495
+ }
518
496
519
- Update :: RemoveItem { at } => {
520
- let chunk_id = at. chunk_identifier ( ) . index ( ) ;
521
- let index = at. index ( ) ;
497
+ Update :: ReplaceItem { at, item : event } => {
498
+ let chunk_id = at. chunk_identifier ( ) . index ( ) ;
499
+ let index = at. index ( ) ;
522
500
523
- trace ! ( %room_id, "removing item @ {chunk_id}:{index}" ) ;
501
+ trace ! ( %room_id, "replacing item @ {chunk_id}:{index}" ) ;
524
502
525
- // Remove the entry.
526
- txn . execute ( "DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?" , ( & hashed_room_id , chunk_id , index ) ) ?;
503
+ let serialized = serde_json :: to_vec ( & event ) ? ;
504
+ let content = this . encode_value ( serialized ) ?;
527
505
528
- // Decrement the index of each item after the one we're going to remove.
529
- txn. execute (
530
- r#"
531
- UPDATE events
532
- SET position = position - 1
533
- WHERE room_id = ? AND chunk_id = ? AND position > ?
534
- "# ,
535
- ( & hashed_room_id, chunk_id, index)
536
- ) ?;
506
+ // The event id should be the same, but just in case it changed…
507
+ let event_id = event. event_id ( ) . map ( |event_id| event_id. to_string ( ) ) ;
537
508
538
- }
509
+ txn. execute (
510
+ r#"
511
+ UPDATE events
512
+ SET content = ?, event_id = ?
513
+ WHERE room_id = ? AND chunk_id = ? AND position = ?
514
+ "# ,
515
+ ( content, event_id, & hashed_room_id, chunk_id, index, )
516
+ ) ?;
517
+ }
539
518
540
- Update :: DetachLastItems { at } => {
541
- let chunk_id = at. chunk_identifier ( ) . index ( ) ;
542
- let index = at. index ( ) ;
519
+ Update :: RemoveItem { at } => {
520
+ let chunk_id = at. chunk_identifier ( ) . index ( ) ;
521
+ let index = at. index ( ) ;
543
522
544
- trace ! ( %room_id, "truncating items >= {chunk_id}:{index}" ) ;
523
+ trace ! ( %room_id, "removing item @ {chunk_id}:{index}" ) ;
545
524
546
- // Remove these entries.
547
- txn. execute ( "DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?" , ( & hashed_room_id, chunk_id, index) ) ?;
548
- }
525
+ // Remove the entry.
526
+ txn. execute ( "DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?" , ( & hashed_room_id, chunk_id, index) ) ?;
549
527
550
- Update :: Clear => {
551
- trace ! ( %room_id, "clearing items" ) ;
528
+ // Decrement the index of each item after the one we're going to remove.
529
+ txn. execute (
530
+ r#"
531
+ UPDATE events
532
+ SET position = position - 1
533
+ WHERE room_id = ? AND chunk_id = ? AND position > ?
534
+ "# ,
535
+ ( & hashed_room_id, chunk_id, index)
536
+ ) ?;
552
537
553
- // Remove chunks, and let cascading do its job.
554
- txn. execute (
555
- "DELETE FROM linked_chunks WHERE room_id = ?" ,
556
- ( & hashed_room_id, ) ,
557
- ) ?;
558
- }
538
+ }
559
539
560
- Update :: StartReattachItems | Update :: EndReattachItems => {
561
- // Nothing.
562
- }
540
+ Update :: DetachLastItems { at } => {
541
+ let chunk_id = at. chunk_identifier ( ) . index ( ) ;
542
+ let index = at. index ( ) ;
543
+
544
+ trace ! ( %room_id, "truncating items >= {chunk_id}:{index}" ) ;
545
+
546
+ // Remove these entries.
547
+ txn. execute ( "DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?" , ( & hashed_room_id, chunk_id, index) ) ?;
548
+ }
549
+
550
+ Update :: Clear => {
551
+ trace ! ( %room_id, "clearing items" ) ;
552
+
553
+ // Remove chunks, and let cascading do its job.
554
+ txn. execute (
555
+ "DELETE FROM linked_chunks WHERE room_id = ?" ,
556
+ ( & hashed_room_id, ) ,
557
+ ) ?;
558
+ }
559
+
560
+ Update :: StartReattachItems | Update :: EndReattachItems => {
561
+ // Nothing.
563
562
}
564
563
}
564
+ }
565
565
566
- Ok ( ( ) )
567
- } )
566
+ Ok ( ( ) )
567
+ } )
568
568
. await ?;
569
569
570
570
Ok ( ( ) )
0 commit comments