@@ -85,23 +85,43 @@ async function runBenchmark(argv) {
8585 // Get the cluster slots mapping to determine which node serves which slots
8686 const slotsMapping = await cluster . cluster ( 'SLOTS' ) ;
8787
88+ // Build a map from "host:port" to the actual node client
89+ const nodeMap = new Map ( ) ;
90+ for ( const node of nodes ) {
91+ const key = `${ node . options . host } :${ node . options . port } ` ;
92+ nodeMap . set ( key , node ) ;
93+ clients . push ( node ) ;
94+ }
95+
8896 // slotsMapping format: [[startSlot, endSlot, [host, port, nodeId], ...], ...]
89- // For each slot range, create a direct connection to the master node
97+ // For each slot range, map slots to the corresponding node client
9098 for ( const slotRange of slotsMapping ) {
9199 const startSlot = slotRange [ 0 ] ;
92100 const endSlot = slotRange [ 1 ] ;
93101 const masterInfo = slotRange [ 2 ] ; // [host, port, nodeId]
94102 const host = masterInfo [ 0 ] ;
95103 const port = masterInfo [ 1 ] ;
96104
97- // Create a standalone Redis client for this node
98- const nodeClient = new Redis ( {
99- ...redisOptions ,
100- host,
101- port
102- } ) ;
105+ // Find the node client for this host:port
106+ const nodeKey = `${ host } :${ port } ` ;
107+ let nodeClient = nodeMap . get ( nodeKey ) ;
108+
109+ if ( ! nodeClient ) {
110+ // If not found by exact match, try to find by port only
111+ // (useful when cluster returns internal IPs but we connect via external IP)
112+ for ( const [ key , client ] of nodeMap . entries ( ) ) {
113+ if ( key . endsWith ( `:${ port } ` ) ) {
114+ nodeClient = client ;
115+ console . log ( `Matched node ${ nodeKey } to ${ key } by port` ) ;
116+ break ;
117+ }
118+ }
119+ }
103120
104- clients . push ( nodeClient ) ;
121+ if ( ! nodeClient ) {
122+ console . warn ( `Warning: No node client found for ${ nodeKey } , using first available node` ) ;
123+ nodeClient = nodes [ 0 ] ;
124+ }
105125
106126 // Map all slots in this range to this node's client
107127 for ( let slot = startSlot ; slot <= endSlot ; slot ++ ) {
@@ -112,7 +132,7 @@ async function runBenchmark(argv) {
112132 nodeAddresses = nodes . map ( node => `${ node . options . host } :${ node . options . port } ` ) ;
113133
114134 console . log ( `Cluster mode - using ${ nodeAddresses . length } unique nodes: ${ nodeAddresses . join ( ', ' ) } ` ) ;
115- console . log ( `Cluster mode - mapped ${ slotClientMap . size } slots to individual node clients` ) ;
135+ console . log ( `Cluster mode - mapped ${ slotClientMap . size } slots to node clients` ) ;
116136 } else {
117137 const client = new Redis ( redisOptions ) ;
118138 clients . push ( client ) ;
@@ -164,7 +184,17 @@ async function runBenchmark(argv) {
164184 const publisherName = `publisher#${ clientId } ` ;
165185 let client ;
166186
167- client = clients [ 0 ]
187+ // For sharded pub/sub in cluster mode, get the client for the first channel's slot
188+ if ( argv . mode . startsWith ( 's' ) && argv [ 'oss-cluster-api-distribute-subscribers' ] ) {
189+ const slot = clusterKeySlot ( channels [ 0 ] ) ;
190+ client = slotClientMap . get ( slot ) ;
191+ if ( ! client ) {
192+ console . error ( `No client found for slot ${ slot } (channel: ${ channels [ 0 ] } )` ) ;
193+ client = clients [ 0 ] ; // Fallback
194+ }
195+ } else {
196+ client = clients [ 0 ] ;
197+ }
168198
169199 if ( argv . verbose ) {
170200 console . log ( `Publisher ${ clientId } targeting channels ${ channels } ` ) ;
0 commit comments