Skip to content

Commit 27d2268

Browse files
committed
colexec/group: add spill
1 parent 171754d commit 27d2268

29 files changed

+2829
-48
lines changed

pkg/fileservice/file_service_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,9 @@ func testFileService(
10321032
}
10331033

10341034
w, err := rwFS.NewWriter(ctx, "foo")
1035+
if moerr.IsMoErrCode(err, moerr.ErrNotSupported) {
1036+
return
1037+
}
10351038
assert.Nil(t, err)
10361039
assert.NotNil(t, w)
10371040
_, err = w.Write([]byte("foobarbaz"))

pkg/fileservice/sub_path.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,24 @@ package fileservice
1717
import (
1818
"context"
1919
"fmt"
20+
"io"
2021
"iter"
2122
"path"
2223
"strings"
24+
25+
"github.com/matrixorigin/matrixone/pkg/common/moerr"
2326
)
2427

2528
type subPathFS struct {
26-
upstream FileService
27-
path string
28-
name string
29+
upstream FileService
30+
rwUpstream ReaderWriterFileService
31+
path string
32+
name string
2933
}
3034

3135
// SubPath returns a FileService instance that operates at specified sub path of the upstream instance
3236
func SubPath(upstream FileService, path string) FileService {
33-
return &subPathFS{
37+
ret := &subPathFS{
3438
upstream: upstream,
3539
path: path,
3640
name: strings.Join([]string{
@@ -39,9 +43,13 @@ func SubPath(upstream FileService, path string) FileService {
3943
path,
4044
}, ","),
4145
}
46+
if rwfs, ok := upstream.(ReaderWriterFileService); ok {
47+
ret.rwUpstream = rwfs
48+
}
49+
return ret
4250
}
4351

44-
var _ FileService = new(subPathFS)
52+
var _ ReaderWriterFileService = new(subPathFS)
4553

4654
func (s *subPathFS) Name() string {
4755
return s.name
@@ -143,6 +151,28 @@ func (s *subPathFS) Cost() *CostAttr {
143151
return s.upstream.Cost()
144152
}
145153

154+
func (s *subPathFS) NewReader(ctx context.Context, filePath string) (io.ReadCloser, error) {
155+
p, err := s.toUpstreamPath(filePath)
156+
if err != nil {
157+
return nil, err
158+
}
159+
if s.rwUpstream != nil {
160+
return s.rwUpstream.NewReader(ctx, p)
161+
}
162+
return nil, moerr.NewNotSupportedNoCtx("not ReaderWriterFileService")
163+
}
164+
165+
func (s *subPathFS) NewWriter(ctx context.Context, filePath string) (io.WriteCloser, error) {
166+
p, err := s.toUpstreamPath(filePath)
167+
if err != nil {
168+
return nil, err
169+
}
170+
if s.rwUpstream != nil {
171+
return s.rwUpstream.NewWriter(ctx, p)
172+
}
173+
return nil, moerr.NewNotSupportedNoCtx("not ReaderWriterFileService")
174+
}
175+
146176
var _ MutableFileService = new(subPathFS)
147177

148178
func (s *subPathFS) NewMutator(ctx context.Context, filePath string) (Mutator, error) {

pkg/sql/colexec/aggexec/aggFrame_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -699,12 +699,12 @@ func TestGroupConcatExecMarshalUnmarshal(t *testing.T) {
699699
require.NoError(t, vector.AppendBytes(v1, []byte("test1"), false, m.Mp()))
700700
require.NoError(t, exec.Fill(0, 0, []*vector.Vector{v1}))
701701

702-
data, err := exec.marshal()
702+
data, err := exec.Marshal()
703703
require.NoError(t, err)
704704
require.NotNil(t, data)
705705

706706
newExec := newGroupConcatExec(m, info, ",")
707-
err = newExec.unmarshal(m.Mp(), nil, nil, [][]byte{[]byte(",")})
707+
err = newExec.Unmarshal(m.Mp(), nil, nil, [][]byte{[]byte(",")})
708708
require.NoError(t, err)
709709

710710
require.Equal(t, []byte(","), newExec.(*groupConcatExec).separator)
@@ -722,14 +722,14 @@ func TestGroupConcatExecMarshalUnmarshal(t *testing.T) {
722722
}
723723
exec := newGroupConcatExec(m, info, "|")
724724

725-
data, err := exec.marshal()
725+
data, err := exec.Marshal()
726726
require.NoError(t, err)
727727

728728
newExec := newGroupConcatExec(m, info, ",")
729729
encoded := &EncodedAgg{}
730730
require.NoError(t, encoded.Unmarshal(data))
731731

732-
err = newExec.unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
732+
err = newExec.Unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
733733
require.NoError(t, err)
734734

735735
require.Equal(t, []byte("|"), newExec.(*groupConcatExec).separator)
@@ -753,14 +753,14 @@ func TestGroupConcatExecMarshalUnmarshal(t *testing.T) {
753753
require.NoError(t, vector.AppendBytes(v1, []byte("distinct1"), false, m.Mp()))
754754
require.NoError(t, exec.Fill(0, 0, []*vector.Vector{v1}))
755755

756-
data, err := exec.marshal()
756+
data, err := exec.Marshal()
757757
require.NoError(t, err)
758758

759759
newExec := newGroupConcatExec(m, info, ",")
760760
encoded := &EncodedAgg{}
761761
require.NoError(t, encoded.Unmarshal(data))
762762

763-
err = newExec.unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
763+
err = newExec.Unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
764764
require.NoError(t, err)
765765

766766
exec.Free()
@@ -803,14 +803,14 @@ func TestCountColumnExecMarshalUnmarshal(t *testing.T) {
803803
require.NoError(t, vector.AppendFixedList(v1, []int64{1, 2, 3}, nil, m.Mp()))
804804
require.NoError(t, exec.BulkFill(0, []*vector.Vector{v1}))
805805

806-
data, err := exec.marshal()
806+
data, err := exec.Marshal()
807807
require.NoError(t, err)
808808

809809
newExec := newCountColumnExecExec(m, info)
810810
encoded := &EncodedAgg{}
811811
require.NoError(t, encoded.Unmarshal(data))
812812

813-
err = newExec.unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
813+
err = newExec.Unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
814814
require.NoError(t, err)
815815

816816
exec.Free()
@@ -833,14 +833,14 @@ func TestCountColumnExecMarshalUnmarshal(t *testing.T) {
833833
require.NoError(t, vector.AppendFixedList(v1, []int64{1, 2, 1, 3}, nil, m.Mp()))
834834
require.NoError(t, exec.BulkFill(0, []*vector.Vector{v1}))
835835

836-
data, err := exec.marshal()
836+
data, err := exec.Marshal()
837837
require.NoError(t, err)
838838

839839
newExec := newCountColumnExecExec(m, info)
840840
encoded := &EncodedAgg{}
841841
require.NoError(t, encoded.Unmarshal(data))
842842

843-
err = newExec.unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
843+
err = newExec.Unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
844844
require.NoError(t, err)
845845

846846
exec.Free()
@@ -858,14 +858,14 @@ func TestCountColumnExecMarshalUnmarshal(t *testing.T) {
858858
exec := newCountColumnExecExec(m, info)
859859
require.NoError(t, exec.GroupGrow(1))
860860

861-
data, err := exec.marshal()
861+
data, err := exec.Marshal()
862862
require.NoError(t, err)
863863

864864
newExec := newCountColumnExecExec(m, info)
865865
encoded := &EncodedAgg{}
866866
require.NoError(t, encoded.Unmarshal(data))
867867

868-
err = newExec.unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
868+
err = newExec.Unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
869869
require.NoError(t, err)
870870

871871
exec.Free()
@@ -882,7 +882,7 @@ func TestCountColumnExecMarshalUnmarshal(t *testing.T) {
882882
}
883883
exec := newCountColumnExecExec(m, info)
884884

885-
err := exec.unmarshal(m.Mp(), nil, nil, [][]byte{})
885+
err := exec.Unmarshal(m.Mp(), nil, nil, [][]byte{})
886886
require.NoError(t, err)
887887

888888
exec.Free()

pkg/sql/colexec/aggexec/approx_count.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func (exec *approxCountFixedExec[T]) GetOptResult() SplitResult {
3535
return &exec.ret.optSplitResult
3636
}
3737

38-
func (exec *approxCountFixedExec[T]) marshal() ([]byte, error) {
38+
func (exec *approxCountFixedExec[T]) Marshal() ([]byte, error) {
3939
d := exec.singleAggInfo.getEncoded()
4040
r, em, err := exec.ret.marshalToBytes()
4141
if err != nil {
@@ -60,7 +60,7 @@ func (exec *approxCountFixedExec[T]) marshal() ([]byte, error) {
6060
return encoded.Marshal()
6161
}
6262

63-
func (exec *approxCountFixedExec[T]) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
63+
func (exec *approxCountFixedExec[T]) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
6464
err := exec.ret.unmarshalFromBytes(result, empties)
6565
if err != nil {
6666
return err
@@ -90,7 +90,7 @@ func (exec *approxCountVarExec) GetOptResult() SplitResult {
9090
return &exec.ret.optSplitResult
9191
}
9292

93-
func (exec *approxCountVarExec) marshal() ([]byte, error) {
93+
func (exec *approxCountVarExec) Marshal() ([]byte, error) {
9494
d := exec.singleAggInfo.getEncoded()
9595
r, em, err := exec.ret.marshalToBytes()
9696
if err != nil {
@@ -115,7 +115,7 @@ func (exec *approxCountVarExec) marshal() ([]byte, error) {
115115
return encoded.Marshal()
116116
}
117117

118-
func (exec *approxCountVarExec) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
118+
func (exec *approxCountVarExec) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
119119
err := exec.ret.unmarshalFromBytes(result, empties)
120120
if err != nil {
121121
return err

pkg/sql/colexec/aggexec/concat.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (exec *groupConcatExec) GetOptResult() SplitResult {
3737
return &exec.ret.optSplitResult
3838
}
3939

40-
func (exec *groupConcatExec) marshal() ([]byte, error) {
40+
func (exec *groupConcatExec) Marshal() ([]byte, error) {
4141
d := exec.multiAggInfo.getEncoded()
4242
r, em, err := exec.ret.marshalToBytes()
4343
if err != nil {
@@ -59,7 +59,7 @@ func (exec *groupConcatExec) marshal() ([]byte, error) {
5959
return encoded.Marshal()
6060
}
6161

62-
func (exec *groupConcatExec) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
62+
func (exec *groupConcatExec) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
6363
if err := exec.SetExtraInformation(groups[0], 0); err != nil {
6464
return err
6565
}

pkg/sql/colexec/aggexec/count.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (exec *countColumnExec) GetOptResult() SplitResult {
3939
return &exec.ret.optSplitResult
4040
}
4141

42-
func (exec *countColumnExec) marshal() ([]byte, error) {
42+
func (exec *countColumnExec) Marshal() ([]byte, error) {
4343
d := exec.singleAggInfo.getEncoded()
4444
r, em, err := exec.ret.marshalToBytes()
4545
if err != nil {
@@ -63,7 +63,7 @@ func (exec *countColumnExec) marshal() ([]byte, error) {
6363
return encoded.Marshal()
6464
}
6565

66-
func (exec *countColumnExec) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
66+
func (exec *countColumnExec) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
6767
if exec.IsDistinct() {
6868
if len(groups) > 0 {
6969
if err := exec.distinctHash.unmarshal(groups[0]); err != nil {
@@ -286,7 +286,7 @@ func (exec *countStarExec) GetOptResult() SplitResult {
286286
return &exec.ret.optSplitResult
287287
}
288288

289-
func (exec *countStarExec) marshal() ([]byte, error) {
289+
func (exec *countStarExec) Marshal() ([]byte, error) {
290290
d := exec.singleAggInfo.getEncoded()
291291
r, em, err := exec.ret.marshalToBytes()
292292
if err != nil {
@@ -301,7 +301,7 @@ func (exec *countStarExec) marshal() ([]byte, error) {
301301
return encoded.Marshal()
302302
}
303303

304-
func (exec *countStarExec) unmarshal(_ *mpool.MPool, result, empties, _ [][]byte) error {
304+
func (exec *countStarExec) Unmarshal(_ *mpool.MPool, result, empties, _ [][]byte) error {
305305
return exec.ret.unmarshalFromBytes(result, empties)
306306
}
307307

pkg/sql/colexec/aggexec/fromBytesRetBytes.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (exec *aggregatorFromBytesToBytes) GetOptResult() SplitResult {
100100
return &exec.ret.optSplitResult
101101
}
102102

103-
func (exec *aggregatorFromBytesToBytes) marshal() ([]byte, error) {
103+
func (exec *aggregatorFromBytesToBytes) Marshal() ([]byte, error) {
104104
d := exec.singleAggInfo.getEncoded()
105105
r, em, err := exec.ret.marshalToBytes()
106106
if err != nil {
@@ -115,7 +115,7 @@ func (exec *aggregatorFromBytesToBytes) marshal() ([]byte, error) {
115115
return encoded.Marshal()
116116
}
117117

118-
func (exec *aggregatorFromBytesToBytes) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
118+
func (exec *aggregatorFromBytesToBytes) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
119119
exec.execContext.decodeGroupContexts(groups, exec.singleAggInfo.retType, exec.singleAggInfo.argType)
120120
return exec.ret.unmarshalFromBytes(result, empties)
121121
}

pkg/sql/colexec/aggexec/fromBytesRetFixed.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func (exec *aggregatorFromBytesToFixed[to]) GetOptResult() SplitResult {
198198
return &exec.ret.optSplitResult
199199
}
200200

201-
func (exec *aggregatorFromBytesToFixed[to]) marshal() ([]byte, error) {
201+
func (exec *aggregatorFromBytesToFixed[to]) Marshal() ([]byte, error) {
202202
d := exec.singleAggInfo.getEncoded()
203203
r, em, err := exec.ret.marshalToBytes()
204204
if err != nil {
@@ -213,7 +213,7 @@ func (exec *aggregatorFromBytesToFixed[to]) marshal() ([]byte, error) {
213213
return encoded.Marshal()
214214
}
215215

216-
func (exec *aggregatorFromBytesToFixed[to]) unmarshal(mp *mpool.MPool, result, empties, groups [][]byte) error {
216+
func (exec *aggregatorFromBytesToFixed[to]) Unmarshal(mp *mpool.MPool, result, empties, groups [][]byte) error {
217217
exec.execContext.decodeGroupContexts(groups, exec.singleAggInfo.retType, exec.singleAggInfo.argType)
218218
return exec.ret.unmarshalFromBytes(result, empties)
219219
}

pkg/sql/colexec/aggexec/fromFixedRetBytes.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func (exec *aggregatorFromFixedToBytes[from]) GetOptResult() SplitResult {
217217
return &exec.ret.optSplitResult
218218
}
219219

220-
func (exec *aggregatorFromFixedToBytes[from]) marshal() ([]byte, error) {
220+
func (exec *aggregatorFromFixedToBytes[from]) Marshal() ([]byte, error) {
221221
d := exec.singleAggInfo.getEncoded()
222222
r, em, err := exec.ret.marshalToBytes()
223223
if err != nil {
@@ -232,7 +232,7 @@ func (exec *aggregatorFromFixedToBytes[from]) marshal() ([]byte, error) {
232232
return encoded.Marshal()
233233
}
234234

235-
func (exec *aggregatorFromFixedToBytes[from]) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
235+
func (exec *aggregatorFromFixedToBytes[from]) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
236236
exec.execContext.decodeGroupContexts(groups, exec.singleAggInfo.retType, exec.singleAggInfo.argType)
237237
return exec.ret.unmarshalFromBytes(result, empties)
238238
}

pkg/sql/colexec/aggexec/fromFixedRetFixed.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ func (exec *aggregatorFromFixedToFixed[from, to]) GetOptResult() SplitResult {
271271
return &exec.ret.optSplitResult
272272
}
273273

274-
func (exec *aggregatorFromFixedToFixed[from, to]) marshal() ([]byte, error) {
274+
func (exec *aggregatorFromFixedToFixed[from, to]) Marshal() ([]byte, error) {
275275
d := exec.singleAggInfo.getEncoded()
276276
r, em, err := exec.ret.marshalToBytes()
277277
if err != nil {
@@ -287,7 +287,7 @@ func (exec *aggregatorFromFixedToFixed[from, to]) marshal() ([]byte, error) {
287287
return encoded.Marshal()
288288
}
289289

290-
func (exec *aggregatorFromFixedToFixed[from, to]) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
290+
func (exec *aggregatorFromFixedToFixed[from, to]) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
291291
exec.execContext.decodeGroupContexts(groups, exec.singleAggInfo.retType, exec.singleAggInfo.argType)
292292
return exec.ret.unmarshalFromBytes(result, empties)
293293
}

0 commit comments

Comments
 (0)