1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-10 01:51:11 +09:00

Add HeadsApply

This commit is contained in:
mcrakhman 2024-07-16 14:44:27 +02:00
parent 23390e556e
commit 48d938bb0e
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
8 changed files with 77 additions and 28 deletions

View file

@ -18,6 +18,7 @@ import (
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/util/slice"
)
var (
@ -33,8 +34,13 @@ type ListenerSetter interface {
SetListener(listener updatelistener.UpdateListener)
}
type SyncTree interface {
type peerSendableObjectTree interface {
objecttree.ObjectTree
AddRawChangesFromPeer(ctx context.Context, peerId string, changesPayload objecttree.RawChangesPayload) (res objecttree.AddResult, err error)
}
type SyncTree interface {
peerSendableObjectTree
synchandler.SyncHandler
ListenerSetter
SyncWithPeer(ctx context.Context, peerId string) (err error)
@ -161,6 +167,19 @@ func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableCh
return
}
func (s *syncTree) AddRawChangesFromPeer(ctx context.Context, peerId string, changesPayload objecttree.RawChangesPayload) (res objecttree.AddResult, err error) {
prevHeads := s.Heads()
res, err = s.AddRawChanges(ctx, changesPayload)
if err != nil {
return
}
curHeads := s.Heads()
if !slice.UnsortedEquals(prevHeads, curHeads) {
s.syncStatus.HeadsApply(peerId, s.Id(), curHeads)
}
return
}
func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree.RawChangesPayload) (res objecttree.AddResult, err error) {
if err = s.checkAlive(); err != nil {
return

View file

@ -2,6 +2,11 @@ package synctree
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
@ -11,9 +16,6 @@ import (
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/nodeconf"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"testing"
)
type syncTreeMatcher struct {
@ -58,7 +60,7 @@ func Test_BuildSyncTree(t *testing.T) {
}
headUpdate := &treechangeproto.TreeSyncMessage{}
t.Run("AddRawChanges update", func(t *testing.T) {
t.Run("AddRawChangesFromPeer update", func(t *testing.T) {
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
payload := objecttree.RawChangesPayload{
NewHeads: nil,
@ -68,18 +70,19 @@ func Test_BuildSyncTree(t *testing.T) {
Added: changes,
Mode: objecttree.Append,
}
objTreeMock.EXPECT().Heads().AnyTimes().Return([]string{"headId"})
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(payload)).
Return(expectedRes, nil)
updateListenerMock.EXPECT().Update(tr)
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Eq(headUpdate))
res, err := tr.AddRawChanges(ctx, payload)
res, err := tr.AddRawChangesFromPeer(ctx, "peerId", payload)
require.NoError(t, err)
require.Equal(t, expectedRes, res)
})
t.Run("AddRawChanges rebuild", func(t *testing.T) {
t.Run("AddRawChangesFromPeer rebuild", func(t *testing.T) {
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
payload := objecttree.RawChangesPayload{
NewHeads: nil,
@ -90,18 +93,19 @@ func Test_BuildSyncTree(t *testing.T) {
Added: changes,
Mode: objecttree.Rebuild,
}
objTreeMock.EXPECT().Heads().AnyTimes().Return([]string{"headId"})
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(payload)).
Return(expectedRes, nil)
updateListenerMock.EXPECT().Rebuild(tr)
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
syncClientMock.EXPECT().Broadcast(gomock.Eq(headUpdate))
res, err := tr.AddRawChanges(ctx, payload)
res, err := tr.AddRawChangesFromPeer(ctx, "peerId", payload)
require.NoError(t, err)
require.Equal(t, expectedRes, res)
})
t.Run("AddRawChanges nothing", func(t *testing.T) {
t.Run("AddRawChangesFromPeer nothing", func(t *testing.T) {
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
payload := objecttree.RawChangesPayload{
NewHeads: nil,
@ -111,10 +115,11 @@ func Test_BuildSyncTree(t *testing.T) {
Added: changes,
Mode: objecttree.Nothing,
}
objTreeMock.EXPECT().Heads().AnyTimes().Return([]string{"headId"})
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(payload)).
Return(expectedRes, nil)
res, err := tr.AddRawChanges(ctx, payload)
res, err := tr.AddRawChangesFromPeer(ctx, "peerId", payload)
require.NoError(t, err)
require.Equal(t, expectedRes, res)
})

View file

@ -5,13 +5,14 @@ import (
"errors"
"sync"
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/util/slice"
"github.com/gogo/protobuf/proto"
)
var (
@ -34,7 +35,7 @@ type syncTreeHandler struct {
const maxQueueSize = 5
func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler {
func newSyncTreeHandler(spaceId string, objTree peerSendableObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler {
return &syncTreeHandler{
objTree: objTree,
syncProtocol: newTreeSyncProtocol(spaceId, objTree, syncClient),

View file

@ -5,13 +5,15 @@ import (
"sync"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)
type testObjTreeMock struct {
@ -19,6 +21,10 @@ type testObjTreeMock struct {
m sync.RWMutex
}
func (t *testObjTreeMock) AddRawChangesFromPeer(ctx context.Context, peerId string, changesPayload objecttree.RawChangesPayload) (res objecttree.AddResult, err error) {
return t.MockObjectTree.AddRawChanges(ctx, changesPayload)
}
func newTestObjMock(mockTree *mock_objecttree.MockObjectTree) *testObjTreeMock {
return &testObjTreeMock{
MockObjectTree: mockTree,

View file

@ -3,11 +3,12 @@ package synctree
import (
"context"
"go.uber.org/zap"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/util/slice"
"go.uber.org/zap"
)
type TreeSyncProtocol interface {
@ -19,11 +20,11 @@ type TreeSyncProtocol interface {
type treeSyncProtocol struct {
log logger.CtxLogger
spaceId string
objTree objecttree.ObjectTree
objTree peerSendableObjectTree
reqFactory RequestFactory
}
func newTreeSyncProtocol(spaceId string, objTree objecttree.ObjectTree, reqFactory RequestFactory) *treeSyncProtocol {
func newTreeSyncProtocol(spaceId string, objTree peerSendableObjectTree, reqFactory RequestFactory) *treeSyncProtocol {
return &treeSyncProtocol{
log: log.With(zap.String("spaceId", spaceId), zap.String("treeId", objTree.Id())),
spaceId: spaceId,
@ -74,7 +75,7 @@ func (t *treeSyncProtocol) HeadUpdate(ctx context.Context, senderId string, upda
return
}
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
_, err = objTree.AddRawChangesFromPeer(ctx, senderId, objecttree.RawChangesPayload{
NewHeads: update.Heads,
RawChanges: update.Changes,
})
@ -110,7 +111,7 @@ func (t *treeSyncProtocol) FullSyncRequest(ctx context.Context, senderId string,
}()
if len(request.Changes) != 0 && !t.hasHeads(objTree, request.Heads) {
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
_, err = objTree.AddRawChangesFromPeer(ctx, senderId, objecttree.RawChangesPayload{
NewHeads: request.Heads,
RawChanges: request.Changes,
})
@ -141,7 +142,7 @@ func (t *treeSyncProtocol) FullSyncResponse(ctx context.Context, senderId string
return
}
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
_, err = objTree.AddRawChangesFromPeer(ctx, senderId, objecttree.RawChangesPayload{
NewHeads: response.Heads,
RawChanges: response.Changes,
})

View file

@ -3,6 +3,16 @@ package synctree
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/cheggaaa/mb/v3"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
@ -12,14 +22,6 @@ import (
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/net/peer"
"github.com/cheggaaa/mb/v3"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"math/rand"
"sync"
"testing"
"time"
)
// protocolMsg is a message used in sync protocol tests
@ -317,6 +319,16 @@ func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.Ra
return res, nil
}
func (b *broadcastTree) AddRawChangesFromPeer(ctx context.Context, peerId string, changes objecttree.RawChangesPayload) (objecttree.AddResult, error) {
res, err := b.ObjectTree.AddRawChanges(ctx, changes)
if err != nil {
return objecttree.AddResult{}, err
}
upd := b.SyncClient.CreateHeadUpdate(b.ObjectTree, res.Added)
b.SyncClient.Broadcast(upd)
return res, nil
}
func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage {
changeCreator := objecttree.NewMockChangeCreator()
st := changeCreator.CreateNewTreeStorage(treeId, aclList.Head().Id, false)

View file

@ -2,6 +2,7 @@ package syncstatus
import (
"context"
"github.com/anyproto/any-sync/app"
)
@ -22,6 +23,9 @@ func (n *noOpSyncStatus) Name() (name string) {
func (n *noOpSyncStatus) HeadsChange(treeId string, heads []string) {
}
func (n *noOpSyncStatus) HeadsApply(senderId, treeId string, heads []string) {
}
func (n *noOpSyncStatus) HeadsReceive(senderId, treeId string, heads []string) {
}

View file

@ -11,4 +11,5 @@ type StatusUpdater interface {
HeadsChange(treeId string, heads []string)
HeadsReceive(senderId, treeId string, heads []string)
HeadsApply(senderId, treeId string, heads []string)
}