@@ -16,62 +16,334 @@ package runtime
16
16
17
17
import (
18
18
"bytes"
19
+ "context"
20
+ "encoding"
19
21
"encoding/csv"
20
22
"errors"
23
+ "fmt"
21
24
"io"
25
+ "reflect"
26
+
27
+ "golang.org/x/sync/errgroup"
22
28
)
23
29
24
- // CSVConsumer creates a new CSV consumer
25
- func CSVConsumer () Consumer {
30
+ // CSVConsumer creates a new CSV consumer.
31
+ //
32
+ // The consumer consumes CSV records from a provided reader into the data passed by reference.
33
+ //
34
+ // CSVOpts options may be specified to alter the default CSV behavior on the reader and the writer side (e.g. separator, skip header, ...).
35
+ // The defaults are those of the standard library's csv.Reader and csv.Writer.
36
+ //
37
+ // Supported output underlying types and interfaces, prioritized in this order:
38
+ // - *csv.Writer
39
+ // - CSVWriter (writer options are ignored)
40
+ // - io.Writer (as raw bytes)
41
+ // - io.ReaderFrom (as raw bytes)
42
+ // - encoding.BinaryUnmarshaler (as raw bytes)
43
+ // - *[][]string (as a collection of records)
44
+ // - *[]byte (as raw bytes)
45
+ // - *string (a raw bytes)
46
+ //
47
+ // The consumer prioritizes situations where buffering the input is not required.
48
+ func CSVConsumer (opts ... CSVOpt ) Consumer {
49
+ o := csvOptsWithDefaults (opts )
50
+
26
51
return ConsumerFunc (func (reader io.Reader , data interface {}) error {
27
52
if reader == nil {
28
53
return errors .New ("CSVConsumer requires a reader" )
29
54
}
55
+ if data == nil {
56
+ return errors .New ("nil destination for CSVConsumer" )
57
+ }
30
58
31
59
csvReader := csv .NewReader (reader )
32
- writer , ok := data .(io.Writer )
33
- if ! ok {
34
- return errors .New ("data type must be io.Writer" )
60
+ o .applyToReader (csvReader )
61
+ closer := defaultCloser
62
+ if o .closeStream {
63
+ if cl , isReaderCloser := reader .(io.Closer ); isReaderCloser {
64
+ closer = cl .Close
65
+ }
35
66
}
36
- csvWriter := csv .NewWriter (writer )
37
- records , err := csvReader .ReadAll ()
38
- if err != nil {
67
+ defer func () {
68
+ _ = closer ()
69
+ }()
70
+
71
+ switch destination := data .(type ) {
72
+ case * csv.Writer :
73
+ csvWriter := destination
74
+ o .applyToWriter (csvWriter )
75
+
76
+ return pipeCSV (csvWriter , csvReader , o )
77
+
78
+ case CSVWriter :
79
+ csvWriter := destination
80
+ // no writer options available
81
+
82
+ return pipeCSV (csvWriter , csvReader , o )
83
+
84
+ case io.Writer :
85
+ csvWriter := csv .NewWriter (destination )
86
+ o .applyToWriter (csvWriter )
87
+
88
+ return pipeCSV (csvWriter , csvReader , o )
89
+
90
+ case io.ReaderFrom :
91
+ var buf bytes.Buffer
92
+ csvWriter := csv .NewWriter (& buf )
93
+ o .applyToWriter (csvWriter )
94
+ if err := bufferedCSV (csvWriter , csvReader , o ); err != nil {
95
+ return err
96
+ }
97
+ _ , err := destination .ReadFrom (& buf )
98
+
39
99
return err
40
- }
41
- for _ , r := range records {
42
- if err := csvWriter .Write (r ); err != nil {
100
+
101
+ case encoding.BinaryUnmarshaler :
102
+ var buf bytes.Buffer
103
+ csvWriter := csv .NewWriter (& buf )
104
+ o .applyToWriter (csvWriter )
105
+ if err := bufferedCSV (csvWriter , csvReader , o ); err != nil {
43
106
return err
44
107
}
108
+
109
+ return destination .UnmarshalBinary (buf .Bytes ())
110
+
111
+ default :
112
+ // support *[][]string, *[]byte, *string
113
+ if ptr := reflect .TypeOf (data ); ptr .Kind () != reflect .Ptr {
114
+ return errors .New ("destination must be a pointer" )
115
+ }
116
+
117
+ v := reflect .Indirect (reflect .ValueOf (data ))
118
+ t := v .Type ()
119
+
120
+ switch {
121
+ case t .Kind () == reflect .Slice && t .Elem ().Kind () == reflect .Slice && t .Elem ().Elem ().Kind () == reflect .String :
122
+ csvWriter := & csvRecordsWriter {}
123
+ // writer options are ignored
124
+ if err := pipeCSV (csvWriter , csvReader , o ); err != nil {
125
+ return err
126
+ }
127
+ v .Grow (len (csvWriter .records ))
128
+ v .SetCap (len (csvWriter .records )) // in case Grow was unnessary, trim down the capacity
129
+ v .SetLen (len (csvWriter .records ))
130
+ reflect .Copy (v , reflect .ValueOf (csvWriter .records ))
131
+
132
+ return nil
133
+
134
+ case t .Kind () == reflect .Slice && t .Elem ().Kind () == reflect .Uint8 :
135
+ var buf bytes.Buffer
136
+ csvWriter := csv .NewWriter (& buf )
137
+ o .applyToWriter (csvWriter )
138
+ if err := bufferedCSV (csvWriter , csvReader , o ); err != nil {
139
+ return err
140
+ }
141
+ v .SetBytes (buf .Bytes ())
142
+
143
+ return nil
144
+
145
+ case t .Kind () == reflect .String :
146
+ var buf bytes.Buffer
147
+ csvWriter := csv .NewWriter (& buf )
148
+ o .applyToWriter (csvWriter )
149
+ if err := bufferedCSV (csvWriter , csvReader , o ); err != nil {
150
+ return err
151
+ }
152
+ v .SetString (buf .String ())
153
+
154
+ return nil
155
+
156
+ default :
157
+ return fmt .Errorf ("%v (%T) is not supported by the CSVConsumer, %s" ,
158
+ data , data , "can be resolved by supporting CSVWriter/Writer/BinaryUnmarshaler interface" ,
159
+ )
160
+ }
45
161
}
46
- csvWriter .Flush ()
47
- return nil
48
162
})
49
163
}
50
164
51
- // CSVProducer creates a new CSV producer
52
- func CSVProducer () Producer {
165
+ // CSVProducer creates a new CSV producer.
166
+ //
167
+ // The producer takes input data then writes as CSV to an output writer (essentially as a pipe).
168
+ //
169
+ // Supported input underlying types and interfaces, prioritized in this order:
170
+ // - *csv.Reader
171
+ // - CSVReader (reader options are ignored)
172
+ // - io.Reader
173
+ // - io.WriterTo
174
+ // - encoding.BinaryMarshaler
175
+ // - [][]string
176
+ // - []byte
177
+ // - string
178
+ //
179
+ // The producer prioritizes situations where buffering the input is not required.
180
+ func CSVProducer (opts ... CSVOpt ) Producer {
181
+ o := csvOptsWithDefaults (opts )
182
+
53
183
return ProducerFunc (func (writer io.Writer , data interface {}) error {
54
184
if writer == nil {
55
185
return errors .New ("CSVProducer requires a writer" )
56
186
}
187
+ if data == nil {
188
+ return errors .New ("nil data for CSVProducer" )
189
+ }
57
190
58
- dataBytes , ok := data .([]byte )
59
- if ! ok {
60
- return errors .New ("data type must be byte array" )
191
+ csvWriter := csv .NewWriter (writer )
192
+ o .applyToWriter (csvWriter )
193
+ closer := defaultCloser
194
+ if o .closeStream {
195
+ if cl , isWriterCloser := writer .(io.Closer ); isWriterCloser {
196
+ closer = cl .Close
197
+ }
61
198
}
199
+ defer func () {
200
+ _ = closer ()
201
+ }()
62
202
63
- csvReader := csv .NewReader (bytes .NewBuffer (dataBytes ))
64
- records , err := csvReader .ReadAll ()
65
- if err != nil {
66
- return err
203
+ if rc , isDataCloser := data .(io.ReadCloser ); isDataCloser {
204
+ defer rc .Close ()
67
205
}
68
- csvWriter := csv .NewWriter (writer )
69
- for _ , r := range records {
70
- if err := csvWriter .Write (r ); err != nil {
206
+
207
+ switch origin := data .(type ) {
208
+ case * csv.Reader :
209
+ csvReader := origin
210
+ o .applyToReader (csvReader )
211
+
212
+ return pipeCSV (csvWriter , csvReader , o )
213
+
214
+ case CSVReader :
215
+ csvReader := origin
216
+ // no reader options available
217
+
218
+ return pipeCSV (csvWriter , csvReader , o )
219
+
220
+ case io.Reader :
221
+ csvReader := csv .NewReader (origin )
222
+ o .applyToReader (csvReader )
223
+
224
+ return pipeCSV (csvWriter , csvReader , o )
225
+
226
+ case io.WriterTo :
227
+ // async piping of the writes performed by WriteTo
228
+ r , w := io .Pipe ()
229
+ csvReader := csv .NewReader (r )
230
+ o .applyToReader (csvReader )
231
+
232
+ pipe , _ := errgroup .WithContext (context .Background ())
233
+ pipe .Go (func () error {
234
+ _ , err := origin .WriteTo (w )
235
+ _ = w .Close ()
236
+ return err
237
+ })
238
+
239
+ pipe .Go (func () error {
240
+ defer func () {
241
+ _ = r .Close ()
242
+ }()
243
+
244
+ return pipeCSV (csvWriter , csvReader , o )
245
+ })
246
+
247
+ return pipe .Wait ()
248
+
249
+ case encoding.BinaryMarshaler :
250
+ buf , err := origin .MarshalBinary ()
251
+ if err != nil {
71
252
return err
72
253
}
254
+ rdr := bytes .NewBuffer (buf )
255
+ csvReader := csv .NewReader (rdr )
256
+
257
+ return bufferedCSV (csvWriter , csvReader , o )
258
+
259
+ default :
260
+ // support [][]string, []byte, string (or pointers to those)
261
+ v := reflect .Indirect (reflect .ValueOf (data ))
262
+ t := v .Type ()
263
+
264
+ switch {
265
+ case t .Kind () == reflect .Slice && t .Elem ().Kind () == reflect .Slice && t .Elem ().Elem ().Kind () == reflect .String :
266
+ csvReader := & csvRecordsWriter {
267
+ records : make ([][]string , v .Len ()),
268
+ }
269
+ reflect .Copy (reflect .ValueOf (csvReader .records ), v )
270
+
271
+ return pipeCSV (csvWriter , csvReader , o )
272
+
273
+ case t .Kind () == reflect .Slice && t .Elem ().Kind () == reflect .Uint8 :
274
+ buf := bytes .NewBuffer (v .Bytes ())
275
+ csvReader := csv .NewReader (buf )
276
+ o .applyToReader (csvReader )
277
+
278
+ return bufferedCSV (csvWriter , csvReader , o )
279
+
280
+ case t .Kind () == reflect .String :
281
+ buf := bytes .NewBufferString (v .String ())
282
+ csvReader := csv .NewReader (buf )
283
+ o .applyToReader (csvReader )
284
+
285
+ return bufferedCSV (csvWriter , csvReader , o )
286
+
287
+ default :
288
+ return fmt .Errorf ("%v (%T) is not supported by the CSVProducer, %s" ,
289
+ data , data , "can be resolved by supporting CSVReader/Reader/BinaryMarshaler interface" ,
290
+ )
291
+ }
73
292
}
74
- csvWriter .Flush ()
75
- return nil
76
293
})
77
294
}
295
+
296
+ // pipeCSV copies CSV records from a CSV reader to a CSV writer
297
+ func pipeCSV (csvWriter CSVWriter , csvReader CSVReader , opts csvOpts ) error {
298
+ for ; opts .skippedLines > 0 ; opts .skippedLines -- {
299
+ _ , err := csvReader .Read ()
300
+ if err != nil {
301
+ if errors .Is (err , io .EOF ) {
302
+ return nil
303
+ }
304
+
305
+ return err
306
+ }
307
+ }
308
+
309
+ for {
310
+ record , err := csvReader .Read ()
311
+ if err != nil {
312
+ if errors .Is (err , io .EOF ) {
313
+ break
314
+ }
315
+
316
+ return err
317
+ }
318
+
319
+ if err := csvWriter .Write (record ); err != nil {
320
+ return err
321
+ }
322
+ }
323
+
324
+ csvWriter .Flush ()
325
+
326
+ return csvWriter .Error ()
327
+ }
328
+
329
+ // bufferedCSV copies CSV records from a CSV reader to a CSV writer,
330
+ // by first reading all records then writing them at once.
331
+ func bufferedCSV (csvWriter * csv.Writer , csvReader * csv.Reader , opts csvOpts ) error {
332
+ for ; opts .skippedLines > 0 ; opts .skippedLines -- {
333
+ _ , err := csvReader .Read ()
334
+ if err != nil {
335
+ if errors .Is (err , io .EOF ) {
336
+ return nil
337
+ }
338
+
339
+ return err
340
+ }
341
+ }
342
+
343
+ records , err := csvReader .ReadAll ()
344
+ if err != nil {
345
+ return err
346
+ }
347
+
348
+ return csvWriter .WriteAll (records )
349
+ }
0 commit comments