@@ -29,8 +29,7 @@ use rs_matter::dm::clusters::dev_att::DevAttDataFetcher;
2929use rs_matter:: dm:: clusters:: gen_diag:: NetifDiag ;
3030use rs_matter:: dm:: networks:: NetChangeNotif ;
3131use rs_matter:: dm:: subscriptions:: Subscriptions ;
32- use rs_matter:: dm:: IMBuffer ;
33- use rs_matter:: dm:: { AsyncHandler , AsyncMetadata } ;
32+ use rs_matter:: dm:: { AsyncHandler , AsyncMetadata , ClusterId , DataModel , EndptId , IMBuffer } ;
3433use rs_matter:: error:: { Error , ErrorCode } ;
3534use rs_matter:: respond:: { DefaultResponder , ExchangeHandler , Responder } ;
3635use rs_matter:: transport:: network:: { Address , ChainedNetwork , NetworkReceive , NetworkSend } ;
@@ -344,13 +343,18 @@ where
344343 /// Notifies the Matter instance that there is a change in the state
345344 /// of one of the clusters.
346345 ///
347- /// User is expected to call this method when user-provided clusters
348- /// change their state.
346+ /// User is expected to call this method when a user-provided cluster
347+ /// changes its state.
349348 ///
350349 /// This is necessary so as the Matter instance can notify clients
351350 /// that have active subscriptions to some of the changed clusters.
352- pub fn notify_changed ( & self ) {
353- self . subscriptions . notify_changed ( ) ;
351+ ///
352+ /// # Arguments
353+ /// - `endpoint_id` - the endpoint ID where the cluster is located
354+ /// - `cluster_id` - the ID of the cluster that has changed
355+ pub fn notify_cluster_changed ( & self , endpoint_id : EndptId , cluster_id : ClusterId ) {
356+ self . subscriptions
357+ . notify_cluster_changed ( endpoint_id, cluster_id) ;
354358 }
355359
356360 // /// User code hook to get the state of the netif passed to the
@@ -607,10 +611,12 @@ where
607611 // Reset the Matter transport buffers and all sessions first
608612 // self.matter().reset_transport()?;
609613
610- let mut responder = pin ! ( self . run_responder( & handler) ) ;
611- let mut handler = pin ! ( handler. run( ) ) ;
614+ let dm = DataModel :: new ( self . matter ( ) , & self . buffers , & self . subscriptions , handler) ;
615+
616+ let mut responder = pin ! ( self . run_responder( & dm) ) ;
617+ let mut dm_job = pin ! ( dm. run( ) ) ;
612618
613- select ( & mut responder, & mut handler ) . coalesce ( ) . await
619+ select ( & mut responder, & mut dm_job ) . coalesce ( ) . await
614620 }
615621
616622 async fn run_handler_with_bump < H > ( & self , handler : H ) -> Result < ( ) , Error >
@@ -621,10 +627,12 @@ where
621627 // Reset the Matter transport buffers and all sessions first
622628 // self.matter().reset_transport()?;
623629
624- let mut responder = pin_alloc ! ( self . bump, self . run_responder_with_bump( & handler) ) ;
625- let mut handler = pin_alloc ! ( self . bump, handler. run( ) ) ;
630+ let dm = DataModel :: new ( self . matter ( ) , & self . buffers , & self . subscriptions , handler) ;
626631
627- select ( & mut responder, & mut handler) . coalesce ( ) . await
632+ let mut responder = pin_alloc ! ( self . bump, self . run_responder_with_bump( & dm) ) ;
633+ let mut dm_job = pin ! ( dm. run( ) ) ;
634+
635+ select ( & mut responder, & mut dm_job) . coalesce ( ) . await
628636 }
629637
630638 fn run_psm < ' t , S , C > (
@@ -638,12 +646,14 @@ where
638646 persist. run ( )
639647 }
640648
641- async fn run_responder < H > ( & self , handler : H ) -> Result < ( ) , Error >
649+ async fn run_responder < const SN : usize , H > (
650+ & self ,
651+ dm : & DataModel < ' _ , SN , PooledBuffers < MAX_IM_BUFFERS , NoopRawMutex , IMBuffer > , H > ,
652+ ) -> Result < ( ) , Error >
642653 where
643654 H : AsyncHandler + AsyncMetadata ,
644655 {
645- let responder =
646- DefaultResponder :: new ( self . matter ( ) , & self . buffers , & self . subscriptions , handler) ;
656+ let responder = DefaultResponder :: new ( dm) ;
647657
648658 // Run the responder with up to MAX_RESPONDERS handlers (i.e. MAX_RESPONDERS exchanges can be handled simultenously)
649659 // Clients trying to open more exchanges than the ones currently running will get "I'm busy, please try again later"
@@ -652,12 +662,14 @@ where
652662 Ok ( ( ) )
653663 }
654664
655- async fn run_responder_with_bump < H > ( & self , handler : H ) -> Result < ( ) , Error >
665+ async fn run_responder_with_bump < const SN : usize , H > (
666+ & self ,
667+ dm : & DataModel < ' _ , SN , PooledBuffers < MAX_IM_BUFFERS , NoopRawMutex , IMBuffer > , H > ,
668+ ) -> Result < ( ) , Error >
656669 where
657670 H : AsyncHandler + AsyncMetadata ,
658671 {
659- let responder =
660- DefaultResponder :: new ( self . matter ( ) , & self . buffers , & self . subscriptions , handler) ;
672+ let responder = DefaultResponder :: new ( dm) ;
661673
662674 let mut actual = pin_alloc ! (
663675 self . bump,
0 commit comments