@@ -219,15 +219,15 @@ public function aggregate(array $pipeline, array $options = [])
219
219
{
220
220
$ hasWriteStage = is_last_pipeline_operator_write ($ pipeline );
221
221
222
- if (! isset ($ options ['readPreference ' ])) {
222
+ if (! isset ($ options ['readPreference ' ]) && ! is_in_transaction ( $ options ) ) {
223
223
$ options ['readPreference ' ] = $ this ->readPreference ;
224
224
}
225
225
226
226
if ($ hasWriteStage ) {
227
227
$ options ['readPreference ' ] = new ReadPreference (ReadPreference::RP_PRIMARY );
228
228
}
229
229
230
- $ server = select_server ($ this ->manager , $ options[ ' readPreference ' ], extract_session_from_options ( $ options ) );
230
+ $ server = select_server ($ this ->manager , $ options );
231
231
232
232
/* MongoDB 4.2 and later supports a read concern when an $out stage is
233
233
* being used, but earlier versions do not.
@@ -276,7 +276,7 @@ public function bulkWrite(array $operations, array $options = [])
276
276
}
277
277
278
278
$ operation = new BulkWrite ($ this ->databaseName , $ this ->collectionName , $ operations , $ options );
279
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
279
+ $ server = select_server ($ this ->manager , $ options );
280
280
281
281
return $ operation ->execute ($ server );
282
282
}
@@ -297,11 +297,11 @@ public function bulkWrite(array $operations, array $options = [])
297
297
*/
298
298
public function count ($ filter = [], array $ options = [])
299
299
{
300
- if (! isset ($ options ['readPreference ' ])) {
300
+ if (! isset ($ options ['readPreference ' ]) && ! is_in_transaction ( $ options ) ) {
301
301
$ options ['readPreference ' ] = $ this ->readPreference ;
302
302
}
303
303
304
- $ server = select_server ($ this ->manager , $ options[ ' readPreference ' ], extract_session_from_options ( $ options ) );
304
+ $ server = select_server ($ this ->manager , $ options );
305
305
306
306
if (! isset ($ options ['readConcern ' ]) && server_supports_feature ($ server , self ::$ wireVersionForReadConcern ) && ! is_in_transaction ($ options )) {
307
307
$ options ['readConcern ' ] = $ this ->readConcern ;
@@ -326,11 +326,11 @@ public function count($filter = [], array $options = [])
326
326
*/
327
327
public function countDocuments ($ filter = [], array $ options = [])
328
328
{
329
- if (! isset ($ options ['readPreference ' ])) {
329
+ if (! isset ($ options ['readPreference ' ]) && ! is_in_transaction ( $ options ) ) {
330
330
$ options ['readPreference ' ] = $ this ->readPreference ;
331
331
}
332
332
333
- $ server = select_server ($ this ->manager , $ options[ ' readPreference ' ], extract_session_from_options ( $ options ) );
333
+ $ server = select_server ($ this ->manager , $ options );
334
334
335
335
if (! isset ($ options ['readConcern ' ]) && server_supports_feature ($ server , self ::$ wireVersionForReadConcern ) && ! is_in_transaction ($ options )) {
336
336
$ options ['readConcern ' ] = $ this ->readConcern ;
@@ -392,7 +392,7 @@ public function createIndex($key, array $options = [])
392
392
*/
393
393
public function createIndexes (array $ indexes , array $ options = [])
394
394
{
395
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
395
+ $ server = select_server ($ this ->manager , $ options );
396
396
397
397
if (! isset ($ options ['writeConcern ' ]) && server_supports_feature ($ server , self ::$ wireVersionForWritableCommandWriteConcern ) && ! is_in_transaction ($ options )) {
398
398
$ options ['writeConcern ' ] = $ this ->writeConcern ;
@@ -422,7 +422,7 @@ public function deleteMany($filter, array $options = [])
422
422
}
423
423
424
424
$ operation = new DeleteMany ($ this ->databaseName , $ this ->collectionName , $ filter , $ options );
425
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
425
+ $ server = select_server ($ this ->manager , $ options );
426
426
427
427
return $ operation ->execute ($ server );
428
428
}
@@ -446,7 +446,7 @@ public function deleteOne($filter, array $options = [])
446
446
}
447
447
448
448
$ operation = new DeleteOne ($ this ->databaseName , $ this ->collectionName , $ filter , $ options );
449
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
449
+ $ server = select_server ($ this ->manager , $ options );
450
450
451
451
return $ operation ->execute ($ server );
452
452
}
@@ -466,11 +466,11 @@ public function deleteOne($filter, array $options = [])
466
466
*/
467
467
public function distinct ($ fieldName , $ filter = [], array $ options = [])
468
468
{
469
- if (! isset ($ options ['readPreference ' ])) {
469
+ if (! isset ($ options ['readPreference ' ]) && ! is_in_transaction ( $ options ) ) {
470
470
$ options ['readPreference ' ] = $ this ->readPreference ;
471
471
}
472
472
473
- $ server = select_server ($ this ->manager , $ options[ ' readPreference ' ], extract_session_from_options ( $ options ) );
473
+ $ server = select_server ($ this ->manager , $ options );
474
474
475
475
if (! isset ($ options ['readConcern ' ]) && server_supports_feature ($ server , self ::$ wireVersionForReadConcern ) && ! is_in_transaction ($ options )) {
476
476
$ options ['readConcern ' ] = $ this ->readConcern ;
@@ -497,7 +497,7 @@ public function drop(array $options = [])
497
497
$ options ['typeMap ' ] = $ this ->typeMap ;
498
498
}
499
499
500
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
500
+ $ server = select_server ($ this ->manager , $ options );
501
501
502
502
if (! isset ($ options ['writeConcern ' ]) && server_supports_feature ($ server , self ::$ wireVersionForWritableCommandWriteConcern ) && ! is_in_transaction ($ options )) {
503
503
$ options ['writeConcern ' ] = $ this ->writeConcern ;
@@ -531,7 +531,7 @@ public function dropIndex($indexName, array $options = [])
531
531
$ options ['typeMap ' ] = $ this ->typeMap ;
532
532
}
533
533
534
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
534
+ $ server = select_server ($ this ->manager , $ options );
535
535
536
536
if (! isset ($ options ['writeConcern ' ]) && server_supports_feature ($ server , self ::$ wireVersionForWritableCommandWriteConcern ) && ! is_in_transaction ($ options )) {
537
537
$ options ['writeConcern ' ] = $ this ->writeConcern ;
@@ -558,7 +558,7 @@ public function dropIndexes(array $options = [])
558
558
$ options ['typeMap ' ] = $ this ->typeMap ;
559
559
}
560
560
561
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
561
+ $ server = select_server ($ this ->manager , $ options );
562
562
563
563
if (! isset ($ options ['writeConcern ' ]) && server_supports_feature ($ server , self ::$ wireVersionForWritableCommandWriteConcern ) && ! is_in_transaction ($ options )) {
564
564
$ options ['writeConcern ' ] = $ this ->writeConcern ;
@@ -582,11 +582,11 @@ public function dropIndexes(array $options = [])
582
582
*/
583
583
public function estimatedDocumentCount (array $ options = [])
584
584
{
585
- if (! isset ($ options ['readPreference ' ])) {
585
+ if (! isset ($ options ['readPreference ' ]) && ! is_in_transaction ( $ options ) ) {
586
586
$ options ['readPreference ' ] = $ this ->readPreference ;
587
587
}
588
588
589
- $ server = select_server ($ this ->manager , $ options[ ' readPreference ' ], extract_session_from_options ( $ options ) );
589
+ $ server = select_server ($ this ->manager , $ options );
590
590
591
591
if (! isset ($ options ['readConcern ' ]) && server_supports_feature ($ server , self ::$ wireVersionForReadConcern ) && ! is_in_transaction ($ options )) {
592
592
$ options ['readConcern ' ] = $ this ->readConcern ;
@@ -611,15 +611,15 @@ public function estimatedDocumentCount(array $options = [])
611
611
*/
612
612
public function explain (Explainable $ explainable , array $ options = [])
613
613
{
614
- if (! isset ($ options ['readPreference ' ])) {
614
+ if (! isset ($ options ['readPreference ' ]) && ! is_in_transaction ( $ options ) ) {
615
615
$ options ['readPreference ' ] = $ this ->readPreference ;
616
616
}
617
617
618
618
if (! isset ($ options ['typeMap ' ])) {
619
619
$ options ['typeMap ' ] = $ this ->typeMap ;
620
620
}
621
621
622
- $ server = select_server ($ this ->manager , $ options[ ' readPreference ' ], extract_session_from_options ( $ options ) );
622
+ $ server = select_server ($ this ->manager , $ options );
623
623
624
624
$ operation = new Explain ($ this ->databaseName , $ explainable , $ options );
625
625
@@ -640,11 +640,11 @@ public function explain(Explainable $explainable, array $options = [])
640
640
*/
641
641
public function find ($ filter = [], array $ options = [])
642
642
{
643
- if (! isset ($ options ['readPreference ' ])) {
643
+ if (! isset ($ options ['readPreference ' ]) && ! is_in_transaction ( $ options ) ) {
644
644
$ options ['readPreference ' ] = $ this ->readPreference ;
645
645
}
646
646
647
- $ server = select_server ($ this ->manager , $ options[ ' readPreference ' ], extract_session_from_options ( $ options ) );
647
+ $ server = select_server ($ this ->manager , $ options );
648
648
649
649
if (! isset ($ options ['readConcern ' ]) && server_supports_feature ($ server , self ::$ wireVersionForReadConcern ) && ! is_in_transaction ($ options )) {
650
650
$ options ['readConcern ' ] = $ this ->readConcern ;
@@ -673,11 +673,11 @@ public function find($filter = [], array $options = [])
673
673
*/
674
674
public function findOne ($ filter = [], array $ options = [])
675
675
{
676
- if (! isset ($ options ['readPreference ' ])) {
676
+ if (! isset ($ options ['readPreference ' ]) && ! is_in_transaction ( $ options ) ) {
677
677
$ options ['readPreference ' ] = $ this ->readPreference ;
678
678
}
679
679
680
- $ server = select_server ($ this ->manager , $ options[ ' readPreference ' ], extract_session_from_options ( $ options ) );
680
+ $ server = select_server ($ this ->manager , $ options );
681
681
682
682
if (! isset ($ options ['readConcern ' ]) && server_supports_feature ($ server , self ::$ wireVersionForReadConcern ) && ! is_in_transaction ($ options )) {
683
683
$ options ['readConcern ' ] = $ this ->readConcern ;
@@ -709,7 +709,7 @@ public function findOne($filter = [], array $options = [])
709
709
*/
710
710
public function findOneAndDelete ($ filter , array $ options = [])
711
711
{
712
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
712
+ $ server = select_server ($ this ->manager , $ options );
713
713
714
714
if (! isset ($ options ['writeConcern ' ]) && server_supports_feature ($ server , self ::$ wireVersionForFindAndModifyWriteConcern ) && ! is_in_transaction ($ options )) {
715
715
$ options ['writeConcern ' ] = $ this ->writeConcern ;
@@ -746,7 +746,7 @@ public function findOneAndDelete($filter, array $options = [])
746
746
*/
747
747
public function findOneAndReplace ($ filter , $ replacement , array $ options = [])
748
748
{
749
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
749
+ $ server = select_server ($ this ->manager , $ options );
750
750
751
751
if (! isset ($ options ['writeConcern ' ]) && server_supports_feature ($ server , self ::$ wireVersionForFindAndModifyWriteConcern ) && ! is_in_transaction ($ options )) {
752
752
$ options ['writeConcern ' ] = $ this ->writeConcern ;
@@ -783,7 +783,7 @@ public function findOneAndReplace($filter, $replacement, array $options = [])
783
783
*/
784
784
public function findOneAndUpdate ($ filter , $ update , array $ options = [])
785
785
{
786
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
786
+ $ server = select_server ($ this ->manager , $ options );
787
787
788
788
if (! isset ($ options ['writeConcern ' ]) && server_supports_feature ($ server , self ::$ wireVersionForFindAndModifyWriteConcern ) && ! is_in_transaction ($ options )) {
789
789
$ options ['writeConcern ' ] = $ this ->writeConcern ;
@@ -899,7 +899,7 @@ public function insertMany(array $documents, array $options = [])
899
899
}
900
900
901
901
$ operation = new InsertMany ($ this ->databaseName , $ this ->collectionName , $ documents , $ options );
902
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
902
+ $ server = select_server ($ this ->manager , $ options );
903
903
904
904
return $ operation ->execute ($ server );
905
905
}
@@ -922,7 +922,7 @@ public function insertOne($document, array $options = [])
922
922
}
923
923
924
924
$ operation = new InsertOne ($ this ->databaseName , $ this ->collectionName , $ document , $ options );
925
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
925
+ $ server = select_server ($ this ->manager , $ options );
926
926
927
927
return $ operation ->execute ($ server );
928
928
}
@@ -939,7 +939,7 @@ public function insertOne($document, array $options = [])
939
939
public function listIndexes (array $ options = [])
940
940
{
941
941
$ operation = new ListIndexes ($ this ->databaseName , $ this ->collectionName , $ options );
942
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
942
+ $ server = select_server ($ this ->manager , $ options );
943
943
944
944
return $ operation ->execute ($ server );
945
945
}
@@ -963,7 +963,7 @@ public function mapReduce(JavascriptInterface $map, JavascriptInterface $reduce,
963
963
{
964
964
$ hasOutputCollection = ! is_mapreduce_output_inline ($ out );
965
965
966
- if (! isset ($ options ['readPreference ' ])) {
966
+ if (! isset ($ options ['readPreference ' ]) && ! is_in_transaction ( $ options ) ) {
967
967
$ options ['readPreference ' ] = $ this ->readPreference ;
968
968
}
969
969
@@ -972,7 +972,7 @@ public function mapReduce(JavascriptInterface $map, JavascriptInterface $reduce,
972
972
$ options ['readPreference ' ] = new ReadPreference (ReadPreference::RP_PRIMARY );
973
973
}
974
974
975
- $ server = select_server ($ this ->manager , $ options[ ' readPreference ' ], extract_session_from_options ( $ options ) );
975
+ $ server = select_server ($ this ->manager , $ options );
976
976
977
977
/* A "majority" read concern is not compatible with inline output, so
978
978
* avoid providing the Collection's read concern if it would conflict.
@@ -1016,7 +1016,7 @@ public function replaceOne($filter, $replacement, array $options = [])
1016
1016
}
1017
1017
1018
1018
$ operation = new ReplaceOne ($ this ->databaseName , $ this ->collectionName , $ filter , $ replacement , $ options );
1019
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
1019
+ $ server = select_server ($ this ->manager , $ options );
1020
1020
1021
1021
return $ operation ->execute ($ server );
1022
1022
}
@@ -1041,7 +1041,7 @@ public function updateMany($filter, $update, array $options = [])
1041
1041
}
1042
1042
1043
1043
$ operation = new UpdateMany ($ this ->databaseName , $ this ->collectionName , $ filter , $ update , $ options );
1044
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
1044
+ $ server = select_server ($ this ->manager , $ options );
1045
1045
1046
1046
return $ operation ->execute ($ server );
1047
1047
}
@@ -1066,7 +1066,7 @@ public function updateOne($filter, $update, array $options = [])
1066
1066
}
1067
1067
1068
1068
$ operation = new UpdateOne ($ this ->databaseName , $ this ->collectionName , $ filter , $ update , $ options );
1069
- $ server = select_server ($ this ->manager , new ReadPreference (ReadPreference:: RP_PRIMARY ), extract_session_from_options ( $ options) );
1069
+ $ server = select_server ($ this ->manager , $ options );
1070
1070
1071
1071
return $ operation ->execute ($ server );
1072
1072
}
@@ -1082,11 +1082,11 @@ public function updateOne($filter, $update, array $options = [])
1082
1082
*/
1083
1083
public function watch (array $ pipeline = [], array $ options = [])
1084
1084
{
1085
- if (! isset ($ options ['readPreference ' ])) {
1085
+ if (! isset ($ options ['readPreference ' ]) && ! is_in_transaction ( $ options ) ) {
1086
1086
$ options ['readPreference ' ] = $ this ->readPreference ;
1087
1087
}
1088
1088
1089
- $ server = select_server ($ this ->manager , $ options[ ' readPreference ' ], extract_session_from_options ( $ options ) );
1089
+ $ server = select_server ($ this ->manager , $ options );
1090
1090
1091
1091
/* Although change streams require a newer version of the server than
1092
1092
* read concerns, perform the usual wire version check before inheriting
0 commit comments