1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 14:07:02 +09:00

Add some objecttree tests

This commit is contained in:
mcrakhman 2024-08-20 09:47:52 +02:00
parent 7701293b4b
commit f2c1571a33
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
9 changed files with 125 additions and 96 deletions

View file

@ -14,9 +14,13 @@ import (
reflect "reflect"
time "time"
proto "github.com/anyproto/protobuf/proto"
gomock "go.uber.org/mock/gomock"
drpc "storj.io/drpc"
list "github.com/anyproto/any-sync/commonspace/object/acl/list"
objecttree "github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
synctree "github.com/anyproto/any-sync/commonspace/object/tree/synctree"
synctree "github.com/anyproto/any-sync/commonspace/object/tree/synctree/response"
updatelistener "github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener"
treechangeproto "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
treestorage "github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
@ -25,9 +29,6 @@ import (
syncdeps "github.com/anyproto/any-sync/commonspace/sync/syncdeps"
syncstatus "github.com/anyproto/any-sync/commonspace/syncstatus"
peer "github.com/anyproto/any-sync/net/peer"
proto "github.com/anyproto/protobuf/proto"
gomock "go.uber.org/mock/gomock"
drpc "storj.io/drpc"
)
// MockSyncTree is a mock of SyncTree interface.

View file

@ -2,6 +2,7 @@ package synctree
import (
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/response"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages"
)
@ -12,7 +13,7 @@ type RequestFactory interface {
CreateHeadUpdate(t objecttree.ObjectTree, ignoredPeer string, added []*treechangeproto.RawTreeChangeWithId) (headUpdate *objectmessages.HeadUpdate, err error)
CreateNewTreeRequest(peerId, objectId string) *objectmessages.Request
CreateFullSyncRequest(peerId string, t objecttree.ObjectTree) *objectmessages.Request
CreateResponseProducer(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (ResponseProducer, error)
CreateResponseProducer(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (response.ResponseProducer, error)
}
func NewRequestFactory(spaceId string) RequestFactory {
@ -53,6 +54,6 @@ func (r *requestFactory) CreateFullSyncRequest(peerId string, t objecttree.Objec
return NewRequest(peerId, r.spaceId, t.Id(), t.Heads(), t.SnapshotPath(), t.Header())
}
func (r *requestFactory) CreateResponseProducer(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (ResponseProducer, error) {
return newResponseProducer(r.spaceId, t, theirHeads, theirSnapshotPath)
func (r *requestFactory) CreateResponseProducer(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (response.ResponseProducer, error) {
return response.NewResponseProducer(r.spaceId, t, theirHeads, theirSnapshotPath)
}

View file

@ -1,4 +1,4 @@
package synctree
package response
import (
"fmt"
@ -10,35 +10,35 @@ import (
)
type Response struct {
spaceId string
objectId string
heads []string
snapshotPath []string
changes []*treechangeproto.RawTreeChangeWithId
root *treechangeproto.RawTreeChangeWithId
SpaceId string
ObjectId string
Heads []string
SnapshotPath []string
Changes []*treechangeproto.RawTreeChangeWithId
Root *treechangeproto.RawTreeChangeWithId
}
func (r *Response) MsgSize() uint64 {
size := uint64(len(r.spaceId)+len(r.objectId)) * 59
size += uint64(len(r.snapshotPath)) * 59
for _, change := range r.changes {
size := uint64(len(r.SpaceId)+len(r.ObjectId)) * 59
size += uint64(len(r.SnapshotPath)) * 59
for _, change := range r.Changes {
size += uint64(len(change.Id))
size += uint64(len(change.RawChange))
}
return size + uint64(len(r.heads))*59
return size + uint64(len(r.Heads))*59
}
func (r *Response) ProtoMessage() (proto.Message, error) {
if r.objectId == "" {
if r.ObjectId == "" {
return &spacesyncproto.ObjectSyncMessage{}, nil
}
resp := &treechangeproto.TreeFullSyncResponse{
Heads: r.heads,
SnapshotPath: r.snapshotPath,
Changes: r.changes,
Heads: r.Heads,
SnapshotPath: r.SnapshotPath,
Changes: r.Changes,
}
wrapped := treechangeproto.WrapFullResponse(resp, r.root)
return spacesyncproto.MarshallSyncMessage(wrapped, r.spaceId, r.objectId)
wrapped := treechangeproto.WrapFullResponse(resp, r.Root)
return spacesyncproto.MarshallSyncMessage(wrapped, r.SpaceId, r.ObjectId)
}
func (r *Response) SetProtoMessage(message proto.Message) error {
@ -54,15 +54,15 @@ func (r *Response) SetProtoMessage(message proto.Message) error {
if err != nil {
return err
}
r.root = treeMsg.RootChange
r.Root = treeMsg.RootChange
headMsg := treeMsg.GetContent().GetFullSyncResponse()
if headMsg == nil {
return fmt.Errorf("unexpected message type: %T", treeMsg.GetContent())
}
r.heads = headMsg.Heads
r.changes = headMsg.Changes
r.snapshotPath = headMsg.SnapshotPath
r.spaceId = msg.SpaceId
r.objectId = msg.ObjectId
r.Heads = headMsg.Heads
r.Changes = headMsg.Changes
r.SnapshotPath = headMsg.SnapshotPath
r.SpaceId = msg.SpaceId
r.ObjectId = msg.ObjectId
return nil
}

View file

@ -1,4 +1,4 @@
package synctree
package response
import (
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
@ -16,7 +16,7 @@ type responseProducer struct {
objectId string
}
func newResponseProducer(spaceId string, tree objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (ResponseProducer, error) {
func NewResponseProducer(spaceId string, tree objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (ResponseProducer, error) {
res, err := tree.ChangesAfterCommonSnapshotLoader(theirSnapshotPath, theirHeads)
if err != nil {
return nil, err
@ -35,12 +35,12 @@ func (r *responseProducer) NewResponse(batchSize int) (*Response, error) {
return &Response{}, err
}
return &Response{
heads: res.Heads,
snapshotPath: res.SnapshotPath,
changes: res.Batch,
root: res.Root,
spaceId: r.spaceId,
objectId: r.objectId,
Heads: res.Heads,
SnapshotPath: res.SnapshotPath,
Changes: res.Batch,
Root: res.Root,
SpaceId: r.spaceId,
ObjectId: r.objectId,
}, nil
}
@ -48,10 +48,10 @@ func (r *responseProducer) EmptyResponse() *Response {
headsCopy := make([]string, len(r.tree.Heads()))
copy(headsCopy, r.tree.Heads())
return &Response{
heads: headsCopy,
spaceId: r.spaceId,
objectId: r.objectId,
root: r.tree.Header(),
snapshotPath: r.tree.SnapshotPath(),
Heads: headsCopy,
SpaceId: r.spaceId,
ObjectId: r.objectId,
Root: r.tree.Header(),
SnapshotPath: r.tree.SnapshotPath(),
}
}

View file

@ -3,6 +3,7 @@ package synctree
import (
"context"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/response"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
)
@ -18,18 +19,18 @@ func newFullResponseCollector() *fullResponseCollector {
}
func (r *fullResponseCollector) CollectResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error {
treeResp, ok := resp.(*Response)
treeResp, ok := resp.(*response.Response)
if !ok {
return ErrUnexpectedResponseType
}
r.heads = treeResp.heads
r.root = treeResp.root
r.changes = append(r.changes, treeResp.changes...)
r.heads = treeResp.Heads
r.root = treeResp.Root
r.changes = append(r.changes, treeResp.Changes...)
return nil
}
func (r *fullResponseCollector) NewResponse() syncdeps.Response {
return &Response{}
return &response.Response{}
}
type responseCollector struct {
@ -45,5 +46,5 @@ func (r *responseCollector) CollectResponse(ctx context.Context, peerId, objectI
}
func (r *responseCollector) NewResponse() syncdeps.Response {
return &Response{}
return &response.Response{}
}

View file

@ -9,6 +9,7 @@ import (
"storj.io/drpc"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
response "github.com/anyproto/any-sync/commonspace/object/tree/synctree/response"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages"
@ -151,7 +152,7 @@ func (s *syncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Reque
s.tree.Lock()
curHeads := s.tree.Heads()
log.Debug("got stream request", zap.String("objectId", req.ObjectId()), zap.String("peerId", rq.PeerId()), zap.Int("len", s.tree.Len()))
producer, err := newResponseProducer(s.spaceId, s.tree, request.Heads, request.SnapshotPath)
producer, err := response.NewResponseProducer(s.spaceId, s.tree, request.Heads, request.SnapshotPath)
if err != nil {
s.tree.Unlock()
return nil, err
@ -179,7 +180,7 @@ func (s *syncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Reque
if err != nil {
return nil, err
}
if len(batch.changes) == 0 {
if len(batch.Changes) == 0 {
break
}
size := batch.MsgSize()
@ -199,18 +200,18 @@ func (s *syncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Reque
}
func (s *syncHandler) HandleResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error {
response, ok := resp.(*Response)
rsp, ok := resp.(*response.Response)
if !ok {
return ErrUnexpectedResponseType
}
if len(response.changes) == 0 {
if len(rsp.Changes) == 0 {
return nil
}
s.tree.Lock()
defer s.tree.Unlock()
rawChangesPayload := objecttree.RawChangesPayload{
NewHeads: response.heads,
RawChanges: response.changes,
NewHeads: rsp.Heads,
RawChanges: rsp.Changes,
}
_, err := s.tree.AddRawChangesFromPeer(ctx, peerId, rawChangesPayload)
return err

View file

@ -13,8 +13,8 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener/mock_updatelistener"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/nodeconf"
)
type syncTreeMatcher struct {
@ -31,14 +31,22 @@ func (s syncTreeMatcher) Matches(x interface{}) bool {
return s.objTree == t.ObjectTree && t.syncClient == s.client && t.listener == s.listener
}
func (s syncTreeMatcher) String() string {
return ""
type testObjMock struct {
*mock_objecttree.MockObjectTree
}
func syncClientFuncCreator(client SyncClient) func(spaceId string, factory RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) SyncClient {
return func(spaceId string, factory RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) SyncClient {
return client
}
func newTestObjMock(obj *mock_objecttree.MockObjectTree) *testObjMock {
return &testObjMock{MockObjectTree: obj}
}
func (t *testObjMock) Lock() {
}
func (t *testObjMock) Unlock() {
}
func (s syncTreeMatcher) String() string {
return ""
}
func Test_BuildSyncTree(t *testing.T) {
@ -51,14 +59,13 @@ func Test_BuildSyncTree(t *testing.T) {
objTreeMock := newTestObjMock(mock_objecttree.NewMockObjectTree(ctrl))
tr := &syncTree{
ObjectTree: objTreeMock,
SyncHandler: nil,
syncClient: syncClientMock,
listener: updateListenerMock,
isClosed: false,
syncStatus: syncstatus.NewNoOpSyncStatus(),
}
headUpdate := &treechangeproto.TreeSyncMessage{}
headUpdate := &objectmessages.HeadUpdate{}
t.Run("AddRawChangesFromPeer update", func(t *testing.T) {
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
payload := objecttree.RawChangesPayload{
@ -75,8 +82,8 @@ func Test_BuildSyncTree(t *testing.T) {
Return(expectedRes, nil)
updateListenerMock.EXPECT().Update(tr)
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Eq(headUpdate))
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), "peerId", gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate))
res, err := tr.AddRawChangesFromPeer(ctx, "peerId", payload)
require.NoError(t, err)
require.Equal(t, expectedRes, res)
@ -98,8 +105,8 @@ func Test_BuildSyncTree(t *testing.T) {
Return(expectedRes, nil)
updateListenerMock.EXPECT().Rebuild(tr)
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Eq(headUpdate))
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), "peerId", gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate))
res, err := tr.AddRawChangesFromPeer(ctx, "peerId", payload)
require.NoError(t, err)
require.Equal(t, expectedRes, res)
@ -137,8 +144,8 @@ func Test_BuildSyncTree(t *testing.T) {
objTreeMock.EXPECT().AddContent(gomock.Any(), gomock.Eq(content)).
Return(expectedRes, nil)
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Eq(headUpdate))
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), "", gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Any(), gomock.Eq(headUpdate))
res, err := tr.AddContent(ctx, content)
require.NoError(t, err)
require.Equal(t, expectedRes, res)

View file

@ -21,6 +21,8 @@ type treeRemoteGetter struct {
treeId string
}
var createCollector = newFullResponseCollector
func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err error) {
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
@ -47,7 +49,7 @@ func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err e
}
func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (collector *fullResponseCollector, err error) {
collector = newFullResponseCollector()
collector = createCollector()
req := t.deps.SyncClient.CreateNewTreeRequest(peerId, t.treeId)
err = t.deps.SyncClient.SendTreeRequest(ctx, req, collector)
if err != nil {

View file

@ -5,14 +5,12 @@ import (
"fmt"
"testing"
"github.com/anyproto/protobuf/proto"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/peermanager/mock_peermanager"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/peer/mock_peer"
)
@ -51,39 +49,57 @@ func (fx *treeRemoteGetterFixture) stop() {
func TestTreeRemoteGetter(t *testing.T) {
ctx := context.Background()
peerId := "peerId"
treeRequest := &treechangeproto.TreeSyncMessage{}
treeResponse := &treechangeproto.TreeSyncMessage{
RootChange: &treechangeproto.RawTreeChangeWithId{Id: "id"},
}
marshalled, _ := proto.Marshal(treeResponse)
objectResponse := &spacesyncproto.ObjectSyncMessage{
Payload: marshalled,
}
treeRequest := &objectmessages.Request{}
t.Run("responsible peers", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
coll := newFullResponseCollector()
createCollector = func() *fullResponseCollector {
return coll
}
tCtx := peer.CtxWithPeerId(ctx, "*")
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(tCtx).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(tCtx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil)
resp, _, err := fx.treeGetter.treeRequestLoop(tCtx)
fx.syncClientMock.EXPECT().CreateNewTreeRequest("peerId", "treeId").Return(treeRequest)
fx.syncClientMock.EXPECT().SendTreeRequest(tCtx, treeRequest, coll).Return(nil)
retColl, pId, err := fx.treeGetter.treeRequestLoop(tCtx)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
require.Equal(t, "peerId", pId)
require.Equal(t, coll, retColl)
})
t.Run("request fails", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
tCtx := peer.CtxWithPeerId(ctx, peerId)
treeRequest := &treechangeproto.TreeSyncMessage{}
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
coll := newFullResponseCollector()
createCollector = func() *fullResponseCollector {
return coll
}
tCtx := peer.CtxWithPeerId(ctx, "*")
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(tCtx, peerId, fx.treeGetter.treeId, treeRequest).AnyTimes().Return(nil, fmt.Errorf("some"))
fx.peerGetterMock.EXPECT().GetResponsiblePeers(tCtx).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest("peerId", "treeId").Return(treeRequest)
fx.syncClientMock.EXPECT().SendTreeRequest(tCtx, treeRequest, coll).Return(fmt.Errorf("error"))
_, _, err := fx.treeGetter.treeRequestLoop(tCtx)
require.Error(t, err)
})
t.Run("no responsible peers", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
coll := newFullResponseCollector()
createCollector = func() *fullResponseCollector {
return coll
}
tCtx := peer.CtxWithPeerId(ctx, "*")
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(tCtx).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest("peerId", "treeId").Return(treeRequest)
fx.syncClientMock.EXPECT().SendTreeRequest(tCtx, treeRequest, coll).Return(fmt.Errorf("error"))
_, _, err := fx.treeGetter.treeRequestLoop(tCtx)
require.Error(t, err)
})