33import static com .google .common .collect .ImmutableList .copyOf ;
44import static com .google .common .collect .Iterables .concat ;
55import static org .hypertrace .core .graphql .atttributes .scopes .HypertraceCoreAttributeScopeString .SPAN ;
6+ import static org .hypertrace .core .graphql .span .joiner .MultipleSpanJoin .SPANS_KEY ;
67import static org .hypertrace .core .graphql .span .joiner .SpanJoin .SPAN_KEY ;
78
9+ import com .google .common .collect .ImmutableListMultimap ;
10+ import com .google .common .collect .ListMultimap ;
811import graphql .schema .DataFetchingFieldSelectionSet ;
912import graphql .schema .SelectedField ;
1013import io .reactivex .rxjava3 .core .Observable ;
4649public class DefaultSpanJoinerBuilder implements SpanJoinerBuilder {
4750
4851 private static final int ZERO_OFFSET = 0 ;
49-
5052 private final SpanDao spanDao ;
5153 private final GraphQlSelectionFinder selectionFinder ;
54+
5255 private final ResultSetRequestBuilder resultSetRequestBuilder ;
5356 private final FilterRequestBuilder filterRequestBuilder ;
5457
@@ -70,61 +73,100 @@ public Single<SpanJoiner> build(
7073 TimeRangeArgument timeRange ,
7174 DataFetchingFieldSelectionSet selectionSet ,
7275 List <String > pathToSpanJoin ) {
73- return Single .just (
74- new DefaultSpanJoiner (
75- context , timeRange , this .getSelections (selectionSet , pathToSpanJoin )));
76- }
77-
78- private List <SelectedField > getSelections (
79- DataFetchingFieldSelectionSet selectionSet , List <String > pathToSpanJoin ) {
80- List <String > fullPath = copyOf (concat (pathToSpanJoin , List .of (SPAN_KEY )));
81- return selectionFinder
82- .findSelections (selectionSet , SelectionQuery .builder ().selectionPath (fullPath ).build ())
83- .collect (Collectors .toUnmodifiableList ());
76+ return Single .just (new DefaultSpanJoiner (context , timeRange , selectionSet , pathToSpanJoin ));
8477 }
8578
8679 @ AllArgsConstructor
8780 private class DefaultSpanJoiner implements SpanJoiner {
8881
8982 private final GraphQlRequestContext context ;
9083 private final TimeRangeArgument timeRange ;
91- private final List <SelectedField > selectedFields ;
84+ private final DataFetchingFieldSelectionSet selectionSet ;
85+ private final List <String > pathToJoin ;
9286
9387 @ Override
94- public <T > Single <Map <T , Span >> joinSpans (
88+ public <T > Single <Map <T , Span >> joinSpan (
9589 Collection <T > joinSources , SpanIdGetter <T > spanIdGetter ) {
96- return this .buildSourceToIdMap (joinSources , spanIdGetter ).flatMap (this ::joinSpans );
90+ Function <T , Single <List <String >>> idsGetter =
91+ source -> spanIdGetter .getSpanId (source ).map (List ::of );
92+ return this .joinSpans (joinSources , idsGetter , SPAN_KEY ).map (this ::reduceMap );
9793 }
9894
99- private <T > Single <Map <T , Span >> joinSpans (Map <T , String > sourceToSpanIdMap ) {
100- return this .buildSpanRequest (sourceToSpanIdMap )
101- .flatMap (spanDao ::getSpans )
102- .map (this ::buildSpanIdToSpanMap )
103- .map (spanIdToSpanMap -> buildSourceToSpanMap (sourceToSpanIdMap , spanIdToSpanMap ));
95+ @ Override
96+ public <T > Single <ListMultimap <T , Span >> joinSpans (
97+ Collection <T > joinSources , MultipleSpanIdGetter <T > multipleSpanIdGetter ) {
98+ return this .joinSpans (joinSources , multipleSpanIdGetter ::getSpanIds , SPANS_KEY );
10499 }
105100
106- private <T > Map <T , Span > buildSourceToSpanMap (
107- Map <T , String > sourceToSpanIdMap , Map <String , Span > spanIdToSpanMap ) {
108- return sourceToSpanIdMap .entrySet ().stream ()
109- .filter (entry -> spanIdToSpanMap .containsKey (entry .getValue ()))
101+ private <T > Single <ListMultimap <T , Span >> joinSpans (
102+ Collection <T > joinSources ,
103+ Function <T , Single <List <String >>> idsGetter ,
104+ String joinSpanKey ) {
105+ return this .buildSourceToIdsMap (joinSources , idsGetter )
106+ .flatMap (
107+ sourceToSpanIdsMap ->
108+ this .buildSpanRequest (sourceToSpanIdsMap , joinSpanKey )
109+ .flatMap (spanDao ::getSpans )
110+ .map (this ::buildSpanIdToSpanMap )
111+ .map (
112+ spanIdToSpanMap ->
113+ this .buildSourceToSpanListMultiMap (
114+ sourceToSpanIdsMap , spanIdToSpanMap )));
115+ }
116+
117+ private <T > Map <T , Span > reduceMap (ListMultimap <T , Span > listMultimap ) {
118+ return listMultimap .entries ().stream ()
110119 .collect (
111120 Collectors .toUnmodifiableMap (
121+ Entry ::getKey , Entry ::getValue , (first , second ) -> first ));
122+ }
123+
124+ private <T > Single <ImmutableListMultimap <T , String >> buildSourceToIdsMap (
125+ Collection <T > joinSources , Function <T , Single <List <String >>> idsGetter ) {
126+ return Observable .fromIterable (joinSources )
127+ .flatMapSingle (source -> idsGetter .apply (source ).map (ids -> Map .entry (source , ids )))
128+ .collect (
129+ ImmutableListMultimap .flatteningToImmutableListMultimap (
130+ Entry ::getKey , entry -> entry .getValue ().stream ()));
131+ }
132+
133+ private <T > ImmutableListMultimap <T , Span > buildSourceToSpanListMultiMap (
134+ ListMultimap <T , String > sourceToSpanIdsMultimap , Map <String , Span > spanIdToSpanMap ) {
135+ return sourceToSpanIdsMultimap .entries ().stream ()
136+ .filter (entry -> spanIdToSpanMap .containsKey (entry .getValue ()))
137+ .collect (
138+ ImmutableListMultimap .toImmutableListMultimap (
112139 Entry ::getKey , entry -> spanIdToSpanMap .get (entry .getValue ())));
113140 }
114141
142+ private List <SelectedField > getSelections (String joinSpanKey ) {
143+ List <String > fullPath = copyOf (concat (pathToJoin , List .of (joinSpanKey )));
144+ return selectionFinder
145+ .findSelections (selectionSet , SelectionQuery .builder ().selectionPath (fullPath ).build ())
146+ .collect (Collectors .toUnmodifiableList ());
147+ }
148+
115149 private Map <String , Span > buildSpanIdToSpanMap (SpanResultSet resultSet ) {
116150 return resultSet .results ().stream ()
117151 .collect (Collectors .toUnmodifiableMap (Identifiable ::id , Function .identity ()));
118152 }
119153
120- private <T > Single <SpanRequest > buildSpanRequest (Map <T , String > sourceToSpanIdMap ) {
121- Collection <String > spanIds = sourceToSpanIdMap .values ();
122- return buildSpanIdsFilter (spanIds )
123- .flatMap (filterArguments -> buildSpanRequest (spanIds .size (), filterArguments ));
154+ private <T > Single <SpanRequest > buildSpanRequest (
155+ ListMultimap <T , String > sourceToSpanIdsMultimap , String joinSpanKey ) {
156+ Collection <String > spanIds =
157+ sourceToSpanIdsMultimap .values ().stream ()
158+ .distinct ()
159+ .collect (Collectors .toUnmodifiableList ());
160+ List <SelectedField > selectedFields = getSelections (joinSpanKey );
161+ return buildSpanIdsFilter (context , spanIds )
162+ .flatMap (
163+ filterArguments -> buildSpanRequest (spanIds .size (), filterArguments , selectedFields ));
124164 }
125165
126166 private Single <SpanRequest > buildSpanRequest (
127- int size , List <AttributeAssociation <FilterArgument >> filterArguments ) {
167+ int size ,
168+ List <AttributeAssociation <FilterArgument >> filterArguments ,
169+ List <SelectedField > selectedFields ) {
128170 return resultSetRequestBuilder
129171 .build (
130172 context ,
@@ -140,21 +182,9 @@ private Single<SpanRequest> buildSpanRequest(
140182 }
141183
142184 private Single <List <AttributeAssociation <FilterArgument >>> buildSpanIdsFilter (
143- Collection <String > spanIds ) {
185+ GraphQlRequestContext context , Collection <String > spanIds ) {
144186 return filterRequestBuilder .build (context , SPAN , Set .of (new SpanIdFilter (spanIds )));
145187 }
146-
147- private <T > Single <Map <T , String >> buildSourceToIdMap (
148- Collection <T > joinSources , SpanIdGetter <T > spanIdGetter ) {
149- return Observable .fromIterable (joinSources )
150- .flatMapSingle (source -> this .maybeBuildMapEntry (source , spanIdGetter ))
151- .collect (Collectors .toMap (Entry ::getKey , Entry ::getValue ));
152- }
153-
154- private <T > Single <Entry <T , String >> maybeBuildMapEntry (
155- T source , SpanIdGetter <T > spanIdGetter ) {
156- return spanIdGetter .getSpanId (source ).map (id -> Map .entry (source , id ));
157- }
158188 }
159189
160190 @ Value
0 commit comments