@@ -733,6 +733,7 @@ mod tests {
733
733
use crate :: execution_context:: Workload ;
734
734
use crate :: host:: module_host:: { DatabaseUpdate , EventStatus , ModuleEvent , ModuleFunctionCall } ;
735
735
use crate :: messages:: websocket as ws;
736
+ use crate :: sql:: execute:: run;
736
737
use crate :: subscription:: module_subscription_manager:: SubscriptionManager ;
737
738
use crate :: subscription:: query:: compile_read_only_query;
738
739
use crate :: subscription:: TableUpdateType ;
@@ -1529,6 +1530,51 @@ mod tests {
1529
1530
Ok ( ( ) )
1530
1531
}
1531
1532
1533
+ /// Test that we receive subscription updates for DML
1534
+ #[ tokio:: test]
1535
+ async fn test_updates_for_dml ( ) -> anyhow:: Result < ( ) > {
1536
+ // Establish a client connection
1537
+ let ( tx, mut rx) = client_connection ( client_id_from_u8 ( 1 ) ) ;
1538
+
1539
+ let db = relational_db ( ) ?;
1540
+ let subs = module_subscriptions ( db. clone ( ) ) ;
1541
+ let schema = [ ( "x" , AlgebraicType :: U8 ) , ( "y" , AlgebraicType :: U8 ) ] ;
1542
+ let t_id = db. create_table_for_test ( "t" , & schema, & [ ] ) ?;
1543
+
1544
+ // Subscribe to `t`
1545
+ subscribe_multi ( & subs, & [ "select * from t" ] , tx, & mut 0 ) ?;
1546
+
1547
+ // Wait to receive the initial subscription message
1548
+ assert ! ( matches!( rx. recv( ) . await , Some ( SerializableMessage :: Subscription ( _) ) ) ) ;
1549
+
1550
+ let schema = ProductType :: from ( [ AlgebraicType :: U8 , AlgebraicType :: U8 ] ) ;
1551
+
1552
+ // Only the owner can invoke DML commands
1553
+ let auth = AuthCtx :: new ( identity_from_u8 ( 0 ) , identity_from_u8 ( 0 ) ) ;
1554
+
1555
+ run (
1556
+ & db,
1557
+ "INSERT INTO t (x, y) VALUES (0, 1)" ,
1558
+ auth,
1559
+ Some ( & subs) ,
1560
+ & mut vec ! [ ] ,
1561
+ ) ?;
1562
+
1563
+ // Client should receive insert
1564
+ assert_tx_update_for_table ( & mut rx, t_id, & schema, [ product ! [ 0_u8 , 1_u8 ] ] , [ ] ) . await ;
1565
+
1566
+ run ( & db, "UPDATE t SET y=2 WHERE x=0" , auth, Some ( & subs) , & mut vec ! [ ] ) ?;
1567
+
1568
+ // Client should receive update
1569
+ assert_tx_update_for_table ( & mut rx, t_id, & schema, [ product ! [ 0_u8 , 2_u8 ] ] , [ product ! [ 0_u8 , 1_u8 ] ] ) . await ;
1570
+
1571
+ run ( & db, "DELETE FROM t WHERE x=0" , auth, Some ( & subs) , & mut vec ! [ ] ) ?;
1572
+
1573
+ // Client should receive delete
1574
+ assert_tx_update_for_table ( & mut rx, t_id, & schema, [ ] , [ product ! [ 0_u8 , 2_u8 ] ] ) . await ;
1575
+ Ok ( ( ) )
1576
+ }
1577
+
1532
1578
/// Test that we do not compress within a [TransactionUpdateMessage].
1533
1579
/// The message itself is compressed before being sent over the wire,
1534
1580
/// but we don't care about that for this test.
0 commit comments