Skip to content

Commit 7f717ce

Browse files
authored
Merge pull request #2 from sendinblue/feature_bt-map
SRE-000: Feature bt map
2 parents 8944a77 + 47075f4 commit 7f717ce

File tree

9 files changed

+257
-8
lines changed

9 files changed

+257
-8
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,6 @@ jobs:
3030
go get github.com/securego/gosec/cmd/gosec
3131
gosec ./...
3232
- name: Run Golang Linter
33-
uses: golangci/golangci-lint-action@v2
33+
uses: golangci/golangci-lint-action@v3
34+
with:
35+
version: v1.45.2

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ go 1.17
44

55
require (
66
cloud.google.com/go/bigtable v1.13.0
7+
cloud.google.com/go/storage v1.10.0
8+
github.com/davecgh/go-spew v1.1.0
9+
github.com/pierrre/compare v1.0.2
10+
github.com/pkg/errors v0.9.1
711
google.golang.org/api v0.70.0
812
google.golang.org/grpc v1.44.0
913
)

go.sum

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiy
5252
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
5353
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
5454
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
55+
cloud.google.com/go/storage v1.10.0 h1:STgFzyU5/8miMl0//zKh2aQeTyeaUH3WN9bSUiJ09bA=
5556
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
5657
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
5758
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
@@ -78,6 +79,7 @@ github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWH
7879
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
7980
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 h1:zH8ljVhhq7yC0MIeUL/IviMtY8hx2mK8cN9wEYb8ggw=
8081
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
82+
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
8183
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
8284
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
8385
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -145,9 +147,11 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
145147
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
146148
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
147149
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
150+
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
148151
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
149152
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
150153
github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
154+
github.com/google/martian/v3 v3.2.1 h1:d8MncMlErDFTwQGBK1xhv026j9kqhvw1Qv9IbWT1VLQ=
151155
github.com/google/martian/v3 v3.2.1/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk=
152156
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
153157
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
@@ -182,6 +186,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
182186
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
183187
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
184188
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
189+
github.com/pierrre/compare v1.0.2 h1:k4IUsHgh+dbcAOIWCfxVa/7G6STjADH2qmhomv+1quc=
190+
github.com/pierrre/compare v1.0.2/go.mod h1:8UvyRHH+9HS8Pczdd2z5x/wvv67krDwVxoOndaIIDVU=
191+
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
192+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
185193
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
186194
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
187195
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=

mapping/gcs_bucket.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package mapping
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
9+
"cloud.google.com/go/storage"
10+
"github.com/pkg/errors"
11+
"google.golang.org/api/option"
12+
)
13+
14+
type GcloudCreds struct {
15+
Type string `json:"type"`
16+
ProjectID string `json:"project_id"`
17+
PrivateKeyID string `json:"private_key_id"`
18+
PrivateKey string `json:"private_key"`
19+
ClientEmail string `json:"client_email"`
20+
ClientID string `json:"client_id"`
21+
AuthURI string `json:"auth_uri"`
22+
TokenURI string `json:"token_uri"`
23+
AuthProvider string `json:"auth_provider_x509_cert_url"`
24+
CertURL string `json:"client_x509_cert_url"`
25+
}
26+
27+
type gcsBucketGetter struct {
28+
objectGetter interface {
29+
Object(name string) *storage.ObjectHandle
30+
}
31+
}
32+
33+
func NewGCSBucketGetter(gcreds *GcloudCreds, bucketName string) (*gcsBucketGetter, *storage.Client, error) {
34+
credsB, err := json.Marshal(gcreds)
35+
if err != nil {
36+
return nil, nil, errors.Wrap(err, "json marshal")
37+
}
38+
39+
client, err := storage.NewClient(context.Background(), option.WithCredentialsJSON(credsB))
40+
if err != nil {
41+
return nil, nil, errors.Wrap(err, "gcs storage client")
42+
}
43+
return &gcsBucketGetter{objectGetter: client.Bucket(bucketName)}, client, nil
44+
}
45+
46+
func NewGCSBucketGetterFromEnvironment(bucketName string) (*gcsBucketGetter, *storage.Client, error) {
47+
client, err := storage.NewClient(context.Background())
48+
if err != nil {
49+
return nil, nil, errors.Wrap(err, "gcs storage client")
50+
}
51+
return &gcsBucketGetter{objectGetter: client.Bucket(bucketName)}, client, nil
52+
}
53+
54+
func NewGCSBucketGetterWithClient(client *storage.Client, bucketName string) (*gcsBucketGetter, error) {
55+
return &gcsBucketGetter{objectGetter: client.Bucket(bucketName)}, nil
56+
}
57+
58+
// GetStorageWriter returns the storage writer for google cloud storage.
59+
func (r *gcsBucketGetter) GetStorageWriter(ctx context.Context, fileName string) io.WriteCloser {
60+
return r.objectGetter.Object(fileName).NewWriter(ctx)
61+
}
62+
63+
// GetStorageReader returns the storage reader for google cloud storage.
64+
func (r *gcsBucketGetter) GetStorageReader(ctx context.Context, fileName string) (io.ReadCloser, error) {
65+
return r.objectGetter.Object(fileName).NewReader(ctx)
66+
}
67+
68+
func getMappingFilename(eventFamily string, version string, environment string) string {
69+
// event_family/v1.0.0.json
70+
return fmt.Sprintf("%s/%s/%s.json", eventFamily, environment, version)
71+
}

mapping/mapper_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,12 @@ func TestMapper(t *testing.T) {
119119
t.Fatal("should not raise an error")
120120
}
121121
mapper := NewMapper(mapping)
122-
compare(t, mapper, "ui", "1233", "user_id", "1233")
123-
compare(t, mapper, "oi", "1", "is_opted_in", "true")
124-
compare(t, mapper, "3", "1", "order_status", "processing")
122+
compareMappedData(t, mapper, "ui", "1233", "user_id", "1233")
123+
compareMappedData(t, mapper, "oi", "1", "is_opted_in", "true")
124+
compareMappedData(t, mapper, "3", "1", "order_status", "processing")
125125
}
126126

127-
func compare(t *testing.T, m *Mapper, col string, val string, wantedCol string, wantedVal string) {
127+
func compareMappedData(t *testing.T, m *Mapper, col string, val string, wantedCol string, wantedVal string) {
128128
fCol, fVal := getMappedData(m.Mapping, m.rules.toEvent, col, val)
129129
if fCol != wantedCol {
130130
t.Fatalf("wrong column: wanted %s, got %s", wantedCol, fCol)

mapping/mapping.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
Package mapping provides the API to convert data coming from Big Table into a data.Set.
3-
*/
3+
*/
44
/*
55
The mapping system is based on a set of rules described inside a JSON mapping file, here's an example:
66
{
@@ -48,13 +48,17 @@ The mapping system will map it to a new data.Event containing the following data
4848
"is_opted_in": "true",
4949
"order_status": "processing"
5050
}
51-
*/
51+
*/
5252
package mapping
5353

5454
import (
5555
"encoding/json"
56+
"fmt"
57+
"io"
5658
"os"
5759
"path/filepath"
60+
61+
"github.com/pkg/errors"
5862
)
5963

6064
// Mapping describes the mapping between data stored in Big Table and its human-readable representation.
@@ -84,6 +88,32 @@ func LoadMapping(c []byte) (*Mapping, error) {
8488
return m, nil
8589
}
8690

91+
// LoadMappingVersion loads a mapping from a slice of bytes and its version.
92+
// You can use this function if you prefer to open the mapping file yourself.
93+
func LoadMappingVersion(c []byte, version string) (*Mapping, error) {
94+
mv := map[string]*Mapping{}
95+
err := json.Unmarshal(c, &mv)
96+
if err != nil {
97+
return nil, err
98+
}
99+
m, ok := mv[version]
100+
if !ok {
101+
return nil, errors.New(fmt.Sprintf("no mapping found for version %s", version))
102+
}
103+
return m, nil
104+
}
105+
106+
// LoadMappingIO loads a mapping from a IO reader.
107+
func LoadMappingIO(reader io.ReadCloser) (*Mapping, error) {
108+
m := &Mapping{}
109+
decoder := json.NewDecoder(reader)
110+
err := decoder.Decode(&m)
111+
if err != nil {
112+
return nil, errors.Wrap(err, "decode mapping")
113+
}
114+
return m, nil
115+
}
116+
87117
// LoadMappingFromFile loads a mapping from a file.
88118
func LoadMappingFromFile(path string) (*Mapping, error) {
89119
c, err := os.ReadFile(filepath.Clean(path))

mapping/reader.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package mapping
2+
3+
import (
4+
"context"
5+
"io"
6+
"time"
7+
8+
"cloud.google.com/go/storage"
9+
)
10+
11+
type Reader struct {
12+
readerBucket func(ctx context.Context, fileName string) (io.ReadCloser, error)
13+
}
14+
15+
func NewReader(gcreds *GcloudCreds, bucketName string) (*Reader, *storage.Client, error) {
16+
gb, gbClient, err := NewGCSBucketGetter(gcreds, bucketName)
17+
if err != nil {
18+
return nil, nil, err
19+
}
20+
return &Reader{
21+
readerBucket: gb.GetStorageReader,
22+
}, gbClient, nil
23+
}
24+
25+
func newReaderFromGCSClient(gbSL func(ctx context.Context, fileName string) (io.ReadCloser, error)) *Reader {
26+
return &Reader{
27+
readerBucket: gbSL,
28+
}
29+
}
30+
31+
func (r *Reader) Load(ctx context.Context, eventFamily string, version string, environment string) (*Mapping, error) {
32+
ctx, cancel := context.WithTimeout(ctx, time.Second*50)
33+
defer cancel()
34+
filename := getMappingFilename(eventFamily, version, environment)
35+
reader, err := r.readerBucket(ctx, filename)
36+
if err != nil {
37+
return nil, err
38+
}
39+
m, err := LoadMappingIO(reader)
40+
if err != nil {
41+
return nil, err
42+
}
43+
err = reader.Close()
44+
if err != nil {
45+
return nil, err
46+
}
47+
return m, nil
48+
}

mapping/writer.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package mapping
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"io"
7+
"time"
8+
9+
std_errors "errors"
10+
11+
"cloud.google.com/go/storage"
12+
"github.com/davecgh/go-spew/spew"
13+
"github.com/pierrre/compare"
14+
"github.com/pkg/errors"
15+
)
16+
17+
type Writer struct {
18+
writerBucket func(ctx context.Context, fileName string) io.WriteCloser
19+
readerLoad func(ctx context.Context, eventFamily string, version string, environment string) (*Mapping, error)
20+
}
21+
22+
func NewWriter(gcreds *GcloudCreds, bucketName string) (*Writer, *storage.Client, error) {
23+
gb, gbClient, err := NewGCSBucketGetter(gcreds, bucketName)
24+
if err != nil {
25+
return nil, nil, errors.Wrap(err, "new gcs bucket")
26+
}
27+
28+
return &Writer{
29+
writerBucket: gb.GetStorageWriter,
30+
readerLoad: newReaderFromGCSClient(gb.GetStorageReader).Load,
31+
}, gbClient, nil
32+
}
33+
34+
func NewWriterFromGCSClient(gbSW func(ctx context.Context, fileName string) io.WriteCloser, gbSL func(ctx context.Context, fileName string) (io.ReadCloser, error)) (*Writer, error) {
35+
return &Writer{
36+
writerBucket: gbSW,
37+
readerLoad: newReaderFromGCSClient(gbSL).Load,
38+
}, nil
39+
}
40+
41+
func (w *Writer) Upload(ctx context.Context, eventFamily, version string, environment string, writeMapping *Mapping, forceUpload bool) error {
42+
ctx, cancel := context.WithTimeout(ctx, time.Second*50)
43+
defer cancel()
44+
filename := getMappingFilename(eventFamily, version, environment)
45+
//if force upload is false, we check for already existing mapping and return without overwriting
46+
if !forceUpload {
47+
readMapping, err := w.readerLoad(ctx, eventFamily, version, environment)
48+
if err != nil && UnwrapAll(err) != storage.ErrObjectNotExist {
49+
return errors.Wrap(err, "get storage reader")
50+
}
51+
diff := compare.Compare(readMapping, writeMapping)
52+
if readMapping != nil {
53+
return errors.Errorf("mapping already exists:\nread:\n%s\nwrite:\n%s\ndiff:\n%+v", spew.Sdump(readMapping), spew.Sdump(writeMapping), diff)
54+
}
55+
}
56+
57+
// only if force upload is true or object does not exists
58+
writer := w.writerBucket(ctx, filename)
59+
encoder := json.NewEncoder(writer)
60+
err := encoder.Encode(writeMapping)
61+
if err != nil {
62+
return errors.Wrap(err, "encode mapping")
63+
}
64+
err = writer.Close()
65+
if err != nil {
66+
return errors.Wrap(err, "close uploaded gcp file")
67+
}
68+
return nil
69+
}
70+
71+
// Unwrap calls std_errors.Unwrap.
72+
func Unwrap(err error) error {
73+
return std_errors.Unwrap(err)
74+
}
75+
76+
// UnwrapAll unwraps all nested errors, and returns the last one.
77+
func UnwrapAll(err error) error {
78+
for {
79+
werr := Unwrap(err)
80+
if werr == nil {
81+
return err
82+
}
83+
err = werr
84+
}
85+
}

repository/repository_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ import (
44
"context"
55
"embed"
66
"fmt"
7-
"google.golang.org/grpc/credentials/insecure"
87
"log"
98
"regexp"
109
"strconv"
1110
"testing"
1211
"time"
1312

13+
"google.golang.org/grpc/credentials/insecure"
14+
1415
"cloud.google.com/go/bigtable"
1516
"cloud.google.com/go/bigtable/bttest"
1617
"github.com/sendinblue/bigtable-access-layer/data"

0 commit comments

Comments
 (0)