Skip to content

Commit b74b91b

Browse files
committed
refactor(ynabber): channels for fan out to writers
Use channels to batch transactions and fan out to writers. This is prep work for allowing multiple "pipelines" of readers and writers and potentially multiple instances of Ynabber running concurrently. commit-id:d118ecdf
1 parent 2146e86 commit b74b91b

File tree

2 files changed

+77
-43
lines changed

2 files changed

+77
-43
lines changed

cmd/ynabber/main.go

+6-43
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package main
22

33
import (
4-
"fmt"
54
"log"
65
"log/slog"
76
"os"
8-
"time"
97

108
"github.com/carlmjohnson/versioninfo"
119
"github.com/kelseyhightower/envconfig"
@@ -38,61 +36,26 @@ func main() {
3836
setupLogging(cfg.Debug)
3937
slog.Info("starting...", "version", versioninfo.Short())
4038

41-
ynabber := ynabber.Ynabber{}
39+
y := ynabber.NewYnabber(&cfg)
4240
for _, reader := range cfg.Readers {
4341
switch reader {
4442
case "nordigen":
45-
ynabber.Readers = append(ynabber.Readers, nordigen.NewReader(&cfg))
43+
y.Readers = append(y.Readers, nordigen.NewReader(&cfg))
4644
default:
4745
log.Fatalf("Unknown reader: %s", reader)
4846
}
4947
}
5048
for _, writer := range cfg.Writers {
5149
switch writer {
5250
case "ynab":
53-
ynabber.Writers = append(ynabber.Writers, ynab.NewWriter(&cfg))
51+
y.Writers = append(y.Writers, ynab.NewWriter(&cfg))
5452
case "json":
55-
ynabber.Writers = append(ynabber.Writers, json.Writer{})
53+
y.Writers = append(y.Writers, json.Writer{})
5654
default:
5755
log.Fatalf("Unknown writer: %s", writer)
5856
}
5957
}
6058

61-
for {
62-
start := time.Now()
63-
err = run(ynabber)
64-
if err != nil {
65-
panic(err)
66-
} else {
67-
slog.Info("run succeeded", "in", time.Since(start))
68-
if cfg.Interval > 0 {
69-
slog.Info("waiting for next run", "in", cfg.Interval)
70-
time.Sleep(cfg.Interval)
71-
} else {
72-
os.Exit(0)
73-
}
74-
}
75-
}
76-
}
77-
78-
func run(y ynabber.Ynabber) error {
79-
var transactions []ynabber.Transaction
80-
81-
// Read transactions from all readers
82-
for _, reader := range y.Readers {
83-
t, err := reader.Bulk()
84-
if err != nil {
85-
return fmt.Errorf("reading: %w", err)
86-
}
87-
transactions = append(transactions, t...)
88-
}
89-
90-
// Write transactions to all writers
91-
for _, writer := range y.Writers {
92-
err := writer.Bulk(transactions)
93-
if err != nil {
94-
return fmt.Errorf("writing: %w", err)
95-
}
96-
}
97-
return nil
59+
// Run Ynabber
60+
y.Run()
9861
}

ynabber.go

+71
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,25 @@
11
package ynabber
22

33
import (
4+
"log/slog"
45
"strconv"
56
"time"
67
)
78

89
type Ynabber struct {
910
Readers []Reader
1011
Writers []Writer
12+
13+
config *Config
14+
logger slog.Logger
15+
}
16+
17+
// NewYnabber creates a new Ynabber instance
18+
func NewYnabber(config *Config) *Ynabber {
19+
return &Ynabber{
20+
config: config,
21+
logger: *slog.Default(),
22+
}
1123
}
1224

1325
type Reader interface {
@@ -53,3 +65,62 @@ func (m Milliunits) String() string {
5365
func MilliunitsFromAmount(amount float64) Milliunits {
5466
return Milliunits(amount * 1000)
5567
}
68+
69+
// Run starts Ynabber by reading transactions from all readers into a channel to
70+
// fan out to all writers
71+
func (y *Ynabber) Run() {
72+
batches := make(chan []Transaction)
73+
74+
// Create a channel for each writer and fan out transactions to each one
75+
channels := make([]chan []Transaction, len(y.Writers))
76+
for c := range channels {
77+
channels[c] = make(chan []Transaction)
78+
}
79+
go func() {
80+
for batch := range batches {
81+
for _, c := range channels {
82+
c <- batch
83+
}
84+
}
85+
}()
86+
87+
for c, writer := range y.Writers {
88+
go func(writer Writer, batches <-chan []Transaction) {
89+
for batch := range batches {
90+
err := writer.Bulk(batch)
91+
if err != nil {
92+
y.logger.Error("writing", "error", err, "writer", writer)
93+
}
94+
}
95+
}(writer, channels[c])
96+
}
97+
98+
for _, r := range y.Readers {
99+
go func(reader Reader) {
100+
for {
101+
start := time.Now()
102+
batch, err := reader.Bulk()
103+
if err != nil {
104+
y.logger.Error("reading", "error", err, "reader", reader)
105+
continue
106+
}
107+
batches <- batch
108+
y.logger.Info("run succeeded", "in", time.Since(start))
109+
110+
// TODO(Martin): The interval should be controlled by the
111+
// reader. We are only pausing the entire reader goroutine
112+
// because thats how the config option is implemented now.
113+
// Eventually we should move this option into the reader
114+
// allowing for multiple readers with different intervals.
115+
if y.config.Interval > 0 {
116+
y.logger.Info("waiting for next run", "in", y.config.Interval)
117+
time.Sleep(y.config.Interval)
118+
} else {
119+
break
120+
}
121+
}
122+
}(r)
123+
}
124+
125+
select {}
126+
}

0 commit comments

Comments
 (0)