@@ -936,6 +936,7 @@ mod tests {
936936 use crate :: error:: DBError ;
937937 use crate :: host:: module_host:: { DatabaseUpdate , EventStatus , ModuleEvent , ModuleFunctionCall } ;
938938 use crate :: messages:: websocket as ws;
939+ use crate :: sql:: execute:: run;
939940 use crate :: subscription:: module_subscription_manager:: { spawn_send_worker, SubscriptionManager } ;
940941 use crate :: subscription:: query:: compile_read_only_query;
941942 use crate :: subscription:: TableUpdateType ;
@@ -1817,6 +1818,51 @@ mod tests {
18171818 Ok ( ( ) )
18181819 }
18191820
1821+ /// Test that we receive subscription updates for DML
1822+ #[ tokio:: test]
1823+ async fn test_updates_for_dml ( ) -> anyhow:: Result < ( ) > {
1824+ // Establish a client connection
1825+ let ( tx, mut rx) = client_connection ( client_id_from_u8 ( 1 ) ) ;
1826+
1827+ let db = relational_db ( ) ?;
1828+ let subs = ModuleSubscriptions :: for_test_enclosing_runtime ( db. clone ( ) ) ;
1829+ let schema = [ ( "x" , AlgebraicType :: U8 ) , ( "y" , AlgebraicType :: U8 ) ] ;
1830+ let t_id = db. create_table_for_test ( "t" , & schema, & [ ] ) ?;
1831+
1832+ // Subscribe to `t`
1833+ subscribe_multi ( & subs, & [ "select * from t" ] , tx, & mut 0 ) ?;
1834+
1835+ // Wait to receive the initial subscription message
1836+ assert_matches ! ( rx. recv( ) . await , Some ( SerializableMessage :: Subscription ( _) ) ) ;
1837+
1838+ let schema = ProductType :: from ( [ AlgebraicType :: U8 , AlgebraicType :: U8 ] ) ;
1839+
1840+ // Only the owner can invoke DML commands
1841+ let auth = AuthCtx :: new ( identity_from_u8 ( 0 ) , identity_from_u8 ( 0 ) ) ;
1842+
1843+ run (
1844+ & db,
1845+ "INSERT INTO t (x, y) VALUES (0, 1)" ,
1846+ auth,
1847+ Some ( & subs) ,
1848+ & mut vec ! [ ] ,
1849+ ) ?;
1850+
1851+ // Client should receive insert
1852+ assert_tx_update_for_table ( & mut rx, t_id, & schema, [ product ! [ 0_u8 , 1_u8 ] ] , [ ] ) . await ;
1853+
1854+ run ( & db, "UPDATE t SET y=2 WHERE x=0" , auth, Some ( & subs) , & mut vec ! [ ] ) ?;
1855+
1856+ // Client should receive update
1857+ assert_tx_update_for_table ( & mut rx, t_id, & schema, [ product ! [ 0_u8 , 2_u8 ] ] , [ product ! [ 0_u8 , 1_u8 ] ] ) . await ;
1858+
1859+ run ( & db, "DELETE FROM t WHERE x=0" , auth, Some ( & subs) , & mut vec ! [ ] ) ?;
1860+
1861+ // Client should receive delete
1862+ assert_tx_update_for_table ( & mut rx, t_id, & schema, [ ] , [ product ! [ 0_u8 , 2_u8 ] ] ) . await ;
1863+ Ok ( ( ) )
1864+ }
1865+
18201866 /// Test that we do not compress within a [TransactionUpdateMessage].
18211867 /// The message itself is compressed before being sent over the wire,
18221868 /// but we don't care about that for this test.
0 commit comments