Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

Swap pricing #2054

Draft
wants to merge 67 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
6050689
api: WIP api rns resolver
santicomp2014 Sep 27, 2019
a2f4fec
rns test
santicomp2014 Sep 30, 2019
b5fb1dc
swarm module added rns
santicomp2014 Oct 7, 2019
a7fcb59
Merge branch 'master' into swarm_rns
santicomp2014 Oct 7, 2019
71ca0c4
api: fixed test rns
santicomp2014 Oct 7, 2019
914d2ff
Merge pull request #1 from ethersphere/master
santicomp2014 Oct 29, 2019
17ca207
Merge branch 'master' into swarm_rns
santicomp2014 Oct 29, 2019
99d1211
added rds-swarm mod
santicomp2014 Oct 29, 2019
2bee967
api: changed resolution to ResolveDomainContent
santicomp2014 Nov 6, 2019
07a4f58
Merge remote-tracking branch 'upstream/master'
santicomp2014 Nov 7, 2019
5d68b10
Merge branch 'master' into swarm_rns
santicomp2014 Nov 7, 2019
1ab9295
api: refactor TestRNSResolver function
mortelli Nov 7, 2019
26239b3
api: refactor TestRNSResolver function
mortelli Nov 7, 2019
402c6ee
api: replace path.Ext call with publicsuffix.PublicSuffix call in Res…
mortelli Nov 7, 2019
2fc2b51
api: remove ToLower call for TLD in Resolve function
mortelli Nov 7, 2019
b8a1461
api: refactor TestRNSResolve function
mortelli Nov 7, 2019
dd9bc01
api,cmd: added rns flag
santicomp2014 Nov 7, 2019
2e5b687
api: iterate TestRNSResolve function
mortelli Nov 8, 2019
6a5b113
main: handle rnsAPI flag as single string instead of array
mortelli Nov 8, 2019
10f63f7
api: add expected error to TestRNSResolve function
mortelli Nov 8, 2019
bcc8900
vendor: re applied
santicomp2014 Nov 8, 2019
faf5652
Merge branch 'swarm_rns' of github.com:santicomp2014/swarm into swarm…
santicomp2014 Nov 8, 2019
b437f78
api: update rns library, add ErrNoContent to resolver test
mortelli Nov 8, 2019
0fb8d6b
vendor: rollback
santicomp2014 Nov 11, 2019
badce67
vendor: fix vendor
santicomp2014 Nov 11, 2019
de3ecbb
vendor: fix vendor
santicomp2014 Nov 11, 2019
8d5e639
vendor: make vendor
santicomp2014 Nov 11, 2019
dffe427
vendor,swarm,main: added parseRnsAPIAddress and updated vendor
santicomp2014 Nov 12, 2019
47cdaf5
Merge branch 'master' into rns-resolution
santicomp2014 Nov 12, 2019
9fc20e0
vendor,swarm: updated rns-resolution
santicomp2014 Nov 12, 2019
65134e0
restor vendor folder, go.mod, go.sum from master branch
mortelli Nov 12, 2019
ca42f0c
remove vendor attachment
mortelli Nov 12, 2019
11d2946
Merge remote-tracking branch 'origin/master' into rns-resolution
mortelli Nov 13, 2019
73dbbe6
vendor: add new dependencies
mortelli Nov 13, 2019
d8846ed
vendor: remove old version of rns library
mortelli Nov 13, 2019
ab730c3
vendor: replace old go.mod and go.sum
mortelli Nov 13, 2019
c3754c2
Merge remote-tracking branch 'origin/master' into rns-resolution
mortelli Nov 14, 2019
87f3ce7
swarm: fix old rns library reference
mortelli Nov 14, 2019
c75a741
swarm: fix old rns library reference
mortelli Nov 14, 2019
844b95e
swarm/api: fix old rns library reference
mortelli Nov 14, 2019
161a490
update go.mod and go.sum, make vendor
mortelli Nov 14, 2019
b88c4a9
update vendor for publicsuffix library
mortelli Nov 14, 2019
f098484
swarm,api,http,main,fuse,ping rns disbled by default
santicomp2014 Nov 15, 2019
fa11198
Merge remote-tracking branch 'origin/master' into rns-resolution
mortelli Nov 15, 2019
a9effab
swarm, api: formatting changes
mortelli Nov 15, 2019
e137a1e
Merge remote-tracking branch 'origin/master' into rns-resolution
mortelli Nov 15, 2019
03bd737
api,swarm: refactored rns to use api.Resolver interface,removed depen…
santicomp2014 Nov 18, 2019
ccc93e3
api: mocked rns resolver
santicomp2014 Nov 18, 2019
4cbb685
vendor: removed unused dependencies
santicomp2014 Nov 19, 2019
879c7a1
api: minor changes to RNS resolution tests
mortelli Nov 19, 2019
80f1778
api: add tld function
mortelli Nov 19, 2019
8d589ff
vendor: make vendor results
mortelli Nov 19, 2019
73211e2
Merge remote-tracking branch 'origin/master' into rns-resolution
mortelli Nov 20, 2019
9582393
Merge remote-tracking branch 'origin/master' into rns-resolution
mortelli Nov 21, 2019
ef34aa2
Merge remote-tracking branch 'origin/master' into rns-resolution
mortelli Nov 25, 2019
18b67dd
Merge remote-tracking branch 'origin/master' into rns-resolution
mortelli Nov 25, 2019
095063f
Merge remote-tracking branch 'origin/master' into rns-resolution
mortelli Nov 26, 2019
8beb6fb
Merge branch 'master' into rns-resolution
santicomp2014 Nov 27, 2019
6f4130d
network/retrieval: added price to RetrieveRequest and ChunkDelivery p…
Eknir Nov 28, 2019
8f5cac8
network/retrieval, accounting: added priceInformation to retrieval.pe…
Eknir Nov 28, 2019
2211814
retrieval, storage: make retrieveRequest priced and based on retrieva…
Eknir Nov 28, 2019
00586f8
retrieval: added handleNewPrice
Eknir Nov 28, 2019
8e28b87
swarm, retrieval: added margin
Eknir Nov 28, 2019
8796295
retrieval: add RUID to NewPrice
Eknir Nov 28, 2019
fb9ca4d
retrieval, storage: helper functions for getting price+setting price+…
Eknir Nov 29, 2019
b331ff3
all: make tests pass again
Eknir Nov 29, 2019
e729776
Merge branch 'master' into swap-pricing
Eknir Dec 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ func (a *API) Resolve(ctx context.Context, address string) (storage.Address, err
return resolved[:], nil
}

//
func tld(address string) (tld string) {
splitAddress := strings.Split(address, ".")
if len(splitAddress) > 1 {
Expand Down
4 changes: 2 additions & 2 deletions api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ type Config struct {
// Swap configs
SwapBackendURL string // Ethereum API endpoint
SwapEnabled bool // whether SWAP incentives are enabled
SwapPaymentThreshold uint64 // honey amount at which a payment is triggered
SwapDisconnectThreshold uint64 // honey amount at which a peer disconnects
SwapPaymentThreshold uint // honey amount at which a payment is triggered
SwapDisconnectThreshold uint // honey amount at which a peer disconnects
SwapInitialDeposit uint64 // initial deposit amount to the chequebook
SwapLogPath string // dir to swap related audit logs
Contract common.Address // address of the chequebook contract
Expand Down
2 changes: 1 addition & 1 deletion bzzeth/bzzeth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func newTestNetworkStore(t *testing.T) (prvkey *ecdsa.PrivateKey, netStore *stor
}

netStore = storage.NewNetStore(localStore, bzzAddr, enode.ID{})
r := retrieval.New(kad, netStore, bzzAddr, nil)
r := retrieval.New(kad, netStore, bzzAddr, nil, 0)
netStore.RemoteGet = r.RequestFromPeers

cleanup = func() {
Expand Down
4 changes: 2 additions & 2 deletions cmd/swarm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,10 @@ func flagsOverride(currentConfig *bzzapi.Config, ctx *cli.Context) *bzzapi.Confi
if initialDepo := ctx.GlobalUint64(SwarmSwapInitialDepositFlag.Name); initialDepo != 0 {
currentConfig.SwapInitialDeposit = initialDepo
}
if paymentThreshold := ctx.GlobalUint64(SwarmSwapPaymentThresholdFlag.Name); paymentThreshold != 0 {
if paymentThreshold := ctx.GlobalUint(SwarmSwapPaymentThresholdFlag.Name); paymentThreshold != 0 {
currentConfig.SwapPaymentThreshold = paymentThreshold
}
if disconnectThreshold := ctx.GlobalUint64(SwarmSwapDisconnectThresholdFlag.Name); disconnectThreshold != 0 {
if disconnectThreshold := ctx.GlobalUint(SwarmSwapDisconnectThresholdFlag.Name); disconnectThreshold != 0 {
currentConfig.SwapDisconnectThreshold = disconnectThreshold
}
if ctx.GlobalIsSet(SwarmNoSyncFlag.Name) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/swarm/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ func TestConfigCmdLineOverrides(t *testing.T) {
fmt.Sprintf("--%s", utils.DataDirFlag.Name), dir,
fmt.Sprintf("--%s", utils.IPCPathFlag.Name), conf.IPCPath,
"--verbosity", fmt.Sprintf("%d", *testutil.Loglevel),
fmt.Sprintf("--%s", SwarmSwapPaymentThresholdFlag.Name), strconv.FormatUint(swap.DefaultPaymentThreshold+1, 10),
fmt.Sprintf("--%s", SwarmSwapDisconnectThresholdFlag.Name), strconv.FormatUint(swap.DefaultDisconnectThreshold+1, 10),
fmt.Sprintf("--%s", SwarmSwapPaymentThresholdFlag.Name), strconv.FormatUint(uint64(swap.DefaultPaymentThreshold)+1, 10),
fmt.Sprintf("--%s", SwarmSwapDisconnectThresholdFlag.Name), strconv.FormatUint(uint64(swap.DefaultDisconnectThreshold)+1, 10),
fmt.Sprintf("--%s", SwarmEnablePinningFlag.Name),
}

Expand Down
32 changes: 22 additions & 10 deletions network/retrieval/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,26 @@ import (
"github.com/ethersphere/swarm/storage"
)

// Peer wraps BzzPeer with a contextual logger and tracks open
var ErrCannotFindRUID = errors.New("cannot find ruid")
var ErrNoAddressMatch = errors.New("retrieve request found but address does not match")

// Peer wraps BzzPeer with a contextual logger and tracks open. TODO: update comment
// retrievals for that peer
type Peer struct {
*network.BzzPeer
logger log.Logger // logger with base and peer address
mtx sync.Mutex // synchronize retrievals
retrievals map[uint]chunk.Address // current ongoing retrievals
logger log.Logger // logger with base and peer address
mtx sync.Mutex // synchronize retrievals
retrievals map[uint]chunk.Address // current ongoing retrievals
priceInformation []uint // price per Proximity order
}

// NewPeer is the constructor for Peer
func NewPeer(peer *network.BzzPeer, baseKey []byte) *Peer {
return &Peer{
BzzPeer: peer,
logger: log.New("base", hex.EncodeToString(baseKey)[:16], "peer", peer.ID().String()[:16]),
retrievals: make(map[uint]chunk.Address),
BzzPeer: peer,
logger: log.New("base", hex.EncodeToString(baseKey)[:16], "peer", peer.ID().String()[:16]),
retrievals: make(map[uint]chunk.Address),
priceInformation: make([]uint, 10), //TODO: more intelligent way of creating this array. Load historical data from store
}
}

Expand All @@ -57,16 +62,23 @@ func (p *Peer) addRetrieval(ruid uint, addr storage.Address) {
// chunkReceived is called upon ChunkDelivery message reception
// it is meant to idenfify unsolicited chunk deliveries
func (p *Peer) checkRequest(ruid uint, addr storage.Address) error {
// 0 means synthetic chunk
if ruid == uint(0) {
return nil
}
p.mtx.Lock()
defer p.mtx.Unlock()
v, ok := p.retrievals[ruid]
if !ok {
return errors.New("cannot find ruid")
return ErrCannotFindRUID
}
delete(p.retrievals, ruid) // since we got the delivery we wanted - it is safe to delete the retrieve request
if !bytes.Equal(v, addr) {
return errors.New("retrieve request found but address does not match")
return ErrNoAddressMatch
}

return nil
}

func (p *Peer) deleteRequest(ruid uint) {
delete(p.retrievals, ruid)
}
110 changes: 88 additions & 22 deletions network/retrieval/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/spancontext"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/swap"
opentracing "github.com/opentracing/opentracing-go"
olog "github.com/opentracing/opentracing-go/log"
)
Expand All @@ -63,30 +62,20 @@ var (
Messages: []interface{}{
ChunkDelivery{},
RetrieveRequest{},
NewPrice{},
},
}

ErrNoPeerFound = errors.New("no peer found")
)

// Price is the method through which a message type marks itself
// as implementing the protocols.Price protocol and thus
// as swap-enabled message
func (rr *RetrieveRequest) Price() *protocols.Price {
return &protocols.Price{
Value: swap.RetrieveRequestPrice,
PerByte: false,
Payer: protocols.Sender,
}
}

// Price is the method through which a message type marks itself
// as implementing the protocols.Price protocol and thus
// as swap-enabled message
func (cd *ChunkDelivery) Price() *protocols.Price {
return &protocols.Price{
Value: swap.ChunkDeliveryPrice,
PerByte: true,
Value: cd.price,
PerByte: false,
Payer: protocols.Receiver,
}
}
Expand All @@ -95,6 +84,7 @@ func (cd *ChunkDelivery) Price() *protocols.Price {
type Retrieval struct {
netStore *storage.NetStore
kad *network.Kademlia
margin uint
mtx sync.RWMutex // protect peer map
peers map[enode.ID]*Peer // compatible peers
spec *protocols.Spec // protocol spec
Expand All @@ -103,10 +93,11 @@ type Retrieval struct {
}

// New returns a new instance of the retrieval protocol handler
func New(kad *network.Kademlia, ns *storage.NetStore, baseKey []byte, balance protocols.Balance) *Retrieval {
func New(kad *network.Kademlia, ns *storage.NetStore, baseKey []byte, balance protocols.Balance, margin uint) *Retrieval {
r := &Retrieval{
netStore: ns,
kad: kad,
margin: margin,
peers: make(map[enode.ID]*Peer),
spec: spec,
logger: log.New("base", hex.EncodeToString(baseKey)[:16]),
Expand Down Expand Up @@ -140,6 +131,29 @@ func (r *Retrieval) getPeer(id enode.ID) *Peer {
return r.peers[id]
}

// get the pric for requesting a chunk from peer
func (r *Retrieval) getPeerPrice(peer enode.ID, chunkAddr chunk.Address) uint {
r.mtx.RLock()
defer r.mtx.RUnlock()
//TODO: does this return 0 if the price was never initialized?
return r.peers[peer].priceInformation[chunk.Proximity(chunkAddr, peer.Bytes())] //TODO: is peer.Bytes() the correct address to use?
}

func (r *Retrieval) setPeerPrice(peer enode.ID, chunkAddr chunk.Address, newPrice uint) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.peers[peer].priceInformation[chunk.Proximity(chunkAddr, peer.Bytes())] = newPrice
}

// returns true if the price
func (r *Retrieval) verifyPrice(offeredPrice uint, wantedPrice uint) bool {
if offeredPrice >= wantedPrice {
return true
} else {
return false
}
}

// Run is being dispatched when 2 nodes connect
func (r *Retrieval) Run(bp *network.BzzPeer) error {
sp := NewPeer(bp, r.kad.BaseAddr())
Expand All @@ -158,6 +172,9 @@ func (r *Retrieval) handleMsg(p *Peer) func(context.Context, interface{}) error
go r.handleRetrieveRequest(ctx, p, msg)
case *ChunkDelivery:
go r.handleChunkDelivery(ctx, p, msg)

case *NewPrice:
go r.handleNewPrice(ctx, p, msg)
}
return nil
}
Expand Down Expand Up @@ -298,10 +315,31 @@ func (r *Retrieval) findPeer(ctx context.Context, req *storage.Request) (retPeer
return retPeer, nil
}

// handleNewPrice updates the priceInformation for the particular peer. If there are outstanding requests for the particular chunk, we handle them.
func (r *Retrieval) handleNewPrice(ctx context.Context, p *Peer, msg *NewPrice) {
// update price
r.setPeerPrice(p.ID(), msg.Addr, msg.Price)
err := r.peers[p.ID()].checkRequest(msg.Ruid, msg.Addr)
// outstanding request
if err == nil {
// TODO: compare newPrice with margin
// if newPrice + margin >= outstanding request price => issue new request
// else => send NewPrice message
}
// TODO: look up outstanding chunk requests, verify wether the newPrice + margin >= the price of the outstanding request. If so, resend with new price, if not, send newPrice message ourself to Origininator of request
}

// handleRetrieveRequest handles an incoming retrieve request from a certain Peer
// if the chunk is found in the localstore it is served immediately, otherwise
// it results in a new retrieve request to candidate peers in our kademlia
func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *RetrieveRequest) {
// return directly with a NewPrice msg if msg.price is not bigger than margin
wantedPrice := r.margin
ok := r.verifyPrice(msg.Price, wantedPrice)
if !ok {
r.createAndSendNewPrice(ctx, p, msg.Ruid, msg.Addr, wantedPrice)
}

p.logger.Debug("retrieval.handleRetrieveRequest", "ref", msg.Addr)
handleRetrieveRequestMsgCount.Inc(1)

Expand All @@ -313,13 +351,15 @@ func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *Ret

defer osp.Finish()

ctx, cancel := context.WithTimeout(ctx, timeouts.FetcherGlobalTimeout)
defer cancel()

req := &storage.Request{
Addr: msg.Addr,
Ruid: msg.Ruid,
Price: msg.Price,
Origin: p.ID(),
}
ctx, cancel := context.WithTimeout(ctx, timeouts.FetcherGlobalTimeout)
defer cancel()

chunk, err := r.netStore.Get(ctx, chunk.ModeGetRequest, req)
if err != nil {
retrieveChunkFail.Inc(1)
Expand All @@ -328,9 +368,9 @@ func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *Ret
}

p.logger.Trace("retrieval.handleRetrieveRequest - delivery", "ref", msg.Addr)

deliveryMsg := &ChunkDelivery{
Ruid: msg.Ruid,
price: msg.Price,
Addr: chunk.Address(),
SData: chunk.Data(),
}
Expand All @@ -350,6 +390,9 @@ func (r *Retrieval) handleRetrieveRequest(ctx context.Context, p *Peer, msg *Ret
func (r *Retrieval) handleChunkDelivery(ctx context.Context, p *Peer, msg *ChunkDelivery) {
p.logger.Debug("retrieval.handleChunkDelivery", "ref", msg.Addr)
err := p.checkRequest(msg.Ruid, msg.Addr)
if err == ErrNoAddressMatch {
p.deleteRequest(msg.Ruid)
}
if err != nil {
unsolicitedChunkDelivery.Inc(1)
p.logger.Error("unsolicited chunk delivery from peer", "ruid", msg.Ruid, "addr", msg.Addr, "err", err)
Expand Down Expand Up @@ -397,7 +440,7 @@ func (r *Retrieval) RequestFromPeers(ctx context.Context, req *storage.Request,

const maxFindPeerRetries = 5
retries := 0

//TODO: make FINDPEER request based on r.Peer.priceInformation
FINDPEER:
sp, err := r.findPeer(ctx, req)
if err != nil {
Expand All @@ -418,9 +461,23 @@ FINDPEER:
goto FINDPEER
}

//check if the price is ok, if not ok send a NewPrice msg
wantedPrice := r.margin + r.getPeerPrice(sp.ID(), req.Addr)
ok := r.verifyPrice(req.Price, wantedPrice)
if !ok {
r.createAndSendNewPrice(ctx, protoPeer, req.Ruid, req.Addr, wantedPrice)
}

// create a non-zero ruid (zero specifices synthetic chunk)
ruid := uint(rand.Uint32())
for ruid == 0 {
ruid = uint(rand.Uint32())
}

ret := &RetrieveRequest{
Ruid: uint(rand.Uint32()),
Addr: req.Addr,
Ruid: ruid,
Price: r.getPeerPrice(sp.ID(), req.Addr),
Addr: req.Addr,
}
protoPeer.logger.Trace("sending retrieve request", "ref", ret.Addr, "origin", localID, "ruid", ret.Ruid)
protoPeer.addRetrieval(ret.Ruid, ret.Addr)
Expand All @@ -434,6 +491,15 @@ FINDPEER:
return &spID, nil
}

func (r *Retrieval) createAndSendNewPrice(ctx context.Context, destinationPeer *Peer, ruid uint, addr storage.Address, wantedPrice uint) error {
newPriceMsg := &NewPrice{
Ruid: ruid,
Price: wantedPrice,
Addr: addr,
}
return destinationPeer.Send(ctx, newPriceMsg)
}

func (r *Retrieval) Start(server *p2p.Server) error {
r.logger.Info("starting bzz-retrieve")
return nil
Expand Down
13 changes: 4 additions & 9 deletions network/retrieval/retrieve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func TestRequestFromPeers(t *testing.T) {

to.On(peer)

s := New(to, nil, to.BaseAddr(), nil)
s := New(to, nil, to.BaseAddr(), nil, 0)

req := storage.NewRequest(storage.Address(hash0[:]))
id, err := s.findPeer(context.Background(), req)
Expand All @@ -465,14 +465,9 @@ func TestRequestFromPeers(t *testing.T) {
//TestHasPriceImplementation is to check that Retrieval provides priced messages
func TestHasPriceImplementation(t *testing.T) {
price := (&ChunkDelivery{}).Price()
if price == nil || price.Value == 0 {
if price == nil {
t.Fatal("No prices set for chunk delivery msg")
}

price = (&RetrieveRequest{}).Price()
if price == nil || price.Value == 0 {
t.Fatal("No prices set for retrieve requests")
}
}

func newBzzRetrieveWithLocalstore(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
Expand Down Expand Up @@ -502,7 +497,7 @@ func newBzzRetrieveWithLocalstore(ctx *adapters.ServiceContext, bucket *sync.Map
return nil, nil, err
}

r := New(kad, netStore, kad.BaseAddr(), nil)
r := New(kad, netStore, kad.BaseAddr(), nil, 0)
netStore.RemoteGet = r.RequestFromPeers
bucket.Store(bucketKeyFileStore, fileStore)
bucket.Store(bucketKeyNetstore, netStore)
Expand Down Expand Up @@ -621,7 +616,7 @@ func newRetrievalTester(t *testing.T, prvkey *ecdsa.PrivateKey, netStore *storag
prvkey = key
}

r := New(kad, netStore, kad.BaseAddr(), nil)
r := New(kad, netStore, kad.BaseAddr(), nil, 0)
protocolTester := p2ptest.NewProtocolTester(prvkey, 1, r.runProtocol)

return protocolTester, r, protocolTester.Stop, nil
Expand Down
Loading