@@ -241,10 +241,11 @@ int32_t DistributedEngineComm::Setup()
241
241
newLocks.clear ();
242
242
243
243
uint32_t newPmCount = fRm ->getPsCount ();
244
- throttleThreshold = fRm ->getDECThrottleThreshold ();
245
244
tbpsThreadCount = fRm ->getJlNumScanReceiveThreads ();
246
245
fDECConnectionsPerQuery = fRm ->getDECConnectionsPerQuery ();
247
246
unsigned numConnections = getNumConnections ();
247
+ flowControlEnableBytesThresh = fRm ->getDECEnableBytesThresh ();
248
+ flowControlDisableBytesThresh = fRm ->getDECDisableBytesThresh ();
248
249
oam::Oam oam;
249
250
ModuleTypeConfig moduletypeconfig;
250
251
@@ -282,6 +283,8 @@ int32_t DistributedEngineComm::Setup()
282
283
if (clientAtTheSameHost (cl))
283
284
{
284
285
cl->atTheSameHost (true );
286
+ assert (connectionId <= std::numeric_limits<uint32_t >::max ());
287
+ localConnectionId_ = connectionId;
285
288
}
286
289
std::shared_ptr<std::mutex> nl (new std::mutex ());
287
290
@@ -433,33 +436,6 @@ void DistributedEngineComm::Listen(boost::shared_ptr<MessageQueueClient> client,
433
436
os << " DEC: lost connection to " << client->addr2String ();
434
437
writeToLog (__FILE__, __LINE__, os.str (), LOG_TYPE_ERROR);
435
438
}
436
-
437
- /*
438
- // reset the pmconnection vector
439
- ClientList tempConns;
440
- boost::mutex::scoped_lock onErrLock(fOnErrMutex);
441
- string moduleName = client->moduleName();
442
- //cout << "moduleName=" << moduleName << endl;
443
- for ( uint32_t i = 0; i < fPmConnections.size(); i++)
444
- {
445
- if (moduleName != fPmConnections[i]->moduleName())
446
- tempConns.push_back(fPmConnections[i]);
447
- //else
448
- //cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" <<
449
- fPmConnections[i]->moduleName() << endl;
450
- }
451
-
452
- if (tempConns.size() == fPmConnections.size()) return;
453
-
454
- fPmConnections.swap(tempConns);
455
- pmCount = (pmCount == 0 ? 0 : pmCount - 1);
456
- //cout << "PMCOUNT=" << pmCount << endl;
457
-
458
- // log it
459
- ostringstream os;
460
- os << "DEC: lost connection to " << client->addr2String();
461
- writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL);
462
- */
463
439
}
464
440
return ;
465
441
}
@@ -472,7 +448,7 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
472
448
condition* cond = new condition ();
473
449
uint32_t firstPMInterleavedConnectionId =
474
450
key % (fPmConnections .size () / pmCount) * fDECConnectionsPerQuery * pmCount % fPmConnections .size ();
475
- boost::shared_ptr<MQE> mqe (new MQE (pmCount, firstPMInterleavedConnectionId));
451
+ boost::shared_ptr<MQE> mqe (new MQE (pmCount, firstPMInterleavedConnectionId, flowControlEnableBytesThresh ));
476
452
477
453
mqe->queue = StepMsgQueue (lock, cond);
478
454
mqe->sendACKs = sendACKs;
@@ -540,7 +516,7 @@ void DistributedEngineComm::read(uint32_t key, SBS& bs)
540
516
{
541
517
std::unique_lock lk (ackLock);
542
518
543
- if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold )
519
+ if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh )
544
520
setFlowControl (false , key, mqe);
545
521
546
522
vector<SBS> v;
@@ -578,7 +554,7 @@ const ByteStream DistributedEngineComm::read(uint32_t key)
578
554
{
579
555
std::unique_lock lk (ackLock);
580
556
581
- if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold )
557
+ if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh )
582
558
setFlowControl (false , key, mqe);
583
559
584
560
vector<SBS> v;
@@ -645,7 +621,7 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS
645
621
{
646
622
std::unique_lock lk (ackLock);
647
623
648
- if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold )
624
+ if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh )
649
625
setFlowControl (false , key, mqe);
650
626
651
627
sendAcks (key, v, mqe, queueSize.size );
@@ -726,12 +702,6 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
726
702
727
703
msg->advanceInputPtr (sizeof (ISMPacketHeader));
728
704
// There must be only one local connection here.
729
- uint32_t localConnectionId = std::numeric_limits<uint32_t >::max ();
730
- for (uint32_t i = 0 ; i < pmCount; ++i)
731
- {
732
- if (fPmConnections [i]->atTheSameHost () && fIsExeMgr )
733
- localConnectionId = i;
734
- }
735
705
bool sendToLocal = false ;
736
706
while (l_msgCount > 0 )
737
707
{
@@ -743,23 +713,23 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
743
713
nextPMToACK (mqe, l_msgCount, &sockIndex, toAck);
744
714
idbassert (*toAck <= l_msgCount);
745
715
l_msgCount -= *toAck;
746
- if (sockIndex == localConnectionId )
716
+ if (sockIndex == localConnectionId_ && fIsExeMgr )
747
717
{
748
718
sendToLocal = true ;
749
719
continue ;
750
720
}
751
721
pmAcked[sockIndex] = true ;
752
722
writeToClient (sockIndex, msg);
753
723
}
754
- if (sendToLocal && localConnectionId < fPmConnections . size () )
724
+ if (sendToLocal)
755
725
{
756
- pmAcked[localConnectionId ] = true ;
757
- writeToClient (localConnectionId , msg);
726
+ pmAcked[localConnectionId_ ] = true ;
727
+ writeToClient (localConnectionId_ , msg);
758
728
}
759
729
760
730
// @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
761
731
// This is apply to the big message case only. For small messages, the flow control is
762
- // disabled when the queue size is below the disableThreshold .
732
+ // disabled when the queue size is below the flowControlDisableBytesThresh .
763
733
if (mqe->hasBigMsgs )
764
734
{
765
735
uint64_t totalUnackedWork = 0 ;
@@ -775,16 +745,16 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
775
745
{
776
746
if (!pmAcked[i])
777
747
{
778
- if (i == localConnectionId )
748
+ if (i == localConnectionId_ && fIsExeMgr )
779
749
{
780
750
continue ;
781
751
}
782
752
writeToClient (i, msg);
783
753
}
784
754
}
785
- if (!pmAcked[localConnectionId] )
755
+ if (!pmAcked[localConnectionId_] && fIsExeMgr )
786
756
{
787
- writeToClient (localConnectionId , msg);
757
+ writeToClient (localConnectionId_ , msg);
788
758
}
789
759
}
790
760
}
@@ -863,28 +833,18 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos
863
833
ism->Command = BATCH_PRIMITIVE_ACK;
864
834
ism->Size = (enabled ? 0 : -1 );
865
835
866
- #ifdef VALGRIND
867
- /* XXXPAT: For testing in valgrind, init the vars that don't get used */
868
- ism->Flags = 0 ;
869
- ism->Type = 0 ;
870
- ism->MsgCount = 0 ;
871
- ism->Status = 0 ;
872
- #endif
873
-
874
836
msg->advanceInputPtr (sizeof (ISMPacketHeader));
875
- uint32_t localConnectionId = std::numeric_limits<uint32_t >::max ();
876
837
877
838
for (uint32_t i = 0 ; i < mqe->pmCount ; ++i)
878
839
{
879
- if (fPmConnections [i]-> atTheSameHost () && fIsExeMgr )
840
+ if (i == localConnectionId_ && fIsExeMgr )
880
841
{
881
- localConnectionId = i;
882
842
continue ;
883
843
}
884
844
writeToClient (i, msg);
885
845
}
886
- if (localConnectionId < fPmConnections . size () )
887
- writeToClient (localConnectionId , msg);
846
+ if (fIsExeMgr )
847
+ writeToClient (localConnectionId_ , msg);
888
848
}
889
849
890
850
int32_t DistributedEngineComm::write (uint32_t senderID, const SBS& msg)
@@ -911,23 +871,23 @@ int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
911
871
/* XXXPAT: This relies on the assumption that the first pmCount "PMS*"
912
872
entries in the config file point to unique PMs */
913
873
{
914
- uint32_t localConnectionId = std::numeric_limits<uint32_t >::max ();
915
874
int32_t rc = 0 ;
916
-
917
875
for (uint32_t i = 0 ; i < pmCount; ++i)
918
876
{
919
- if (fPmConnections [i]-> atTheSameHost () && fIsExeMgr )
877
+ if (i == localConnectionId_ && fIsExeMgr )
920
878
{
921
- localConnectionId = i;
922
879
continue ;
923
880
}
924
881
925
- rc =writeToClient (i, msg, senderID);
926
- if (rc)
882
+ if (( rc = writeToClient (i, msg, senderID)))
883
+ {
927
884
return rc;
885
+ }
886
+ }
887
+ if (fIsExeMgr )
888
+ {
889
+ return writeToClient (localConnectionId_, msg);
928
890
}
929
- if (localConnectionId < fPmConnections .size ())
930
- rc = writeToClient (localConnectionId, msg);
931
891
return rc;
932
892
}
933
893
@@ -985,56 +945,27 @@ void DistributedEngineComm::StartClientListener(boost::shared_ptr<MessageQueueCl
985
945
986
946
void DistributedEngineComm::addDataToOutput (SBS sbs)
987
947
{
988
- ISMPacketHeader* hdr = (ISMPacketHeader*)(sbs->buf ());
989
- PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1 );
990
- uint32_t uniqueId = p->UniqueID ;
991
- boost::shared_ptr<MQE> mqe;
992
-
993
- std::unique_lock lk (fMlock );
994
- MessageQueueMap::iterator map_tok = fSessionMessages .find (uniqueId);
995
-
996
- // The message for a session that doesn't exist.
997
- if (map_tok == fSessionMessages .end ())
998
- {
999
- // Here gets the dead session ByteStream that is already removed
1000
- // from DEC queue.
1001
- return ;
1002
- }
1003
-
1004
- mqe = map_tok->second ;
1005
- lk.unlock ();
1006
-
1007
- if (pmCount > 0 )
1008
- {
1009
- // I hardcoded the unacked Worker id here. ACK isn't important
1010
- // for the local exchange b/c there is no need to
1011
- // enable flowcontrol localy on PM.
1012
- (void )atomicops::atomicInc (&mqe->unackedWork [0 ]);
1013
- }
1014
-
1015
- [[maybe_unused]] TSQSize_t queueSize = mqe->queue .push (sbs);
1016
- // There will be no statistics about data transfered
1017
- // over the memory.
948
+ assert (localConnectionId_ < pmCount);
949
+ return addDataToOutput (sbs, localConnectionId_, nullptr );
1018
950
}
1019
951
1020
952
void DistributedEngineComm::addDataToOutput (SBS sbs, uint32_t connIndex, Stats* stats)
1021
953
{
1022
954
ISMPacketHeader* hdr = (ISMPacketHeader*)(sbs->buf ());
1023
955
PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1 );
1024
956
uint32_t uniqueId = p->UniqueID ;
1025
- boost::shared_ptr<MQE> mqe;
1026
957
std::unique_lock lk (fMlock );
1027
958
MessageQueueMap::iterator map_tok = fSessionMessages .find (uniqueId);
1028
959
960
+ // The message for a session that doesn't exist.
1029
961
if (map_tok == fSessionMessages .end ())
1030
962
{
1031
- // For debugging...
1032
- // cerr << "DistributedEngineComm::AddDataToOutput: tried to add a message to a dead session: " <<
1033
- // uniqueId << ", size " << sbs->length() << ", step id " << p->StepID << endl;
963
+ // Here gets the dead session ByteStream that is already removed
964
+ // from DEC queue.
1034
965
return ;
1035
966
}
1036
967
1037
- mqe = map_tok->second ;
968
+ auto mqe = map_tok->second ;
1038
969
lk.unlock ();
1039
970
1040
971
if (pmCount > 0 )
@@ -1049,9 +980,9 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats*
1049
980
std::lock_guard lk (ackLock);
1050
981
uint64_t msgSize = sbs->lengthWithHdrOverhead ();
1051
982
1052
- if (!mqe->throttled && msgSize > (targetRecvQueueSize / 2 ))
1053
- doHasBigMsgs (mqe, ( 300 * 1024 * 1024 > 3 * msgSize ? 300 * 1024 * 1024
1054
- : 3 * msgSize)); // buffer at least 3 big msgs
983
+ if (!mqe->throttled && msgSize > (flowControlEnableBytesThresh / 2 ))
984
+ doHasBigMsgs (
985
+ mqe, (bigMessageSize > 3 * msgSize ? bigMessageSize : 3 * msgSize)); // buffer at least 3 big msgs
1055
986
1056
987
if (!mqe->throttled && queueSize.size >= mqe->targetQueueSize )
1057
988
setFlowControl (true , uniqueId, mqe);
@@ -1271,8 +1202,9 @@ Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID)
1271
1202
return empty;
1272
1203
}
1273
1204
1274
- DistributedEngineComm::MQE::MQE (const uint32_t pCount, const uint32_t initialInterleaverValue)
1275
- : ackSocketIndex(0 ), pmCount(pCount), hasBigMsgs(false ), targetQueueSize(targetRecvQueueSize)
1205
+ DistributedEngineComm::MQE::MQE (const uint32_t pCount, const uint32_t initialInterleaverValue,
1206
+ const uint64_t flowControlEnableBytesThresh)
1207
+ : ackSocketIndex(0 ), pmCount(pCount), hasBigMsgs(false ), targetQueueSize(flowControlEnableBytesThresh)
1276
1208
{
1277
1209
unackedWork.reset (new volatile uint32_t [pmCount]);
1278
1210
interleaver.reset (new uint32_t [pmCount]);
0 commit comments