@@ -92,6 +92,7 @@ struct Comms::PCap
92
92
int rx_pcap_selectable_fd = 0 ;
93
93
94
94
size_t _80211_header_length = 0 ;
95
+ size_t index = 0 ;
95
96
};
96
97
97
98
struct Comms ::TX
@@ -136,13 +137,14 @@ struct Comms::TX
136
137
137
138
struct Comms ::RX
138
139
{
139
- std::thread thread ;
140
+ std::vector<std:: thread> threads ;
140
141
141
142
fec_t * fec = nullptr ;
142
143
std::array<uint8_t const *, 16 > fec_src_packet_ptrs;
143
144
std::array<uint8_t *, 32 > fec_dst_packet_ptrs;
144
145
145
146
std::vector<PCap*> pcaps;
147
+ std::vector<uint32_t > pcal_last_block_index;
146
148
147
149
size_t transport_packet_size = 0 ;
148
150
size_t streaming_packet_size = 0 ;
@@ -248,8 +250,9 @@ Comms::~Comms()
248
250
249
251
m_impl->tx .packet_queue_cv .notify_all ();
250
252
251
- if (m_impl->rx .thread .joinable ())
252
- m_impl->rx .thread .join ();
253
+ for (auto & thread: m_impl->rx .threads )
254
+ if (thread.joinable ())
255
+ thread.join ();
253
256
254
257
if (m_impl->tx .thread .joinable ())
255
258
m_impl->tx .thread .join ();
@@ -490,6 +493,10 @@ bool Comms::process_rx_packet(PCap& pcap)
490
493
491
494
std::lock_guard<std::mutex> lg (rx.block_queue_mutex );
492
495
496
+ // keep track of what interface returned what index.
497
+ // this should allow us to skip stale blocks sooner
498
+ rx.pcal_last_block_index [pcap.index ] = block_index;
499
+
493
500
if (block_index < rx.next_block_index )
494
501
{
495
502
// LOGW("Old packet: {} < {}", block_index, rx.next_block_index);
@@ -783,21 +790,24 @@ bool Comms::init(RX_Descriptor const& rx_descriptor, TX_Descriptor const& tx_des
783
790
interfaces.insert (i);
784
791
interfaces.insert (m_tx_descriptor.interface );
785
792
793
+ m_impl->rx .pcal_last_block_index .resize (interfaces.size ());
786
794
m_impl->rx .pcaps .resize (interfaces.size ());
787
795
m_impl->pcaps .resize (interfaces.size ());
788
796
size_t index = 0 ;
789
- for (auto & i : interfaces)
797
+ for (auto & interf : interfaces)
790
798
{
791
799
m_impl->pcaps [index ] = std::make_unique<PCap>();
792
- if (!prepare_pcap (i , *m_impl->pcaps [index ]))
800
+ if (!prepare_pcap (interf , *m_impl->pcaps [index ]))
793
801
return false ;
794
802
795
- if (m_tx_descriptor.interface == i)
803
+ m_impl->pcaps [index ]->index = index ;
804
+
805
+ if (m_tx_descriptor.interface == interf)
796
806
m_impl->tx .pcap = m_impl->pcaps [index ].get ();
797
807
798
808
for (size_t j = 0 ; j < m_rx_descriptor.interfaces .size (); j++)
799
809
{
800
- if (m_rx_descriptor.interfaces [j] == i )
810
+ if (m_rx_descriptor.interfaces [j] == interf )
801
811
{
802
812
m_impl->rx .pcaps [j] = m_impl->pcaps [index ].get ();
803
813
break ;
@@ -807,7 +817,8 @@ bool Comms::init(RX_Descriptor const& rx_descriptor, TX_Descriptor const& tx_des
807
817
}
808
818
809
819
m_impl->tx .thread = std::thread ([this ]() { tx_thread_proc (); });
810
- m_impl->rx .thread = std::thread ([this ]() { rx_thread_proc (); });
820
+ for (size_t i = 0 ; i < m_rx_descriptor.interfaces .size (); i++)
821
+ m_impl->rx .threads .push_back (std::thread ([this , i]() { rx_thread_proc (i); }));
811
822
812
823
#if defined RASPBERRY_PI_XXX
813
824
{
@@ -829,9 +840,10 @@ bool Comms::init(RX_Descriptor const& rx_descriptor, TX_Descriptor const& tx_des
829
840
830
841
// //////////////////////////////////////////////////////////////////////////////////////////
831
842
832
- void Comms::rx_thread_proc ()
843
+ void Comms::rx_thread_proc (size_t index )
833
844
{
834
845
RX& rx = m_impl->rx ;
846
+ PCap& pcap = *rx.pcaps [index ];
835
847
836
848
while (!m_exit)
837
849
{
@@ -842,18 +854,11 @@ void Comms::rx_thread_proc()
842
854
to.tv_usec = 30000 ;
843
855
844
856
FD_ZERO (&readset);
845
- for (size_t i = 0 ; i < m_rx_descriptor.interfaces .size (); i++)
846
- FD_SET (rx.pcaps [i]->rx_pcap_selectable_fd , &readset);
857
+ FD_SET (pcap.rx_pcap_selectable_fd , &readset);
847
858
848
859
int n = select (30 , &readset, nullptr , nullptr , &to);
849
- if (n != 0 )
850
- {
851
- for (size_t i = 0 ; i < m_rx_descriptor.interfaces .size (); i++)
852
- {
853
- if (FD_ISSET (rx.pcaps [i]->rx_pcap_selectable_fd , &readset))
854
- process_rx_packet (*rx.pcaps [i]);
855
- }
856
- }
860
+ if (n != 0 && FD_ISSET (pcap.rx_pcap_selectable_fd , &readset))
861
+ process_rx_packet (pcap);
857
862
}
858
863
}
859
864
@@ -1093,7 +1098,7 @@ void Comms::process_rx_packets()
1093
1098
uint32_t coding_k = m_rx_descriptor.coding_k ;
1094
1099
uint32_t coding_n = m_rx_descriptor.coding_n ;
1095
1100
1096
- std::lock_guard <std::mutex> lg (rx.block_queue_mutex );
1101
+ std::unique_lock <std::mutex> lg (rx.block_queue_mutex );
1097
1102
1098
1103
if (Clock::now () - rx.last_packet_tp > m_rx_descriptor.reset_duration )
1099
1104
rx.next_block_index = 0 ;
@@ -1127,6 +1132,14 @@ void Comms::process_rx_packets()
1127
1132
// entire block received
1128
1133
if (block->packets .size () >= coding_k)
1129
1134
{
1135
+ // sanity check - this should not happen
1136
+ for (size_t i = 0 ; i < block->packets .size (); i++)
1137
+ {
1138
+ RX::Packet_ptr const & d = block->packets [i];
1139
+ if (!d->is_processed )
1140
+ LOGI (" Skipping {}!!!" , block->index * coding_k + d->index );
1141
+ }
1142
+
1130
1143
rx.last_block_tp = Clock::now ();
1131
1144
rx.next_block_index = block->index + 1 ;
1132
1145
rx.block_queue .pop_front ();
@@ -1170,7 +1183,9 @@ void Comms::process_rx_packets()
1170
1183
}
1171
1184
}
1172
1185
1186
+ lg.unlock (); // not need to hold the mutex locked - give the rx_proc a chance to get its data in
1173
1187
fec_decode (rx.fec , rx.fec_src_packet_ptrs .data (), rx.fec_dst_packet_ptrs .data (), indices.data (), rx.payload_size );
1188
+ lg.lock (); // relock the mutex
1174
1189
1175
1190
// now dispatch them
1176
1191
for (size_t i = 0 ; i < block->packets .size (); i++)
@@ -1197,9 +1212,21 @@ void Comms::process_rx_packets()
1197
1212
continue ; // next packet
1198
1213
}
1199
1214
1215
+ // calculate what is the earliest block index received
1216
+ uint32_t earliest_block_index = std::numeric_limits<uint32_t >::max ();
1217
+ for (uint32_t index : rx.pcal_last_block_index )
1218
+ earliest_block_index = std::min (earliest_block_index, index );
1219
+
1200
1220
// skip if too much buffering
1201
- if (rx.block_queue .size () > 3 )
1221
+ bool skipped_blocks = false ;
1222
+ while ((rx.block_queue .size () > 0 && rx.block_queue .front ()->index < earliest_block_index) || // if all interfaces received blocks bigger that the first in the queue
1223
+ rx.block_queue .size () > 3 ) // or if queueing too much
1202
1224
{
1225
+ // if (rx.block_queue.front()->index < earliest_block_index)ˇ
1226
+ // LOGI("Skipping stale packet: fast");
1227
+ // else
1228
+ // LOGI("Skipping stale packet: slow");
1229
+
1203
1230
for (size_t i = 0 ; i < block->packets .size (); i++)
1204
1231
{
1205
1232
RX::Packet_ptr const & d = block->packets [i];
@@ -1208,10 +1235,12 @@ void Comms::process_rx_packets()
1208
1235
}
1209
1236
rx.next_block_index = block->index + 1 ;
1210
1237
rx.block_queue .pop_front ();
1238
+ skipped_blocks = true ;
1211
1239
}
1212
1240
1213
- // crt packet is not complete, stop until we get more packets
1214
- break ;
1241
+ // nothing else to do - we cannot complete nor skip blocks - so wait for more data
1242
+ if (!skipped_blocks)
1243
+ break ;
1215
1244
}
1216
1245
}
1217
1246
0 commit comments