diff --git a/app/ldiff/diff.go b/app/ldiff/diff.go index 388e796b..712dcefb 100644 --- a/app/ldiff/diff.go +++ b/app/ldiff/diff.go @@ -15,8 +15,6 @@ import ( "github.com/cespare/xxhash" "github.com/huandu/skiplist" "github.com/zeebo/blake3" - - "github.com/anyproto/any-sync/commonspace/spacesyncproto" ) // New creates precalculated Diff container @@ -33,18 +31,18 @@ import ( // normal value between 8 and 64 // // Less threshold and divideFactor - less traffic but more requests -func New(divideFactor, compareThreshold int) Diff { - return newDiff(divideFactor, compareThreshold) +func New(divideFactor, compareThreshold int) *Diff { + return NewDiff(divideFactor, compareThreshold) } -func newDiff(divideFactor, compareThreshold int) *diff { +func NewDiff(divideFactor, compareThreshold int) *Diff { if divideFactor < 2 { divideFactor = 2 } if compareThreshold < 1 { compareThreshold = 1 } - d := &diff{ + d := &Diff{ divideFactor: divideFactor, compareThreshold: compareThreshold, } @@ -88,36 +86,14 @@ type element struct { hash uint64 } -// Diff contains elements and can compare it with Remote diff -type Diff interface { - Remote - // Set adds or update elements in container - Set(elements ...Element) - // RemoveId removes element by id - RemoveId(id string) error - // Diff makes diff with remote container - Diff(ctx context.Context, dl Remote) (newIds, changedIds, removedIds []string, err error) - // Elements retrieves all elements in the Diff - Elements() []Element - // Element returns an element by id - Element(id string) (Element, error) - // Ids retrieves ids of all elements in the Diff - Ids() []string - // Hash returns hash of all elements in the diff - Hash() string - // Len returns count of elements in the diff - Len() int - // DiffType returns type of diff - DiffType() spacesyncproto.DiffType -} - // Remote interface for using in the Diff type Remote interface { // Ranges calculates given ranges and return results Ranges(ctx context.Context, ranges []Range, resBuf []RangeResult) (results []RangeResult, err error) } -type diff struct { +// Diff contains elements and can compare it with Remote diff +type Diff struct { sl *skiplist.SkipList divideFactor int compareThreshold int @@ -126,7 +102,7 @@ type diff struct { } // Compare implements skiplist interface -func (d *diff) Compare(lhs, rhs interface{}) int { +func (d *Diff) Compare(lhs, rhs interface{}) int { lhe := lhs.(*element) rhe := rhs.(*element) if lhe.Id == rhe.Id { @@ -145,12 +121,12 @@ func (d *diff) Compare(lhs, rhs interface{}) int { } // CalcScore implements skiplist interface -func (d *diff) CalcScore(key interface{}) float64 { +func (d *Diff) CalcScore(key interface{}) float64 { return 0 } // Set adds or update element in container -func (d *diff) Set(elements ...Element) { +func (d *Diff) Set(elements ...Element) { d.mu.Lock() defer d.mu.Unlock() for _, e := range elements { @@ -163,7 +139,7 @@ func (d *diff) Set(elements ...Element) { d.ranges.recalculateHashes() } -func (d *diff) Ids() (ids []string) { +func (d *Diff) Ids() (ids []string) { d.mu.RLock() defer d.mu.RUnlock() @@ -178,13 +154,13 @@ func (d *diff) Ids() (ids []string) { return } -func (d *diff) Len() int { +func (d *Diff) Len() int { d.mu.RLock() defer d.mu.RUnlock() return d.sl.Len() } -func (d *diff) Elements() (elements []Element) { +func (d *Diff) Elements() (elements []Element) { d.mu.RLock() defer d.mu.RUnlock() @@ -199,7 +175,7 @@ func (d *diff) Elements() (elements []Element) { return } -func (d *diff) Element(id string) (Element, error) { +func (d *Diff) Element(id string) (Element, error) { d.mu.RLock() defer d.mu.RUnlock() el := d.sl.Get(&element{Element: Element{Id: id}, hash: xxhash.Sum64([]byte(id))}) @@ -212,14 +188,14 @@ func (d *diff) Element(id string) (Element, error) { return Element{}, ErrElementNotFound } -func (d *diff) Hash() string { +func (d *Diff) Hash() string { d.mu.RLock() defer d.mu.RUnlock() return hex.EncodeToString(d.ranges.hash()) } // RemoveId removes element by id -func (d *diff) RemoveId(id string) error { +func (d *Diff) RemoveId(id string) error { d.mu.Lock() defer d.mu.Unlock() hash := xxhash.Sum64([]byte(id)) @@ -234,7 +210,7 @@ func (d *diff) RemoveId(id string) error { return nil } -func (d *diff) getRange(r Range) (rr RangeResult) { +func (d *Diff) getRange(r Range) (rr RangeResult) { rng := d.ranges.getRange(r.From, r.To) // if we have the division for this range if rng != nil { @@ -257,7 +233,7 @@ func (d *diff) getRange(r Range) (rr RangeResult) { } // Ranges calculates given ranges and return results -func (d *diff) Ranges(ctx context.Context, ranges []Range, resBuf []RangeResult) (results []RangeResult, err error) { +func (d *Diff) Ranges(ctx context.Context, ranges []Range, resBuf []RangeResult) (results []RangeResult, err error) { d.mu.RLock() defer d.mu.RUnlock() @@ -277,12 +253,8 @@ type diffCtx struct { var errMismatched = errors.New("query and results mismatched") -func (d *diff) DiffType() spacesyncproto.DiffType { - return spacesyncproto.DiffType_Precalculated -} - // Diff makes diff with remote container -func (d *diff) Diff(ctx context.Context, dl Remote) (newIds, changedIds, removedIds []string, err error) { +func (d *Diff) Diff(ctx context.Context, dl Remote) (newIds, changedIds, removedIds []string, err error) { dctx := &diffCtx{} dctx.toSend = append(dctx.toSend, Range{ From: 0, @@ -314,7 +286,7 @@ func (d *diff) Diff(ctx context.Context, dl Remote) (newIds, changedIds, removed return dctx.newIds, dctx.changedIds, dctx.removedIds, nil } -func (d *diff) compareResults(dctx *diffCtx, r Range, myRes, otherRes RangeResult) { +func (d *Diff) compareResults(dctx *diffCtx, r Range, myRes, otherRes RangeResult) { // both hash equals - do nothing if bytes.Equal(myRes.Hash, otherRes.Hash) { return @@ -343,7 +315,7 @@ func (d *diff) compareResults(dctx *diffCtx, r Range, myRes, otherRes RangeResul return } -func (d *diff) compareElements(dctx *diffCtx, my, other []Element) { +func (d *Diff) compareElements(dctx *diffCtx, my, other []Element) { find := func(list []Element, targetEl Element) (has, eq bool) { for _, el := range list { if el.Id == targetEl.Id { diff --git a/app/ldiff/diffcontainer.go b/app/ldiff/diffcontainer.go deleted file mode 100644 index d0f2742f..00000000 --- a/app/ldiff/diffcontainer.go +++ /dev/null @@ -1,41 +0,0 @@ -package ldiff - -import "context" - -type RemoteTypeChecker interface { - DiffTypeCheck(ctx context.Context, diffContainer DiffContainer) (needsSync bool, diff Diff, err error) -} - -type DiffContainer interface { - DiffTypeCheck(ctx context.Context, typeChecker RemoteTypeChecker) (needsSync bool, diff Diff, err error) - PrecalculatedDiff() Diff - Set(elements ...Element) - RemoveId(id string) error -} - -type diffContainer struct { - precalculated *diff -} - -func (d *diffContainer) PrecalculatedDiff() Diff { - return d.precalculated -} - -func (d *diffContainer) Set(elements ...Element) { - d.precalculated.Set(elements...) -} - -func (d *diffContainer) RemoveId(id string) error { - return d.precalculated.RemoveId(id) -} - -func (d *diffContainer) DiffTypeCheck(ctx context.Context, typeChecker RemoteTypeChecker) (needsSync bool, diff Diff, err error) { - return typeChecker.DiffTypeCheck(ctx, d) -} - -func NewDiffContainer(divideFactor, compareThreshold int) DiffContainer { - newDiff := newDiff(divideFactor, compareThreshold) - return &diffContainer{ - precalculated: newDiff, - } -} diff --git a/app/ldiff/mock_ldiff/mock_ldiff.go b/app/ldiff/mock_ldiff/mock_ldiff.go index 5ff19316..a9edaaf8 100644 --- a/app/ldiff/mock_ldiff/mock_ldiff.go +++ b/app/ldiff/mock_ldiff/mock_ldiff.go @@ -249,22 +249,6 @@ func (m *MockDiffContainer) EXPECT() *MockDiffContainerMockRecorder { return m.recorder } -// DiffTypeCheck mocks base method. -func (m *MockDiffContainer) DiffTypeCheck(arg0 context.Context, arg1 ldiff.RemoteTypeChecker) (bool, ldiff.Diff, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DiffTypeCheck", arg0, arg1) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(ldiff.Diff) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 -} - -// DiffTypeCheck indicates an expected call of DiffTypeCheck. -func (mr *MockDiffContainerMockRecorder) DiffTypeCheck(arg0, arg1 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiffTypeCheck", reflect.TypeOf((*MockDiffContainer)(nil).DiffTypeCheck), arg0, arg1) -} - // PrecalculatedDiff mocks base method. func (m *MockDiffContainer) PrecalculatedDiff() ldiff.Diff { m.ctrl.T.Helper() diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index c1994d31..c6936983 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -18,7 +18,6 @@ import ( "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/net/peer" - "github.com/anyproto/any-sync/net/rpc/rpcerr" "github.com/anyproto/any-sync/util/slice" ) @@ -33,7 +32,7 @@ const logPeriodSecs = 200 func newDiffSyncer(hs *headSync) DiffSyncer { return &diffSyncer{ - diffContainer: hs.diffContainer, + diff: hs.diff, spaceId: hs.spaceId, storage: hs.storage, peerManager: hs.peerManager, @@ -48,7 +47,7 @@ func newDiffSyncer(hs *headSync) DiffSyncer { type diffSyncer struct { spaceId string - diffContainer ldiff.DiffContainer + diff ldiff.Diff peerManager peermanager.PeerManager treeManager treemanager.TreeManager treeSyncer treesyncer.TreeSyncer @@ -66,9 +65,9 @@ func (d *diffSyncer) Init() { func (d *diffSyncer) RemoveObjects(ids []string) { for _, id := range ids { - _ = d.diffContainer.RemoveId(id) + _ = d.diff.RemoveId(id) } - if err := d.storage.WriteSpaceHash(d.diffContainer.PrecalculatedDiff().Hash()); err != nil { + if err := d.storage.WriteSpaceHash(d.diff.Hash()); err != nil { d.log.Error("can't write space hash", zap.Error(err)) } } @@ -77,11 +76,11 @@ func (d *diffSyncer) UpdateHeads(id string, heads []string) { if d.deletionState.Exists(id) { return } - d.diffContainer.Set(ldiff.Element{ + d.diff.Set(ldiff.Element{ Id: id, Head: concatStrings(heads), }) - if err := d.storage.WriteSpaceHash(d.diffContainer.PrecalculatedDiff().Hash()); err != nil { + if err := d.storage.WriteSpaceHash(d.diff.Hash()); err != nil { d.log.Error("can't write space hash", zap.Error(err)) } } @@ -120,25 +119,9 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) defer p.ReleaseDrpcConn(conn) var ( - cl = d.clientFactory.Client(conn) - rdiff = NewRemoteDiff(d.spaceId, cl) syncAclId = d.syncAcl.Id() newIds, changedIds, removedIds []string ) - // getting correct diff and checking if we need to continue sync - // we do this through diffContainer for the sake of testing - needsSync, diff, err := d.diffContainer.DiffTypeCheck(ctx, rdiff) - err = rpcerr.Unwrap(err) - if err != nil { - return d.onDiffError(ctx, p, cl, err) - } - if needsSync { - newIds, changedIds, removedIds, err = diff.Diff(ctx, rdiff) - err = rpcerr.Unwrap(err) - if err != nil { - return d.onDiffError(ctx, p, cl, err) - } - } totalLen := len(newIds) + len(changedIds) + len(removedIds) // not syncing ids which were removed through settings document diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index 54de5682..ba34c246 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -44,13 +44,14 @@ type HeadSync interface { RemoveObjects(ids []string) } + type headSync struct { spaceId string syncPeriod int periodicSync periodicsync.PeriodicSync storage spacestorage.SpaceStorage - diffContainer ldiff.DiffContainer + diff ldiff.Diff log logger.CtxLogger syncer DiffSyncer configuration nodeconf.NodeConf @@ -76,7 +77,7 @@ func (h *headSync) Init(a *app.App) (err error) { h.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) h.log = log.With(zap.String("spaceId", h.spaceId)) h.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) - h.diffContainer = ldiff.NewDiffContainer(32, 256) + h.diff = *ldiff.NewDiff(32, 256) h.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager) h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider) h.treeSyncer = a.MustComponent(treesyncer.CName).(treesyncer.TreeSyncer) @@ -108,7 +109,7 @@ func (h *headSync) Run(ctx context.Context) (err error) { func (h *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { if req.DiffType == spacesyncproto.DiffType_Precalculated { - return HandleRangeRequest(ctx, h.diffContainer.PrecalculatedDiff(), req) + return HandleRangeRequest(ctx, h.diff, req) } else { err = fmt.Errorf("Unexpected DiffType: %s", req.DiffType.String()) return @@ -120,7 +121,7 @@ func (h *headSync) UpdateHeads(id string, heads []string) { } func (h *headSync) AllIds() []string { - return h.diffContainer.PrecalculatedDiff().Ids() + return h.diff.Ids() } func (h *headSync) ExternalIds() []string { @@ -132,7 +133,7 @@ func (h *headSync) ExternalIds() []string { } func (h *headSync) DebugAllHeads() (res []TreeHeads) { - els := h.diffContainer.PrecalculatedDiff().Elements() + els := h.diff.Elements() for _, el := range els { idHead := TreeHeads{ Id: el.Id, @@ -176,8 +177,8 @@ func (h *headSync) fillDiff(objectIds []string) { Head: h.syncAcl.Head().Id, }) log.Debug("setting acl", zap.String("aclId", h.syncAcl.Id()), zap.String("headId", h.syncAcl.Head().Id)) - h.diffContainer.Set(els...) - if err := h.storage.WriteSpaceHash(h.diffContainer.PrecalculatedDiff().Hash()); err != nil { + h.diff.Set(els...) + if err := h.storage.WriteSpaceHash(h.diff.Hash()); err != nil { h.log.Error("can't write space hash", zap.Error(err)) } } diff --git a/commonspace/headsync/remotediff.go b/commonspace/headsync/remotediff.go index 605b7387..4ef41afa 100644 --- a/commonspace/headsync/remotediff.go +++ b/commonspace/headsync/remotediff.go @@ -1,11 +1,7 @@ package headsync import ( - "bytes" "context" - "encoding/hex" - "fmt" - "math" "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/commonspace/spacesyncproto" @@ -16,7 +12,6 @@ type Client interface { } type RemoteDiff interface { - ldiff.RemoteTypeChecker ldiff.Remote } @@ -33,40 +28,6 @@ type remote struct { diffType spacesyncproto.DiffType } -// DiffTypeCheck checks which type of diff should we use -func (r *remote) DiffTypeCheck(ctx context.Context, diffContainer ldiff.DiffContainer) (needsSync bool, diff ldiff.Diff, err error) { - req := &spacesyncproto.HeadSyncRequest{ - SpaceId: r.spaceId, - DiffType: spacesyncproto.DiffType_Precalculated, - Ranges: []*spacesyncproto.HeadSyncRange{{From: 0, To: math.MaxUint64}}, - } - resp, err := r.client.HeadSync(ctx, req) - if err != nil { - return - } - needsSync = true - checkHash := func(diff ldiff.Diff) (bool, error) { - hashB, err := hex.DecodeString(diff.Hash()) - if err != nil { - return false, err - } - if len(resp.Results) != 0 && bytes.Equal(hashB, resp.Results[0].Hash) { - return false, nil - } - return true, nil - } - r.diffType = resp.DiffType - switch resp.DiffType { - case spacesyncproto.DiffType_Precalculated: - diff = diffContainer.PrecalculatedDiff() - needsSync, err = checkHash(diff) - case spacesyncproto.DiffType_Initial: - err = fmt.Errorf("Unexpected DiffType: %s", req.DiffType.String()) - return - } - return -} - func (r *remote) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf []ldiff.RangeResult) (results []ldiff.RangeResult, err error) { results = resBuf[:0] pbRanges := make([]*spacesyncproto.HeadSyncRange, 0, len(ranges)) @@ -143,6 +104,8 @@ func HandleRangeRequest(ctx context.Context, d ldiff.Diff, req *spacesyncproto.H Count: uint32(rangeRes.Count), }) } - resp.DiffType = d.DiffType() + + // resp.DiffType = d.DiffType() + return }