mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
Change algorithm to precalculate hashes
This commit is contained in:
parent
84adb5f6fd
commit
5e67c668d2
3 changed files with 406 additions and 61 deletions
|
@ -9,11 +9,12 @@ import (
|
|||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/huandu/skiplist"
|
||||
"github.com/zeebo/blake3"
|
||||
"math"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// New creates new Diff container
|
||||
|
@ -42,6 +43,9 @@ func New(divideFactor, compareThreshold int) Diff {
|
|||
compareThreshold: compareThreshold,
|
||||
}
|
||||
d.sl = skiplist.New(d)
|
||||
d.ranges = newHashRanges(divideFactor, compareThreshold, d.sl)
|
||||
d.ranges.dirty[d.ranges.topRange] = struct{}{}
|
||||
d.ranges.recalculateHashes()
|
||||
return d
|
||||
}
|
||||
|
||||
|
@ -62,7 +66,7 @@ type Element struct {
|
|||
// Range request to get RangeResult
|
||||
type Range struct {
|
||||
From, To uint64
|
||||
Limit int
|
||||
Elements bool
|
||||
}
|
||||
|
||||
// RangeResult response for Range
|
||||
|
@ -108,6 +112,7 @@ type diff struct {
|
|||
sl *skiplist.SkipList
|
||||
divideFactor int
|
||||
compareThreshold int
|
||||
ranges *hashRanges
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
|
@ -140,10 +145,13 @@ func (d *diff) Set(elements ...Element) {
|
|||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
for _, e := range elements {
|
||||
el := &element{Element: e, hash: xxhash.Sum64([]byte(e.Id))}
|
||||
hash := xxhash.Sum64([]byte(e.Id))
|
||||
el := &element{Element: e, hash: hash}
|
||||
d.sl.Remove(el)
|
||||
d.sl.Set(el, nil)
|
||||
d.ranges.addElement(hash)
|
||||
}
|
||||
d.ranges.recalculateHashes()
|
||||
}
|
||||
|
||||
func (d *diff) Ids() (ids []string) {
|
||||
|
@ -198,51 +206,42 @@ func (d *diff) Element(id string) (Element, error) {
|
|||
func (d *diff) Hash() string {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
res := d.getRange(Range{To: math.MaxUint64})
|
||||
return hex.EncodeToString(res.Hash)
|
||||
return hex.EncodeToString(d.ranges.hash())
|
||||
}
|
||||
|
||||
// RemoveId removes element by id
|
||||
func (d *diff) RemoveId(id string) error {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
hash := xxhash.Sum64([]byte(id))
|
||||
el := &element{Element: Element{
|
||||
Id: id,
|
||||
}, hash: xxhash.Sum64([]byte(id))}
|
||||
}, hash: hash}
|
||||
if d.sl.Remove(el) == nil {
|
||||
return ErrElementNotFound
|
||||
}
|
||||
d.ranges.removeElement(hash)
|
||||
d.ranges.recalculateHashes()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *diff) getRange(r Range) (rr RangeResult) {
|
||||
hasher := hashersPool.Get().(*blake3.Hasher)
|
||||
defer hashersPool.Put(hasher)
|
||||
hasher.Reset()
|
||||
rng := d.ranges.getRange(r.From, r.To)
|
||||
rr.Count = rng.elements
|
||||
if rng != nil {
|
||||
rr.Hash = rng.hash
|
||||
if !r.Elements && rng.isDivided {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
el := d.sl.Find(&element{hash: r.From})
|
||||
rr.Elements = make([]Element, 0, r.Limit)
|
||||
var overfill bool
|
||||
rr.Elements = make([]Element, 0, d.divideFactor)
|
||||
for el != nil && el.Key().(*element).hash <= r.To {
|
||||
elem := el.Key().(*element).Element
|
||||
el = el.Next()
|
||||
|
||||
hasher.WriteString(elem.Id)
|
||||
hasher.WriteString(elem.Head)
|
||||
rr.Count++
|
||||
if !overfill {
|
||||
if len(rr.Elements) < r.Limit {
|
||||
rr.Elements = append(rr.Elements, elem)
|
||||
}
|
||||
if len(rr.Elements) == r.Limit && el != nil {
|
||||
overfill = true
|
||||
}
|
||||
}
|
||||
rr.Elements = append(rr.Elements, elem)
|
||||
}
|
||||
if overfill {
|
||||
rr.Elements = nil
|
||||
}
|
||||
rr.Hash = hasher.Sum(nil)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -271,9 +270,8 @@ var errMismatched = errors.New("query and results mismatched")
|
|||
func (d *diff) Diff(ctx context.Context, dl Remote) (newIds, changedIds, removedIds []string, err error) {
|
||||
dctx := &diffCtx{}
|
||||
dctx.toSend = append(dctx.toSend, Range{
|
||||
From: 0,
|
||||
To: math.MaxUint64,
|
||||
Limit: d.compareThreshold,
|
||||
From: 0,
|
||||
To: math.MaxUint64,
|
||||
})
|
||||
for len(dctx.toSend) > 0 {
|
||||
select {
|
||||
|
@ -307,26 +305,25 @@ func (d *diff) compareResults(dctx *diffCtx, r Range, myRes, otherRes RangeResul
|
|||
return
|
||||
}
|
||||
|
||||
// both has elements
|
||||
if len(myRes.Elements) == myRes.Count && len(otherRes.Elements) == otherRes.Count {
|
||||
d.compareElements(dctx, myRes.Elements, otherRes.Elements)
|
||||
// other has elements
|
||||
if len(otherRes.Elements) == otherRes.Count {
|
||||
if len(myRes.Elements) == myRes.Count {
|
||||
d.compareElements(dctx, myRes.Elements, otherRes.Elements)
|
||||
} else {
|
||||
r.Elements = true
|
||||
d.compareElements(dctx, d.getRange(r).Elements, otherRes.Elements)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// make more queries
|
||||
divideFactor := uint64(d.divideFactor)
|
||||
perRange := (r.To - r.From) / divideFactor
|
||||
align := ((r.To-r.From)%divideFactor + 1) % divideFactor
|
||||
if align == 0 {
|
||||
perRange += 1
|
||||
// request all elements from other, because we don't have enough
|
||||
if len(myRes.Elements) == myRes.Count {
|
||||
r.Elements = true
|
||||
dctx.prepare = append(dctx.prepare, r)
|
||||
return
|
||||
}
|
||||
var j = r.From
|
||||
for i := 0; i < d.divideFactor; i++ {
|
||||
if i == d.divideFactor-1 {
|
||||
perRange += align
|
||||
}
|
||||
dctx.prepare = append(dctx.prepare, Range{From: j, To: j + perRange - 1, Limit: r.Limit})
|
||||
j += perRange
|
||||
rangeTuples := genTupleRanges(r.From, r.To, d.divideFactor)
|
||||
for _, tuple := range rangeTuples {
|
||||
dctx.prepare = append(dctx.prepare, Range{From: tuple.from, To: tuple.to})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -3,12 +3,13 @@ package ldiff
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"math"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDiff_fillRange(t *testing.T) {
|
||||
|
@ -23,17 +24,10 @@ func TestDiff_fillRange(t *testing.T) {
|
|||
t.Log(d.sl.Len())
|
||||
|
||||
t.Run("elements", func(t *testing.T) {
|
||||
r := Range{From: 0, To: math.MaxUint64, Limit: 10}
|
||||
r := Range{From: 0, To: math.MaxUint64}
|
||||
res := d.getRange(r)
|
||||
assert.NotNil(t, res.Hash)
|
||||
assert.Len(t, res.Elements, 10)
|
||||
})
|
||||
t.Run("hash", func(t *testing.T) {
|
||||
r := Range{From: 0, To: math.MaxUint64, Limit: 9}
|
||||
res := d.getRange(r)
|
||||
t.Log(len(res.Elements))
|
||||
assert.NotNil(t, res.Hash)
|
||||
assert.Nil(t, res.Elements)
|
||||
assert.Equal(t, res.Count, 10)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -77,6 +71,73 @@ func TestDiff_Diff(t *testing.T) {
|
|||
assert.Len(t, changedIds, 1)
|
||||
assert.Len(t, removedIds, 1)
|
||||
})
|
||||
t.Run("complex", func(t *testing.T) {
|
||||
d1 := New(16, 128)
|
||||
d2 := New(16, 128)
|
||||
length := 10000
|
||||
for i := 0; i < length; i++ {
|
||||
id := fmt.Sprint(i)
|
||||
head := uuid.NewString()
|
||||
d1.Set(Element{
|
||||
Id: id,
|
||||
Head: head,
|
||||
})
|
||||
}
|
||||
|
||||
newIds, changedIds, removedIds, err := d1.Diff(ctx, d2)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, newIds, 0)
|
||||
assert.Len(t, changedIds, 0)
|
||||
assert.Len(t, removedIds, length)
|
||||
|
||||
for i := 0; i < length; i++ {
|
||||
id := fmt.Sprint(i)
|
||||
head := uuid.NewString()
|
||||
d2.Set(Element{
|
||||
Id: id,
|
||||
Head: head,
|
||||
})
|
||||
}
|
||||
|
||||
newIds, changedIds, removedIds, err = d1.Diff(ctx, d2)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, newIds, 0)
|
||||
assert.Len(t, changedIds, length)
|
||||
assert.Len(t, removedIds, 0)
|
||||
|
||||
for i := 0; i < length; i++ {
|
||||
id := fmt.Sprint(i)
|
||||
head := uuid.NewString()
|
||||
d2.Set(Element{
|
||||
Id: id,
|
||||
Head: head,
|
||||
})
|
||||
}
|
||||
|
||||
res, err := d1.Ranges(
|
||||
context.Background(),
|
||||
[]Range{{From: 0, To: math.MaxUint64, Elements: true}},
|
||||
nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, 1)
|
||||
for i, el := range res[0].Elements {
|
||||
if i < length/2 {
|
||||
continue
|
||||
}
|
||||
id := el.Id
|
||||
head := el.Head
|
||||
d2.Set(Element{
|
||||
Id: id,
|
||||
Head: head,
|
||||
})
|
||||
}
|
||||
|
||||
newIds, changedIds, removedIds, err = d1.Diff(ctx, d2)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, newIds, 0)
|
||||
assert.Len(t, changedIds, length/2)
|
||||
assert.Len(t, removedIds, 0)
|
||||
})
|
||||
t.Run("empty", func(t *testing.T) {
|
||||
d1 := New(16, 16)
|
||||
d2 := New(16, 16)
|
||||
|
@ -133,7 +194,7 @@ func BenchmarkDiff_Ranges(b *testing.B) {
|
|||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
var resBuf []RangeResult
|
||||
var ranges = []Range{{From: 0, To: math.MaxUint64, Limit: 10}}
|
||||
var ranges = []Range{{From: 0, To: math.MaxUint64}}
|
||||
for i := 0; i < b.N; i++ {
|
||||
d.Ranges(ctx, ranges, resBuf)
|
||||
resBuf = resBuf[:0]
|
||||
|
@ -197,3 +258,69 @@ func TestDiff_Elements(t *testing.T) {
|
|||
})
|
||||
assert.Equal(t, els, gotEls)
|
||||
}
|
||||
|
||||
func TestRangesAddRemove(t *testing.T) {
|
||||
length := 10000
|
||||
divideFactor := 4
|
||||
compareThreshold := 4
|
||||
addTwice := func() string {
|
||||
d := New(divideFactor, compareThreshold)
|
||||
var els []Element
|
||||
for i := 0; i < length; i++ {
|
||||
if i < length/20 {
|
||||
continue
|
||||
}
|
||||
els = append(els, Element{
|
||||
Id: fmt.Sprint(i),
|
||||
Head: fmt.Sprint("h", i),
|
||||
})
|
||||
}
|
||||
d.Set(els...)
|
||||
els = els[:0]
|
||||
for i := 0; i < length/20; i++ {
|
||||
els = append(els, Element{
|
||||
Id: fmt.Sprint(i),
|
||||
Head: fmt.Sprint("h", i),
|
||||
})
|
||||
}
|
||||
d.Set(els...)
|
||||
return d.Hash()
|
||||
}
|
||||
addOnce := func() string {
|
||||
d := New(divideFactor, compareThreshold)
|
||||
var els []Element
|
||||
for i := 0; i < length; i++ {
|
||||
els = append(els, Element{
|
||||
Id: fmt.Sprint(i),
|
||||
Head: fmt.Sprint("h", i),
|
||||
})
|
||||
}
|
||||
d.Set(els...)
|
||||
return d.Hash()
|
||||
}
|
||||
addRemove := func() string {
|
||||
d := New(divideFactor, compareThreshold)
|
||||
var els []Element
|
||||
for i := 0; i < length; i++ {
|
||||
els = append(els, Element{
|
||||
Id: fmt.Sprint(i),
|
||||
Head: fmt.Sprint("h", i),
|
||||
})
|
||||
}
|
||||
d.Set(els...)
|
||||
for i := 0; i < length/20; i++ {
|
||||
err := d.RemoveId(fmt.Sprint(i))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
els = els[:0]
|
||||
for i := 0; i < length/20; i++ {
|
||||
els = append(els, Element{
|
||||
Id: fmt.Sprint(i),
|
||||
Head: fmt.Sprint("h", i),
|
||||
})
|
||||
}
|
||||
d.Set(els...)
|
||||
return d.Hash()
|
||||
}
|
||||
require.Equal(t, addTwice(), addOnce(), addRemove())
|
||||
}
|
||||
|
|
221
app/ldiff/hashrange.go
Normal file
221
app/ldiff/hashrange.go
Normal file
|
@ -0,0 +1,221 @@
|
|||
package ldiff
|
||||
|
||||
import (
|
||||
"math"
|
||||
|
||||
"github.com/huandu/skiplist"
|
||||
"github.com/zeebo/blake3"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
type hashRange struct {
|
||||
from, to uint64
|
||||
parent *hashRange
|
||||
isDivided bool
|
||||
elements int
|
||||
level int
|
||||
hash []byte
|
||||
}
|
||||
|
||||
type rangeTuple struct {
|
||||
from, to uint64
|
||||
}
|
||||
|
||||
type hashRanges struct {
|
||||
ranges map[rangeTuple]*hashRange
|
||||
topRange *hashRange
|
||||
sl *skiplist.SkipList
|
||||
dirty map[*hashRange]struct{}
|
||||
divideFactor int
|
||||
compareThreshold int
|
||||
}
|
||||
|
||||
func newHashRanges(divideFactor, compareThreshold int, sl *skiplist.SkipList) *hashRanges {
|
||||
h := &hashRanges{
|
||||
ranges: make(map[rangeTuple]*hashRange),
|
||||
dirty: make(map[*hashRange]struct{}),
|
||||
divideFactor: divideFactor,
|
||||
compareThreshold: compareThreshold,
|
||||
sl: sl,
|
||||
}
|
||||
h.topRange = &hashRange{
|
||||
from: 0,
|
||||
to: math.MaxUint64,
|
||||
isDivided: true,
|
||||
level: 0,
|
||||
}
|
||||
h.ranges[rangeTuple{from: 0, to: math.MaxUint64}] = h.topRange
|
||||
h.makeBottomRanges(h.topRange)
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *hashRanges) hash() []byte {
|
||||
return h.topRange.hash
|
||||
}
|
||||
|
||||
func (h *hashRanges) addElement(elHash uint64) {
|
||||
rng := h.topRange
|
||||
rng.elements++
|
||||
for rng.isDivided {
|
||||
rng = h.getBottomRange(rng, elHash)
|
||||
rng.elements++
|
||||
}
|
||||
h.dirty[rng] = struct{}{}
|
||||
if rng.elements > h.compareThreshold {
|
||||
rng.isDivided = true
|
||||
h.makeBottomRanges(rng)
|
||||
}
|
||||
if rng.parent != nil {
|
||||
if _, ok := h.dirty[rng.parent]; ok {
|
||||
delete(h.dirty, rng.parent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *hashRanges) removeElement(elHash uint64) {
|
||||
rng := h.topRange
|
||||
rng.elements--
|
||||
for rng.isDivided {
|
||||
rng = h.getBottomRange(rng, elHash)
|
||||
rng.elements--
|
||||
}
|
||||
parent := rng.parent
|
||||
if parent.elements <= h.compareThreshold && parent != h.topRange {
|
||||
ranges := genTupleRanges(parent.from, parent.to, h.divideFactor)
|
||||
for _, tuple := range ranges {
|
||||
child := h.ranges[tuple]
|
||||
delete(h.ranges, tuple)
|
||||
delete(h.dirty, child)
|
||||
}
|
||||
parent.isDivided = false
|
||||
h.dirty[parent] = struct{}{}
|
||||
} else {
|
||||
h.dirty[rng] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *hashRanges) recalculateHashes() {
|
||||
for len(h.dirty) > 0 {
|
||||
var slDirty []*hashRange
|
||||
for rng := range h.dirty {
|
||||
slDirty = append(slDirty, rng)
|
||||
}
|
||||
slices.SortFunc(slDirty, func(a, b *hashRange) int {
|
||||
if a.level < b.level {
|
||||
return -1
|
||||
} else if a.level > b.level {
|
||||
return 1
|
||||
} else {
|
||||
return 0
|
||||
}
|
||||
})
|
||||
for _, rng := range slDirty {
|
||||
if rng.isDivided {
|
||||
rng.hash = h.calcDividedHash(rng)
|
||||
} else {
|
||||
rng.hash, rng.elements = h.calcElementsHash(rng.from, rng.to)
|
||||
}
|
||||
delete(h.dirty, rng)
|
||||
if rng.parent != nil {
|
||||
h.dirty[rng.parent] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *hashRanges) getRange(from, to uint64) *hashRange {
|
||||
return h.ranges[rangeTuple{from: from, to: to}]
|
||||
}
|
||||
|
||||
func (h *hashRanges) getBottomRange(rng *hashRange, elHash uint64) *hashRange {
|
||||
df := uint64(h.divideFactor)
|
||||
perRange := (rng.to - rng.from) / df
|
||||
align := ((rng.to-rng.from)%df + 1) % df
|
||||
if align == 0 {
|
||||
perRange++
|
||||
}
|
||||
bucket := (elHash - rng.from) / perRange
|
||||
tuple := rangeTuple{from: rng.from + bucket*perRange, to: rng.from - 1 + (bucket+1)*perRange}
|
||||
if bucket == df-1 {
|
||||
tuple.to += align
|
||||
}
|
||||
return h.ranges[tuple]
|
||||
}
|
||||
|
||||
func (h *hashRanges) makeBottomRanges(rng *hashRange) {
|
||||
ranges := genTupleRanges(rng.from, rng.to, h.divideFactor)
|
||||
for _, tuple := range ranges {
|
||||
newRange := h.makeRange(tuple, rng)
|
||||
h.ranges[tuple] = newRange
|
||||
if newRange.elements > h.compareThreshold {
|
||||
if _, ok := h.dirty[rng]; ok {
|
||||
delete(h.dirty, rng)
|
||||
}
|
||||
h.dirty[newRange] = struct{}{}
|
||||
newRange.isDivided = true
|
||||
h.makeBottomRanges(newRange)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *hashRanges) makeRange(tuple rangeTuple, parent *hashRange) *hashRange {
|
||||
newRange := &hashRange{
|
||||
from: tuple.from,
|
||||
to: tuple.to,
|
||||
parent: parent,
|
||||
}
|
||||
hash, els := h.calcElementsHash(tuple.from, tuple.to)
|
||||
newRange.hash = hash
|
||||
newRange.level = parent.level + 1
|
||||
newRange.elements = els
|
||||
return newRange
|
||||
}
|
||||
|
||||
func (h *hashRanges) calcDividedHash(rng *hashRange) (hash []byte) {
|
||||
hasher := hashersPool.Get().(*blake3.Hasher)
|
||||
defer hashersPool.Put(hasher)
|
||||
hasher.Reset()
|
||||
ranges := genTupleRanges(rng.from, rng.to, h.divideFactor)
|
||||
for _, tuple := range ranges {
|
||||
child := h.ranges[tuple]
|
||||
hasher.Write(child.hash)
|
||||
}
|
||||
hash = hasher.Sum(nil)
|
||||
return
|
||||
}
|
||||
|
||||
func genTupleRanges(from, to uint64, divideFactor int) (prepare []rangeTuple) {
|
||||
df := uint64(divideFactor)
|
||||
perRange := (to - from) / df
|
||||
align := ((to-from)%df + 1) % df
|
||||
if align == 0 {
|
||||
perRange++
|
||||
}
|
||||
var j = from
|
||||
for i := 0; i < divideFactor; i++ {
|
||||
if i == divideFactor-1 {
|
||||
perRange += align
|
||||
}
|
||||
prepare = append(prepare, rangeTuple{from: j, to: j + perRange - 1})
|
||||
j += perRange
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (h *hashRanges) calcElementsHash(from, to uint64) (hash []byte, els int) {
|
||||
hasher := hashersPool.Get().(*blake3.Hasher)
|
||||
defer hashersPool.Put(hasher)
|
||||
hasher.Reset()
|
||||
|
||||
el := h.sl.Find(&element{hash: from})
|
||||
for el != nil && el.Key().(*element).hash <= to {
|
||||
elem := el.Key().(*element).Element
|
||||
el = el.Next()
|
||||
|
||||
hasher.WriteString(elem.Id)
|
||||
hasher.WriteString(elem.Head)
|
||||
els++
|
||||
}
|
||||
hash = hasher.Sum(nil)
|
||||
return
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue