mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 14:07:02 +09:00
Merge branch 'new-sync-protocol-tests' into new-sync-protocol
This commit is contained in:
commit
dad76def2f
15 changed files with 639 additions and 252 deletions
|
@ -73,6 +73,7 @@ func TestSpaceDeleteIds(t *testing.T) {
|
||||||
fx.treeManager.space = spc
|
fx.treeManager.space = spc
|
||||||
err = spc.Init(ctx)
|
err = spc.Init(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
close(fx.treeManager.waitLoad)
|
||||||
|
|
||||||
var ids []string
|
var ids []string
|
||||||
for i := 0; i < totalObjs; i++ {
|
for i := 0; i < totalObjs; i++ {
|
||||||
|
@ -147,6 +148,7 @@ func TestSpaceDeleteIdsIncorrectSnapshot(t *testing.T) {
|
||||||
// adding space to tree manager
|
// adding space to tree manager
|
||||||
fx.treeManager.space = spc
|
fx.treeManager.space = spc
|
||||||
err = spc.Init(ctx)
|
err = spc.Init(ctx)
|
||||||
|
close(fx.treeManager.waitLoad)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
settingsObject := spc.(*space).app.MustComponent(settings.CName).(settings.Settings).SettingsObject()
|
settingsObject := spc.(*space).app.MustComponent(settings.CName).(settings.Settings).SettingsObject()
|
||||||
|
@ -183,10 +185,12 @@ func TestSpaceDeleteIdsIncorrectSnapshot(t *testing.T) {
|
||||||
spc, err = fx.spaceService.NewSpace(ctx, sp)
|
spc, err = fx.spaceService.NewSpace(ctx, sp)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, spc)
|
require.NotNil(t, spc)
|
||||||
|
fx.treeManager.waitLoad = make(chan struct{})
|
||||||
fx.treeManager.space = spc
|
fx.treeManager.space = spc
|
||||||
fx.treeManager.deletedIds = nil
|
fx.treeManager.deletedIds = nil
|
||||||
err = spc.Init(ctx)
|
err = spc.Init(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
close(fx.treeManager.waitLoad)
|
||||||
|
|
||||||
// waiting until everything is deleted
|
// waiting until everything is deleted
|
||||||
time.Sleep(3 * time.Second)
|
time.Sleep(3 * time.Second)
|
||||||
|
@ -230,6 +234,7 @@ func TestSpaceDeleteIdsMarkDeleted(t *testing.T) {
|
||||||
fx.treeManager.space = spc
|
fx.treeManager.space = spc
|
||||||
err = spc.Init(ctx)
|
err = spc.Init(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
close(fx.treeManager.waitLoad)
|
||||||
|
|
||||||
settingsObject := spc.(*space).app.MustComponent(settings.CName).(settings.Settings).SettingsObject()
|
settingsObject := spc.(*space).app.MustComponent(settings.CName).(settings.Settings).SettingsObject()
|
||||||
var ids []string
|
var ids []string
|
||||||
|
@ -259,10 +264,12 @@ func TestSpaceDeleteIdsMarkDeleted(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, spc)
|
require.NotNil(t, spc)
|
||||||
fx.treeManager.space = spc
|
fx.treeManager.space = spc
|
||||||
|
fx.treeManager.waitLoad = make(chan struct{})
|
||||||
fx.treeManager.deletedIds = nil
|
fx.treeManager.deletedIds = nil
|
||||||
fx.treeManager.markedIds = nil
|
fx.treeManager.markedIds = nil
|
||||||
err = spc.Init(ctx)
|
err = spc.Init(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
close(fx.treeManager.waitLoad)
|
||||||
|
|
||||||
// waiting until everything is deleted
|
// waiting until everything is deleted
|
||||||
time.Sleep(3 * time.Second)
|
time.Sleep(3 * time.Second)
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package deletionstate
|
package deletionstate
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/anyproto/any-sync/app/logger"
|
|
||||||
"github.com/anyproto/any-sync/commonspace/spacestorage"
|
"github.com/anyproto/any-sync/commonspace/spacestorage"
|
||||||
"github.com/anyproto/any-sync/commonspace/spacestorage/mock_spacestorage"
|
"github.com/anyproto/any-sync/commonspace/spacestorage/mock_spacestorage"
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
|
@ -19,7 +18,8 @@ type fixture struct {
|
||||||
func newFixture(t *testing.T) *fixture {
|
func newFixture(t *testing.T) *fixture {
|
||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
spaceStorage := mock_spacestorage.NewMockSpaceStorage(ctrl)
|
spaceStorage := mock_spacestorage.NewMockSpaceStorage(ctrl)
|
||||||
delState := New(logger.NewNamed("test"), spaceStorage).(*objectDeletionState)
|
delState := New().(*objectDeletionState)
|
||||||
|
delState.storage = spaceStorage
|
||||||
return &fixture{
|
return &fixture{
|
||||||
ctrl: ctrl,
|
ctrl: ctrl,
|
||||||
delState: delState,
|
delState: delState,
|
||||||
|
|
|
@ -1,7 +1,14 @@
|
||||||
package headsync
|
package headsync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/anyproto/any-sync/app/ldiff"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/object/acl/aclrecordproto"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/object/acl/liststorage/mock_liststorage"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage/mock_treestorage"
|
||||||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||||
"github.com/anyproto/any-sync/net/peer"
|
"github.com/anyproto/any-sync/net/peer"
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
|
@ -11,6 +18,42 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type pushSpaceRequestMatcher struct {
|
||||||
|
spaceId string
|
||||||
|
aclRootId string
|
||||||
|
settingsId string
|
||||||
|
credential []byte
|
||||||
|
spaceHeader *spacesyncproto.RawSpaceHeaderWithId
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPushSpaceRequestMatcher(
|
||||||
|
spaceId string,
|
||||||
|
aclRootId string,
|
||||||
|
settingsId string,
|
||||||
|
credential []byte,
|
||||||
|
spaceHeader *spacesyncproto.RawSpaceHeaderWithId) *pushSpaceRequestMatcher {
|
||||||
|
return &pushSpaceRequestMatcher{
|
||||||
|
spaceId: spaceId,
|
||||||
|
aclRootId: aclRootId,
|
||||||
|
settingsId: settingsId,
|
||||||
|
credential: credential,
|
||||||
|
spaceHeader: spaceHeader,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p pushSpaceRequestMatcher) Matches(x interface{}) bool {
|
||||||
|
res, ok := x.(*spacesyncproto.SpacePushRequest)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Payload.AclPayloadId == p.aclRootId && res.Payload.SpaceHeader == p.spaceHeader && res.Payload.SpaceSettingsPayloadId == p.settingsId && bytes.Equal(p.credential, res.Credential)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p pushSpaceRequestMatcher) String() string {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
type mockPeer struct {
|
type mockPeer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,12 +101,12 @@ func (fx *headSyncFixture) initDiffSyncer(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDiffSyncer(t *testing.T) {
|
func TestDiffSyncer(t *testing.T) {
|
||||||
fx := newHeadSyncFixture(t)
|
|
||||||
fx.initDiffSyncer(t)
|
|
||||||
defer fx.stop()
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
t.Run("diff syncer sync", func(t *testing.T) {
|
t.Run("diff syncer sync", func(t *testing.T) {
|
||||||
|
fx := newHeadSyncFixture(t)
|
||||||
|
fx.initDiffSyncer(t)
|
||||||
|
defer fx.stop()
|
||||||
mPeer := mockPeer{}
|
mPeer := mockPeer{}
|
||||||
fx.peerManagerMock.EXPECT().
|
fx.peerManagerMock.EXPECT().
|
||||||
GetResponsiblePeers(gomock.Any()).
|
GetResponsiblePeers(gomock.Any()).
|
||||||
|
@ -77,220 +120,120 @@ func TestDiffSyncer(t *testing.T) {
|
||||||
fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"changed"}, []string{"new"}).Return(nil)
|
fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"changed"}, []string{"new"}).Return(nil)
|
||||||
require.NoError(t, fx.diffSyncer.Sync(ctx))
|
require.NoError(t, fx.diffSyncer.Sync(ctx))
|
||||||
})
|
})
|
||||||
}
|
|
||||||
|
|
||||||
//
|
t.Run("diff syncer sync conf error", func(t *testing.T) {
|
||||||
//type pushSpaceRequestMatcher struct {
|
fx := newHeadSyncFixture(t)
|
||||||
// spaceId string
|
fx.initDiffSyncer(t)
|
||||||
// aclRootId string
|
defer fx.stop()
|
||||||
// settingsId string
|
ctx := context.Background()
|
||||||
// credential []byte
|
fx.peerManagerMock.EXPECT().
|
||||||
// spaceHeader *spacesyncproto.RawSpaceHeaderWithId
|
GetResponsiblePeers(gomock.Any()).
|
||||||
//}
|
Return(nil, fmt.Errorf("some error"))
|
||||||
//
|
|
||||||
//func (p pushSpaceRequestMatcher) Matches(x interface{}) bool {
|
require.Error(t, fx.diffSyncer.Sync(ctx))
|
||||||
// res, ok := x.(*spacesyncproto.SpacePushRequest)
|
})
|
||||||
// if !ok {
|
|
||||||
// return false
|
t.Run("deletion state remove objects", func(t *testing.T) {
|
||||||
// }
|
fx := newHeadSyncFixture(t)
|
||||||
//
|
fx.initDiffSyncer(t)
|
||||||
// return res.Payload.AclPayloadId == p.aclRootId && res.Payload.SpaceHeader == p.spaceHeader && res.Payload.SpaceSettingsPayloadId == p.settingsId && bytes.Equal(p.credential, res.Credential)
|
defer fx.stop()
|
||||||
//}
|
deletedId := "id"
|
||||||
//
|
fx.deletionStateMock.EXPECT().Exists(deletedId).Return(true)
|
||||||
//func (p pushSpaceRequestMatcher) String() string {
|
|
||||||
// return ""
|
// this should not result in any mock being called
|
||||||
//}
|
fx.diffSyncer.UpdateHeads(deletedId, []string{"someHead"})
|
||||||
//
|
})
|
||||||
//type mockPeer struct{}
|
|
||||||
//
|
t.Run("update heads updates diff", func(t *testing.T) {
|
||||||
//func (m mockPeer) Addr() string {
|
fx := newHeadSyncFixture(t)
|
||||||
// return ""
|
fx.initDiffSyncer(t)
|
||||||
//}
|
defer fx.stop()
|
||||||
//
|
newId := "newId"
|
||||||
//func (m mockPeer) TryClose(objectTTL time.Duration) (res bool, err error) {
|
newHeads := []string{"h1", "h2"}
|
||||||
// return true, m.Close()
|
hash := "hash"
|
||||||
//}
|
fx.diffMock.EXPECT().Set(ldiff.Element{
|
||||||
//
|
Id: newId,
|
||||||
//func (m mockPeer) Id() string {
|
Head: concatStrings(newHeads),
|
||||||
// return "mockId"
|
})
|
||||||
//}
|
fx.diffMock.EXPECT().Hash().Return(hash)
|
||||||
//
|
fx.deletionStateMock.EXPECT().Exists(newId).Return(false)
|
||||||
//func (m mockPeer) LastUsage() time.Time {
|
fx.storageMock.EXPECT().WriteSpaceHash(hash)
|
||||||
// return time.Time{}
|
fx.diffSyncer.UpdateHeads(newId, newHeads)
|
||||||
//}
|
})
|
||||||
//
|
|
||||||
//func (m mockPeer) Secure() sec.SecureConn {
|
t.Run("diff syncer sync space missing", func(t *testing.T) {
|
||||||
// return nil
|
fx := newHeadSyncFixture(t)
|
||||||
//}
|
fx.initDiffSyncer(t)
|
||||||
//
|
defer fx.stop()
|
||||||
//func (m mockPeer) UpdateLastUsage() {
|
aclStorageMock := mock_liststorage.NewMockListStorage(fx.ctrl)
|
||||||
//}
|
settingsStorage := mock_treestorage.NewMockTreeStorage(fx.ctrl)
|
||||||
//
|
settingsId := "settingsId"
|
||||||
//func (m mockPeer) Close() error {
|
aclRootId := "aclRootId"
|
||||||
// return nil
|
aclRoot := &aclrecordproto.RawAclRecordWithId{
|
||||||
//}
|
Id: aclRootId,
|
||||||
//
|
}
|
||||||
//func (m mockPeer) Closed() <-chan struct{} {
|
settingsRoot := &treechangeproto.RawTreeChangeWithId{
|
||||||
// return make(chan struct{})
|
Id: settingsId,
|
||||||
//}
|
}
|
||||||
//
|
spaceHeader := &spacesyncproto.RawSpaceHeaderWithId{}
|
||||||
//func (m mockPeer) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error {
|
spaceSettingsId := "spaceSettingsId"
|
||||||
// return nil
|
credential := []byte("credential")
|
||||||
//}
|
|
||||||
//
|
fx.peerManagerMock.EXPECT().
|
||||||
//func (m mockPeer) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) {
|
GetResponsiblePeers(gomock.Any()).
|
||||||
// return nil, nil
|
Return([]peer.Peer{mockPeer{}}, nil)
|
||||||
//}
|
fx.diffMock.EXPECT().
|
||||||
//
|
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock))).
|
||||||
//func newPushSpaceRequestMatcher(
|
Return(nil, nil, nil, spacesyncproto.ErrSpaceMissing)
|
||||||
// spaceId string,
|
|
||||||
// aclRootId string,
|
fx.storageMock.EXPECT().AclStorage().Return(aclStorageMock, nil)
|
||||||
// settingsId string,
|
fx.storageMock.EXPECT().SpaceHeader().Return(spaceHeader, nil)
|
||||||
// credential []byte,
|
fx.storageMock.EXPECT().SpaceSettingsId().Return(spaceSettingsId)
|
||||||
// spaceHeader *spacesyncproto.RawSpaceHeaderWithId) *pushSpaceRequestMatcher {
|
fx.storageMock.EXPECT().TreeStorage(spaceSettingsId).Return(settingsStorage, nil)
|
||||||
// return &pushSpaceRequestMatcher{
|
|
||||||
// spaceId: spaceId,
|
settingsStorage.EXPECT().Root().Return(settingsRoot, nil)
|
||||||
// aclRootId: aclRootId,
|
aclStorageMock.EXPECT().
|
||||||
// settingsId: settingsId,
|
Root().
|
||||||
// credential: credential,
|
Return(aclRoot, nil)
|
||||||
// spaceHeader: spaceHeader,
|
fx.credentialProviderMock.EXPECT().
|
||||||
// }
|
GetCredential(gomock.Any(), spaceHeader).
|
||||||
//}
|
Return(credential, nil)
|
||||||
//
|
fx.clientMock.EXPECT().
|
||||||
//func TestDiffSyncer_Sync(t *testing.T) {
|
SpacePush(gomock.Any(), newPushSpaceRequestMatcher(fx.spaceState.SpaceId, aclRootId, settingsId, credential, spaceHeader)).
|
||||||
// // setup
|
Return(nil, nil)
|
||||||
// fx := newHeadSyncFixture(t)
|
fx.peerManagerMock.EXPECT().SendPeer(gomock.Any(), "peerId", gomock.Any())
|
||||||
// fx.initDiffSyncer(t)
|
|
||||||
// defer fx.stop()
|
require.NoError(t, fx.diffSyncer.Sync(ctx))
|
||||||
//
|
})
|
||||||
// diffMock := mock_ldiff.NewMockDiff(ctrl)
|
|
||||||
// peerManagerMock := mock_peermanager.NewMockPeerManager(ctrl)
|
t.Run("diff syncer sync unexpected", func(t *testing.T) {
|
||||||
// cacheMock := mock_treemanager.NewMockTreeManager(ctrl)
|
fx := newHeadSyncFixture(t)
|
||||||
// stMock := mock_spacestorage.NewMockSpaceStorage(ctrl)
|
fx.initDiffSyncer(t)
|
||||||
// clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl)
|
defer fx.stop()
|
||||||
// factory := spacesyncproto.ClientFactoryFunc(func(cc drpc.Conn) spacesyncproto.DRPCSpaceSyncClient {
|
fx.peerManagerMock.EXPECT().
|
||||||
// return clientMock
|
GetResponsiblePeers(gomock.Any()).
|
||||||
// })
|
Return([]peer.Peer{mockPeer{}}, nil)
|
||||||
// treeSyncerMock := mock_treemanager.NewMockTreeSyncer(ctrl)
|
fx.diffMock.EXPECT().
|
||||||
// credentialProvider := mock_credentialprovider.NewMockCredentialProvider(ctrl)
|
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock))).
|
||||||
// delState := mock_settingsstate.NewMockObjectDeletionState(ctrl)
|
Return(nil, nil, nil, spacesyncproto.ErrUnexpected)
|
||||||
// spaceId := "spaceId"
|
|
||||||
// aclRootId := "aclRootId"
|
require.NoError(t, fx.diffSyncer.Sync(ctx))
|
||||||
// l := logger.NewNamed(spaceId)
|
})
|
||||||
// diffSyncer := newDiffSyncer(spaceId, diffMock, peerManagerMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), credentialProvider, l)
|
|
||||||
// delState.EXPECT().AddObserver(gomock.Any())
|
t.Run("diff syncer sync space is deleted error", func(t *testing.T) {
|
||||||
// cacheMock.EXPECT().NewTreeSyncer(spaceId, gomock.Any()).Return(treeSyncerMock)
|
fx := newHeadSyncFixture(t)
|
||||||
// diffSyncer.Init(delState)
|
fx.initDiffSyncer(t)
|
||||||
//
|
defer fx.stop()
|
||||||
// t.Run("diff syncer sync", func(t *testing.T) {
|
mPeer := mockPeer{}
|
||||||
// mPeer := mockPeer{}
|
fx.peerManagerMock.EXPECT().
|
||||||
// peerManagerMock.EXPECT().
|
GetResponsiblePeers(gomock.Any()).
|
||||||
// GetResponsiblePeers(gomock.Any()).
|
Return([]peer.Peer{mPeer}, nil)
|
||||||
// Return([]peer.Peer{mPeer}, nil)
|
fx.diffMock.EXPECT().
|
||||||
// diffMock.EXPECT().
|
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock))).
|
||||||
// Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
|
Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted)
|
||||||
// Return([]string{"new"}, []string{"changed"}, nil, nil)
|
fx.storageMock.EXPECT().SpaceSettingsId().Return("settingsId")
|
||||||
// delState.EXPECT().Filter([]string{"new"}).Return([]string{"new"}).Times(1)
|
fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"settingsId"}, nil).Return(nil)
|
||||||
// delState.EXPECT().Filter([]string{"changed"}).Return([]string{"changed"}).Times(1)
|
|
||||||
// delState.EXPECT().Filter(nil).Return(nil).Times(1)
|
require.NoError(t, fx.diffSyncer.Sync(ctx))
|
||||||
// treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"changed"}, []string{"new"}).Return(nil)
|
})
|
||||||
// require.NoError(t, diffSyncer.Sync(ctx))
|
}
|
||||||
// })
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
// t.Run("diff syncer sync conf error", func(t *testing.T) {
|
|
||||||
// peerManagerMock.EXPECT().
|
|
||||||
// GetResponsiblePeers(gomock.Any()).
|
|
||||||
// Return(nil, fmt.Errorf("some error"))
|
|
||||||
//
|
|
||||||
// require.Error(t, diffSyncer.Sync(ctx))
|
|
||||||
// })
|
|
||||||
//
|
|
||||||
// t.Run("deletion state remove objects", func(t *testing.T) {
|
|
||||||
// deletedId := "id"
|
|
||||||
// delState.EXPECT().Exists(deletedId).Return(true)
|
|
||||||
//
|
|
||||||
// // this should not result in any mock being called
|
|
||||||
// diffSyncer.UpdateHeads(deletedId, []string{"someHead"})
|
|
||||||
// })
|
|
||||||
//
|
|
||||||
// t.Run("update heads updates diff", func(t *testing.T) {
|
|
||||||
// newId := "newId"
|
|
||||||
// newHeads := []string{"h1", "h2"}
|
|
||||||
// hash := "hash"
|
|
||||||
// diffMock.EXPECT().Set(ldiff.Element{
|
|
||||||
// Id: newId,
|
|
||||||
// Head: concatStrings(newHeads),
|
|
||||||
// })
|
|
||||||
// diffMock.EXPECT().Hash().Return(hash)
|
|
||||||
// delState.EXPECT().Exists(newId).Return(false)
|
|
||||||
// stMock.EXPECT().WriteSpaceHash(hash)
|
|
||||||
// diffSyncer.UpdateHeads(newId, newHeads)
|
|
||||||
// })
|
|
||||||
//
|
|
||||||
// t.Run("diff syncer sync space missing", func(t *testing.T) {
|
|
||||||
// aclStorageMock := mock_liststorage.NewMockListStorage(ctrl)
|
|
||||||
// settingsStorage := mock_treestorage.NewMockTreeStorage(ctrl)
|
|
||||||
// settingsId := "settingsId"
|
|
||||||
// aclRoot := &aclrecordproto.RawAclRecordWithId{
|
|
||||||
// Id: aclRootId,
|
|
||||||
// }
|
|
||||||
// settingsRoot := &treechangeproto.RawTreeChangeWithId{
|
|
||||||
// Id: settingsId,
|
|
||||||
// }
|
|
||||||
// spaceHeader := &spacesyncproto.RawSpaceHeaderWithId{}
|
|
||||||
// spaceSettingsId := "spaceSettingsId"
|
|
||||||
// credential := []byte("credential")
|
|
||||||
//
|
|
||||||
// peerManagerMock.EXPECT().
|
|
||||||
// GetResponsiblePeers(gomock.Any()).
|
|
||||||
// Return([]peer.Peer{mockPeer{}}, nil)
|
|
||||||
// diffMock.EXPECT().
|
|
||||||
// Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
|
|
||||||
// Return(nil, nil, nil, spacesyncproto.ErrSpaceMissing)
|
|
||||||
//
|
|
||||||
// stMock.EXPECT().AclStorage().Return(aclStorageMock, nil)
|
|
||||||
// stMock.EXPECT().SpaceHeader().Return(spaceHeader, nil)
|
|
||||||
// stMock.EXPECT().SpaceSettingsId().Return(spaceSettingsId)
|
|
||||||
// stMock.EXPECT().TreeStorage(spaceSettingsId).Return(settingsStorage, nil)
|
|
||||||
//
|
|
||||||
// settingsStorage.EXPECT().Root().Return(settingsRoot, nil)
|
|
||||||
// aclStorageMock.EXPECT().
|
|
||||||
// Root().
|
|
||||||
// Return(aclRoot, nil)
|
|
||||||
// credentialProvider.EXPECT().
|
|
||||||
// GetCredential(gomock.Any(), spaceHeader).
|
|
||||||
// Return(credential, nil)
|
|
||||||
// clientMock.EXPECT().
|
|
||||||
// SpacePush(gomock.Any(), newPushSpaceRequestMatcher(spaceId, aclRootId, settingsId, credential, spaceHeader)).
|
|
||||||
// Return(nil, nil)
|
|
||||||
// peerManagerMock.EXPECT().SendPeer(gomock.Any(), "mockId", gomock.Any())
|
|
||||||
//
|
|
||||||
// require.NoError(t, diffSyncer.Sync(ctx))
|
|
||||||
// })
|
|
||||||
//
|
|
||||||
// t.Run("diff syncer sync unexpected", func(t *testing.T) {
|
|
||||||
// peerManagerMock.EXPECT().
|
|
||||||
// GetResponsiblePeers(gomock.Any()).
|
|
||||||
// Return([]peer.Peer{mockPeer{}}, nil)
|
|
||||||
// diffMock.EXPECT().
|
|
||||||
// Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
|
|
||||||
// Return(nil, nil, nil, spacesyncproto.ErrUnexpected)
|
|
||||||
//
|
|
||||||
// require.NoError(t, diffSyncer.Sync(ctx))
|
|
||||||
// })
|
|
||||||
//
|
|
||||||
// t.Run("diff syncer sync space is deleted error", func(t *testing.T) {
|
|
||||||
// mPeer := mockPeer{}
|
|
||||||
// peerManagerMock.EXPECT().
|
|
||||||
// GetResponsiblePeers(gomock.Any()).
|
|
||||||
// Return([]peer.Peer{mPeer}, nil)
|
|
||||||
// diffMock.EXPECT().
|
|
||||||
// Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
|
|
||||||
// Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted)
|
|
||||||
// stMock.EXPECT().SpaceSettingsId().Return("settingsId")
|
|
||||||
// treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"settingsId"}, nil).Return(nil)
|
|
||||||
//
|
|
||||||
// require.NoError(t, diffSyncer.Sync(ctx))
|
|
||||||
// })
|
|
||||||
//}
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/anyproto/any-sync/app"
|
"github.com/anyproto/any-sync/app"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/object/treemanager"
|
||||||
"github.com/anyproto/any-sync/commonspace/spacestate"
|
"github.com/anyproto/any-sync/commonspace/spacestate"
|
||||||
"github.com/anyproto/any-sync/metric"
|
"github.com/anyproto/any-sync/metric"
|
||||||
"github.com/anyproto/any-sync/net/peer"
|
"github.com/anyproto/any-sync/net/peer"
|
||||||
|
@ -69,7 +70,7 @@ type objectSync struct {
|
||||||
|
|
||||||
func (s *objectSync) Init(a *app.App) (err error) {
|
func (s *objectSync) Init(a *app.App) (err error) {
|
||||||
s.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
|
s.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
|
||||||
s.objectGetter = app.MustComponent[syncobjectgetter.SyncObjectGetter](a)
|
s.objectGetter = a.MustComponent(treemanager.CName).(syncobjectgetter.SyncObjectGetter)
|
||||||
s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
|
s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
|
||||||
sharedData := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
|
sharedData := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
|
||||||
mc := a.Component(metric.CName)
|
mc := a.Component(metric.CName)
|
||||||
|
|
|
@ -45,12 +45,14 @@ type requestManager struct {
|
||||||
handler MessageHandler
|
handler MessageHandler
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
clientFactory spacesyncproto.ClientFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *requestManager) Init(a *app.App) (err error) {
|
func (r *requestManager) Init(a *app.App) (err error) {
|
||||||
r.ctx, r.cancel = context.WithCancel(context.Background())
|
r.ctx, r.cancel = context.WithCancel(context.Background())
|
||||||
r.handler = a.MustComponent(objectsync.CName).(MessageHandler)
|
r.handler = a.MustComponent(objectsync.CName).(MessageHandler)
|
||||||
r.peerPool = a.MustComponent(pool.CName).(pool.Pool)
|
r.peerPool = a.MustComponent(pool.CName).(pool.Pool)
|
||||||
|
r.clientFactory = spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,6 +91,13 @@ func (r *requestManager) QueueRequest(peerId string, req *spacesyncproto.ObjectS
|
||||||
// TODO: for later think when many clients are there,
|
// TODO: for later think when many clients are there,
|
||||||
// we need to close pools for inactive clients
|
// we need to close pools for inactive clients
|
||||||
return pl.TryAdd(func() {
|
return pl.TryAdd(func() {
|
||||||
|
doRequestAndHandle(r, peerId, req)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
var doRequestAndHandle = (*requestManager).requestAndHandle
|
||||||
|
|
||||||
|
func (r *requestManager) requestAndHandle(peerId string, req *spacesyncproto.ObjectSyncMessage) {
|
||||||
ctx := r.ctx
|
ctx := r.ctx
|
||||||
resp, err := r.doRequest(ctx, peerId, req)
|
resp, err := r.doRequest(ctx, peerId, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -101,7 +110,6 @@ func (r *requestManager) QueueRequest(peerId string, req *spacesyncproto.ObjectS
|
||||||
Message: resp,
|
Message: resp,
|
||||||
PeerCtx: ctx,
|
PeerCtx: ctx,
|
||||||
})
|
})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
|
func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
|
||||||
|
@ -110,7 +118,7 @@ func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spac
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = pr.DoDrpc(ctx, func(conn drpc.Conn) error {
|
err = pr.DoDrpc(ctx, func(conn drpc.Conn) error {
|
||||||
cl := spacesyncproto.NewDRPCSpaceSyncClient(conn)
|
cl := r.clientFactory.Client(conn)
|
||||||
resp, err = cl.ObjectSync(ctx, msg)
|
resp, err = cl.ObjectSync(ctx, msg)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
|
|
@ -1 +1,189 @@
|
||||||
package requestmanager
|
package requestmanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/objectsync"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/spacesyncproto/mock_spacesyncproto"
|
||||||
|
"github.com/anyproto/any-sync/net/peer"
|
||||||
|
"github.com/anyproto/any-sync/net/peer/mock_peer"
|
||||||
|
"github.com/anyproto/any-sync/net/pool/mock_pool"
|
||||||
|
"github.com/golang/mock/gomock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"storj.io/drpc"
|
||||||
|
"storj.io/drpc/drpcconn"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fixture struct {
|
||||||
|
requestManager *requestManager
|
||||||
|
messageHandlerMock *mock_objectsync.MockObjectSync
|
||||||
|
peerPoolMock *mock_pool.MockPool
|
||||||
|
clientMock *mock_spacesyncproto.MockDRPCSpaceSyncClient
|
||||||
|
ctrl *gomock.Controller
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFixture(t *testing.T) *fixture {
|
||||||
|
ctrl := gomock.NewController(t)
|
||||||
|
manager := New().(*requestManager)
|
||||||
|
peerPoolMock := mock_pool.NewMockPool(ctrl)
|
||||||
|
messageHandlerMock := mock_objectsync.NewMockObjectSync(ctrl)
|
||||||
|
clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl)
|
||||||
|
manager.peerPool = peerPoolMock
|
||||||
|
manager.handler = messageHandlerMock
|
||||||
|
manager.clientFactory = spacesyncproto.ClientFactoryFunc(func(cc drpc.Conn) spacesyncproto.DRPCSpaceSyncClient {
|
||||||
|
return clientMock
|
||||||
|
})
|
||||||
|
manager.ctx, manager.cancel = context.WithCancel(context.Background())
|
||||||
|
return &fixture{
|
||||||
|
requestManager: manager,
|
||||||
|
messageHandlerMock: messageHandlerMock,
|
||||||
|
peerPoolMock: peerPoolMock,
|
||||||
|
clientMock: clientMock,
|
||||||
|
ctrl: ctrl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fx *fixture) stop() {
|
||||||
|
fx.ctrl.Finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRequestManager_SyncRequest(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
t.Run("send request", func(t *testing.T) {
|
||||||
|
fx := newFixture(t)
|
||||||
|
defer fx.stop()
|
||||||
|
|
||||||
|
peerId := "peerId"
|
||||||
|
peerMock := mock_peer.NewMockPeer(fx.ctrl)
|
||||||
|
conn := &drpcconn.Conn{}
|
||||||
|
msg := &spacesyncproto.ObjectSyncMessage{}
|
||||||
|
resp := &spacesyncproto.ObjectSyncMessage{}
|
||||||
|
fx.peerPoolMock.EXPECT().Get(ctx, peerId).Return(peerMock, nil)
|
||||||
|
fx.clientMock.EXPECT().ObjectSync(ctx, msg).Return(resp, nil)
|
||||||
|
peerMock.EXPECT().DoDrpc(ctx, gomock.Any()).DoAndReturn(func(ctx context.Context, drpcHandler func(conn drpc.Conn) error) {
|
||||||
|
drpcHandler(conn)
|
||||||
|
}).Return(nil)
|
||||||
|
res, err := fx.requestManager.SendRequest(ctx, peerId, msg)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, resp, res)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("request and handle", func(t *testing.T) {
|
||||||
|
fx := newFixture(t)
|
||||||
|
defer fx.stop()
|
||||||
|
ctx = fx.requestManager.ctx
|
||||||
|
|
||||||
|
peerId := "peerId"
|
||||||
|
peerMock := mock_peer.NewMockPeer(fx.ctrl)
|
||||||
|
conn := &drpcconn.Conn{}
|
||||||
|
msg := &spacesyncproto.ObjectSyncMessage{}
|
||||||
|
resp := &spacesyncproto.ObjectSyncMessage{}
|
||||||
|
fx.peerPoolMock.EXPECT().Get(ctx, peerId).Return(peerMock, nil)
|
||||||
|
fx.clientMock.EXPECT().ObjectSync(ctx, msg).Return(resp, nil)
|
||||||
|
peerMock.EXPECT().DoDrpc(ctx, gomock.Any()).DoAndReturn(func(ctx context.Context, drpcHandler func(conn drpc.Conn) error) {
|
||||||
|
drpcHandler(conn)
|
||||||
|
}).Return(nil)
|
||||||
|
fx.messageHandlerMock.EXPECT().HandleMessage(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, msg objectsync.HandleMessage) {
|
||||||
|
require.Equal(t, peerId, msg.SenderId)
|
||||||
|
require.Equal(t, resp, msg.Message)
|
||||||
|
pId, _ := peer.CtxPeerId(msg.PeerCtx)
|
||||||
|
require.Equal(t, peerId, pId)
|
||||||
|
}).Return(nil)
|
||||||
|
fx.requestManager.requestAndHandle(peerId, msg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRequestManager_QueueRequest(t *testing.T) {
|
||||||
|
t.Run("max concurrent reqs for peer, independent reqs for other peer", func(t *testing.T) {
|
||||||
|
// testing 2 concurrent requests to one peer and simultaneous to another peer
|
||||||
|
fx := newFixture(t)
|
||||||
|
defer fx.stop()
|
||||||
|
fx.requestManager.workers = 2
|
||||||
|
msgRelease := make(chan struct{})
|
||||||
|
msgWait := make(chan struct{})
|
||||||
|
msgs := sync.Map{}
|
||||||
|
doRequestAndHandle = func(manager *requestManager, peerId string, req *spacesyncproto.ObjectSyncMessage) {
|
||||||
|
msgs.Store(req.ObjectId, struct{}{})
|
||||||
|
<-msgWait
|
||||||
|
<-msgRelease
|
||||||
|
}
|
||||||
|
otherPeer := "otherPeer"
|
||||||
|
msg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id1"}
|
||||||
|
msg2 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id2"}
|
||||||
|
msg3 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id3"}
|
||||||
|
otherMsg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "otherId1"}
|
||||||
|
|
||||||
|
// sending requests to first peer
|
||||||
|
peerId := "peerId"
|
||||||
|
err := fx.requestManager.QueueRequest(peerId, msg1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = fx.requestManager.QueueRequest(peerId, msg2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = fx.requestManager.QueueRequest(peerId, msg3)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// waiting until all the messages are loaded
|
||||||
|
msgWait <- struct{}{}
|
||||||
|
msgWait <- struct{}{}
|
||||||
|
_, ok := msgs.Load("id1")
|
||||||
|
require.True(t, ok)
|
||||||
|
_, ok = msgs.Load("id2")
|
||||||
|
require.True(t, ok)
|
||||||
|
// third message should not be read
|
||||||
|
_, ok = msgs.Load("id3")
|
||||||
|
require.False(t, ok)
|
||||||
|
|
||||||
|
// request for other peer should pass
|
||||||
|
err = fx.requestManager.QueueRequest(otherPeer, otherMsg1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
msgWait <- struct{}{}
|
||||||
|
|
||||||
|
_, ok = msgs.Load("otherId1")
|
||||||
|
require.True(t, ok)
|
||||||
|
close(msgRelease)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("no requests after close", func(t *testing.T) {
|
||||||
|
fx := newFixture(t)
|
||||||
|
defer fx.stop()
|
||||||
|
fx.requestManager.workers = 1
|
||||||
|
msgRelease := make(chan struct{})
|
||||||
|
msgWait := make(chan struct{})
|
||||||
|
msgs := sync.Map{}
|
||||||
|
doRequestAndHandle = func(manager *requestManager, peerId string, req *spacesyncproto.ObjectSyncMessage) {
|
||||||
|
msgs.Store(req.ObjectId, struct{}{})
|
||||||
|
<-msgWait
|
||||||
|
<-msgRelease
|
||||||
|
}
|
||||||
|
msg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id1"}
|
||||||
|
msg2 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id2"}
|
||||||
|
|
||||||
|
// sending requests to first peer
|
||||||
|
peerId := "peerId"
|
||||||
|
err := fx.requestManager.QueueRequest(peerId, msg1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = fx.requestManager.QueueRequest(peerId, msg2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// waiting until all the message is loaded
|
||||||
|
msgWait <- struct{}{}
|
||||||
|
_, ok := msgs.Load("id1")
|
||||||
|
require.True(t, ok)
|
||||||
|
_, ok = msgs.Load("id2")
|
||||||
|
require.False(t, ok)
|
||||||
|
|
||||||
|
fx.requestManager.Close(context.Background())
|
||||||
|
close(msgRelease)
|
||||||
|
// waiting to know if the second one is not taken
|
||||||
|
// because the manager is now closed
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
_, ok = msgs.Load("id2")
|
||||||
|
require.False(t, ok)
|
||||||
|
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -2,9 +2,9 @@ package settings
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/deletionstate/mock_deletionstate"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
|
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/treemanager/mock_treemanager"
|
"github.com/anyproto/any-sync/commonspace/object/treemanager/mock_treemanager"
|
||||||
"github.com/anyproto/any-sync/commonspace/settings/settingsstate/mock_settingsstate"
|
|
||||||
"github.com/anyproto/any-sync/commonspace/spacestorage/mock_spacestorage"
|
"github.com/anyproto/any-sync/commonspace/spacestorage/mock_spacestorage"
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -14,7 +14,7 @@ func TestDeleter_Delete(t *testing.T) {
|
||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
treeManager := mock_treemanager.NewMockTreeManager(ctrl)
|
treeManager := mock_treemanager.NewMockTreeManager(ctrl)
|
||||||
st := mock_spacestorage.NewMockSpaceStorage(ctrl)
|
st := mock_spacestorage.NewMockSpaceStorage(ctrl)
|
||||||
delState := mock_settingsstate.NewMockObjectDeletionState(ctrl)
|
delState := mock_deletionstate.NewMockObjectDeletionState(ctrl)
|
||||||
|
|
||||||
deleter := newDeleter(st, delState, treeManager)
|
deleter := newDeleter(st, delState, treeManager)
|
||||||
|
|
||||||
|
|
|
@ -2,10 +2,10 @@ package settings
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/deletionstate/mock_deletionstate"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/treemanager/mock_treemanager"
|
"github.com/anyproto/any-sync/commonspace/object/treemanager/mock_treemanager"
|
||||||
"github.com/anyproto/any-sync/commonspace/settings/mock_settings"
|
"github.com/anyproto/any-sync/commonspace/settings/mock_settings"
|
||||||
"github.com/anyproto/any-sync/commonspace/settings/settingsstate"
|
"github.com/anyproto/any-sync/commonspace/settings/settingsstate"
|
||||||
"github.com/anyproto/any-sync/commonspace/settings/settingsstate/mock_settingsstate"
|
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -26,7 +26,7 @@ func TestDeletionManager_UpdateState_NotResponsible(t *testing.T) {
|
||||||
onDeleted := func() {
|
onDeleted := func() {
|
||||||
deleted = true
|
deleted = true
|
||||||
}
|
}
|
||||||
delState := mock_settingsstate.NewMockObjectDeletionState(ctrl)
|
delState := mock_deletionstate.NewMockObjectDeletionState(ctrl)
|
||||||
treeManager := mock_treemanager.NewMockTreeManager(ctrl)
|
treeManager := mock_treemanager.NewMockTreeManager(ctrl)
|
||||||
|
|
||||||
delState.EXPECT().Add(state.DeletedIds)
|
delState.EXPECT().Add(state.DeletedIds)
|
||||||
|
@ -58,7 +58,7 @@ func TestDeletionManager_UpdateState_Responsible(t *testing.T) {
|
||||||
onDeleted := func() {
|
onDeleted := func() {
|
||||||
deleted = true
|
deleted = true
|
||||||
}
|
}
|
||||||
delState := mock_settingsstate.NewMockObjectDeletionState(ctrl)
|
delState := mock_deletionstate.NewMockObjectDeletionState(ctrl)
|
||||||
treeManager := mock_treemanager.NewMockTreeManager(ctrl)
|
treeManager := mock_treemanager.NewMockTreeManager(ctrl)
|
||||||
provider := mock_settings.NewMockSpaceIdsProvider(ctrl)
|
provider := mock_settings.NewMockSpaceIdsProvider(ctrl)
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package settings
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/anyproto/any-sync/accountservice/mock_accountservice"
|
"github.com/anyproto/any-sync/accountservice/mock_accountservice"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/deletionstate/mock_deletionstate"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/accountdata"
|
"github.com/anyproto/any-sync/commonspace/object/accountdata"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
|
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
|
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
|
||||||
|
@ -54,7 +55,7 @@ type settingsFixture struct {
|
||||||
deleter *mock_settings.MockDeleter
|
deleter *mock_settings.MockDeleter
|
||||||
syncTree *mock_synctree.MockSyncTree
|
syncTree *mock_synctree.MockSyncTree
|
||||||
historyTree *mock_objecttree.MockObjectTree
|
historyTree *mock_objecttree.MockObjectTree
|
||||||
delState *mock_settingsstate.MockObjectDeletionState
|
delState *mock_deletionstate.MockObjectDeletionState
|
||||||
account *mock_accountservice.MockService
|
account *mock_accountservice.MockService
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,7 +67,7 @@ func newSettingsFixture(t *testing.T) *settingsFixture {
|
||||||
acc := mock_accountservice.NewMockService(ctrl)
|
acc := mock_accountservice.NewMockService(ctrl)
|
||||||
treeManager := mock_treemanager.NewMockTreeManager(ctrl)
|
treeManager := mock_treemanager.NewMockTreeManager(ctrl)
|
||||||
st := mock_spacestorage.NewMockSpaceStorage(ctrl)
|
st := mock_spacestorage.NewMockSpaceStorage(ctrl)
|
||||||
delState := mock_settingsstate.NewMockObjectDeletionState(ctrl)
|
delState := mock_deletionstate.NewMockObjectDeletionState(ctrl)
|
||||||
delManager := mock_settings.NewMockDeletionManager(ctrl)
|
delManager := mock_settings.NewMockDeletionManager(ctrl)
|
||||||
stateBuilder := mock_settingsstate.NewMockStateBuilder(ctrl)
|
stateBuilder := mock_settingsstate.NewMockStateBuilder(ctrl)
|
||||||
changeFactory := mock_settingsstate.NewMockChangeFactory(ctrl)
|
changeFactory := mock_settingsstate.NewMockChangeFactory(ctrl)
|
||||||
|
|
|
@ -270,6 +270,7 @@ type mockTreeManager struct {
|
||||||
cache ocache.OCache
|
cache ocache.OCache
|
||||||
deletedIds []string
|
deletedIds []string
|
||||||
markedIds []string
|
markedIds []string
|
||||||
|
waitLoad chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *mockTreeManager) NewTreeSyncer(spaceId string, treeManager treemanager.TreeManager) treemanager.TreeSyncer {
|
func (t *mockTreeManager) NewTreeSyncer(spaceId string, treeManager treemanager.TreeManager) treemanager.TreeSyncer {
|
||||||
|
@ -283,6 +284,7 @@ func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId s
|
||||||
|
|
||||||
func (t *mockTreeManager) Init(a *app.App) (err error) {
|
func (t *mockTreeManager) Init(a *app.App) (err error) {
|
||||||
t.cache = ocache.New(func(ctx context.Context, id string) (value ocache.Object, err error) {
|
t.cache = ocache.New(func(ctx context.Context, id string) (value ocache.Object, err error) {
|
||||||
|
<-t.waitLoad
|
||||||
return t.space.TreeBuilder().BuildTree(ctx, id, objecttreebuilder.BuildTreeOpts{})
|
return t.space.TreeBuilder().BuildTree(ctx, id, objecttreebuilder.BuildTreeOpts{})
|
||||||
},
|
},
|
||||||
ocache.WithGCPeriod(time.Minute),
|
ocache.WithGCPeriod(time.Minute),
|
||||||
|
@ -352,7 +354,7 @@ func newFixture(t *testing.T) *spaceFixture {
|
||||||
configurationService: &mockConf{},
|
configurationService: &mockConf{},
|
||||||
storageProvider: spacestorage.NewInMemorySpaceStorageProvider(),
|
storageProvider: spacestorage.NewInMemorySpaceStorageProvider(),
|
||||||
peermanagerProvider: &mockPeerManagerProvider{},
|
peermanagerProvider: &mockPeerManagerProvider{},
|
||||||
treeManager: &mockTreeManager{},
|
treeManager: &mockTreeManager{waitLoad: make(chan struct{})},
|
||||||
pool: &mockPool{},
|
pool: &mockPool{},
|
||||||
spaceService: New(),
|
spaceService: New(),
|
||||||
}
|
}
|
||||||
|
|
149
net/peer/mock_peer/mock_peer.go
Normal file
149
net/peer/mock_peer/mock_peer.go
Normal file
|
@ -0,0 +1,149 @@
|
||||||
|
// Code generated by MockGen. DO NOT EDIT.
|
||||||
|
// Source: github.com/anyproto/any-sync/net/peer (interfaces: Peer)
|
||||||
|
|
||||||
|
// Package mock_peer is a generated GoMock package.
|
||||||
|
package mock_peer
|
||||||
|
|
||||||
|
import (
|
||||||
|
context "context"
|
||||||
|
reflect "reflect"
|
||||||
|
time "time"
|
||||||
|
|
||||||
|
gomock "github.com/golang/mock/gomock"
|
||||||
|
drpc "storj.io/drpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MockPeer is a mock of Peer interface.
|
||||||
|
type MockPeer struct {
|
||||||
|
ctrl *gomock.Controller
|
||||||
|
recorder *MockPeerMockRecorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockPeerMockRecorder is the mock recorder for MockPeer.
|
||||||
|
type MockPeerMockRecorder struct {
|
||||||
|
mock *MockPeer
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMockPeer creates a new mock instance.
|
||||||
|
func NewMockPeer(ctrl *gomock.Controller) *MockPeer {
|
||||||
|
mock := &MockPeer{ctrl: ctrl}
|
||||||
|
mock.recorder = &MockPeerMockRecorder{mock}
|
||||||
|
return mock
|
||||||
|
}
|
||||||
|
|
||||||
|
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||||
|
func (m *MockPeer) EXPECT() *MockPeerMockRecorder {
|
||||||
|
return m.recorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// AcquireDrpcConn mocks base method.
|
||||||
|
func (m *MockPeer) AcquireDrpcConn(arg0 context.Context) (drpc.Conn, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "AcquireDrpcConn", arg0)
|
||||||
|
ret0, _ := ret[0].(drpc.Conn)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// AcquireDrpcConn indicates an expected call of AcquireDrpcConn.
|
||||||
|
func (mr *MockPeerMockRecorder) AcquireDrpcConn(arg0 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcquireDrpcConn", reflect.TypeOf((*MockPeer)(nil).AcquireDrpcConn), arg0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close mocks base method.
|
||||||
|
func (m *MockPeer) Close() error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "Close")
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close indicates an expected call of Close.
|
||||||
|
func (mr *MockPeerMockRecorder) Close() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockPeer)(nil).Close))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Context mocks base method.
|
||||||
|
func (m *MockPeer) Context() context.Context {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "Context")
|
||||||
|
ret0, _ := ret[0].(context.Context)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Context indicates an expected call of Context.
|
||||||
|
func (mr *MockPeerMockRecorder) Context() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockPeer)(nil).Context))
|
||||||
|
}
|
||||||
|
|
||||||
|
// DoDrpc mocks base method.
|
||||||
|
func (m *MockPeer) DoDrpc(arg0 context.Context, arg1 func(drpc.Conn) error) error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "DoDrpc", arg0, arg1)
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// DoDrpc indicates an expected call of DoDrpc.
|
||||||
|
func (mr *MockPeerMockRecorder) DoDrpc(arg0, arg1 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoDrpc", reflect.TypeOf((*MockPeer)(nil).DoDrpc), arg0, arg1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Id mocks base method.
|
||||||
|
func (m *MockPeer) Id() string {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "Id")
|
||||||
|
ret0, _ := ret[0].(string)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Id indicates an expected call of Id.
|
||||||
|
func (mr *MockPeerMockRecorder) Id() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Id", reflect.TypeOf((*MockPeer)(nil).Id))
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsClosed mocks base method.
|
||||||
|
func (m *MockPeer) IsClosed() bool {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "IsClosed")
|
||||||
|
ret0, _ := ret[0].(bool)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsClosed indicates an expected call of IsClosed.
|
||||||
|
func (mr *MockPeerMockRecorder) IsClosed() *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsClosed", reflect.TypeOf((*MockPeer)(nil).IsClosed))
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReleaseDrpcConn mocks base method.
|
||||||
|
func (m *MockPeer) ReleaseDrpcConn(arg0 drpc.Conn) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
m.ctrl.Call(m, "ReleaseDrpcConn", arg0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReleaseDrpcConn indicates an expected call of ReleaseDrpcConn.
|
||||||
|
func (mr *MockPeerMockRecorder) ReleaseDrpcConn(arg0 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReleaseDrpcConn", reflect.TypeOf((*MockPeer)(nil).ReleaseDrpcConn), arg0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryClose mocks base method.
|
||||||
|
func (m *MockPeer) TryClose(arg0 time.Duration) (bool, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "TryClose", arg0)
|
||||||
|
ret0, _ := ret[0].(bool)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryClose indicates an expected call of TryClose.
|
||||||
|
func (mr *MockPeerMockRecorder) TryClose(arg0 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryClose", reflect.TypeOf((*MockPeer)(nil).TryClose), arg0)
|
||||||
|
}
|
|
@ -1,3 +1,4 @@
|
||||||
|
//go:generate mockgen -destination mock_peer/mock_peer.go github.com/anyproto/any-sync/net/peer Peer
|
||||||
package peer
|
package peer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
80
net/pool/mock_pool/mock_pool.go
Normal file
80
net/pool/mock_pool/mock_pool.go
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
// Code generated by MockGen. DO NOT EDIT.
|
||||||
|
// Source: github.com/anyproto/any-sync/net/pool (interfaces: Pool)
|
||||||
|
|
||||||
|
// Package mock_pool is a generated GoMock package.
|
||||||
|
package mock_pool
|
||||||
|
|
||||||
|
import (
|
||||||
|
context "context"
|
||||||
|
reflect "reflect"
|
||||||
|
|
||||||
|
peer "github.com/anyproto/any-sync/net/peer"
|
||||||
|
gomock "github.com/golang/mock/gomock"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MockPool is a mock of Pool interface.
|
||||||
|
type MockPool struct {
|
||||||
|
ctrl *gomock.Controller
|
||||||
|
recorder *MockPoolMockRecorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockPoolMockRecorder is the mock recorder for MockPool.
|
||||||
|
type MockPoolMockRecorder struct {
|
||||||
|
mock *MockPool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMockPool creates a new mock instance.
|
||||||
|
func NewMockPool(ctrl *gomock.Controller) *MockPool {
|
||||||
|
mock := &MockPool{ctrl: ctrl}
|
||||||
|
mock.recorder = &MockPoolMockRecorder{mock}
|
||||||
|
return mock
|
||||||
|
}
|
||||||
|
|
||||||
|
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||||
|
func (m *MockPool) EXPECT() *MockPoolMockRecorder {
|
||||||
|
return m.recorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddPeer mocks base method.
|
||||||
|
func (m *MockPool) AddPeer(arg0 context.Context, arg1 peer.Peer) error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "AddPeer", arg0, arg1)
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddPeer indicates an expected call of AddPeer.
|
||||||
|
func (mr *MockPoolMockRecorder) AddPeer(arg0, arg1 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPeer", reflect.TypeOf((*MockPool)(nil).AddPeer), arg0, arg1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get mocks base method.
|
||||||
|
func (m *MockPool) Get(arg0 context.Context, arg1 string) (peer.Peer, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "Get", arg0, arg1)
|
||||||
|
ret0, _ := ret[0].(peer.Peer)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get indicates an expected call of Get.
|
||||||
|
func (mr *MockPoolMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockPool)(nil).Get), arg0, arg1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetOneOf mocks base method.
|
||||||
|
func (m *MockPool) GetOneOf(arg0 context.Context, arg1 []string) (peer.Peer, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "GetOneOf", arg0, arg1)
|
||||||
|
ret0, _ := ret[0].(peer.Peer)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetOneOf indicates an expected call of GetOneOf.
|
||||||
|
func (mr *MockPoolMockRecorder) GetOneOf(arg0, arg1 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOneOf", reflect.TypeOf((*MockPool)(nil).GetOneOf), arg0, arg1)
|
||||||
|
}
|
|
@ -1,3 +1,4 @@
|
||||||
|
//go:generate mockgen -destination mock_pool/mock_pool.go github.com/anyproto/any-sync/net/pool Pool
|
||||||
package pool
|
package pool
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
|
@ -10,7 +10,10 @@ import (
|
||||||
// workers - how many processes will execute tasks
|
// workers - how many processes will execute tasks
|
||||||
// maxSize - limit for queue size
|
// maxSize - limit for queue size
|
||||||
func NewExecPool(workers, maxSize int) *ExecPool {
|
func NewExecPool(workers, maxSize int) *ExecPool {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
ss := &ExecPool{
|
ss := &ExecPool{
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
workers: workers,
|
workers: workers,
|
||||||
batch: mb.New[func()](maxSize),
|
batch: mb.New[func()](maxSize),
|
||||||
}
|
}
|
||||||
|
@ -19,6 +22,8 @@ func NewExecPool(workers, maxSize int) *ExecPool {
|
||||||
|
|
||||||
// ExecPool needed for parallel execution of the incoming send tasks
|
// ExecPool needed for parallel execution of the incoming send tasks
|
||||||
type ExecPool struct {
|
type ExecPool struct {
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
workers int
|
workers int
|
||||||
batch *mb.MB[func()]
|
batch *mb.MB[func()]
|
||||||
}
|
}
|
||||||
|
@ -39,7 +44,7 @@ func (ss *ExecPool) Run() {
|
||||||
|
|
||||||
func (ss *ExecPool) sendLoop() {
|
func (ss *ExecPool) sendLoop() {
|
||||||
for {
|
for {
|
||||||
f, err := ss.batch.WaitOne(context.Background())
|
f, err := ss.batch.WaitOne(ss.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("close send loop", zap.Error(err))
|
log.Debug("close send loop", zap.Error(err))
|
||||||
return
|
return
|
||||||
|
@ -49,5 +54,6 @@ func (ss *ExecPool) sendLoop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *ExecPool) Close() (err error) {
|
func (ss *ExecPool) Close() (err error) {
|
||||||
|
ss.cancel()
|
||||||
return ss.batch.Close()
|
return ss.batch.Close()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue