Skip to content

Commit 812c937

Browse files
committed
implement scanning iterator AllRowsScanned
1 parent 548aace commit 812c937

File tree

2 files changed

+114
-0
lines changed

2 files changed

+114
-0
lines changed

rows.go

+25
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"iter"
78
"reflect"
89
"strings"
910
"sync"
@@ -666,6 +667,30 @@ func RowToAddrOfStructByNameLax[T any](row CollectableRow) (*T, error) {
666667
return &value, err
667668
}
668669

670+
// AllRowsScanned returns iterator that read and scans rows one-by-one. It closes
671+
// the rows automatically on return.
672+
//
673+
// In case rows.Err() returns non-nil error after all rows are read, it will
674+
// trigger extra yield with zero value and the error.
675+
func AllRowsScanned[T any](rows Rows, fn RowToFunc[T]) iter.Seq2[T, error] {
676+
return func(yield func(T, error) bool) {
677+
defer rows.Close()
678+
679+
for rows.Next() {
680+
if !yield(fn(rows)) {
681+
break
682+
}
683+
}
684+
685+
// we don't have another choice but to push one more time
686+
// in order to propagate the error to user
687+
if err := rows.Err(); err != nil {
688+
var zero T
689+
yield(zero, err)
690+
}
691+
}
692+
}
693+
669694
type namedStructRowScanner struct {
670695
ptrToStruct any
671696
lax bool

rows_test.go

+89
Original file line numberDiff line numberDiff line change
@@ -993,3 +993,92 @@ insert into products (name, price) values
993993
// Fries: $5
994994
// Soft Drink: $3
995995
}
996+
997+
func ExampleAllRowsScanned() {
998+
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
999+
defer cancel()
1000+
1001+
conn, err := pgx.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
1002+
if err != nil {
1003+
fmt.Printf("Unable to establish connection: %v", err)
1004+
return
1005+
}
1006+
1007+
if conn.PgConn().ParameterStatus("crdb_version") != "" {
1008+
// Skip test / example when running on CockroachDB. Since an example can't be skipped fake success instead.
1009+
fmt.Println(`Cheeseburger: $10
1010+
Fries: $5
1011+
Soft Drink: $3`)
1012+
return
1013+
}
1014+
1015+
// Setup example schema and data.
1016+
_, err = conn.Exec(ctx, `
1017+
create temporary table products (
1018+
id int primary key generated by default as identity,
1019+
name varchar(100) not null,
1020+
price int not null
1021+
);
1022+
1023+
insert into products (name, price) values
1024+
('Cheeseburger', 10),
1025+
('Double Cheeseburger', 14),
1026+
('Fries', 5),
1027+
('Soft Drink', 3);
1028+
`)
1029+
if err != nil {
1030+
fmt.Printf("Unable to setup example schema and data: %v", err)
1031+
return
1032+
}
1033+
1034+
type product struct {
1035+
ID int32
1036+
Name string
1037+
Type string
1038+
Price int32
1039+
}
1040+
1041+
result := make([]product, 0, 3)
1042+
1043+
rows, _ := conn.Query(ctx, "select * from products where price < $1 order by price desc", 12)
1044+
for row, err := range pgx.AllRowsScanned[product](rows, pgx.RowToStructByNameLax) {
1045+
if err != nil {
1046+
fmt.Printf("AllRowsScanned error: %v", err)
1047+
return
1048+
}
1049+
1050+
// our business logic here
1051+
result = append(result, row)
1052+
}
1053+
1054+
for _, p := range result {
1055+
fmt.Printf("%s: $%d\n", p.Name, p.Price)
1056+
}
1057+
1058+
// Output:
1059+
// Cheeseburger: $10
1060+
// Fries: $5
1061+
// Soft Drink: $3
1062+
}
1063+
1064+
func TestAllRowsScanned(t *testing.T) {
1065+
defaultConnTestRunner.RunTest(context.Background(), t, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
1066+
type resultRow struct {
1067+
N int32 `db:"n"`
1068+
}
1069+
1070+
rows, _ := conn.Query(ctx, `select n from generate_series(0, 99) n`)
1071+
1072+
results := make([]resultRow, 0, 100)
1073+
1074+
for row, err := range pgx.AllRowsScanned[resultRow](rows, pgx.RowToStructByName) {
1075+
require.NoError(t, err)
1076+
results = append(results, row)
1077+
}
1078+
1079+
assert.Len(t, results, 100)
1080+
for i := range results {
1081+
assert.Equal(t, int32(i), results[i].N)
1082+
}
1083+
})
1084+
}

0 commit comments

Comments
 (0)