2121import reactor .core .publisher .Mono ;
2222
2323import java .io .Serializable ;
24+ import java .util .ArrayList ;
2425import java .util .Collection ;
2526import java .util .Collections ;
2627import java .util .List ;
4748import org .springframework .data .mongodb .repository .query .MongoEntityInformation ;
4849import org .springframework .data .repository .query .FluentQuery ;
4950import org .springframework .data .util .StreamUtils ;
50- import org .springframework .data .util .Streamable ;
5151import org .springframework .lang .Nullable ;
5252import org .springframework .util .Assert ;
5353
@@ -110,21 +110,17 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
110110
111111 Assert .notNull (entities , "The given Iterable of entities must not be null" );
112112
113- Streamable <S > source = Streamable .of (entities );
114-
113+ List <S > source = toList (entities );
115114 return source .stream ().allMatch (entityInformation ::isNew ) ? //
116- insert (entities ) :
117- Flux .fromIterable (entities ).concatMap (this ::save );
115+ insert (source ) : concatMapSequentially (source , this ::save );
118116 }
119117
120118 @ Override
121- public <S extends T > Flux <S > saveAll (Publisher <S > entityStream ) {
119+ public <S extends T > Flux <S > saveAll (Publisher <S > publisher ) {
122120
123- Assert .notNull (entityStream , "The given Publisher of entities must not be null" );
121+ Assert .notNull (publisher , "The given Publisher of entities must not be null" );
124122
125- return Flux .from (entityStream ).concatMap (entity -> entityInformation .isNew (entity ) ? //
126- mongoOperations .insert (entity , entityInformation .getCollectionName ()) : //
127- mongoOperations .save (entity , entityInformation .getCollectionName ()));
123+ return concatMapSequentially (publisher , this ::save );
128124 }
129125
130126 @ Override
@@ -278,14 +274,10 @@ public Mono<Void> deleteAll(Iterable<? extends T> entities) {
278274
279275 Assert .notNull (entities , "The given Iterable of entities must not be null" );
280276
281- Collection <?> idCollection = StreamUtils .createStreamFromIterator (entities .iterator ()). map ( entityInformation :: getId )
282- .collect (Collectors .toList ());
277+ Collection <? extends ID > ids = StreamUtils .createStreamFromIterator (entities .iterator ())
278+ .map ( entityInformation :: getId ). collect (Collectors .toList ());
283279
284- Criteria idsInCriteria = where (entityInformation .getIdAttribute ()).in (idCollection );
285-
286- Query query = new Query (idsInCriteria );
287- getReadPreference ().ifPresent (query ::withReadPreference );
288- return mongoOperations .remove (query , entityInformation .getJavaType (), entityInformation .getCollectionName ()).then ();
280+ return deleteAllById (ids );
289281 }
290282
291283 @ Override
@@ -336,8 +328,11 @@ public <S extends T> Flux<S> insert(Iterable<S> entities) {
336328
337329 Assert .notNull (entities , "The given Iterable of entities must not be null" );
338330
339- Collection <S > source = toCollection (entities );
340- return source .isEmpty () ? Flux .empty () : mongoOperations .insert (source , entityInformation .getCollectionName ());
331+ return insert (toCollection (entities ));
332+ }
333+
334+ private <S extends T > Flux <S > insert (Collection <S > entities ) {
335+ return entities .isEmpty () ? Flux .empty () : mongoOperations .insert (entities , entityInformation .getCollectionName ());
341336 }
342337
343338 @ Override
@@ -440,6 +435,12 @@ void setRepositoryMethodMetadata(CrudMethodMetadata crudMethodMetadata) {
440435 this .crudMethodMetadata = crudMethodMetadata ;
441436 }
442437
438+ private Flux <T > findAll (Query query ) {
439+
440+ getReadPreference ().ifPresent (query ::withReadPreference );
441+ return mongoOperations .find (query , entityInformation .getJavaType (), entityInformation .getCollectionName ());
442+ }
443+
443444 private Optional <ReadPreference > getReadPreference () {
444445
445446 if (crudMethodMetadata == null ) {
@@ -461,15 +462,61 @@ private Query getIdQuery(Iterable<? extends ID> ids) {
461462 return new Query (where (entityInformation .getIdAttribute ()).in (toCollection (ids )));
462463 }
463464
464- private static <E > Collection <E > toCollection (Iterable <E > ids ) {
465- return ids instanceof Collection <E > collection ? collection
466- : StreamUtils .createStreamFromIterator (ids .iterator ()).collect (Collectors .toList ());
465+ /**
466+ * Transform the elements emitted by this Flux into Publishers, then flatten these inner publishers into a single
467+ * Flux. The operation does not allow interleave between performing the map operation for the first and second source
468+ * element guaranteeing the mapping operation completed before subscribing to its following inners, that will then be
469+ * subscribed to eagerly emitting elements in order of their source.
470+ *
471+ * <pre class="code">
472+ * Flux.just(first-element).flatMap(...)
473+ * .concatWith(Flux.fromIterable(remaining-elements).flatMapSequential(...))
474+ * </pre>
475+ *
476+ * @param source the collection of elements to transform.
477+ * @param mapper the transformation {@link Function}. Must not be {@literal null}.
478+ * @return never {@literal null}.
479+ * @param <T> source type
480+ */
481+ static <T > Flux <T > concatMapSequentially (List <T > source ,
482+ Function <? super T , ? extends Publisher <? extends T >> mapper ) {
483+
484+ if (source .isEmpty ()) {
485+ return Flux .empty ();
486+ }
487+ if (source .size () == 1 ) {
488+ return Flux .just (source .iterator ().next ()).flatMap (mapper );
489+ }
490+ if (source .size () == 2 ) {
491+ return Flux .fromIterable (source ).concatMap (mapper );
492+ }
493+
494+ Flux <T > first = Flux .just (source .get (0 )).flatMap (mapper );
495+ Flux <T > theRest = Flux .fromIterable (source .subList (1 , source .size ())).flatMapSequential (mapper );
496+ return first .concatWith (theRest );
467497 }
468498
469- private Flux <T > findAll (Query query ) {
499+ static <T > Flux <T > concatMapSequentially (Publisher <T > publisher ,
500+ Function <? super T , ? extends Publisher <? extends T >> mapper ) {
470501
471- getReadPreference ().ifPresent (query ::withReadPreference );
472- return mongoOperations .find (query , entityInformation .getJavaType (), entityInformation .getCollectionName ());
502+ return Flux .from (publisher ).switchOnFirst (((signal , source ) -> {
503+
504+ if (!signal .hasValue ()) {
505+ return source .concatMap (mapper );
506+ }
507+
508+ Mono <T > firstCall = Mono .from (mapper .apply (signal .get ()));
509+ return firstCall .concatWith (source .skip (1 ).flatMapSequential (mapper ));
510+ }));
511+ }
512+
513+ private static <E > List <E > toList (Iterable <E > source ) {
514+ return source instanceof List <E > list ? list : new ArrayList <>(toCollection (source ));
515+ }
516+
517+ private static <E > Collection <E > toCollection (Iterable <E > source ) {
518+ return source instanceof Collection <E > collection ? collection
519+ : StreamUtils .createStreamFromIterator (source .iterator ()).collect (Collectors .toList ());
473520 }
474521
475522 /**
0 commit comments