mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-10 18:10:54 +09:00
Merge branch 'main' into GO-3690-new-sync-protocol-intermediate
This commit is contained in:
commit
5302469a5c
14 changed files with 160 additions and 61 deletions
|
@ -90,6 +90,21 @@ func (mr *MockSyncTreeMockRecorder) AddRawChanges(arg0, arg1 any) *gomock.Call {
|
|||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChanges", reflect.TypeOf((*MockSyncTree)(nil).AddRawChanges), arg0, arg1)
|
||||
}
|
||||
|
||||
// AddRawChangesFromPeer mocks base method.
|
||||
func (m *MockSyncTree) AddRawChangesFromPeer(arg0 context.Context, arg1 string, arg2 objecttree.RawChangesPayload) (objecttree.AddResult, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "AddRawChangesFromPeer", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(objecttree.AddResult)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// AddRawChangesFromPeer indicates an expected call of AddRawChangesFromPeer.
|
||||
func (mr *MockSyncTreeMockRecorder) AddRawChangesFromPeer(arg0, arg1, arg2 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChangesFromPeer", reflect.TypeOf((*MockSyncTree)(nil).AddRawChangesFromPeer), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// ChangeInfo mocks base method.
|
||||
func (m *MockSyncTree) ChangeInfo() *treechangeproto.TreeChangeInfo {
|
||||
m.ctrl.T.Helper()
|
||||
|
|
|
@ -4,6 +4,7 @@ package synctree
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -19,6 +20,7 @@ import (
|
|||
"github.com/anyproto/any-sync/net/peer"
|
||||
"github.com/anyproto/any-sync/net/secureservice"
|
||||
"github.com/anyproto/any-sync/nodeconf"
|
||||
"github.com/anyproto/any-sync/util/slice"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -34,8 +36,13 @@ type ListenerSetter interface {
|
|||
SetListener(listener updatelistener.UpdateListener)
|
||||
}
|
||||
|
||||
type SyncTree interface {
|
||||
type peerSendableObjectTree interface {
|
||||
objecttree.ObjectTree
|
||||
AddRawChangesFromPeer(ctx context.Context, peerId string, changesPayload objecttree.RawChangesPayload) (res objecttree.AddResult, err error)
|
||||
}
|
||||
|
||||
type SyncTree interface {
|
||||
peerSendableObjectTree
|
||||
synchandler.SyncHandler
|
||||
ListenerSetter
|
||||
SyncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
||||
|
@ -79,13 +86,13 @@ type BuildDeps struct {
|
|||
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
|
||||
var (
|
||||
remoteGetter = treeRemoteGetter{treeId: id, deps: deps}
|
||||
isRemote bool
|
||||
peerId string
|
||||
)
|
||||
deps.TreeStorage, isRemote, err = remoteGetter.getTree(ctx)
|
||||
deps.TreeStorage, peerId, err = remoteGetter.getTree(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return buildSyncTree(ctx, isRemote, deps)
|
||||
return buildSyncTree(ctx, peerId, deps)
|
||||
}
|
||||
|
||||
func PutSyncTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, deps BuildDeps) (t SyncTree, err error) {
|
||||
|
@ -93,10 +100,10 @@ func PutSyncTree(ctx context.Context, payload treestorage.TreeStorageCreatePaylo
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
return buildSyncTree(ctx, true, deps)
|
||||
return buildSyncTree(ctx, "", deps)
|
||||
}
|
||||
|
||||
func buildSyncTree(ctx context.Context, sendUpdate bool, deps BuildDeps) (t SyncTree, err error) {
|
||||
func buildSyncTree(ctx context.Context, peerId string, deps BuildDeps) (t SyncTree, err error) {
|
||||
objTree, err := deps.BuildObjectTree(deps.TreeStorage, deps.AclList)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -118,10 +125,11 @@ func buildSyncTree(ctx context.Context, sendUpdate bool, deps BuildDeps) (t Sync
|
|||
syncTree.Unlock()
|
||||
|
||||
// don't send updates for empty derived trees, because they won't be accepted
|
||||
if sendUpdate && !objecttree.IsEmptyDerivedTree(objTree) {
|
||||
if peerId != "" && !objecttree.IsEmptyDerivedTree(objTree) {
|
||||
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
|
||||
// send to everybody, because everybody should know that the node or client got new tree
|
||||
syncTree.syncClient.Broadcast(headUpdate)
|
||||
deps.SyncStatus.ObjectReceive(peerId, syncTree.Id(), syncTree.Heads())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -162,6 +170,38 @@ func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableCh
|
|||
return
|
||||
}
|
||||
|
||||
func (s *syncTree) hasHeads(ot objecttree.ObjectTree, heads []string) bool {
|
||||
return slice.UnsortedEquals(ot.Heads(), heads) || ot.HasChanges(heads...)
|
||||
}
|
||||
|
||||
func (s *syncTree) AddRawChangesFromPeer(ctx context.Context, peerId string, changesPayload objecttree.RawChangesPayload) (res objecttree.AddResult, err error) {
|
||||
if s.hasHeads(s, changesPayload.NewHeads) {
|
||||
s.syncStatus.HeadsApply(peerId, s.Id(), s.Heads(), true)
|
||||
return objecttree.AddResult{
|
||||
OldHeads: changesPayload.NewHeads,
|
||||
Heads: changesPayload.NewHeads,
|
||||
Mode: objecttree.Nothing,
|
||||
}, nil
|
||||
}
|
||||
prevHeads := s.Heads()
|
||||
res, err = s.AddRawChanges(ctx, changesPayload)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
curHeads := s.Heads()
|
||||
allAdded := true
|
||||
for _, head := range changesPayload.NewHeads {
|
||||
if !slices.Contains(curHeads, head) {
|
||||
allAdded = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !slice.UnsortedEquals(prevHeads, curHeads) {
|
||||
s.syncStatus.HeadsApply(peerId, s.Id(), curHeads, allAdded)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree.RawChangesPayload) (res objecttree.AddResult, err error) {
|
||||
if err = s.checkAlive(); err != nil {
|
||||
return
|
||||
|
|
|
@ -2,6 +2,11 @@ package synctree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
|
||||
|
@ -11,9 +16,6 @@ import (
|
|||
"github.com/anyproto/any-sync/commonspace/objectsync"
|
||||
"github.com/anyproto/any-sync/commonspace/syncstatus"
|
||||
"github.com/anyproto/any-sync/nodeconf"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type syncTreeMatcher struct {
|
||||
|
@ -58,7 +60,7 @@ func Test_BuildSyncTree(t *testing.T) {
|
|||
}
|
||||
|
||||
headUpdate := &treechangeproto.TreeSyncMessage{}
|
||||
t.Run("AddRawChanges update", func(t *testing.T) {
|
||||
t.Run("AddRawChangesFromPeer update", func(t *testing.T) {
|
||||
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
|
||||
payload := objecttree.RawChangesPayload{
|
||||
NewHeads: nil,
|
||||
|
@ -68,18 +70,20 @@ func Test_BuildSyncTree(t *testing.T) {
|
|||
Added: changes,
|
||||
Mode: objecttree.Append,
|
||||
}
|
||||
objTreeMock.EXPECT().Heads().AnyTimes().Return([]string{"headId"})
|
||||
objTreeMock.EXPECT().HasChanges(gomock.Any()).AnyTimes().Return(false)
|
||||
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(payload)).
|
||||
Return(expectedRes, nil)
|
||||
updateListenerMock.EXPECT().Update(tr)
|
||||
|
||||
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
|
||||
syncClientMock.EXPECT().Broadcast(gomock.Eq(headUpdate))
|
||||
res, err := tr.AddRawChanges(ctx, payload)
|
||||
res, err := tr.AddRawChangesFromPeer(ctx, "peerId", payload)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedRes, res)
|
||||
})
|
||||
|
||||
t.Run("AddRawChanges rebuild", func(t *testing.T) {
|
||||
t.Run("AddRawChangesFromPeer rebuild", func(t *testing.T) {
|
||||
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
|
||||
payload := objecttree.RawChangesPayload{
|
||||
NewHeads: nil,
|
||||
|
@ -90,18 +94,19 @@ func Test_BuildSyncTree(t *testing.T) {
|
|||
Added: changes,
|
||||
Mode: objecttree.Rebuild,
|
||||
}
|
||||
objTreeMock.EXPECT().Heads().AnyTimes().Return([]string{"headId"})
|
||||
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(payload)).
|
||||
Return(expectedRes, nil)
|
||||
updateListenerMock.EXPECT().Rebuild(tr)
|
||||
|
||||
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
|
||||
syncClientMock.EXPECT().Broadcast(gomock.Eq(headUpdate))
|
||||
res, err := tr.AddRawChanges(ctx, payload)
|
||||
res, err := tr.AddRawChangesFromPeer(ctx, "peerId", payload)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedRes, res)
|
||||
})
|
||||
|
||||
t.Run("AddRawChanges nothing", func(t *testing.T) {
|
||||
t.Run("AddRawChangesFromPeer nothing", func(t *testing.T) {
|
||||
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
|
||||
payload := objecttree.RawChangesPayload{
|
||||
NewHeads: nil,
|
||||
|
@ -111,10 +116,11 @@ func Test_BuildSyncTree(t *testing.T) {
|
|||
Added: changes,
|
||||
Mode: objecttree.Nothing,
|
||||
}
|
||||
objTreeMock.EXPECT().Heads().AnyTimes().Return([]string{"headId"})
|
||||
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(payload)).
|
||||
Return(expectedRes, nil)
|
||||
|
||||
res, err := tr.AddRawChanges(ctx, payload)
|
||||
res, err := tr.AddRawChangesFromPeer(ctx, "peerId", payload)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedRes, res)
|
||||
})
|
||||
|
|
|
@ -36,7 +36,7 @@ type syncTreeHandler struct {
|
|||
|
||||
const maxQueueSize = 5
|
||||
|
||||
func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler {
|
||||
func newSyncTreeHandler(spaceId string, objTree peerSendableObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler {
|
||||
return &syncTreeHandler{
|
||||
objTree: objTree,
|
||||
syncProtocol: newTreeSyncProtocol(spaceId, objTree, syncClient),
|
||||
|
|
|
@ -5,13 +5,15 @@ import (
|
|||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anyproto/any-sync/commonspace/syncstatus"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
)
|
||||
|
||||
type testObjTreeMock struct {
|
||||
|
@ -19,6 +21,10 @@ type testObjTreeMock struct {
|
|||
m sync.RWMutex
|
||||
}
|
||||
|
||||
func (t *testObjTreeMock) AddRawChangesFromPeer(ctx context.Context, peerId string, changesPayload objecttree.RawChangesPayload) (res objecttree.AddResult, err error) {
|
||||
return t.MockObjectTree.AddRawChanges(ctx, changesPayload)
|
||||
}
|
||||
|
||||
func newTestObjMock(mockTree *mock_objecttree.MockObjectTree) *testObjTreeMock {
|
||||
return &testObjTreeMock{
|
||||
MockObjectTree: mockTree,
|
||||
|
|
|
@ -60,16 +60,20 @@ func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg *
|
|||
return
|
||||
}
|
||||
|
||||
func (t treeRemoteGetter) treeRequestLoop(ctx context.Context) (msg *treechangeproto.TreeSyncMessage, err error) {
|
||||
func (t treeRemoteGetter) treeRequestLoop(ctx context.Context) (msg *treechangeproto.TreeSyncMessage, peerId string, err error) {
|
||||
availablePeers, err := t.getPeers(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// in future we will try to load from different peers
|
||||
return t.treeRequest(ctx, availablePeers[0])
|
||||
res, err := t.treeRequest(ctx, availablePeers[0])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return res, availablePeers[0], nil
|
||||
}
|
||||
|
||||
func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, isRemote bool, err error) {
|
||||
func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, peerId string, err error) {
|
||||
treeStorage, err = t.deps.SpaceStorage.TreeStorage(t.treeId)
|
||||
if err == nil {
|
||||
return
|
||||
|
@ -87,8 +91,7 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.
|
|||
return
|
||||
}
|
||||
|
||||
isRemote = true
|
||||
resp, err := t.treeRequestLoop(ctx)
|
||||
resp, peerId, err := t.treeRequestLoop(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ func TestTreeRemoteGetter(t *testing.T) {
|
|||
fx.peerGetterMock.EXPECT().GetResponsiblePeers(tCtx).Return([]peer.Peer{mockPeer}, nil)
|
||||
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
|
||||
fx.syncClientMock.EXPECT().SendRequest(tCtx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil)
|
||||
resp, err := fx.treeGetter.treeRequestLoop(tCtx)
|
||||
resp, _, err := fx.treeGetter.treeRequestLoop(tCtx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "id", resp.RootChange.Id)
|
||||
})
|
||||
|
@ -84,7 +84,7 @@ func TestTreeRemoteGetter(t *testing.T) {
|
|||
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
|
||||
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
|
||||
fx.syncClientMock.EXPECT().SendRequest(tCtx, peerId, fx.treeGetter.treeId, treeRequest).AnyTimes().Return(nil, fmt.Errorf("some"))
|
||||
_, err := fx.treeGetter.treeRequestLoop(tCtx)
|
||||
_, _, err := fx.treeGetter.treeRequestLoop(tCtx)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -3,11 +3,12 @@ package synctree
|
|||
import (
|
||||
"context"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/anyproto/any-sync/app/logger"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anyproto/any-sync/util/slice"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type TreeSyncProtocol interface {
|
||||
|
@ -19,11 +20,11 @@ type TreeSyncProtocol interface {
|
|||
type treeSyncProtocol struct {
|
||||
log logger.CtxLogger
|
||||
spaceId string
|
||||
objTree objecttree.ObjectTree
|
||||
objTree peerSendableObjectTree
|
||||
reqFactory RequestFactory
|
||||
}
|
||||
|
||||
func newTreeSyncProtocol(spaceId string, objTree objecttree.ObjectTree, reqFactory RequestFactory) *treeSyncProtocol {
|
||||
func newTreeSyncProtocol(spaceId string, objTree peerSendableObjectTree, reqFactory RequestFactory) *treeSyncProtocol {
|
||||
return &treeSyncProtocol{
|
||||
log: log.With(zap.String("spaceId", spaceId), zap.String("treeId", objTree.Id())),
|
||||
spaceId: spaceId,
|
||||
|
@ -70,11 +71,7 @@ func (t *treeSyncProtocol) HeadUpdate(ctx context.Context, senderId string, upda
|
|||
return
|
||||
}
|
||||
|
||||
if t.hasHeads(objTree, update.Heads) {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
|
||||
_, err = objTree.AddRawChangesFromPeer(ctx, senderId, objecttree.RawChangesPayload{
|
||||
NewHeads: update.Heads,
|
||||
RawChanges: update.Changes,
|
||||
})
|
||||
|
@ -109,8 +106,8 @@ func (t *treeSyncProtocol) FullSyncRequest(ctx context.Context, senderId string,
|
|||
}
|
||||
}()
|
||||
|
||||
if len(request.Changes) != 0 && !t.hasHeads(objTree, request.Heads) {
|
||||
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
|
||||
if len(request.Changes) != 0 {
|
||||
_, err = objTree.AddRawChangesFromPeer(ctx, senderId, objecttree.RawChangesPayload{
|
||||
NewHeads: request.Heads,
|
||||
RawChanges: request.Changes,
|
||||
})
|
||||
|
@ -137,11 +134,8 @@ func (t *treeSyncProtocol) FullSyncResponse(ctx context.Context, senderId string
|
|||
log.DebugCtx(ctx, "full sync response succeeded")
|
||||
}
|
||||
}()
|
||||
if t.hasHeads(objTree, response.Heads) {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
|
||||
_, err = objTree.AddRawChangesFromPeer(ctx, senderId, objecttree.RawChangesPayload{
|
||||
NewHeads: response.Heads,
|
||||
RawChanges: response.Changes,
|
||||
})
|
||||
|
|
|
@ -3,14 +3,16 @@ package synctree
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
|
||||
"github.com/anyproto/any-sync/app/logger"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type treeSyncProtocolFixture struct {
|
||||
|
@ -69,8 +71,7 @@ func TestTreeSyncProtocol_HeadUpdate(t *testing.T) {
|
|||
SnapshotPath: []string{"h1"},
|
||||
}
|
||||
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
|
||||
fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).Times(2)
|
||||
fx.objectTreeMock.EXPECT().HasChanges(gomock.Eq([]string{"h1"})).Return(false)
|
||||
fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).AnyTimes()
|
||||
fx.objectTreeMock.EXPECT().
|
||||
AddRawChanges(gomock.Any(), gomock.Eq(objecttree.RawChangesPayload{
|
||||
NewHeads: []string{"h1"},
|
||||
|
@ -96,6 +97,12 @@ func TestTreeSyncProtocol_HeadUpdate(t *testing.T) {
|
|||
|
||||
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
|
||||
fx.objectTreeMock.EXPECT().Heads().Return([]string{"h1"}).AnyTimes()
|
||||
fx.objectTreeMock.EXPECT().
|
||||
AddRawChanges(gomock.Any(), gomock.Eq(objecttree.RawChangesPayload{
|
||||
NewHeads: []string{"h1"},
|
||||
RawChanges: []*treechangeproto.RawTreeChangeWithId{chWithId},
|
||||
})).
|
||||
Return(objecttree.AddResult{}, nil)
|
||||
|
||||
res, err := fx.syncProtocol.HeadUpdate(ctx, fx.senderId, headUpdate)
|
||||
require.NoError(t, err)
|
||||
|
@ -161,7 +168,7 @@ func TestTreeSyncProtocol_FullSyncRequest(t *testing.T) {
|
|||
}
|
||||
|
||||
fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).AnyTimes()
|
||||
fx.objectTreeMock.EXPECT().HasChanges(gomock.Eq([]string{"h1"})).Return(false)
|
||||
fx.objectTreeMock.EXPECT().HasChanges(gomock.Eq([]string{"h1"})).AnyTimes().Return(false)
|
||||
fx.objectTreeMock.EXPECT().
|
||||
AddRawChanges(gomock.Any(), gomock.Eq(objecttree.RawChangesPayload{
|
||||
NewHeads: []string{"h1"},
|
||||
|
@ -190,6 +197,12 @@ func TestTreeSyncProtocol_FullSyncRequest(t *testing.T) {
|
|||
fx.objectTreeMock.EXPECT().
|
||||
Heads().
|
||||
Return([]string{"h1"}).AnyTimes()
|
||||
fx.objectTreeMock.EXPECT().
|
||||
AddRawChanges(gomock.Any(), gomock.Eq(objecttree.RawChangesPayload{
|
||||
NewHeads: []string{"h1"},
|
||||
RawChanges: []*treechangeproto.RawTreeChangeWithId{chWithId},
|
||||
})).
|
||||
Return(objecttree.AddResult{}, nil)
|
||||
fx.reqFactory.EXPECT().
|
||||
CreateFullSyncResponse(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})).
|
||||
Return(fullResponse, nil)
|
||||
|
@ -228,7 +241,6 @@ func TestTreeSyncProtocol_FullSyncRequest(t *testing.T) {
|
|||
}
|
||||
|
||||
fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).AnyTimes()
|
||||
fx.objectTreeMock.EXPECT().HasChanges(gomock.Eq([]string{"h1"})).Return(false)
|
||||
fx.objectTreeMock.EXPECT().
|
||||
AddRawChanges(gomock.Any(), gomock.Eq(objecttree.RawChangesPayload{
|
||||
NewHeads: []string{"h1"},
|
||||
|
@ -258,9 +270,6 @@ func TestTreeSyncProtocol_FullSyncResponse(t *testing.T) {
|
|||
fx.objectTreeMock.EXPECT().
|
||||
Heads().
|
||||
Return([]string{"h2"}).AnyTimes()
|
||||
fx.objectTreeMock.EXPECT().
|
||||
HasChanges(gomock.Eq([]string{"h1"})).
|
||||
Return(false)
|
||||
fx.objectTreeMock.EXPECT().
|
||||
AddRawChanges(gomock.Any(), gomock.Eq(objecttree.RawChangesPayload{
|
||||
NewHeads: []string{"h1"},
|
||||
|
@ -286,6 +295,12 @@ func TestTreeSyncProtocol_FullSyncResponse(t *testing.T) {
|
|||
fx.objectTreeMock.EXPECT().
|
||||
Heads().
|
||||
Return([]string{"h1"}).AnyTimes()
|
||||
fx.objectTreeMock.EXPECT().
|
||||
AddRawChanges(gomock.Any(), gomock.Eq(objecttree.RawChangesPayload{
|
||||
NewHeads: []string{"h1"},
|
||||
RawChanges: []*treechangeproto.RawTreeChangeWithId{chWithId},
|
||||
})).
|
||||
Return(objecttree.AddResult{}, nil)
|
||||
|
||||
err := fx.syncProtocol.FullSyncResponse(ctx, fx.senderId, fullSyncResponse)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -3,6 +3,16 @@ package synctree
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
|
||||
|
@ -12,14 +22,6 @@ import (
|
|||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anyproto/any-sync/commonspace/syncstatus"
|
||||
"github.com/anyproto/any-sync/net/peer"
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/exp/slices"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// protocolMsg is a message used in sync protocol tests
|
||||
|
@ -317,6 +319,16 @@ func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.Ra
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (b *broadcastTree) AddRawChangesFromPeer(ctx context.Context, peerId string, changes objecttree.RawChangesPayload) (objecttree.AddResult, error) {
|
||||
res, err := b.ObjectTree.AddRawChanges(ctx, changes)
|
||||
if err != nil {
|
||||
return objecttree.AddResult{}, err
|
||||
}
|
||||
upd := b.SyncClient.CreateHeadUpdate(b.ObjectTree, res.Added)
|
||||
b.SyncClient.Broadcast(upd)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage {
|
||||
changeCreator := objecttree.NewMockChangeCreator()
|
||||
st := changeCreator.CreateNewTreeStorage(treeId, aclList.Head().Id, false)
|
||||
|
|
|
@ -2,6 +2,7 @@ package syncstatus
|
|||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
)
|
||||
|
||||
|
@ -22,6 +23,12 @@ func (n *noOpSyncStatus) Name() (name string) {
|
|||
func (n *noOpSyncStatus) HeadsChange(treeId string, heads []string) {
|
||||
}
|
||||
|
||||
func (n *noOpSyncStatus) ObjectReceive(senderId, treeId string, heads []string) {
|
||||
}
|
||||
|
||||
func (n *noOpSyncStatus) HeadsApply(senderId, treeId string, heads []string, allAdded bool) {
|
||||
}
|
||||
|
||||
func (n *noOpSyncStatus) HeadsReceive(senderId, treeId string, heads []string) {
|
||||
}
|
||||
|
||||
|
|
|
@ -11,4 +11,6 @@ type StatusUpdater interface {
|
|||
|
||||
HeadsChange(treeId string, heads []string)
|
||||
HeadsReceive(senderId, treeId string, heads []string)
|
||||
ObjectReceive(senderId, treeId string, heads []string)
|
||||
HeadsApply(senderId, treeId string, heads []string, allAdded bool)
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ func New() AnyNsClientService {
|
|||
func (s *service) doClient(ctx context.Context, fn func(cl nsp.DRPCAnynsClient) error) error {
|
||||
if len(s.nodeconf.NamingNodePeers()) == 0 {
|
||||
log.Error("no namingNode peers configured")
|
||||
return errors.New("no namingNode peers configured")
|
||||
return errors.New("no namingNode peers configured. Node config ID: " + s.nodeconf.Id())
|
||||
}
|
||||
|
||||
// it will try to connect to the Naming Node
|
||||
|
|
|
@ -57,8 +57,7 @@ func New() AnyPpClientService {
|
|||
func (s *service) doClient(ctx context.Context, fn func(cl pp.DRPCAnyPaymentProcessingClient) error) error {
|
||||
if len(s.nodeconf.PaymentProcessingNodePeers()) == 0 {
|
||||
log.Error("no payment processing peers configured")
|
||||
|
||||
return errors.New("no paymentProcessingNode peers configured")
|
||||
return errors.New("no paymentProcessingNode peers configured. Node config ID: " + s.nodeconf.Id())
|
||||
}
|
||||
|
||||
// it will try to connect to the Payment Node
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue