Skip to content

Observe(rxgo.WithContext(ctx)) does not cancel observable?  #342

@torepaulsson

Description

@torepaulsson

Hi,
I am not sure I'm using RxGo the correct way or if this use case is not supported. I have one producer and multiple consumers which want to subscribe and unsubscribe for messages. Consumers can come and go, my idea was to use the rxgo.WithContext to "cancel" consumer subscriptions, but they don't get cancelled? Is this the intended behavior or am I missing something? I've been testing out some different approaches to no success.

The test I try with his this:

func TestCtxCancel(t *testing.T) {
	ch := make(chan rxgo.Item)
	ob := rxgo.FromEventSource(ch, rxgo.WithBackPressureStrategy(rxgo.Drop))

	producerCtx, cancelProducer := context.WithCancel(context.Background())
	cancel1 := observe(1, ob)
	cancel2 := observe(2, ob)

	// Producer
	go func() {
		defer func() {
			log.Print("producer finished")
			close(ch)
		}()
		ctr := 0
		t := time.NewTicker(time.Second)
		for {
			select {
			case <-producerCtx.Done():
				return
			case <-t.C:
				ctr++
				select {
				case ch <- rxgo.Of(ctr):
				default:
					log.Printf("Send channel blocked, msg %d dropped", ctr) // This happend when I tried some other methods
				}
			}
		}
	}()

	log.Printf("Sleep 3")
	time.Sleep(time.Second * 3)
	log.Printf("Cancel Observer 1")
	cancel1()

	log.Printf("Sleep 3")
	time.Sleep(time.Second * 3)
	log.Printf("Cancel Observer 2")
	cancel2()

	log.Printf("Sleep 3")
	time.Sleep(time.Second * 3)
	log.Printf("Cancel Producer")
	cancelProducer()

	log.Printf("Sleep 1")
	time.Sleep(time.Second * 1)
}

func observe(index int, ob rxgo.Observable) context.CancelFunc {
	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		for i := range ob.Observe(rxgo.WithContext(ctx)) {
			log.Printf("Observer[%d]: %v", index, i.V)
		}
		log.Printf("Observer[%d] stopped", index)
	}()
	return cancel
}

Produces the following output:

2022/01/31 08:43:59 Sleep 3
2022/01/31 08:44:00 Observer[2]: 1
2022/01/31 08:44:00 Observer[1]: 1
2022/01/31 08:44:01 Observer[2]: 2
2022/01/31 08:44:01 Observer[1]: 2
2022/01/31 08:44:02 Observer[2]: 3
2022/01/31 08:44:02 Observer[1]: 3
2022/01/31 08:44:02 Cancel Observer 1
2022/01/31 08:44:02 Sleep 3
2022/01/31 08:44:03 Observer[2]: 4
2022/01/31 08:44:03 Observer[1]: 4
2022/01/31 08:44:04 Observer[2]: 5
2022/01/31 08:44:04 Observer[1]: 5
2022/01/31 08:44:05 Observer[2]: 6
2022/01/31 08:44:05 Observer[1]: 6
2022/01/31 08:44:05 Cancel Observer 2
2022/01/31 08:44:05 Sleep 3
2022/01/31 08:44:06 Observer[2]: 7
2022/01/31 08:44:06 Observer[1]: 7
2022/01/31 08:44:07 Observer[2]: 8
2022/01/31 08:44:07 Observer[1]: 8
2022/01/31 08:44:08 Observer[2]: 9
2022/01/31 08:44:08 Observer[1]: 9
2022/01/31 08:44:08 Cancel Producer
2022/01/31 08:44:08 Sleep 1
2022/01/31 08:44:08 producer finished
2022/01/31 08:44:08 Observer[2] stopped
2022/01/31 08:44:08 Observer[1] stopped

Have I just made a mistake somewhere which leads to the observable not being cancelled? Any help is much appreciated, thanks for a great package.

Metadata

Metadata

Assignees

Labels

questionQuestion regarding how RxGo is working etc.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions