Skip to content
This repository was archived by the owner on Mar 14, 2023. It is now read-only.

Added support of hrw weighted sort for netmap #23

Merged
merged 9 commits into from
May 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified examples/demo
Binary file not shown.
Binary file modified examples/map2
Binary file not shown.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/golang/protobuf v1.2.0
github.com/mattn/go-colorable v0.0.9 // indirect
github.com/mattn/go-isatty v0.0.4 // indirect
github.com/nspcc-dev/hrw v1.0.5
github.com/nspcc-dev/hrw v1.0.7
github.com/onsi/gomega v1.4.3
github.com/pkg/errors v0.8.0
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/nspcc-dev/hrw v1.0.5 h1:XB4ESH3HvlvrFdE4uujWjoyJMbf9aPhYxtpNvDWufmM=
github.com/nspcc-dev/hrw v1.0.5/go.mod h1:IoUVXam2Z0cwnCtvClGFHQFe6fxeIeieMeDdhOYeyPM=
github.com/nspcc-dev/hrw v1.0.7 h1:+643HVfSVbfjDw2qb/z0LkyyIVX/GxKxRnlnIvrwMbQ=
github.com/nspcc-dev/hrw v1.0.7/go.mod h1:IoUVXam2Z0cwnCtvClGFHQFe6fxeIeieMeDdhOYeyPM=
github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
Expand Down
177 changes: 131 additions & 46 deletions policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,104 @@ type (
Bucket struct {
Key string
Value string
nodes Uint32Slice
nodes Nodes
children []Bucket
}

// Uint32Slice is generic type for more convenient sorting.
Uint32Slice []uint32
// Node type represents single graph leaf with index N and weight W.
Node struct {
N uint32
W uint64
}

// Nodes represents slice of graph leafs.
Nodes []Node

// FilterFunc is generic type for filtering function on nodes.
FilterFunc func([]uint32) []uint32
FilterFunc func(Nodes) Nodes
)

func (p Uint32Slice) Len() int { return len(p) }
func (p Uint32Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p Uint32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// Hash is a function from hrw.Hasher interface. It is implemented
// to support weighted hrw therefore sort function sorts nodes
// based on their `N` value.
func (n Node) Hash() uint64 {
return uint64(n.N)
}
func (n Node) Write(w io.Writer) error {
var err error
if err = binary.Write(w, binary.BigEndian, n.N); err != nil {
return err
}
if err = binary.Write(w, binary.BigEndian, n.W); err != nil {
return err
}
return nil
}
func (n *Node) Read(r io.Reader) error {
var err error
if err = binary.Read(r, binary.BigEndian, &n.N); err != nil {
return err
}
if err = binary.Read(r, binary.BigEndian, &n.W); err != nil {
return err
}
return nil
}

func (n Nodes) Len() int { return len(n) }
func (n Nodes) Less(i, j int) bool { return n[i].N < n[j].N }
func (n Nodes) Swap(i, j int) { n[i], n[j] = n[j], n[i] }
func (n Nodes) Write(w io.Writer) error {
var err error
if err = binary.Write(w, binary.BigEndian, int32(len(n))); err != nil {
return err
}
for i := range n {
if err = n[i].Write(w); err != nil {
return err
}
}
return nil
}
func (n *Nodes) Read(r io.Reader) error {
var (
err error
ln int32
)
if err = binary.Read(r, binary.BigEndian, &ln); err != nil {
return err
}
if ln > 0 {
nodes := make(Nodes, ln)
for i := range nodes {
if err = nodes[i].Read(r); err != nil {
return err
}
}
*n = nodes
}
return nil
}

// Nodes returns slice of nodes indexes N.
func (n Nodes) Nodes() []uint32 {
ns := make([]uint32, 0, len(n))
for i := range n {
ns = append(ns, n[i].N)
}
return ns
}

// Weights returns slice ow nodes weights W.
func (n Nodes) Weights() []uint64 {
w := make([]uint64, 0, len(n))
for i := range n {
w = append(w, n[i].W)
}
return w
}

// Hash uses murmur3 hash to return uint64
// Hash uses murmur3 hash to return uint64.
func (b Bucket) Hash() uint64 {
return hrw.Hash([]byte(b.Key + b.Value))
}
Expand All @@ -73,14 +155,14 @@ func (b *Bucket) findGraph(pivot []byte, s SFGroup) (c *Bucket) {
}

// FindNodes returns list of nodes, corresponding to specified placement rule.
func (b *Bucket) FindNodes(pivot []byte, ss ...SFGroup) (nodes []uint32) {
func (b *Bucket) FindNodes(pivot []byte, ss ...SFGroup) (nodes Nodes) {
for _, s := range ss {
nodes = merge(nodes, b.findNodes(pivot, s))
}
return
}

func (b *Bucket) findNodes(pivot []byte, s SFGroup) []uint32 {
func (b *Bucket) findNodes(pivot []byte, s SFGroup) Nodes {
var c *Bucket

if c = b.GetMaxSelection(s); c != nil {
Expand All @@ -98,7 +180,7 @@ func (b Bucket) Copy() Bucket {
Value: b.Value,
}
if b.nodes != nil {
bc.nodes = make(Uint32Slice, len(b.nodes))
bc.nodes = make(Nodes, len(b.nodes))
copy(bc.nodes, b.nodes)
}
if b.children != nil {
Expand All @@ -112,12 +194,12 @@ func (b Bucket) Copy() Bucket {
}

// IsValid checks if bucket is well-formed:
// - all nodes contained in sub-bucket must belong to this
// - there must be no nodes belonging to 2 buckets
// - all nodes contained in sub-bucket must belong to this;
// - there must be no nodes belonging to 2 buckets.
func (b Bucket) IsValid() bool {
var (
ns Uint32Slice
nodes = make(Uint32Slice, 0, len(b.nodes))
ns Nodes
nodes = make(Nodes, 0, len(b.nodes))
)

if len(b.children) == 0 {
Expand All @@ -136,7 +218,7 @@ func (b Bucket) IsValid() bool {
return len(nodes) == len(ns)
}

func (b Bucket) findForbidden(fs []Filter) (forbidden []uint32) {
func (b Bucket) findForbidden(fs []Filter) (forbidden Nodes) {
// if root does not satisfy any filter it must be forbidden
for _, f := range fs {
if b.Key == f.Key && !f.Check(b) {
Expand Down Expand Up @@ -243,13 +325,13 @@ func (b Bucket) GetMaxSelection(s SFGroup) (r *Bucket) {
)

for _, c := range forbidden {
excludes[c] = struct{}{}
excludes[c.N] = struct{}{}
}
for _, c := range s.Exclude {
excludes[c] = struct{}{}
}

r, _ = b.getMaxSelection(s.Selectors, func(nodes []uint32) []uint32 {
r, _ = b.getMaxSelection(s.Selectors, func(nodes Nodes) Nodes {
return diff(nodes, excludes)
})
return
Expand Down Expand Up @@ -281,10 +363,10 @@ func (b Bucket) GetSelection(ss []Select, pivot []byte) *Bucket {
return nil
}

nodes := make([]uint32, len(b.nodes))
nodes := make(Nodes, len(b.nodes))
copy(nodes, b.nodes)
if len(pivot) != 0 {
hrw.SortSliceByValue(nodes, pivotHash)
hrw.SortSliceByWeightValue(nodes, nodes.Weights(), pivotHash)
}
root.nodes = nodes[:count]
return &root
Expand Down Expand Up @@ -369,17 +451,17 @@ loop:

// UpdateIndices is auxiliary function used to update
// indices of all nodes according to tr.
func (b *Bucket) UpdateIndices(tr map[uint32]uint32) Bucket {
func (b *Bucket) UpdateIndices(tr map[uint32]Node) Bucket {
var (
children = make([]Bucket, 0, len(b.children))
nodes = make(Uint32Slice, 0, len(b.nodes))
nodes = make(Nodes, 0, len(b.nodes))
)

for i := range b.children {
children = append(children, b.children[i].UpdateIndices(tr))
}
for i := range b.nodes {
nodes = append(nodes, tr[b.nodes[i]])
nodes = append(nodes, tr[b.nodes[i].N])
}
sort.Sort(nodes)

Expand Down Expand Up @@ -417,10 +499,7 @@ func (b Bucket) Write(w io.Writer) error {
}

// writing nodes
if err = binary.Write(w, binary.BigEndian, int32(len(b.nodes))); err != nil {
return err
}
if err = binary.Write(w, binary.BigEndian, b.nodes); err != nil {
if err = b.nodes.Write(w); err != nil {
return err
}

Expand Down Expand Up @@ -456,15 +535,9 @@ func (b *Bucket) Read(r io.Reader) error {
b.Key, b.Value, _ = splitKV(string(name))

// reading node list
if err = binary.Read(r, binary.BigEndian, &ln); err != nil {
if err = b.nodes.Read(r); err != nil {
return err
}
if ln > 0 {
b.nodes = make([]uint32, ln)
if err = binary.Read(r, binary.BigEndian, &b.nodes); err != nil {
return err
}
}

if err = binary.Read(r, binary.BigEndian, &ln); err != nil {
return err
Expand Down Expand Up @@ -514,7 +587,7 @@ func (b *Bucket) fillNodes() {
}

// Nodelist returns slice of nodes belonging to b.
func (b Bucket) Nodelist() (r []uint32) {
func (b Bucket) Nodelist() (r Nodes) {
if b.nodes != nil || len(b.children) == 0 {
return b.nodes
}
Expand All @@ -532,8 +605,18 @@ func (b Bucket) Children() []Bucket {

// AddNode adds node n with options opts to b.
func (b *Bucket) AddNode(n uint32, opts ...string) error {
return b.addNode(Node{n, 0}, opts...)
}

// AddStrawNode adds straw node n with options opts to b.
// Straws are an analogy of weights in CRUSH algorithm paper.
func (b *Bucket) AddStrawNode(n Node, opts ...string) error {
return b.addNode(n, opts...)
}

func (b *Bucket) addNode(n Node, opts ...string) error {
for _, o := range opts {
if err := b.AddBucket(o, []uint32{n}); err != nil {
if err := b.AddBucket(o, Nodes{n}); err != nil {
return err
}
}
Expand All @@ -549,15 +632,15 @@ func splitKV(s string) (string, string, error) {
}

// GetNodesByOption returns list of nodes possessing specified options.
func (b Bucket) GetNodesByOption(opts ...string) []uint32 {
var nodes []uint32
func (b Bucket) GetNodesByOption(opts ...string) Nodes {
var nodes Nodes
for _, opt := range opts {
nodes = intersect(nodes, getNodes(b, splitProps(opt[1:])))
}
return nodes
}

func (b *Bucket) addNodes(bs []Bucket, n []uint32) error {
func (b *Bucket) addNodes(bs []Bucket, n Nodes) error {
b.nodes = merge(b.nodes, n)
if len(bs) == 0 {
return nil
Expand All @@ -573,11 +656,13 @@ func (b *Bucket) addNodes(bs []Bucket, n []uint32) error {
}

// AddBucket add bucket corresponding to option o with nodes n as subbucket to b.
func (b *Bucket) AddBucket(o string, n []uint32) error {
func (b *Bucket) AddBucket(o string, n Nodes) error {
if o != Separator && (!strings.HasPrefix(o, Separator) || strings.HasSuffix(o, Separator)) {
return errors.Errorf("must start and not end with '%s'", Separator)
}

if len(n) == 0 {
n = nil
}
return b.addNodes(splitProps(o[1:]), n)
}

Expand All @@ -597,13 +682,13 @@ func splitProps(o string) []Bucket {
return props
}

func merge(a, b []uint32) []uint32 {
func merge(a, b Nodes) Nodes {
if a == nil {
return b
}

la, lb := len(a), len(b)
c := make([]uint32, 0, la+lb)
c := make(Nodes, 0, la+lb)
loop:
for i, j := 0, 0; i < la || j < lb; {
switch true {
Expand All @@ -613,10 +698,10 @@ loop:
case j == lb:
c = append(c, a[i:]...)
break loop
case a[i] < b[j]:
case a[i].N < b[j].N:
c = append(c, a[i])
i++
case a[i] > b[j]:
case a[i].N > b[j].N:
c = append(c, b[j])
j++
default:
Expand All @@ -629,7 +714,7 @@ loop:
return c
}

func makeTreeProps(bs []Bucket, n []uint32) Bucket {
func makeTreeProps(bs []Bucket, n Nodes) Bucket {
bs[0].nodes = n
for i := len(bs) - 1; i > 0; i-- {
bs[i].nodes = n
Expand Down
6 changes: 3 additions & 3 deletions policy_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func (b Bucket) dumpTo(g Graph) error {
attrsN = map[string]string{"shape": "box"}
attrsE = map[string]string{"style": "dotted"}
for _, n := range b.nodes {
if err = g.AddNode(g.Name, strconv.Itoa(int(n)), attrsN); err != nil {
if err = g.AddNode(g.Name, strconv.Itoa(int(n.N)), attrsN); err != nil {
return err
}
if err = g.AddEdge(bname, strconv.Itoa(int(n)), true, attrsE); err != nil {
if err = g.AddEdge(bname, strconv.Itoa(int(n.N)), true, attrsE); err != nil {
return err
}
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func selectBucket(g Graph, b Bucket) error {

if len(b.children) == 0 {
for _, n := range b.nodes {
applyAttrs(g, bname, strconv.Itoa(int(n)), attrs)
applyAttrs(g, bname, strconv.Itoa(int(n.N)), attrs)
}
return nil
}
Expand Down
Loading