From e8a0edca754c3bf177bb6923cc090c96cbd3f10a Mon Sep 17 00:00:00 2001 From: si3nloong Date: Wed, 31 Aug 2022 17:24:57 +0800 Subject: [PATCH] fix: issue #341 --- item.go | 33 +++++++++++++++++++++++++++++++++ iterable_just.go | 2 +- single.go | 4 +++- 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/item.go b/item.go index 5a311761..b78a8b34 100644 --- a/item.go +++ b/item.go @@ -84,6 +84,39 @@ func send(ctx context.Context, ch chan<- Item, items ...interface{}) { } } +func sendSingleItem(ctx context.Context, ch chan<- Item, strategy CloseChannelStrategy, items ...interface{}) { + if strategy == CloseChannel { + defer close(ch) + } + for _, currentItem := range items { + switch item := currentItem.(type) { + default: + rt := reflect.TypeOf(item) + switch rt.Kind() { + default: + Of(item).SendContext(ctx, ch) + case reflect.Chan: + in := reflect.ValueOf(currentItem) + for { + v, ok := in.Recv() + if !ok { + return + } + currentItem := v.Interface() + switch item := currentItem.(type) { + default: + Of(item).SendContext(ctx, ch) + case error: + Error(item).SendContext(ctx, ch) + } + } + } + case error: + Error(item).SendContext(ctx, ch) + } + } +} + // Error checks if an item is an error. func (i Item) Error() bool { return i.E != nil diff --git a/iterable_just.go b/iterable_just.go index 0856a8ff..3b30d02a 100644 --- a/iterable_just.go +++ b/iterable_just.go @@ -18,6 +18,6 @@ func (i *justIterable) Observe(opts ...Option) <-chan Item { option := parseOptions(append(i.opts, opts...)...) next := option.buildChannel() - go SendItems(option.buildContext(emptyContext), next, CloseChannel, i.items) + go sendSingleItem(option.buildContext(emptyContext), next, CloseChannel, i.items...) return next } diff --git a/single.go b/single.go index e4d70d28..a1fa1ce7 100644 --- a/single.go +++ b/single.go @@ -1,6 +1,8 @@ package rxgo -import "context" +import ( + "context" +) // Single is a observable with a single element. type Single interface {