29
29
#include " task.h"
30
30
#include " cache_aligned_allocator.h"
31
31
#include " tbb_exception.h"
32
+ #include " pipeline.h"
32
33
#include " internal/_template_helpers.h"
33
34
#include " internal/_aggregator_impl.h"
34
35
#include " tbb/internal/_allocator_traits.h"
@@ -920,12 +921,12 @@ class input_node : public graph_node, public sender< Output > {
920
921
template < typename Body >
921
922
__TBB_NOINLINE_SYM input_node ( graph &g, Body body )
922
923
: graph_node(g), my_active(false ),
923
- my_body( new internal::source_body_leaf < output_type, Body>(body) ),
924
- my_init_body( new internal::source_body_leaf < output_type, Body>(body) ),
924
+ my_body( new internal::input_body_leaf < output_type, Body>(body) ),
925
+ my_init_body( new internal::input_body_leaf < output_type, Body>(body) ),
925
926
my_reserved(false ), my_has_cached_item(false )
926
927
{
927
928
my_successors.set_owner (this );
928
- tbb::internal::fgt_node_with_body ( CODEPTR (), tbb::internal::FLOW_SOURCE_NODE, &this ->my_graph ,
929
+ tbb::internal::fgt_node_with_body ( CODEPTR (), tbb::internal::FLOW_SOURCE_NODE, &this ->my_graph ,
929
930
static_cast <sender<output_type> *>(this ), this ->my_body );
930
931
}
931
932
@@ -1066,8 +1067,8 @@ class input_node : public graph_node, public sender< Output > {
1066
1067
1067
1068
template <typename Body>
1068
1069
Body copy_function_object () {
1069
- internal::source_body <output_type> &body_ref = *this ->my_body ;
1070
- return dynamic_cast < internal::source_body_leaf <output_type, Body> & >(body_ref).get_body ();
1070
+ internal::input_body <output_type> &body_ref = *this ->my_body ;
1071
+ return dynamic_cast < internal::input_body_leaf <output_type, Body> & >(body_ref).get_body ();
1071
1072
}
1072
1073
1073
1074
#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
@@ -1081,15 +1082,15 @@ class input_node : public graph_node, public sender< Output > {
1081
1082
1082
1083
protected:
1083
1084
1084
- // ! resets the source_node to its initial state
1085
+ // ! resets the input_node to its initial state
1085
1086
void reset_node ( reset_flags f) __TBB_override {
1086
1087
my_active = false ;
1087
1088
my_reserved = false ;
1088
1089
my_has_cached_item = false ;
1089
1090
1090
1091
if (f & rf_clear_edges) my_successors.clear ();
1091
1092
if (f & rf_reset_bodies) {
1092
- internal::source_body <output_type> *tmp = my_init_body->clone ();
1093
+ internal::input_body <output_type> *tmp = my_init_body->clone ();
1093
1094
delete my_body;
1094
1095
my_body = tmp;
1095
1096
}
@@ -1098,8 +1099,8 @@ class input_node : public graph_node, public sender< Output > {
1098
1099
private:
1099
1100
spin_mutex my_mutex;
1100
1101
bool my_active;
1101
- internal::source_body <output_type> *my_body;
1102
- internal::source_body <output_type> *my_init_body;
1102
+ internal::input_body <output_type> *my_body;
1103
+ internal::input_body <output_type> *my_init_body;
1103
1104
internal::broadcast_cache< output_type > my_successors;
1104
1105
bool my_reserved;
1105
1106
bool my_has_cached_item;
@@ -1113,11 +1114,18 @@ class input_node : public graph_node, public sender< Output > {
1113
1114
}
1114
1115
if ( !my_has_cached_item ) {
1115
1116
tbb::internal::fgt_begin_body ( my_body );
1117
+
1118
+ #if TBB_DEPRECATED_INPUT_NODE_BODY
1116
1119
bool r = (*my_body)(my_cached_item);
1117
- tbb::internal::fgt_end_body ( my_body );
1118
1120
if (r) {
1119
1121
my_has_cached_item = true ;
1120
1122
}
1123
+ #else
1124
+ flow_control control;
1125
+ my_cached_item = (*my_body)(control);
1126
+ my_has_cached_item = !control.is_pipeline_stopped ;
1127
+ #endif
1128
+ tbb::internal::fgt_end_body ( my_body );
1121
1129
}
1122
1130
if ( my_has_cached_item ) {
1123
1131
v = my_cached_item;
0 commit comments