1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-11 10:18:08 +09:00

Changes and request factory

This commit is contained in:
mcrakhman 2024-06-12 20:39:17 +02:00
parent 51f740cb90
commit 5c2923b440
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
18 changed files with 293 additions and 1439 deletions

View file

@ -0,0 +1,99 @@
package synctree
import (
"fmt"
"slices"
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
type BroadcastOptions struct {
EmptyPeers []string
}
type HeadUpdate struct {
peerId string
objectId string
spaceId string
heads []string
changes []*treechangeproto.RawTreeChangeWithId
snapshotPath []string
root *treechangeproto.RawTreeChangeWithId
opts BroadcastOptions
}
func (h *HeadUpdate) SetPeerId(peerId string) {
h.peerId = peerId
}
func (h *HeadUpdate) SetProtoMessage(message proto.Message) error {
var (
msg *spacesyncproto.ObjectSyncMessage
ok bool
)
if msg, ok = message.(*spacesyncproto.ObjectSyncMessage); !ok {
return fmt.Errorf("unexpected message type: %T", message)
}
treeMsg := &treechangeproto.TreeSyncMessage{}
err := proto.Unmarshal(msg.Payload, treeMsg)
if err != nil {
return err
}
h.root = treeMsg.RootChange
headMsg := treeMsg.GetContent().GetHeadUpdate()
if headMsg == nil {
return fmt.Errorf("unexpected message type: %T", treeMsg.GetContent())
}
h.heads = headMsg.Heads
h.changes = headMsg.Changes
h.snapshotPath = headMsg.SnapshotPath
h.spaceId = msg.SpaceId
h.objectId = msg.ObjectId
return nil
}
func (h *HeadUpdate) ProtoMessage() (proto.Message, error) {
if h.heads != nil {
return h.SyncMessage()
}
return &spacesyncproto.ObjectSyncMessage{}, nil
}
func (h *HeadUpdate) PeerId() string {
return h.peerId
}
func (h *HeadUpdate) ObjectId() string {
return h.objectId
}
func (h *HeadUpdate) ShallowCopy() *HeadUpdate {
return &HeadUpdate{
peerId: h.peerId,
objectId: h.objectId,
heads: h.heads,
changes: h.changes,
snapshotPath: h.snapshotPath,
root: h.root,
}
}
func (h *HeadUpdate) SyncMessage() (*spacesyncproto.ObjectSyncMessage, error) {
changes := h.changes
if slices.Contains(h.opts.EmptyPeers, h.peerId) {
changes = nil
}
treeMsg := treechangeproto.WrapHeadUpdate(&treechangeproto.TreeHeadUpdate{
Heads: h.heads,
SnapshotPath: h.snapshotPath,
Changes: changes,
}, h.root)
return spacesyncproto.MarshallSyncMessage(treeMsg, h.spaceId, h.objectId)
}
func (h *HeadUpdate) RemoveChanges() {
h.changes = nil
}

View file

@ -1,63 +0,0 @@
package synctree
import (
"sync"
"time"
)
type operationCounter struct {
value int32
lastOp time.Time
lastOpMu sync.Mutex
ch chan struct{}
limit time.Duration
}
func defaultOpCounter() *operationCounter {
return newOpCounter(50 * time.Millisecond)
}
func newOpCounter(limit time.Duration) *operationCounter {
ac := &operationCounter{
limit: limit,
ch: make(chan struct{}, 1),
}
return ac
}
func (ac *operationCounter) Increment() {
ac.lastOpMu.Lock()
ac.value++
ac.lastOp = time.Now()
ac.lastOpMu.Unlock()
select {
case ac.ch <- struct{}{}:
default:
}
}
func (ac *operationCounter) Decrement() {
ac.lastOpMu.Lock()
ac.value--
ac.lastOp = time.Now()
ac.lastOpMu.Unlock()
select {
case ac.ch <- struct{}{}:
default:
}
}
func (ac *operationCounter) WaitIdle() {
for {
select {
case <-ac.ch:
case <-time.After(ac.limit):
ac.lastOpMu.Lock()
if ac.value == 0 {
ac.lastOpMu.Unlock()
return
}
ac.lastOpMu.Unlock()
}
}
}

View file

@ -1,366 +0,0 @@
package synctree
import (
"context"
"math/rand"
"testing"
"time"
"github.com/anyproto/any-sync/commonspace/object/accountdata"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/util/slice"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
)
func TestEmptyClientGetsFullHistory(t *testing.T) {
treeId := "treeId"
spaceId := "spaceId"
keys, err := accountdata.NewRandom()
require.NoError(t, err)
aclList, err := list.NewTestDerivedAcl(spaceId, keys)
require.NoError(t, err)
storage := createStorage(treeId, aclList)
changeCreator := objecttree.NewMockChangeCreator()
deps := fixtureDeps{
counter: defaultOpCounter(),
aclList: aclList,
initStorage: storage.(*treestorage.InMemoryTreeStorage),
connectionMap: map[string][]string{
"peer1": []string{"peer2"},
"peer2": []string{"peer1"},
},
emptyTrees: []string{"peer2"},
treeBuilder: objecttree.BuildTestableTree,
}
fx := newProtocolFixture(t, spaceId, deps)
fx.run(t)
fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: nil,
RawChanges: []*treechangeproto.RawTreeChangeWithId{
changeCreator.CreateRaw("1", aclList.Id(), treeId, true, treeId),
},
})
fx.counter.WaitIdle()
fx.stop()
firstHeads := fx.handlers["peer1"].tree().Heads()
secondHeads := fx.handlers["peer2"].tree().Heads()
require.True(t, slice.UnsortedEquals(firstHeads, secondHeads))
require.Equal(t, []string{"1"}, firstHeads)
logMsgs := fx.log.batcher.GetAll()
var fullResponseMsg msgDescription
for _, msg := range logMsgs {
descr := msg.description()
if descr.name == "FullSyncResponse" {
fullResponseMsg = descr
}
}
// that means that we got not only the last snapshot, but also the first one
require.Len(t, fullResponseMsg.changes, 2)
}
func TestTreeFuzzyMerge(t *testing.T) {
testTreeFuzzyMerge(t, true)
testTreeFuzzyMerge(t, false)
}
func testTreeFuzzyMerge(t *testing.T, withData bool) {
var (
rnd = rand.New(rand.NewSource(time.Now().Unix()))
levels = 20
perLevel = 20
rounds = 10
)
for i := 0; i < rounds; i++ {
testTreeMerge(t, levels, perLevel, withData, func() bool {
return true
})
testTreeMerge(t, levels, perLevel, withData, func() bool {
return false
})
testTreeMerge(t, levels, perLevel, withData, func() bool {
return rnd.Intn(10) > 8
})
levels += 2
}
}
func testTreeMerge(t *testing.T, levels, perLevel int, hasData bool, isSnapshot func() bool) {
treeId := "treeId"
spaceId := "spaceId"
keys, err := accountdata.NewRandom()
require.NoError(t, err)
aclList, err := list.NewTestDerivedAcl(spaceId, keys)
storage := createStorage(treeId, aclList)
changeCreator := objecttree.NewMockChangeCreator()
builder := objecttree.BuildTestableTree
if hasData {
builder = objecttree.BuildEmptyDataTestableTree
}
params := genParams{
prefix: "peer1",
aclId: aclList.Id(),
startIdx: 0,
levels: levels,
perLevel: perLevel,
snapshotId: treeId,
prevHeads: []string{treeId},
isSnapshot: isSnapshot,
hasData: hasData,
}
// generating initial tree
initialRes := genChanges(changeCreator, params)
err = storage.AddRawChangesSetHeads(initialRes.changes, initialRes.heads)
require.NoError(t, err)
deps := fixtureDeps{
counter: defaultOpCounter(),
aclList: aclList,
initStorage: storage.(*treestorage.InMemoryTreeStorage),
connectionMap: map[string][]string{
"peer1": []string{"node1"},
"peer2": []string{"node1"},
"node1": []string{"peer1", "peer2"},
},
emptyTrees: []string{"peer2", "node1"},
treeBuilder: builder,
}
fx := newProtocolFixture(t, spaceId, deps)
fx.run(t)
fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: initialRes.heads,
RawChanges: initialRes.changes,
})
fx.counter.WaitIdle()
firstHeads := fx.handlers["peer1"].tree().Heads()
secondHeads := fx.handlers["peer2"].tree().Heads()
require.True(t, slice.UnsortedEquals(firstHeads, secondHeads))
params = genParams{
prefix: "peer1",
aclId: aclList.Id(),
startIdx: levels,
levels: levels,
perLevel: perLevel,
snapshotId: initialRes.snapshotId,
prevHeads: initialRes.heads,
isSnapshot: isSnapshot,
hasData: hasData,
}
// generating different additions to the tree for different peers
peer1Res := genChanges(changeCreator, params)
params.prefix = "peer2"
peer2Res := genChanges(changeCreator, params)
fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: peer1Res.heads,
RawChanges: peer1Res.changes,
})
fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: peer2Res.heads,
RawChanges: peer2Res.changes,
})
fx.counter.WaitIdle()
fx.stop()
firstTree := fx.handlers["peer1"].tree()
secondTree := fx.handlers["peer2"].tree()
firstHeads = firstTree.Heads()
secondHeads = secondTree.Heads()
require.True(t, slice.UnsortedEquals(firstHeads, secondHeads))
require.True(t, slice.UnsortedEquals(firstHeads, append(peer1Res.heads, peer2Res.heads...)))
firstStorage := firstTree.Storage().(*treestorage.InMemoryTreeStorage)
secondStorage := secondTree.Storage().(*treestorage.InMemoryTreeStorage)
require.True(t, firstStorage.Equal(secondStorage))
if hasData {
for _, ch := range secondStorage.Changes {
if ch.Id == treeId {
continue
}
unmarshallRaw := &treechangeproto.RawTreeChange{}
proto.Unmarshal(ch.RawChange, unmarshallRaw)
treeCh := &treechangeproto.TreeChange{}
proto.Unmarshal(unmarshallRaw.Payload, treeCh)
require.Equal(t, ch.Id, string(treeCh.ChangesData))
}
}
}
func TestTreeStorageHasExtraChanges(t *testing.T) {
var (
rnd = rand.New(rand.NewSource(time.Now().Unix()))
levels = 20
perLevel = 40
)
// simulating cases where one peer has some extra changes saved in storage
// and checking that this will not break the sync
t.Run("tree storage has extra simple", func(t *testing.T) {
testTreeStorageHasExtra(t, levels, perLevel, false, func() bool {
return false
})
testTreeStorageHasExtra(t, levels, perLevel, false, func() bool {
return rnd.Intn(10) > 5
})
testTreeStorageHasExtra(t, levels, perLevel, true, func() bool {
return false
})
testTreeStorageHasExtra(t, levels, perLevel, true, func() bool {
return rnd.Intn(10) > 5
})
})
t.Run("tree storage has extra three parts", func(t *testing.T) {
testTreeStorageHasExtraThreeParts(t, levels, perLevel, false, func() bool {
return false
})
testTreeStorageHasExtraThreeParts(t, levels, perLevel, false, func() bool {
return rnd.Intn(10) > 5
})
testTreeStorageHasExtraThreeParts(t, levels, perLevel, true, func() bool {
return false
})
testTreeStorageHasExtraThreeParts(t, levels, perLevel, true, func() bool {
return rnd.Intn(10) > 5
})
})
}
func testTreeStorageHasExtra(t *testing.T, levels, perLevel int, hasData bool, isSnapshot func() bool) {
treeId := "treeId"
spaceId := "spaceId"
keys, err := accountdata.NewRandom()
require.NoError(t, err)
aclList, err := list.NewTestDerivedAcl(spaceId, keys)
storage := createStorage(treeId, aclList)
changeCreator := objecttree.NewMockChangeCreator()
builder := objecttree.BuildTestableTree
if hasData {
builder = objecttree.BuildEmptyDataTestableTree
}
params := genParams{
prefix: "peer1",
aclId: aclList.Id(),
startIdx: 0,
levels: levels,
perLevel: perLevel,
snapshotId: treeId,
prevHeads: []string{treeId},
isSnapshot: isSnapshot,
hasData: hasData,
}
deps := fixtureDeps{
counter: defaultOpCounter(),
aclList: aclList,
initStorage: storage.(*treestorage.InMemoryTreeStorage),
connectionMap: map[string][]string{
"peer1": []string{"peer2"},
"peer2": []string{"peer1"},
},
treeBuilder: builder,
}
fx := newProtocolFixture(t, spaceId, deps)
// generating initial tree
initialRes := genChanges(changeCreator, params)
fx.run(t)
// adding some changes to store, but without updating heads
store := fx.handlers["peer1"].tree().Storage().(*treestorage.InMemoryTreeStorage)
oldHeads, _ := store.Heads()
store.AddRawChangesSetHeads(initialRes.changes, oldHeads)
// sending those changes to other peer
fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: initialRes.heads,
RawChanges: initialRes.changes,
})
fx.counter.WaitIdle()
// here we want that the saved changes in storage should not affect the sync protocol
firstHeads := fx.handlers["peer1"].tree().Heads()
secondHeads := fx.handlers["peer2"].tree().Heads()
require.True(t, slice.UnsortedEquals(firstHeads, secondHeads))
require.True(t, slice.UnsortedEquals(initialRes.heads, firstHeads))
}
func testTreeStorageHasExtraThreeParts(t *testing.T, levels, perLevel int, hasData bool, isSnapshot func() bool) {
treeId := "treeId"
spaceId := "spaceId"
keys, err := accountdata.NewRandom()
require.NoError(t, err)
aclList, err := list.NewTestDerivedAcl(spaceId, keys)
storage := createStorage(treeId, aclList)
changeCreator := objecttree.NewMockChangeCreator()
builder := objecttree.BuildTestableTree
if hasData {
builder = objecttree.BuildEmptyDataTestableTree
}
params := genParams{
prefix: "peer1",
aclId: aclList.Id(),
startIdx: 0,
levels: levels,
perLevel: perLevel,
snapshotId: treeId,
prevHeads: []string{treeId},
isSnapshot: isSnapshot,
hasData: hasData,
}
deps := fixtureDeps{
counter: defaultOpCounter(),
aclList: aclList,
initStorage: storage.(*treestorage.InMemoryTreeStorage),
connectionMap: map[string][]string{
"peer1": []string{"peer2"},
"peer2": []string{"peer1"},
},
treeBuilder: builder,
}
fx := newProtocolFixture(t, spaceId, deps)
// generating parts
firstPart := genChanges(changeCreator, params)
params.startIdx = levels
params.snapshotId = firstPart.snapshotId
params.prevHeads = firstPart.heads
secondPart := genChanges(changeCreator, params)
params.startIdx = levels * 2
params.snapshotId = secondPart.snapshotId
params.prevHeads = secondPart.heads
thirdPart := genChanges(changeCreator, params)
// adding part1 to first peer and saving part2 and part3 in its storage
res, _ := fx.handlers["peer1"].tree().AddRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: firstPart.heads,
RawChanges: firstPart.changes,
})
require.True(t, slice.UnsortedEquals(res.Heads, firstPart.heads))
store := fx.handlers["peer1"].tree().Storage().(*treestorage.InMemoryTreeStorage)
oldHeads, _ := store.Heads()
store.AddRawChangesSetHeads(secondPart.changes, oldHeads)
store.AddRawChangesSetHeads(thirdPart.changes, oldHeads)
var peer2Initial []*treechangeproto.RawTreeChangeWithId
peer2Initial = append(peer2Initial, firstPart.changes...)
peer2Initial = append(peer2Initial, secondPart.changes...)
// adding part1 and part2 to second peer
res, _ = fx.handlers["peer2"].tree().AddRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: secondPart.heads,
RawChanges: peer2Initial,
})
require.True(t, slice.UnsortedEquals(res.Heads, secondPart.heads))
fx.run(t)
// sending part3 changes to other peer
fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: thirdPart.heads,
RawChanges: thirdPart.changes,
})
fx.counter.WaitIdle()
firstHeads := fx.handlers["peer1"].tree().Heads()
secondHeads := fx.handlers["peer2"].tree().Heads()
require.True(t, slice.UnsortedEquals(firstHeads, secondHeads))
require.True(t, slice.UnsortedEquals(thirdPart.heads, firstHeads))
}

View file

@ -1,74 +0,0 @@
package synctree
import (
"errors"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"sync"
)
type ReceiveQueue interface {
AddMessage(senderId string, msg *treechangeproto.TreeSyncMessage, replyId string) (queueFull bool)
GetMessage(senderId string) (msg *treechangeproto.TreeSyncMessage, replyId string, err error)
ClearQueue(senderId string)
}
type queueMsg struct {
replyId string
syncMessage *treechangeproto.TreeSyncMessage
}
type receiveQueue struct {
sync.Mutex
handlerMap map[string][]queueMsg
maxSize int
}
func newReceiveQueue(maxSize int) ReceiveQueue {
return &receiveQueue{
Mutex: sync.Mutex{},
handlerMap: map[string][]queueMsg{},
maxSize: maxSize,
}
}
var errEmptyQueue = errors.New("the queue is empty")
func (q *receiveQueue) AddMessage(senderId string, msg *treechangeproto.TreeSyncMessage, replyId string) (queueFull bool) {
q.Lock()
defer q.Unlock()
queue := q.handlerMap[senderId]
queueFull = len(queue) >= maxQueueSize
queue = append(queue, queueMsg{replyId, msg})
q.handlerMap[senderId] = queue
return
}
func (q *receiveQueue) GetMessage(senderId string) (msg *treechangeproto.TreeSyncMessage, replyId string, err error) {
q.Lock()
defer q.Unlock()
if len(q.handlerMap) == 0 {
err = errEmptyQueue
return
}
qMsg := q.handlerMap[senderId][0]
msg = qMsg.syncMessage
replyId = qMsg.replyId
return
}
func (q *receiveQueue) ClearQueue(senderId string) {
q.Lock()
defer q.Unlock()
queue := q.handlerMap[senderId]
excessLen := len(queue) - q.maxSize + 1
if excessLen <= 0 {
excessLen = 1
}
queue = queue[excessLen:]
q.handlerMap[senderId] = queue
}

View file

@ -0,0 +1,45 @@
package synctree
import (
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
type Request struct {
peerId string
spaceId string
objectId string
heads []string
snapshotPath []string
root *treechangeproto.RawTreeChangeWithId
}
func NewRequest(peerId, spaceId, objectId string, heads []string, snapshotPath []string, root *treechangeproto.RawTreeChangeWithId) Request {
return Request{
peerId: peerId,
spaceId: spaceId,
objectId: objectId,
heads: heads,
snapshotPath: snapshotPath,
root: root,
}
}
func (r Request) PeerId() string {
return r.peerId
}
func (r Request) ObjectId() string {
return r.root.Id
}
func (r Request) Proto() (proto.Message, error) {
msg := &treechangeproto.TreeFullSyncRequest{
Heads: r.heads,
SnapshotPath: r.snapshotPath,
}
req := treechangeproto.WrapFullRequest(msg, r.root)
return spacesyncproto.MarshallSyncMessage(req, r.spaceId, r.objectId)
}

View file

@ -1,72 +1,51 @@
package synctree package synctree
import ( import (
"fmt"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/util/slice"
) )
const batchSize = 1024 * 1024 * 10
type RequestFactory interface { type RequestFactory interface {
CreateHeadUpdate(t objecttree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *treechangeproto.TreeSyncMessage) CreateHeadUpdate(t objecttree.ObjectTree, ignoredPeer string, added []*treechangeproto.RawTreeChangeWithId) (headUpdate HeadUpdate)
CreateNewTreeRequest() (msg *treechangeproto.TreeSyncMessage) CreateNewTreeRequest(peerId, objectId string) Request
CreateFullSyncRequest(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (req *treechangeproto.TreeSyncMessage, err error) CreateFullSyncRequest(peerId string, t objecttree.ObjectTree) Request
CreateFullSyncResponse(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (*treechangeproto.TreeSyncMessage, error) CreateResponseProducer(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (ResponseProducer, error)
} }
func NewRequestFactory() RequestFactory { func NewRequestFactory(spaceId string) RequestFactory {
return &requestFactory{} return &requestFactory{spaceId}
} }
type requestFactory struct{} type requestFactory struct {
spaceId string
func (r *requestFactory) CreateHeadUpdate(t objecttree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *treechangeproto.TreeSyncMessage) {
return treechangeproto.WrapHeadUpdate(&treechangeproto.TreeHeadUpdate{
Heads: t.Heads(),
Changes: added,
SnapshotPath: t.SnapshotPath(),
}, t.Header())
} }
func (r *requestFactory) CreateNewTreeRequest() (msg *treechangeproto.TreeSyncMessage) { func (r *requestFactory) CreateHeadUpdate(t objecttree.ObjectTree, ignoredPeer string, added []*treechangeproto.RawTreeChangeWithId) (headUpdate HeadUpdate) {
return treechangeproto.WrapFullRequest(&treechangeproto.TreeFullSyncRequest{}, nil) broadcastOpts := BroadcastOptions{}
} if ignoredPeer != "" {
broadcastOpts.EmptyPeers = []string{ignoredPeer}
func (r *requestFactory) CreateFullSyncRequest(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (msg *treechangeproto.TreeSyncMessage, err error) {
req := &treechangeproto.TreeFullSyncRequest{}
if t == nil {
return nil, fmt.Errorf("tree should not be empty")
} }
return HeadUpdate{
req.Heads = t.Heads() objectId: t.Id(),
req.SnapshotPath = t.SnapshotPath() spaceId: r.spaceId,
heads: t.Heads(),
var changesAfterSnapshot []*treechangeproto.RawTreeChangeWithId changes: added,
changesAfterSnapshot, err = t.ChangesAfterCommonSnapshot(theirSnapshotPath, theirHeads) snapshotPath: t.SnapshotPath(),
if err != nil { root: t.Header(),
return opts: broadcastOpts,
} }
req.Changes = changesAfterSnapshot
msg = treechangeproto.WrapFullRequest(req, t.Header())
return
} }
func (r *requestFactory) CreateFullSyncResponse(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (msg *treechangeproto.TreeSyncMessage, err error) { func (r *requestFactory) CreateNewTreeRequest(peerId, objectId string) Request {
resp := &treechangeproto.TreeFullSyncResponse{ return NewRequest(peerId, r.spaceId, objectId, nil, nil, nil)
Heads: t.Heads(), }
SnapshotPath: t.SnapshotPath(),
} func (r *requestFactory) CreateFullSyncRequest(peerId string, t objecttree.ObjectTree) Request {
if slice.UnsortedEquals(theirHeads, t.Heads()) { return NewRequest(peerId, r.spaceId, t.Id(), t.Heads(), t.SnapshotPath(), t.Header())
msg = treechangeproto.WrapFullResponse(resp, t.Header()) }
return
} func (r *requestFactory) CreateResponseProducer(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (ResponseProducer, error) {
return NewResponseProducer(r.spaceId, t, theirHeads, theirSnapshotPath)
ourChanges, err := t.ChangesAfterCommonSnapshot(theirSnapshotPath, theirHeads)
if err != nil {
return
}
resp.Changes = ourChanges
msg = treechangeproto.WrapFullResponse(resp, t.Header())
return
} }

View file

@ -0,0 +1,55 @@
package synctree
import (
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
type Response struct {
spaceId string
objectId string
heads []string
snapshotPath []string
changes []*treechangeproto.RawTreeChangeWithId
root *treechangeproto.RawTreeChangeWithId
}
func (r *Response) ProtoMessage() (proto.Message, error) {
resp := &treechangeproto.TreeFullSyncResponse{
Heads: r.heads,
SnapshotPath: r.snapshotPath,
Changes: r.changes,
}
wrapped := treechangeproto.WrapFullResponse(resp, r.root)
return spacesyncproto.MarshallSyncMessage(wrapped, r.spaceId, r.objectId)
}
func (r *Response) SetProtoMessage(message proto.Message) error {
var (
msg *spacesyncproto.ObjectSyncMessage
ok bool
)
if msg, ok = message.(*spacesyncproto.ObjectSyncMessage); !ok {
return fmt.Errorf("unexpected message type: %T", message)
}
treeMsg := &treechangeproto.TreeSyncMessage{}
err := proto.Unmarshal(msg.Payload, treeMsg)
if err != nil {
return err
}
r.root = treeMsg.RootChange
headMsg := treeMsg.GetContent().GetFullSyncResponse()
if headMsg == nil {
return fmt.Errorf("unexpected message type: %T", treeMsg.GetContent())
}
r.heads = headMsg.Heads
r.changes = headMsg.Changes
r.snapshotPath = headMsg.SnapshotPath
r.spaceId = msg.SpaceId
r.objectId = msg.ObjectId
return nil
}

View file

@ -0,0 +1,40 @@
package synctree
import "github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
type ResponseProducer interface {
NewResponse(batchSize int) (Response, error)
}
type responseProducer struct {
iterator objecttree.LoadIterator
spaceId string
objectId string
}
func NewResponseProducer(spaceId string, tree objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (ResponseProducer, error) {
res, err := tree.ChangesAfterCommonSnapshotLoader(theirSnapshotPath, theirHeads)
if err != nil {
return nil, err
}
return &responseProducer{
iterator: res,
spaceId: spaceId,
objectId: tree.Id(),
}, nil
}
func (r *responseProducer) NewResponse(batchSize int) (Response, error) {
res, err := r.iterator.NextBatch(batchSize)
if err != nil {
return Response{}, err
}
return Response{
heads: res.Heads,
snapshotPath: res.SnapshotPath,
changes: res.Batch,
root: res.Root,
spaceId: r.spaceId,
objectId: r.objectId,
}, nil
}

View file

@ -3,11 +3,12 @@ package synctree
import ( import (
"context" "context"
"go.uber.org/zap"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/requestmanager" "github.com/anyproto/any-sync/commonspace/requestmanager"
"github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/spacesyncproto"
"go.uber.org/zap"
) )
type SyncClient interface { type SyncClient interface {
@ -20,9 +21,7 @@ type SyncClient interface {
type syncClient struct { type syncClient struct {
RequestFactory RequestFactory
spaceId string spaceId string
requestManager requestmanager.RequestManager
peerManager peermanager.PeerManager
} }
func NewSyncClient(spaceId string, requestManager requestmanager.RequestManager, peerManager peermanager.PeerManager) SyncClient { func NewSyncClient(spaceId string, requestManager requestmanager.RequestManager, peerManager peermanager.PeerManager) SyncClient {

View file

@ -1,143 +0,0 @@
package synctree
import (
"context"
"errors"
"sync"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/util/slice"
"github.com/gogo/protobuf/proto"
)
var (
ErrMessageIsRequest = errors.New("message is request")
ErrMessageIsNotRequest = errors.New("message is not request")
ErrMoreThanOneRequest = errors.New("more than one request for same peer")
)
type syncTreeHandler struct {
objTree objecttree.ObjectTree
syncClient SyncClient
syncProtocol TreeSyncProtocol
syncStatus syncstatus.StatusUpdater
spaceId string
handlerLock sync.Mutex
pendingRequests map[string]struct{}
heads []string
}
const maxQueueSize = 5
func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler {
return &syncTreeHandler{
objTree: objTree,
syncProtocol: newTreeSyncProtocol(spaceId, objTree, syncClient),
syncClient: syncClient,
syncStatus: syncStatus,
spaceId: spaceId,
pendingRequests: make(map[string]struct{}),
}
}
func (s *syncTreeHandler) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(request.Payload, unmarshalled)
if err != nil {
return
}
fullSyncRequest := unmarshalled.GetContent().GetFullSyncRequest()
if fullSyncRequest == nil {
return nil, ErrMessageIsNotRequest
}
// setting pending requests
s.handlerLock.Lock()
_, exists := s.pendingRequests[senderId]
if exists {
s.handlerLock.Unlock()
return nil, ErrMoreThanOneRequest
}
s.pendingRequests[senderId] = struct{}{}
s.handlerLock.Unlock()
response, err = s.handleRequest(ctx, senderId, fullSyncRequest)
// removing pending requests
s.handlerLock.Lock()
delete(s.pendingRequests, senderId)
s.handlerLock.Unlock()
return
}
func (s *syncTreeHandler) handleRequest(ctx context.Context, senderId string, fullSyncRequest *treechangeproto.TreeFullSyncRequest) (response *spacesyncproto.ObjectSyncMessage, err error) {
s.objTree.Lock()
defer s.objTree.Unlock()
treeResp, err := s.syncProtocol.FullSyncRequest(ctx, senderId, fullSyncRequest)
if err != nil {
return
}
response, err = spacesyncproto.MarshallSyncMessage(treeResp, s.spaceId, s.objTree.Id())
return
}
func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(msg.Payload, unmarshalled)
if err != nil {
return
}
heads := treechangeproto.GetHeads(unmarshalled)
s.syncStatus.HeadsReceive(senderId, msg.ObjectId, heads)
s.handlerLock.Lock()
// if the update has same heads then returning not to hang on a lock
if unmarshalled.GetContent().GetHeadUpdate() != nil && slice.UnsortedEquals(heads, s.heads) {
s.handlerLock.Unlock()
return
}
s.handlerLock.Unlock()
return s.handleMessage(ctx, unmarshalled, senderId)
}
func (s *syncTreeHandler) handleMessage(ctx context.Context, msg *treechangeproto.TreeSyncMessage, senderId string) (err error) {
s.objTree.Lock()
defer s.objTree.Unlock()
var (
copyHeads = make([]string, 0, len(s.objTree.Heads()))
treeId = s.objTree.Id()
content = msg.GetContent()
)
// getting old heads
copyHeads = append(copyHeads, s.objTree.Heads()...)
defer func() {
// checking if something changed
if !slice.UnsortedEquals(copyHeads, s.objTree.Heads()) {
s.handlerLock.Lock()
defer s.handlerLock.Unlock()
s.heads = s.heads[:0]
for _, h := range s.objTree.Heads() {
s.heads = append(s.heads, h)
}
}
}()
switch {
case content.GetHeadUpdate() != nil:
var syncReq *treechangeproto.TreeSyncMessage
syncReq, err = s.syncProtocol.HeadUpdate(ctx, senderId, content.GetHeadUpdate())
if err != nil || syncReq == nil {
return
}
return s.syncClient.QueueRequest(senderId, treeId, syncReq)
case content.GetFullSyncRequest() != nil:
return ErrMessageIsRequest
case content.GetFullSyncResponse() != nil:
return s.syncProtocol.FullSyncResponse(ctx, senderId, content.GetFullSyncResponse())
}
return
}

View file

@ -1,240 +0,0 @@
package synctree
import (
"context"
"sync"
"testing"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)
type testObjTreeMock struct {
*mock_objecttree.MockObjectTree
m sync.RWMutex
}
func newTestObjMock(mockTree *mock_objecttree.MockObjectTree) *testObjTreeMock {
return &testObjTreeMock{
MockObjectTree: mockTree,
}
}
func (t *testObjTreeMock) Lock() {
t.m.Lock()
}
func (t *testObjTreeMock) RLock() {
t.m.RLock()
}
func (t *testObjTreeMock) Unlock() {
t.m.Unlock()
}
func (t *testObjTreeMock) RUnlock() {
t.m.RUnlock()
}
func (t *testObjTreeMock) TryLock() bool {
return t.m.TryLock()
}
func (t *testObjTreeMock) TryRLock() bool {
return t.m.TryRLock()
}
type syncHandlerFixture struct {
ctrl *gomock.Controller
syncClientMock *mock_synctree.MockSyncClient
objectTreeMock *testObjTreeMock
syncProtocolMock *mock_synctree.MockTreeSyncProtocol
spaceId string
senderId string
treeId string
syncHandler *syncTreeHandler
}
func newSyncHandlerFixture(t *testing.T) *syncHandlerFixture {
ctrl := gomock.NewController(t)
objectTreeMock := newTestObjMock(mock_objecttree.NewMockObjectTree(ctrl))
syncClientMock := mock_synctree.NewMockSyncClient(ctrl)
syncProtocolMock := mock_synctree.NewMockTreeSyncProtocol(ctrl)
spaceId := "spaceId"
syncHandler := &syncTreeHandler{
objTree: objectTreeMock,
syncClient: syncClientMock,
syncProtocol: syncProtocolMock,
spaceId: spaceId,
syncStatus: syncstatus.NewNoOpSyncStatus(),
pendingRequests: map[string]struct{}{},
}
return &syncHandlerFixture{
ctrl: ctrl,
objectTreeMock: objectTreeMock,
syncProtocolMock: syncProtocolMock,
syncClientMock: syncClientMock,
syncHandler: syncHandler,
spaceId: spaceId,
senderId: "senderId",
treeId: "treeId",
}
}
func (fx *syncHandlerFixture) stop() {
fx.ctrl.Finish()
}
func TestSyncTreeHandler_HandleMessage(t *testing.T) {
ctx := context.Background()
t.Run("handle head update message, heads not equal, request returned", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
treeId := "treeId"
chWithId := &treechangeproto.RawTreeChangeWithId{}
headUpdate := &treechangeproto.TreeHeadUpdate{
Heads: []string{"h3"},
}
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
syncReq := &treechangeproto.TreeSyncMessage{}
fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h2"})
fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h3"})
fx.syncProtocolMock.EXPECT().HeadUpdate(ctx, fx.senderId, gomock.Any()).Return(syncReq, nil)
fx.syncClientMock.EXPECT().QueueRequest(fx.senderId, fx.treeId, syncReq).Return(nil)
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.NoError(t, err)
require.Equal(t, []string{"h3"}, fx.syncHandler.heads)
})
t.Run("handle head update message, heads equal", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
treeId := "treeId"
chWithId := &treechangeproto.RawTreeChangeWithId{}
headUpdate := &treechangeproto.TreeHeadUpdate{
Heads: []string{"h1"},
}
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
fx.syncHandler.heads = []string{"h1"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.NoError(t, err)
})
t.Run("handle head update message, no sync request returned", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
treeId := "treeId"
chWithId := &treechangeproto.RawTreeChangeWithId{}
headUpdate := &treechangeproto.TreeHeadUpdate{
Heads: []string{"h3"},
}
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h2"})
fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h3"})
fx.syncProtocolMock.EXPECT().HeadUpdate(ctx, fx.senderId, gomock.Any()).Return(nil, nil)
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.NoError(t, err)
require.Equal(t, []string{"h3"}, fx.syncHandler.heads)
})
t.Run("handle full sync request returns error", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
treeId := "treeId"
chWithId := &treechangeproto.RawTreeChangeWithId{}
fullRequest := &treechangeproto.TreeFullSyncRequest{
Heads: []string{"h3"},
}
treeMsg := treechangeproto.WrapFullRequest(fullRequest, chWithId)
objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
fx.objectTreeMock.EXPECT().Heads().Times(3).Return([]string{"h2"})
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.Equal(t, err, ErrMessageIsRequest)
})
t.Run("handle full sync response", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
treeId := "treeId"
chWithId := &treechangeproto.RawTreeChangeWithId{}
fullSyncResponse := &treechangeproto.TreeFullSyncResponse{
Heads: []string{"h3"},
}
treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId)
objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h2"})
fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h3"})
fx.syncProtocolMock.EXPECT().FullSyncResponse(ctx, fx.senderId, gomock.Any()).Return(nil)
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.NoError(t, err)
})
}
func TestSyncTreeHandler_HandleRequest(t *testing.T) {
ctx := context.Background()
t.Run("handle request", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
treeId := "treeId"
chWithId := &treechangeproto.RawTreeChangeWithId{}
fullRequest := &treechangeproto.TreeFullSyncRequest{}
treeMsg := treechangeproto.WrapFullRequest(fullRequest, chWithId)
objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
syncResp := &treechangeproto.TreeSyncMessage{}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
fx.syncProtocolMock.EXPECT().FullSyncRequest(ctx, fx.senderId, gomock.Any()).Return(syncResp, nil)
res, err := fx.syncHandler.HandleRequest(ctx, fx.senderId, objectMsg)
require.NoError(t, err)
require.NotNil(t, res)
})
t.Run("handle other message", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
treeId := "treeId"
chWithId := &treechangeproto.RawTreeChangeWithId{}
fullResponse := &treechangeproto.TreeFullSyncResponse{}
responseMsg := treechangeproto.WrapFullResponse(fullResponse, chWithId)
headUpdate := &treechangeproto.TreeHeadUpdate{}
headUpdateMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
for _, msg := range []*treechangeproto.TreeSyncMessage{responseMsg, headUpdateMsg} {
objectMsg, _ := spacesyncproto.MarshallSyncMessage(msg, "spaceId", treeId)
_, err := fx.syncHandler.HandleRequest(ctx, fx.senderId, objectMsg)
require.Equal(t, err, ErrMessageIsNotRequest)
}
})
}

View file

@ -1,486 +0,0 @@
package synctree
import (
"context"
"fmt"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/net/peer"
"github.com/cheggaaa/mb/v3"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"math/rand"
"sync"
"testing"
"time"
)
// protocolMsg is a message used in sync protocol tests
type protocolMsg struct {
msg *spacesyncproto.ObjectSyncMessage
senderId string
receiverId string
userMsg *objecttree.RawChangesPayload
}
// msgDescription is a representation of message used for checking the results of the test
type msgDescription struct {
name string
from string
to string
heads []string
changes []*treechangeproto.RawTreeChangeWithId
}
func (p *protocolMsg) description() (descr msgDescription) {
unmarshalled := &treechangeproto.TreeSyncMessage{}
err := proto.Unmarshal(p.msg.Payload, unmarshalled)
if err != nil {
panic(err)
}
descr = msgDescription{
from: p.senderId,
to: p.receiverId,
}
switch {
case unmarshalled.GetContent().GetHeadUpdate() != nil:
cnt := unmarshalled.GetContent().GetHeadUpdate()
descr.name = "HeadUpdate"
descr.heads = cnt.Heads
descr.changes = unmarshalled.GetContent().GetHeadUpdate().Changes
case unmarshalled.GetContent().GetFullSyncRequest() != nil:
cnt := unmarshalled.GetContent().GetFullSyncRequest()
descr.name = "FullSyncRequest"
descr.heads = cnt.Heads
descr.changes = unmarshalled.GetContent().GetFullSyncRequest().Changes
case unmarshalled.GetContent().GetFullSyncResponse() != nil:
cnt := unmarshalled.GetContent().GetFullSyncResponse()
descr.name = "FullSyncResponse"
descr.heads = cnt.Heads
descr.changes = unmarshalled.GetContent().GetFullSyncResponse().Changes
}
return
}
// messageLog saves all messages that were sent during sync test
type messageLog struct {
batcher *mb.MB[protocolMsg]
}
func newMessageLog() *messageLog {
return &messageLog{batcher: mb.New[protocolMsg](0)}
}
func (m *messageLog) addMessage(msg protocolMsg) {
m.batcher.Add(context.Background(), msg)
}
type requestPeerManager struct {
peerId string
handlers map[string]*testSyncHandler
log *messageLog
}
func newRequestPeerManager(peerId string, log *messageLog) *requestPeerManager {
return &requestPeerManager{
peerId: peerId,
handlers: map[string]*testSyncHandler{},
log: log,
}
}
func (r *requestPeerManager) addHandler(peerId string, handler *testSyncHandler) {
r.handlers[peerId] = handler
}
func (r *requestPeerManager) Run(ctx context.Context) (err error) {
return nil
}
func (r *requestPeerManager) Close(ctx context.Context) (err error) {
return nil
}
func (r *requestPeerManager) SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
panic("should not be called")
}
func (r *requestPeerManager) QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
pMsg := protocolMsg{
msg: msg,
senderId: r.peerId,
receiverId: peerId,
}
r.log.addMessage(pMsg)
return r.handlers[peerId].send(context.Background(), pMsg)
}
func (r *requestPeerManager) Init(a *app.App) (err error) {
return
}
func (r *requestPeerManager) Name() (name string) {
return
}
func (r *requestPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
pMsg := protocolMsg{
msg: msg,
senderId: r.peerId,
receiverId: peerId,
}
r.log.addMessage(pMsg)
return r.handlers[peerId].send(context.Background(), pMsg)
}
func (r *requestPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
for _, handler := range r.handlers {
pMsg := protocolMsg{
msg: msg,
senderId: r.peerId,
receiverId: handler.peerId,
}
r.log.addMessage(pMsg)
handler.send(context.Background(), pMsg)
}
return
}
func (r *requestPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) {
return nil, nil
}
func (r *requestPeerManager) GetNodePeers(ctx context.Context) (peers []peer.Peer, err error) {
return nil, nil
}
// testSyncHandler is the wrapper around individual tree to test sync protocol
type testSyncHandler struct {
synchandler.SyncHandler
batcher *mb.MB[protocolMsg]
peerId string
aclList list.AclList
log *messageLog
syncClient SyncClient
builder objecttree.BuildObjectTreeFunc
peerManager *requestPeerManager
counter *operationCounter
}
// createSyncHandler creates a sync handler when a tree is already created
func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *testSyncHandler {
peerManager := newRequestPeerManager(peerId, log)
syncClient := NewSyncClient(spaceId, peerManager, peerManager)
netTree := &broadcastTree{
ObjectTree: objTree,
SyncClient: syncClient,
}
handler := newSyncTreeHandler(spaceId, netTree, syncClient, syncstatus.NewNoOpSyncStatus())
return &testSyncHandler{
SyncHandler: handler,
batcher: mb.New[protocolMsg](0),
peerId: peerId,
peerManager: peerManager,
}
}
// createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree)
func createEmptySyncHandler(peerId, spaceId string, builder objecttree.BuildObjectTreeFunc, aclList list.AclList, log *messageLog) *testSyncHandler {
peerManager := newRequestPeerManager(peerId, log)
syncClient := NewSyncClient(spaceId, peerManager, peerManager)
batcher := mb.New[protocolMsg](0)
return &testSyncHandler{
batcher: batcher,
peerId: peerId,
aclList: aclList,
log: log,
syncClient: syncClient,
builder: builder,
peerManager: peerManager,
}
}
func (h *testSyncHandler) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) {
if h.SyncHandler != nil {
return h.SyncHandler.HandleMessage(ctx, senderId, request)
}
unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(request.Payload, unmarshalled)
if err != nil {
return
}
if unmarshalled.Content.GetFullSyncResponse() == nil {
newTreeRequest := NewRequestFactory().CreateNewTreeRequest()
return h.syncClient.QueueRequest(senderId, request.ObjectId, newTreeRequest)
}
fullSyncResponse := unmarshalled.Content.GetFullSyncResponse()
treeStorage, _ := treestorage.NewInMemoryTreeStorage(unmarshalled.RootChange, []string{unmarshalled.RootChange.Id}, nil)
tree, err := h.builder(treeStorage, h.aclList)
if err != nil {
return
}
netTree := &broadcastTree{
ObjectTree: tree,
SyncClient: h.syncClient,
}
res, err := netTree.AddRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: fullSyncResponse.Heads,
RawChanges: fullSyncResponse.Changes,
})
if err != nil {
return
}
h.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, h.syncClient, syncstatus.NewNoOpSyncStatus())
headUpdate := NewRequestFactory().CreateHeadUpdate(netTree, res.Added)
h.syncClient.Broadcast(headUpdate)
return nil
}
func (h *testSyncHandler) manager() *requestPeerManager {
return h.peerManager
}
func (h *testSyncHandler) tree() *broadcastTree {
return h.SyncHandler.(*syncTreeHandler).objTree.(*broadcastTree)
}
func (h *testSyncHandler) send(ctx context.Context, msg protocolMsg) (err error) {
return h.batcher.Add(ctx, msg)
}
func (h *testSyncHandler) sendRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) {
h.batcher.Add(ctx, protocolMsg{userMsg: &changes})
}
func (h *testSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
for {
res, err := h.batcher.WaitOne(ctx)
if err != nil {
return
}
if res.userMsg != nil {
h.counter.Increment()
h.tree().Lock()
userRes, err := h.tree().AddRawChanges(ctx, *res.userMsg)
require.NoError(t, err)
fmt.Println("user add result", userRes.Heads)
h.tree().Unlock()
h.counter.Decrement()
continue
}
if res.description().name == "FullSyncRequest" {
h.counter.Increment()
resp, err := h.HandleRequest(ctx, res.senderId, res.msg)
h.counter.Decrement()
if err != nil {
fmt.Println("error handling request", err.Error())
continue
}
h.peerManager.SendPeer(ctx, res.senderId, resp)
} else {
h.counter.Increment()
err = h.HandleMessage(ctx, res.senderId, res.msg)
h.counter.Decrement()
if err != nil {
fmt.Println("error handling message", err.Error())
}
}
}
}()
}
// broadcastTree is the tree that broadcasts changes to everyone when changes are added
// it is a simplified version of SyncTree which is easier to use in the test environment
type broadcastTree struct {
objecttree.ObjectTree
SyncClient
}
func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) (objecttree.AddResult, error) {
res, err := b.ObjectTree.AddRawChanges(ctx, changes)
if err != nil {
return objecttree.AddResult{}, err
}
upd := b.SyncClient.CreateHeadUpdate(b.ObjectTree, res.Added)
b.SyncClient.Broadcast(upd)
return res, nil
}
func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage {
changeCreator := objecttree.NewMockChangeCreator()
st := changeCreator.CreateNewTreeStorage(treeId, aclList.Head().Id, false)
return st
}
func createTestTree(aclList list.AclList, storage treestorage.TreeStorage) (objecttree.ObjectTree, error) {
return objecttree.BuildEmptyDataTestableTree(storage, aclList)
}
type fixtureDeps struct {
counter *operationCounter
aclList list.AclList
initStorage *treestorage.InMemoryTreeStorage
connectionMap map[string][]string
emptyTrees []string
treeBuilder objecttree.BuildObjectTreeFunc
}
// protocolFixture is the test environment for sync protocol tests
type protocolFixture struct {
handlers map[string]*testSyncHandler
log *messageLog
wg *sync.WaitGroup
counter *operationCounter
ctx context.Context
cancel context.CancelFunc
}
func newProtocolFixture(t *testing.T, spaceId string, deps fixtureDeps) *protocolFixture {
var (
handlers = map[string]*testSyncHandler{}
log = newMessageLog()
wg = sync.WaitGroup{}
ctx, cancel = context.WithCancel(context.Background())
)
for peerId := range deps.connectionMap {
var handler *testSyncHandler
if slices.Contains(deps.emptyTrees, peerId) {
handler = createEmptySyncHandler(peerId, spaceId, deps.treeBuilder, deps.aclList, log)
} else {
stCopy := deps.initStorage.Copy()
testTree, err := deps.treeBuilder(stCopy, deps.aclList)
require.NoError(t, err)
handler = createSyncHandler(peerId, spaceId, testTree, log)
}
handler.counter = deps.counter
handlers[peerId] = handler
}
for peerId, connectionMap := range deps.connectionMap {
handler := handlers[peerId]
manager := handler.manager()
for _, connectionId := range connectionMap {
manager.addHandler(connectionId, handlers[connectionId])
}
}
return &protocolFixture{
counter: deps.counter,
handlers: handlers,
log: log,
wg: &wg,
ctx: ctx,
cancel: cancel,
}
}
func (p *protocolFixture) run(t *testing.T) {
for _, handler := range p.handlers {
handler.run(p.ctx, t, p.wg)
}
}
func (p *protocolFixture) stop() {
p.cancel()
p.wg.Wait()
}
// genParams is the parameters for genChanges
type genParams struct {
// prefix is the prefix which is added to change id
prefix string
aclId string
startIdx int
levels int
perLevel int
snapshotId string
prevHeads []string
isSnapshot func() bool
hasData bool
}
// genResult is the result of genChanges
type genResult struct {
changes []*treechangeproto.RawTreeChangeWithId
heads []string
snapshotId string
}
// genChanges generates several levels of tree changes where each level is connected only with previous one
func genChanges(creator *objecttree.MockChangeCreator, params genParams) (res genResult) {
src := rand.NewSource(time.Now().Unix())
rnd := rand.New(src)
var (
prevHeads []string
snapshotId = params.snapshotId
)
prevHeads = append(prevHeads, params.prevHeads...)
for i := 0; i < params.levels; i++ {
var (
newHeads []string
usedIds = map[string]struct{}{}
)
newChange := func(isSnapshot bool, idx int, prevIds []string) string {
newId := fmt.Sprintf("%s.%d.%d", params.prefix, params.startIdx+i, idx)
var data []byte
if params.hasData {
data = []byte(newId)
}
newCh := creator.CreateRawWithData(newId, params.aclId, snapshotId, isSnapshot, data, prevIds...)
res.changes = append(res.changes, newCh)
return newId
}
if params.isSnapshot() {
newId := newChange(true, 0, prevHeads)
prevHeads = []string{newId}
snapshotId = newId
continue
}
perLevel := rnd.Intn(params.perLevel)
if perLevel == 0 {
perLevel = 1
}
for j := 0; j < perLevel; j++ {
prevConns := rnd.Intn(len(prevHeads))
if prevConns == 0 {
prevConns = 1
}
rnd.Shuffle(len(prevHeads), func(i, j int) {
prevHeads[i], prevHeads[j] = prevHeads[j], prevHeads[i]
})
// if we didn't connect with all prev ones
if j == perLevel-1 && len(usedIds) != len(prevHeads) {
var unusedIds []string
for _, id := range prevHeads {
if _, exists := usedIds[id]; !exists {
unusedIds = append(unusedIds, id)
}
}
prevHeads = unusedIds
prevConns = len(prevHeads)
}
var prevIds []string
for k := 0; k < prevConns; k++ {
prevIds = append(prevIds, prevHeads[k])
usedIds[prevHeads[k]] = struct{}{}
}
newId := newChange(false, j, prevIds)
newHeads = append(newHeads, newId)
}
prevHeads = newHeads
}
res.heads = prevHeads
res.snapshotId = snapshotId
return
}

View file

@ -68,7 +68,7 @@ func (s *syncService) BroadcastMessage(ctx context.Context, msg drpc.Message) er
} }
func (s *syncService) handleOutgoingMessage(id string, msg drpc.Message, q *mb.MB[drpc.Message]) error { func (s *syncService) handleOutgoingMessage(id string, msg drpc.Message, q *mb.MB[drpc.Message]) error {
return s.handler.TryAddMessage(s.ctx, msg, q) return s.handler.TryAddMessage(s.ctx, id, msg, q)
} }
func (s *syncService) handleIncomingMessage(msg msgCtx) { func (s *syncService) handleIncomingMessage(msg msgCtx) {
@ -95,6 +95,7 @@ func (s *syncService) NewReadMessage() drpc.Message {
} }
func (s *syncService) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error { func (s *syncService) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error {
// TODO: make this queue per object and add closing of the individual queues
return s.receiveQueue.Add(ctx, peerId, msgCtx{ return s.receiveQueue.Add(ctx, peerId, msgCtx{
ctx: ctx, ctx: ctx,
Message: msg, Message: msg,

View file

@ -5,5 +5,5 @@ import "github.com/gogo/protobuf/proto"
type Request interface { type Request interface {
PeerId() string PeerId() string
ObjectId() string ObjectId() string
Proto() proto.Message Proto() (proto.Message, error)
} }

View file

@ -12,13 +12,17 @@ import (
const CName = "common.sync.syncdeps" const CName = "common.sync.syncdeps"
type ObjectSyncHandler interface {
HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (Request, error)
HandleStreamRequest(ctx context.Context, rq Request, send func(resp proto.Message) error) (Request, error)
HandleResponse(ctx context.Context, peerId, objectId string, resp Response) error
}
type SyncHandler interface { type SyncHandler interface {
app.Component app.Component
HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (Request, error) ObjectSyncHandler
TryAddMessage(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error TryAddMessage(ctx context.Context, peerId string, msg drpc.Message, q *mb.MB[drpc.Message]) error
HandleStreamRequest(ctx context.Context, rq Request, send func(resp proto.Message) error) (Request, error)
SendStreamRequest(ctx context.Context, rq Request, receive func(stream drpc.Stream) error) (err error) SendStreamRequest(ctx context.Context, rq Request, receive func(stream drpc.Stream) error) (err error)
HandleResponse(ctx context.Context, peerId, objectId string, resp Response) error
NewResponse() Response NewResponse() Response
NewMessage() drpc.Message NewMessage() drpc.Message
} }

View file

@ -11,8 +11,8 @@ type CounterRequest struct {
*synctestproto.CounterRequest *synctestproto.CounterRequest
} }
func (c CounterRequest) Proto() proto.Message { func (c CounterRequest) Proto() (proto.Message, error) {
return c.CounterRequest return c.CounterRequest, nil
} }
func NewCounterRequest(peerId, objectId string, counters []int32) CounterRequest { func NewCounterRequest(peerId, objectId string, counters []int32) CounterRequest {

View file

@ -23,7 +23,11 @@ func (c *CounterRequestSender) SendStreamRequest(ctx context.Context, rq syncdep
} }
return pr.DoDrpc(ctx, func(conn drpc.Conn) error { return pr.DoDrpc(ctx, func(conn drpc.Conn) error {
cl := synctestproto.NewDRPCCounterSyncClient(conn) cl := synctestproto.NewDRPCCounterSyncClient(conn)
stream, err := cl.CounterStreamRequest(ctx, rq.Proto().(*synctestproto.CounterRequest)) req, err := rq.Proto()
if err != nil {
return err
}
stream, err := cl.CounterStreamRequest(ctx, req.(*synctestproto.CounterRequest))
if err != nil { if err != nil {
return err return err
} }

View file

@ -26,7 +26,7 @@ func (c *CounterSyncHandler) HandleHeadUpdate(ctx context.Context, headUpdate dr
return c.updateHandler.HandleHeadUpdate(ctx, headUpdate) return c.updateHandler.HandleHeadUpdate(ctx, headUpdate)
} }
func (c *CounterSyncHandler) TryAddMessage(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error { func (c *CounterSyncHandler) TryAddMessage(ctx context.Context, id string, msg drpc.Message, q *mb.MB[drpc.Message]) error {
return q.TryAdd(msg) return q.TryAdd(msg)
} }