Skip to content

Commit b37aa4c

Browse files
jjeffcaiicaiweiwei.cww
and
caiweiwei.cww
authored
feat: add ants scheduler (#37)
Co-authored-by: caiweiwei.cww <[email protected]>
1 parent 3addfdc commit b37aa4c

File tree

4 files changed

+11
-13
lines changed

4 files changed

+11
-13
lines changed

README.md

-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
# reactor-go 🚀🚀🚀
22

3-
43
![GitHub Workflow Status](https://github.com/jjeffcaii/reactor-go/workflows/Go/badge.svg)
54
[![codecov](https://codecov.io/gh/jjeffcaii/reactor-go/branch/master/graph/badge.svg)](https://codecov.io/gh/jjeffcaii/reactor-go)
65
[![GoDoc](https://godoc.org/github.com/jjeffcaii/reactor-go?status.svg)](https://godoc.org/github.com/jjeffcaii/reactor-go)
@@ -9,8 +8,6 @@
98
[![GitHub Release](https://img.shields.io/github/release-pre/jjeffcaii/reactor-go.svg)](https://github.com/jjeffcaii/reactor-go/releases)
109

1110
> A golang implementation for [reactive-streams](https://www.reactive-streams.org/).
12-
<br>🚧🚧🚧 ***IT IS UNDER ACTIVE DEVELOPMENT!!!***
13-
<br>⚠️⚠️⚠️ ***DO NOT USE IN ANY PRODUCTION ENVIRONMENT!!!***
1411
1512
## Install
1613

@@ -20,10 +17,6 @@ go get -u github.com/jjeffcaii/reactor-go
2017

2118
## Example
2219

23-
> NOTICE:
24-
<br> We can only use `func(interface{})interface{}` for most operations because Golang has not Generics. 😭
25-
<br> If you have any better idea, please let me know. 😀
26-
2720
### Mono
2821
```go
2922
package mono_test

mono/schedule_on.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
"github.com/jjeffcaii/reactor-go"
7+
"github.com/jjeffcaii/reactor-go/internal"
78
"github.com/jjeffcaii/reactor-go/scheduler"
89
)
910

@@ -16,7 +17,8 @@ func (m monoScheduleOn) SubscribeWith(ctx context.Context, s reactor.Subscriber)
1617
if err := m.sc.Worker().Do(func() {
1718
m.source.SubscribeWith(ctx, s)
1819
}); err != nil {
19-
panic(err)
20+
s.OnSubscribe(ctx, internal.EmptySubscription)
21+
s.OnError(err)
2022
}
2123
}
2224

scheduler/elastic.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@ func (e *elasticScheduler) Worker() Worker {
4141
// NewElastic creates a new elastic scheduler.
4242
func NewElastic(size int) Scheduler {
4343
pool, _ := ants.NewPool(size)
44-
return &elasticScheduler{
45-
pool: pool,
46-
}
44+
return NewAnts(pool)
45+
}
46+
47+
// NewAnts creates a new scheduler over ants pool.
48+
func NewAnts(pool *ants.Pool) Scheduler {
49+
return &elasticScheduler{pool: pool}
4750
}
4851

4952
// Elastic is a dynamic alloc scheduler.

scheduler/elastic_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import (
66
"testing"
77
"time"
88

9-
"github.com/jjeffcaii/reactor-go/scheduler"
109
"github.com/stretchr/testify/assert"
10+
11+
"github.com/jjeffcaii/reactor-go/scheduler"
1112
)
1213

1314
func TestElastic(t *testing.T) {
@@ -54,5 +55,4 @@ func TestElasticBounded(t *testing.T) {
5455
}
5556
wg.Wait()
5657
})
57-
5858
}

0 commit comments

Comments
 (0)