Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/fileservice/file_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,9 @@ func testFileService(
}

w, err := rwFS.NewWriter(ctx, "foo")
if moerr.IsMoErrCode(err, moerr.ErrNotSupported) {
return
}
assert.Nil(t, err)
assert.NotNil(t, w)
_, err = w.Write([]byte("foobarbaz"))
Expand Down
40 changes: 35 additions & 5 deletions pkg/fileservice/sub_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@ package fileservice
import (
"context"
"fmt"
"io"
"iter"
"path"
"strings"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
)

type subPathFS struct {
upstream FileService
path string
name string
upstream FileService
rwUpstream ReaderWriterFileService
path string
name string
}

// SubPath returns a FileService instance that operates at specified sub path of the upstream instance
func SubPath(upstream FileService, path string) FileService {
return &subPathFS{
ret := &subPathFS{
upstream: upstream,
path: path,
name: strings.Join([]string{
Expand All @@ -39,9 +43,13 @@ func SubPath(upstream FileService, path string) FileService {
path,
}, ","),
}
if rwfs, ok := upstream.(ReaderWriterFileService); ok {
ret.rwUpstream = rwfs
}
return ret
}

var _ FileService = new(subPathFS)
var _ ReaderWriterFileService = new(subPathFS)

func (s *subPathFS) Name() string {
return s.name
Expand Down Expand Up @@ -143,6 +151,28 @@ func (s *subPathFS) Cost() *CostAttr {
return s.upstream.Cost()
}

func (s *subPathFS) NewReader(ctx context.Context, filePath string) (io.ReadCloser, error) {
p, err := s.toUpstreamPath(filePath)
if err != nil {
return nil, err
}
if s.rwUpstream != nil {
return s.rwUpstream.NewReader(ctx, p)
}
return nil, moerr.NewNotSupportedNoCtx("not ReaderWriterFileService")
}

func (s *subPathFS) NewWriter(ctx context.Context, filePath string) (io.WriteCloser, error) {
p, err := s.toUpstreamPath(filePath)
if err != nil {
return nil, err
}
if s.rwUpstream != nil {
return s.rwUpstream.NewWriter(ctx, p)
}
return nil, moerr.NewNotSupportedNoCtx("not ReaderWriterFileService")
}

var _ MutableFileService = new(subPathFS)

func (s *subPathFS) NewMutator(ctx context.Context, filePath string) (Mutator, error) {
Expand Down
26 changes: 13 additions & 13 deletions pkg/sql/colexec/aggexec/aggFrame_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,12 +699,12 @@ func TestGroupConcatExecMarshalUnmarshal(t *testing.T) {
require.NoError(t, vector.AppendBytes(v1, []byte("test1"), false, m.Mp()))
require.NoError(t, exec.Fill(0, 0, []*vector.Vector{v1}))

data, err := exec.marshal()
data, err := exec.Marshal()
require.NoError(t, err)
require.NotNil(t, data)

newExec := newGroupConcatExec(m, info, ",")
err = newExec.unmarshal(m.Mp(), nil, nil, [][]byte{[]byte(",")})
err = newExec.Unmarshal(m.Mp(), nil, nil, [][]byte{[]byte(",")})
require.NoError(t, err)

require.Equal(t, []byte(","), newExec.(*groupConcatExec).separator)
Expand All @@ -722,14 +722,14 @@ func TestGroupConcatExecMarshalUnmarshal(t *testing.T) {
}
exec := newGroupConcatExec(m, info, "|")

data, err := exec.marshal()
data, err := exec.Marshal()
require.NoError(t, err)

newExec := newGroupConcatExec(m, info, ",")
encoded := &EncodedAgg{}
require.NoError(t, encoded.Unmarshal(data))

err = newExec.unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
err = newExec.Unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
require.NoError(t, err)

require.Equal(t, []byte("|"), newExec.(*groupConcatExec).separator)
Expand All @@ -753,14 +753,14 @@ func TestGroupConcatExecMarshalUnmarshal(t *testing.T) {
require.NoError(t, vector.AppendBytes(v1, []byte("distinct1"), false, m.Mp()))
require.NoError(t, exec.Fill(0, 0, []*vector.Vector{v1}))

data, err := exec.marshal()
data, err := exec.Marshal()
require.NoError(t, err)

newExec := newGroupConcatExec(m, info, ",")
encoded := &EncodedAgg{}
require.NoError(t, encoded.Unmarshal(data))

err = newExec.unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
err = newExec.Unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
require.NoError(t, err)

exec.Free()
Expand Down Expand Up @@ -803,14 +803,14 @@ func TestCountColumnExecMarshalUnmarshal(t *testing.T) {
require.NoError(t, vector.AppendFixedList(v1, []int64{1, 2, 3}, nil, m.Mp()))
require.NoError(t, exec.BulkFill(0, []*vector.Vector{v1}))

data, err := exec.marshal()
data, err := exec.Marshal()
require.NoError(t, err)

newExec := newCountColumnExecExec(m, info)
encoded := &EncodedAgg{}
require.NoError(t, encoded.Unmarshal(data))

err = newExec.unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
err = newExec.Unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
require.NoError(t, err)

exec.Free()
Expand All @@ -833,14 +833,14 @@ func TestCountColumnExecMarshalUnmarshal(t *testing.T) {
require.NoError(t, vector.AppendFixedList(v1, []int64{1, 2, 1, 3}, nil, m.Mp()))
require.NoError(t, exec.BulkFill(0, []*vector.Vector{v1}))

data, err := exec.marshal()
data, err := exec.Marshal()
require.NoError(t, err)

newExec := newCountColumnExecExec(m, info)
encoded := &EncodedAgg{}
require.NoError(t, encoded.Unmarshal(data))

err = newExec.unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
err = newExec.Unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
require.NoError(t, err)

exec.Free()
Expand All @@ -858,14 +858,14 @@ func TestCountColumnExecMarshalUnmarshal(t *testing.T) {
exec := newCountColumnExecExec(m, info)
require.NoError(t, exec.GroupGrow(1))

data, err := exec.marshal()
data, err := exec.Marshal()
require.NoError(t, err)

newExec := newCountColumnExecExec(m, info)
encoded := &EncodedAgg{}
require.NoError(t, encoded.Unmarshal(data))

err = newExec.unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
err = newExec.Unmarshal(m.Mp(), encoded.Result, encoded.Empties, encoded.Groups)
require.NoError(t, err)

exec.Free()
Expand All @@ -882,7 +882,7 @@ func TestCountColumnExecMarshalUnmarshal(t *testing.T) {
}
exec := newCountColumnExecExec(m, info)

err := exec.unmarshal(m.Mp(), nil, nil, [][]byte{})
err := exec.Unmarshal(m.Mp(), nil, nil, [][]byte{})
require.NoError(t, err)

exec.Free()
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/aggexec/approx_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (exec *approxCountFixedExec[T]) GetOptResult() SplitResult {
return &exec.ret.optSplitResult
}

func (exec *approxCountFixedExec[T]) marshal() ([]byte, error) {
func (exec *approxCountFixedExec[T]) Marshal() ([]byte, error) {
d := exec.singleAggInfo.getEncoded()
r, em, err := exec.ret.marshalToBytes()
if err != nil {
Expand All @@ -60,7 +60,7 @@ func (exec *approxCountFixedExec[T]) marshal() ([]byte, error) {
return encoded.Marshal()
}

func (exec *approxCountFixedExec[T]) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
func (exec *approxCountFixedExec[T]) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
err := exec.ret.unmarshalFromBytes(result, empties)
if err != nil {
return err
Expand Down Expand Up @@ -90,7 +90,7 @@ func (exec *approxCountVarExec) GetOptResult() SplitResult {
return &exec.ret.optSplitResult
}

func (exec *approxCountVarExec) marshal() ([]byte, error) {
func (exec *approxCountVarExec) Marshal() ([]byte, error) {
d := exec.singleAggInfo.getEncoded()
r, em, err := exec.ret.marshalToBytes()
if err != nil {
Expand All @@ -115,7 +115,7 @@ func (exec *approxCountVarExec) marshal() ([]byte, error) {
return encoded.Marshal()
}

func (exec *approxCountVarExec) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
func (exec *approxCountVarExec) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
err := exec.ret.unmarshalFromBytes(result, empties)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/aggexec/concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (exec *groupConcatExec) GetOptResult() SplitResult {
return &exec.ret.optSplitResult
}

func (exec *groupConcatExec) marshal() ([]byte, error) {
func (exec *groupConcatExec) Marshal() ([]byte, error) {
d := exec.multiAggInfo.getEncoded()
r, em, err := exec.ret.marshalToBytes()
if err != nil {
Expand All @@ -59,7 +59,7 @@ func (exec *groupConcatExec) marshal() ([]byte, error) {
return encoded.Marshal()
}

func (exec *groupConcatExec) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
func (exec *groupConcatExec) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
if err := exec.SetExtraInformation(groups[0], 0); err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/aggexec/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (exec *countColumnExec) GetOptResult() SplitResult {
return &exec.ret.optSplitResult
}

func (exec *countColumnExec) marshal() ([]byte, error) {
func (exec *countColumnExec) Marshal() ([]byte, error) {
d := exec.singleAggInfo.getEncoded()
r, em, err := exec.ret.marshalToBytes()
if err != nil {
Expand All @@ -63,7 +63,7 @@ func (exec *countColumnExec) marshal() ([]byte, error) {
return encoded.Marshal()
}

func (exec *countColumnExec) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
func (exec *countColumnExec) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
if exec.IsDistinct() {
if len(groups) > 0 {
if err := exec.distinctHash.unmarshal(groups[0]); err != nil {
Expand Down Expand Up @@ -286,7 +286,7 @@ func (exec *countStarExec) GetOptResult() SplitResult {
return &exec.ret.optSplitResult
}

func (exec *countStarExec) marshal() ([]byte, error) {
func (exec *countStarExec) Marshal() ([]byte, error) {
d := exec.singleAggInfo.getEncoded()
r, em, err := exec.ret.marshalToBytes()
if err != nil {
Expand All @@ -301,7 +301,7 @@ func (exec *countStarExec) marshal() ([]byte, error) {
return encoded.Marshal()
}

func (exec *countStarExec) unmarshal(_ *mpool.MPool, result, empties, _ [][]byte) error {
func (exec *countStarExec) Unmarshal(_ *mpool.MPool, result, empties, _ [][]byte) error {
return exec.ret.unmarshalFromBytes(result, empties)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/aggexec/fromBytesRetBytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (exec *aggregatorFromBytesToBytes) GetOptResult() SplitResult {
return &exec.ret.optSplitResult
}

func (exec *aggregatorFromBytesToBytes) marshal() ([]byte, error) {
func (exec *aggregatorFromBytesToBytes) Marshal() ([]byte, error) {
d := exec.singleAggInfo.getEncoded()
r, em, err := exec.ret.marshalToBytes()
if err != nil {
Expand All @@ -115,7 +115,7 @@ func (exec *aggregatorFromBytesToBytes) marshal() ([]byte, error) {
return encoded.Marshal()
}

func (exec *aggregatorFromBytesToBytes) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
func (exec *aggregatorFromBytesToBytes) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
exec.execContext.decodeGroupContexts(groups, exec.singleAggInfo.retType, exec.singleAggInfo.argType)
return exec.ret.unmarshalFromBytes(result, empties)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/aggexec/fromBytesRetFixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (exec *aggregatorFromBytesToFixed[to]) GetOptResult() SplitResult {
return &exec.ret.optSplitResult
}

func (exec *aggregatorFromBytesToFixed[to]) marshal() ([]byte, error) {
func (exec *aggregatorFromBytesToFixed[to]) Marshal() ([]byte, error) {
d := exec.singleAggInfo.getEncoded()
r, em, err := exec.ret.marshalToBytes()
if err != nil {
Expand All @@ -213,7 +213,7 @@ func (exec *aggregatorFromBytesToFixed[to]) marshal() ([]byte, error) {
return encoded.Marshal()
}

func (exec *aggregatorFromBytesToFixed[to]) unmarshal(mp *mpool.MPool, result, empties, groups [][]byte) error {
func (exec *aggregatorFromBytesToFixed[to]) Unmarshal(mp *mpool.MPool, result, empties, groups [][]byte) error {
exec.execContext.decodeGroupContexts(groups, exec.singleAggInfo.retType, exec.singleAggInfo.argType)
return exec.ret.unmarshalFromBytes(result, empties)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/aggexec/fromFixedRetBytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (exec *aggregatorFromFixedToBytes[from]) GetOptResult() SplitResult {
return &exec.ret.optSplitResult
}

func (exec *aggregatorFromFixedToBytes[from]) marshal() ([]byte, error) {
func (exec *aggregatorFromFixedToBytes[from]) Marshal() ([]byte, error) {
d := exec.singleAggInfo.getEncoded()
r, em, err := exec.ret.marshalToBytes()
if err != nil {
Expand All @@ -232,7 +232,7 @@ func (exec *aggregatorFromFixedToBytes[from]) marshal() ([]byte, error) {
return encoded.Marshal()
}

func (exec *aggregatorFromFixedToBytes[from]) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
func (exec *aggregatorFromFixedToBytes[from]) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
exec.execContext.decodeGroupContexts(groups, exec.singleAggInfo.retType, exec.singleAggInfo.argType)
return exec.ret.unmarshalFromBytes(result, empties)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/aggexec/fromFixedRetFixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (exec *aggregatorFromFixedToFixed[from, to]) GetOptResult() SplitResult {
return &exec.ret.optSplitResult
}

func (exec *aggregatorFromFixedToFixed[from, to]) marshal() ([]byte, error) {
func (exec *aggregatorFromFixedToFixed[from, to]) Marshal() ([]byte, error) {
d := exec.singleAggInfo.getEncoded()
r, em, err := exec.ret.marshalToBytes()
if err != nil {
Expand All @@ -287,7 +287,7 @@ func (exec *aggregatorFromFixedToFixed[from, to]) marshal() ([]byte, error) {
return encoded.Marshal()
}

func (exec *aggregatorFromFixedToFixed[from, to]) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
func (exec *aggregatorFromFixedToFixed[from, to]) Unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
exec.execContext.decodeGroupContexts(groups, exec.singleAggInfo.retType, exec.singleAggInfo.argType)
return exec.ret.unmarshalFromBytes(result, empties)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/aggexec/median.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (exec *medianColumnExecSelf[T, R]) GetOptResult() SplitResult {
return &exec.ret.optSplitResult
}

func (exec *medianColumnExecSelf[T, R]) marshal() ([]byte, error) {
func (exec *medianColumnExecSelf[T, R]) Marshal() ([]byte, error) {
d := exec.singleAggInfo.getEncoded()
r, em, err := exec.ret.marshalToBytes()
if err != nil {
Expand All @@ -78,7 +78,7 @@ func (exec *medianColumnExecSelf[T, R]) marshal() ([]byte, error) {
return encoded.Marshal()
}

func (exec *medianColumnExecSelf[T, R]) unmarshal(mp *mpool.MPool, result, empties, groups [][]byte) error {
func (exec *medianColumnExecSelf[T, R]) Unmarshal(mp *mpool.MPool, result, empties, groups [][]byte) error {
if len(groups) > 0 {
exec.groups = make([]*Vectors[T], len(groups))
for i := range exec.groups {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/aggexec/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

func MarshalAggFuncExec(exec AggFuncExec) ([]byte, error) {
return exec.marshal()
return exec.Marshal()
}

func UnmarshalAggFuncExec(
Expand Down Expand Up @@ -48,7 +48,7 @@ func UnmarshalAggFuncExec(
mp = mg.Mp()
}

if err := exec.unmarshal(
if err := exec.Unmarshal(
mp, encoded.Result, encoded.Empties, encoded.Groups); err != nil {
exec.Free()
return nil, err
Expand Down
Loading
Loading