@@ -6,37 +6,33 @@ import (
6
6
"sync"
7
7
"time"
8
8
9
- "github.com/confluentinc/confluent-kafka-go/kafka"
9
+ kafkalib "github.com/confluentinc/confluent-kafka-go/kafka"
10
+ "github.com/netlify/netlify-commons/kafka"
10
11
"github.com/netlify/netlify-commons/util"
11
12
"github.com/sirupsen/logrus"
12
13
)
13
14
14
15
func KafkaPipe (log logrus.FieldLogger ) (* FakeKafkaConsumer , * FakeKafkaProducer ) {
15
- distri := make (chan * kafka .Message , 200 )
16
+ distri := make (chan * kafkalib .Message , 200 )
16
17
rdr := NewFakeKafkaConsumer (log , distri )
17
18
wtr := NewFakeKafkaProducer (distri )
18
19
wtr .commits = rdr .commits
19
20
return rdr , wtr
20
21
}
21
22
22
23
type FakeKafkaConsumer struct {
23
- messages []* kafka .Message
24
+ messages []* kafkalib .Message
24
25
msgMu sync.Mutex
25
26
offset int64
26
27
notify chan struct {}
27
- commits chan * kafka .Message
28
+ commits chan * kafkalib .Message
28
29
log logrus.FieldLogger
29
30
}
30
31
31
- func (f * FakeKafkaConsumer ) Close () error {
32
- close (f .commits )
33
- return nil
34
- }
35
-
36
32
type FakeKafkaProducer struct {
37
- distris []chan <- * kafka .Message
33
+ distris []chan <- * kafkalib .Message
38
34
distrisMu sync.Mutex
39
- commits <- chan * kafka .Message
35
+ commits <- chan * kafkalib .Message
40
36
closed util.AtomicBool
41
37
}
42
38
@@ -53,19 +49,19 @@ func (f *FakeKafkaProducer) Close() error {
53
49
return nil
54
50
}
55
51
56
- func NewFakeKafkaConsumer (log logrus.FieldLogger , distri <- chan * kafka .Message ) * FakeKafkaConsumer {
52
+ func NewFakeKafkaConsumer (log logrus.FieldLogger , distri <- chan * kafkalib .Message ) * FakeKafkaConsumer {
57
53
r := & FakeKafkaConsumer {
58
- messages : make ([]* kafka .Message , 0 ),
54
+ messages : make ([]* kafkalib .Message , 0 ),
59
55
offset : 0 ,
60
56
notify : make (chan struct {}),
61
57
log : log ,
62
- commits : make (chan * kafka .Message , 1000 ),
58
+ commits : make (chan * kafkalib .Message , 1000 ),
63
59
}
64
60
65
61
go func () {
66
62
for msg := range distri {
67
63
r .msgMu .Lock ()
68
- msg .TopicPartition .Offset = kafka .Offset (r .offset + 1 )
64
+ msg .TopicPartition .Offset = kafkalib .Offset (r .offset + 1 )
69
65
r .messages = append (r .messages , setMsgDefaults (msg ))
70
66
r .msgMu .Unlock ()
71
67
r .notify <- struct {}{}
@@ -75,7 +71,7 @@ func NewFakeKafkaConsumer(log logrus.FieldLogger, distri <-chan *kafka.Message)
75
71
return r
76
72
}
77
73
78
- func (f * FakeKafkaConsumer ) FetchMessage (ctx context.Context ) (* kafka .Message , error ) {
74
+ func (f * FakeKafkaConsumer ) FetchMessage (ctx context.Context ) (* kafkalib .Message , error ) {
79
75
for {
80
76
f .msgMu .Lock ()
81
77
if int64 (len (f .messages )) > f .offset {
@@ -89,13 +85,13 @@ func (f *FakeKafkaConsumer) FetchMessage(ctx context.Context) (*kafka.Message, e
89
85
90
86
select {
91
87
case <- ctx .Done ():
92
- return & kafka .Message {}, ctx .Err ()
88
+ return & kafkalib .Message {}, ctx .Err ()
93
89
case <- f .notify :
94
90
}
95
91
}
96
92
}
97
93
98
- func (f * FakeKafkaConsumer ) CommitMessage (msg * kafka .Message ) error {
94
+ func (f * FakeKafkaConsumer ) CommitMessage (msg * kafkalib .Message ) error {
99
95
f .msgMu .Lock ()
100
96
f .log .WithField ("offset" , msg .TopicPartition .Offset ).Trace ("commiting message..." )
101
97
if int64 (msg .TopicPartition .Offset ) > f .offset {
@@ -117,27 +113,52 @@ func (f *FakeKafkaConsumer) SetInitialOffset(offset int64) error {
117
113
return nil
118
114
}
119
115
120
- func (f * FakeKafkaConsumer ) Seek (offset int64 , _ time. Duration ) error {
116
+ func (f * FakeKafkaConsumer ) Seek (offset int64 ) error {
121
117
f .msgMu .Lock ()
122
118
f .offset = offset
123
119
f .msgMu .Unlock ()
124
120
return nil
125
121
}
126
122
127
- func NewFakeKafkaProducer (distris ... chan <- * kafka.Message ) * FakeKafkaProducer {
123
+ func (f * FakeKafkaConsumer ) AssignPartitionByKey (key string , algorithm kafka.PartitionerAlgorithm ) error {
124
+ return nil // noop
125
+ }
126
+
127
+ func (f * FakeKafkaConsumer ) AssignPartitionByID (id int32 ) error {
128
+ return nil // noop
129
+ }
130
+
131
+ func (f * FakeKafkaConsumer ) GetMetadata (allTopics bool ) (* kafkalib.Metadata , error ) {
132
+ return & kafkalib.Metadata {}, nil // noop
133
+ }
134
+
135
+ func (f * FakeKafkaConsumer ) GetPartitions () ([]int32 , error ) {
136
+ return []int32 {}, nil // noop
137
+ }
138
+
139
+ func (f * FakeKafkaConsumer ) SeekToTime (t time.Time ) error {
140
+ return nil // noop
141
+ }
142
+
143
+ func (f * FakeKafkaConsumer ) Close () error {
144
+ close (f .commits )
145
+ return nil
146
+ }
147
+
148
+ func NewFakeKafkaProducer (distris ... chan <- * kafkalib.Message ) * FakeKafkaProducer {
128
149
return & FakeKafkaProducer {
129
150
distris : distris ,
130
151
closed : util .NewAtomicBool (false ),
131
152
}
132
153
}
133
154
134
- func (f * FakeKafkaProducer ) AddDistri (d chan <- * kafka .Message ) {
155
+ func (f * FakeKafkaProducer ) AddDistri (d chan <- * kafkalib .Message ) {
135
156
f .distrisMu .Lock ()
136
157
f .distris = append (f .distris , d )
137
158
f .distrisMu .Unlock ()
138
159
}
139
160
140
- func (f * FakeKafkaProducer ) Produce (ctx context.Context , msgs ... * kafka .Message ) error {
161
+ func (f * FakeKafkaProducer ) Produce (ctx context.Context , msgs ... * kafkalib .Message ) error {
141
162
f .distrisMu .Lock ()
142
163
for _ , msg := range msgs {
143
164
for _ , d := range f .distris {
@@ -162,7 +183,7 @@ func (f *FakeKafkaProducer) WaitForKey(key []byte) (gotKey bool) {
162
183
return false
163
184
}
164
185
165
- func setMsgDefaults (msg * kafka .Message ) * kafka .Message {
186
+ func setMsgDefaults (msg * kafkalib .Message ) * kafkalib .Message {
166
187
if msg .TopicPartition .Topic == nil {
167
188
topicName := "local-test"
168
189
msg .TopicPartition .Topic = & topicName
0 commit comments