Skip to content

Commit 5d38363

Browse files
AWS storage. Create image thumbnails
1 parent fc3fb17 commit 5d38363

20 files changed

+525
-113
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ vendor/
1717
*.csv
1818
.env*
1919
*.db
20-
config.yaml
20+
dipdup.yml
2121
build/metadata

.vscode/launch.json

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@
1010
"request": "launch",
1111
"mode": "debug",
1212
"program": "${workspaceFolder}/cmd/metadata/",
13+
"envFile": "${workspaceFolder}/.env",
1314
"args": [
14-
"-f",
15-
"${workspaceFolder}/config.yaml"
15+
"-c",
16+
"${workspaceFolder}/build/dipdup.yml"
1617
]
1718
}
1819
]

Makefile

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
-include .env
2+
export $(shell sed 's/=.*//' .env)
3+
14
.PHONY: build
25

36
build:
@@ -8,4 +11,8 @@ debug: build
811
cd build && POSTGRES_HOST=localhost ./metadata
912

1013
run:
11-
docker-compose up -d --build
14+
docker-compose up -d --build
15+
16+
metadata:
17+
docker-compose up -d db
18+
cd cmd/metadata && go run .

build/dipdup.yml

+5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ metadata:
99
ipfs_timeout: 10
1010
http_timeout: 10
1111
max_retry_count_on_error: 3
12+
aws:
13+
bucket_name: dipdup-thumbnails
14+
region: eu-central-1
15+
access_key_id: ${AWS_ACCESS_KEY_ID}
16+
secret_access_key: ${AWS_SECRET_ACCESS_KEY}
1217
indexers:
1318
mainnet:
1419
datasources:

cmd/metadata/config/config.go

+9
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ type Settings struct {
113113
HTTPTimeout uint64 `yaml:"http_timeout"`
114114
MaxRetryCountOnError uint64 `yaml:"max_retry_count_on_error"`
115115
Index []string `yaml:"index"`
116+
AWS AWS `yaml:"aws"`
116117
}
117118

118119
// Validate -
@@ -142,3 +143,11 @@ func (cfg *Settings) Validate() error {
142143

143144
return nil
144145
}
146+
147+
// AWS -
148+
type AWS struct {
149+
BucketName string `yaml:"bucket_name"`
150+
Region string `yaml:"region"`
151+
AccessKey string `yaml:"access_key_id"`
152+
Secret string `yaml:"secret_access_key"`
153+
}

cmd/metadata/contract.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ import (
1313
)
1414

1515
func (indexer *Indexer) processContractMetadata(update api.BigMapUpdate, tx *gorm.DB) error {
16+
if update.Content != nil {
17+
return nil
18+
}
1619
if update.Content.Hash != emptyHash {
1720
return indexer.ctx.Add(update, indexer.network)
1821
}
@@ -51,7 +54,7 @@ func (indexer *Indexer) resolveContractMetadata(cm *models.ContractMetadata) {
5154
switch {
5255
case errors.Is(err, resolver.ErrNoIPFSResponse) || errors.Is(err, resolver.ErrTezosStorageKeyNotFound):
5356
cm.RetryCount += 1
54-
if cm.RetryCount < int(indexer.maxRetryCount) {
57+
if cm.RetryCount < int(indexer.settings.MaxRetryCountOnError) {
5558
indexer.logContractMetadata(*cm, fmt.Sprintf("Retry: %s", err.Error()), "warn")
5659
} else {
5760
cm.Status = models.StatusFailed

cmd/metadata/helpers/ipfs.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package helpers
2+
3+
import (
4+
"fmt"
5+
"regexp"
6+
"strings"
7+
8+
"github.com/ipfs/go-cid"
9+
)
10+
11+
const (
12+
prefixIpfs = "ipfs://"
13+
)
14+
15+
// IPFSHash - separate IPFS hash from link
16+
func IPFSHash(link string) (string, error) {
17+
hash := strings.TrimPrefix(link, prefixIpfs)
18+
if _, err := cid.Decode(hash); err != nil {
19+
return "", err
20+
}
21+
return hash, nil
22+
}
23+
24+
// IPFSLink - get gateway link
25+
func IPFSLink(gateway, hash string) string {
26+
return fmt.Sprintf("%s/ipfs/%s", gateway, hash)
27+
}
28+
29+
var ipfsURL = regexp.MustCompile(`ipfs:\/\/(?P<hash>Qm[123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz]{44})`)
30+
31+
// FindAllIPFSLinks -
32+
func FindAllIPFSLinks(data []byte) []string {
33+
matches := ipfsURL.FindAllSubmatch(data, -1)
34+
if len(matches) == 0 {
35+
return nil
36+
}
37+
38+
res := make([]string, 0)
39+
for i := range matches {
40+
if len(matches[i]) != 2 {
41+
continue
42+
}
43+
res = append(res, string(matches[i][1]))
44+
}
45+
return res
46+
}

cmd/metadata/helpers/string.go

+6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package helpers
22

33
import (
4+
"bytes"
45
"encoding/hex"
56
"strings"
67
)
@@ -19,3 +20,8 @@ func IsJSON(val string) bool {
1920
func Decode(data []byte) ([]byte, error) {
2021
return hex.DecodeString(Trim(string(data)))
2122
}
23+
24+
// Escape -
25+
func Escape(data []byte) []byte {
26+
return bytes.ReplaceAll(data, []byte{0x5c, 0x75, 0x30, 0x30, 0x30, 0x30}, []byte("0x00")) // \u0000 => 0x00
27+
}

cmd/metadata/indexer.go

+35-21
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package main
22

33
import (
4-
"errors"
54
"strings"
65
"sync"
76

7+
"github.com/pkg/errors"
88
log "github.com/sirupsen/logrus"
99
"gorm.io/gorm"
1010

@@ -14,21 +14,23 @@ import (
1414
"github.com/dipdup-net/metadata/cmd/metadata/context"
1515
"github.com/dipdup-net/metadata/cmd/metadata/models"
1616
"github.com/dipdup-net/metadata/cmd/metadata/resolver"
17+
"github.com/dipdup-net/metadata/cmd/metadata/storage"
1718
"github.com/dipdup-net/metadata/cmd/metadata/tzkt"
1819
)
1920

2021
// Indexer -
2122
type Indexer struct {
22-
network string
23-
indexName string
24-
state state.State
25-
resolver resolver.Receiver
26-
db *gorm.DB
27-
scanner *tzkt.Scanner
28-
ctx *context.Context
29-
contracts *Queue
30-
tokens *Queue
31-
maxRetryCount uint64
23+
network string
24+
indexName string
25+
state state.State
26+
resolver resolver.Receiver
27+
db *gorm.DB
28+
scanner *tzkt.Scanner
29+
ctx *context.Context
30+
contracts *Queue
31+
tokens *Queue
32+
thumbnailCreator *ThumbnailCreator
33+
settings config.Settings
3234

3335
stop chan struct{}
3436
wg sync.WaitGroup
@@ -50,16 +52,19 @@ func NewIndexer(network string, indexerConfig *config.Indexer, database generalC
5052
}
5153

5254
indexer := &Indexer{
53-
scanner: tzkt.New(indexerConfig.DataSource.Tzkt, filters.Accounts...),
54-
network: network,
55-
indexName: models.IndexName(network),
56-
resolver: rslvr,
57-
maxRetryCount: settings.MaxRetryCountOnError,
58-
ctx: ctx,
59-
db: db,
60-
stop: make(chan struct{}, 1),
55+
scanner: tzkt.New(indexerConfig.DataSource.Tzkt, filters.Accounts...),
56+
network: network,
57+
indexName: models.IndexName(network),
58+
resolver: rslvr,
59+
settings: settings,
60+
ctx: ctx,
61+
db: db,
62+
stop: make(chan struct{}, 1),
6163
}
6264

65+
if aws := storage.NewAWS(settings.AWS.AccessKey, settings.AWS.Secret, settings.AWS.Region, settings.AWS.BucketName); aws != nil {
66+
indexer.thumbnailCreator = NewThumbnailCreator(aws, db, settings.IPFSGateways)
67+
}
6368
indexer.contracts = NewQueue(db, 15, 60, indexer.onContractFlush, indexer.onContractTick)
6469
indexer.tokens = NewQueue(db, 15, 60, indexer.onTokenFlush, indexer.onTokenTick)
6570

@@ -76,6 +81,9 @@ func (indexer *Indexer) Start() error {
7681
return err
7782
}
7883

84+
if indexer.thumbnailCreator != nil {
85+
indexer.thumbnailCreator.Start()
86+
}
7987
indexer.contracts.Start()
8088
indexer.tokens.Start()
8189

@@ -92,6 +100,12 @@ func (indexer *Indexer) Close() error {
92100
indexer.stop <- struct{}{}
93101
indexer.wg.Wait()
94102

103+
if indexer.thumbnailCreator != nil {
104+
if err := indexer.thumbnailCreator.Close(); err != nil {
105+
return err
106+
}
107+
}
108+
95109
if err := indexer.scanner.Close(); err != nil {
96110
return err
97111
}
@@ -175,11 +189,11 @@ func (indexer *Indexer) handlerUpdate(msg tzkt.Message) error {
175189
switch path[len(path)-1] {
176190
case "token_metadata":
177191
if err := indexer.processTokenMetadata(msg.Body[i], tx); err != nil {
178-
return err
192+
return errors.Wrap(err, "token_metadata")
179193
}
180194
case "metadata":
181195
if err := indexer.processContractMetadata(msg.Body[i], tx); err != nil {
182-
return err
196+
return errors.Wrap(err, "contract_metadata")
183197
}
184198
}
185199
}

cmd/metadata/models/contract_metadata.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ type ContractMetadata struct {
1717
}
1818

1919
// Status - metadata status
20-
type Status string
20+
type Status int
2121

2222
const (
23-
StatusNew Status = "new"
24-
StatusFailed Status = "failed"
25-
StatusApplied Status = "applied"
23+
StatusNew Status = iota + 1
24+
StatusFailed
25+
StatusApplied
2626
)
2727

2828
// GetContractMetadata -

cmd/metadata/models/token_metadata.go

+29-8
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ import (
77

88
// TokenMetadata -
99
type TokenMetadata struct {
10-
Network string `gorm:"primaryKey"`
11-
Contract string `gorm:"primaryKey"`
12-
TokenID uint64 `gorm:"primaryKey"`
13-
Link string
14-
RetryCount int
15-
Status Status
16-
UpdatedAt int
17-
Metadata datatypes.JSON
10+
Network string `gorm:"primaryKey"`
11+
Contract string `gorm:"primaryKey"`
12+
TokenID uint64 `gorm:"primaryKey"`
13+
Link string
14+
RetryCount int `gorm:"default:0"`
15+
Status Status
16+
UpdatedAt int
17+
Metadata datatypes.JSON
18+
ImageProcessed bool
19+
ImageRetryCount int `gorm:"default:0"`
1820
}
1921

2022
// GetTokenMetadata -
@@ -29,3 +31,22 @@ func GetTokenMetadata(tx *gorm.DB, status Status, limit, offset int) (all []Toke
2931
err = query.Order("retry_count asc").Find(&all).Error
3032
return
3133
}
34+
35+
// GetTokenMetadataWithUnprocessedImages -
36+
func GetTokenMetadataWithUnprocessedImages(tx *gorm.DB) (all []TokenMetadata, err error) {
37+
err = tx.Model(&TokenMetadata{}).Where("status = 3 AND image_processed = false AND image_retry_count < 3").Limit(5).Order("image_retry_count desc").Find(&all).Error
38+
return
39+
}
40+
41+
// SetImageProcessed -
42+
func (tm *TokenMetadata) SetImageProcessed(tx *gorm.DB) error {
43+
tm.ImageRetryCount += 1
44+
updates := map[string]interface{}{
45+
"image_retry_count": tm.ImageRetryCount,
46+
}
47+
48+
if tm.ImageProcessed {
49+
updates["image_processed"] = true
50+
}
51+
return tx.Model(&tm).Updates(updates).Error
52+
}

cmd/metadata/resolver/http.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"strings"
99
"time"
1010

11+
"github.com/dipdup-net/metadata/cmd/metadata/helpers"
1112
"github.com/pkg/errors"
1213
)
1314

@@ -69,7 +70,12 @@ func (s Http) Resolve(network, address, link string) ([]byte, error) {
6970
return nil, errors.Errorf("Invalid status code: %s", resp.Status)
7071
}
7172

72-
return ioutil.ReadAll(io.LimitReader(resp.Body, 20971520)) // 20 MB limit for metadata
73+
data, err := ioutil.ReadAll(io.LimitReader(resp.Body, 20971520)) // 20 MB limit for metadata
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
return helpers.Escape(data), nil
7379
}
7480

7581
// Is -

0 commit comments

Comments
 (0)