1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-12 02:30:41 +09:00

Add sync handler

This commit is contained in:
mcrakhman 2022-09-13 22:26:51 +02:00 committed by Mikhail Iudin
parent 5a606b8854
commit 3f41c60f2b
No known key found for this signature in database
GPG key ID: FAAAA8BAABDFF1C0
3 changed files with 299 additions and 0 deletions

17
common/commonspace/cache/treecache.go vendored Normal file
View file

@ -0,0 +1,17 @@
package cache
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
)
type TreeResult struct {
Release func()
Tree tree.ObjectTree
}
type TreeCache interface {
GetTree(ctx context.Context, id string) (TreeResult, error)
AddTree(ctx context.Context, payload storage.TreeStorageCreatePayload) error
}

View file

@ -0,0 +1,35 @@
package spacesyncproto
import "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
type SpaceStream = DRPCSpace_StreamStream
func WrapHeadUpdate(update *ObjectHeadUpdate, header *aclpb.TreeHeader, treeId string) *ObjectSyncMessage {
return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_HeadUpdate{HeadUpdate: update},
},
TreeHeader: header,
TreeId: treeId,
}
}
func WrapFullRequest(request *ObjectFullSyncRequest, header *aclpb.TreeHeader, treeId string) *ObjectSyncMessage {
return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_FullSyncRequest{FullSyncRequest: request},
},
TreeHeader: header,
TreeId: treeId,
}
}
func WrapFullResponse(response *ObjectFullSyncResponse, header *aclpb.TreeHeader, treeId string) *ObjectSyncMessage {
return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_FullSyncResponse{FullSyncResponse: response},
},
TreeHeader: header,
TreeId: treeId,
}
}

View file

@ -0,0 +1,247 @@
package syncservice
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
)
type syncHandler struct {
treeCache cache.TreeCache
syncClient SyncClient
}
type SyncHandler interface {
HandleSyncMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error)
}
type SyncClient interface {
SendSyncMessage(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
}
func newSyncHandler(treeCache cache.TreeCache, syncClient SyncClient) *syncHandler {
return &syncHandler{
treeCache: treeCache,
syncClient: syncClient,
}
}
func (s *syncHandler) HandleSyncMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) error {
msg := message.GetContent()
switch {
case msg.GetFullSyncRequest() != nil:
return s.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest(), message.GetTreeHeader(), message.GetTreeId())
case msg.GetFullSyncResponse() != nil:
return s.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse(), message.GetTreeHeader(), message.GetTreeId())
case msg.GetHeadUpdate() != nil:
return s.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate(), message.GetTreeHeader(), message.GetTreeId())
}
return nil
}
func (s *syncHandler) HandleHeadUpdate(
ctx context.Context,
senderId string,
update *spacesyncproto.ObjectHeadUpdate,
header *aclpb.TreeHeader,
treeId string) (err error) {
var (
fullRequest *spacesyncproto.ObjectFullSyncRequest
snapshotPath []string
result tree.AddResult
// in case update changes are empty then we want to sync the whole tree
sendChangesOnFullSync = len(update.Changes) == 0
)
res, err := s.treeCache.GetTree(ctx, treeId)
if err != nil {
return
}
err = func() error {
objTree := res.Tree
objTree.Lock()
defer res.Release()
defer objTree.Unlock()
if slice.UnsortedEquals(update.Heads, objTree.Heads()) {
return nil
}
result, err = objTree.AddRawChanges(ctx, update.Changes...)
if err != nil {
return err
}
// if we couldn't add all the changes
shouldFullSync := len(update.Changes) != len(result.Added)
snapshotPath = objTree.SnapshotPath()
if shouldFullSync {
fullRequest, err = s.prepareFullSyncRequest(objTree, sendChangesOnFullSync)
if err != nil {
return err
}
}
return nil
}()
// if there are no such tree
if err == storage.ErrUnknownTreeId {
fullRequest = &spacesyncproto.ObjectFullSyncRequest{}
}
// if we have incompatible heads, or we haven't seen the tree at all
if fullRequest != nil {
return s.syncClient.SendSyncMessage(senderId, spacesyncproto.WrapFullRequest(fullRequest, header, treeId))
}
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
return err
}
// otherwise sending heads update message
newUpdate := &spacesyncproto.ObjectHeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
}
return s.syncClient.SendSyncMessage("", spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId))
}
func (s *syncHandler) HandleFullSyncRequest(
ctx context.Context,
senderId string,
request *spacesyncproto.ObjectFullSyncRequest,
header *aclpb.TreeHeader,
treeId string) (err error) {
var fullResponse *spacesyncproto.ObjectFullSyncResponse
res, err := s.treeCache.GetTree(ctx, treeId)
if err != nil {
return
}
// TODO: check if sync request contains changes and add them (also do head update in this case)
err = func() error {
objTree := res.Tree
objTree.Lock()
defer res.Release()
defer objTree.Unlock()
fullResponse, err = s.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Heads, objTree)
if err != nil {
return err
}
return nil
}()
if err != nil {
return err
}
return s.syncClient.SendSyncMessage(senderId, spacesyncproto.WrapFullResponse(fullResponse, header, treeId))
}
func (s *syncHandler) HandleFullSyncResponse(
ctx context.Context,
senderId string,
response *spacesyncproto.ObjectFullSyncResponse,
header *aclpb.TreeHeader,
treeId string) (err error) {
var (
snapshotPath []string
result tree.AddResult
)
res, err := s.treeCache.GetTree(ctx, treeId)
if err != nil {
return
}
err = func() error {
objTree := res.Tree
objTree.Lock()
defer res.Release()
defer objTree.Unlock()
// if we already have the heads for whatever reason
if slice.UnsortedEquals(response.Heads, objTree.Heads()) {
return nil
}
result, err = objTree.AddRawChanges(ctx, response.Changes...)
if err != nil {
return err
}
snapshotPath = objTree.SnapshotPath()
return nil
}()
// if error or nothing has changed
if (err != nil || len(result.Added) == 0) && err != storage.ErrUnknownTreeId {
return err
}
// if we have a new tree
if err == storage.ErrUnknownTreeId {
err = s.addTree(ctx, response, header, treeId)
if err != nil {
return err
}
result = tree.AddResult{
OldHeads: []string{},
Heads: response.Heads,
Added: response.Changes,
}
}
// sending heads update message
newUpdate := &spacesyncproto.ObjectHeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
}
return s.syncClient.SendSyncMessage("", spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId))
}
func (s *syncHandler) prepareFullSyncRequest(t tree.ObjectTree, sendOwnChanges bool) (*spacesyncproto.ObjectFullSyncRequest, error) {
// TODO: add send own changes logic
return &spacesyncproto.ObjectFullSyncRequest{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
}, nil
}
func (s *syncHandler) prepareFullSyncResponse(
treeId string,
theirPath, theirHeads []string,
t tree.ObjectTree) (*spacesyncproto.ObjectFullSyncResponse, error) {
ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath, theirHeads)
if err != nil {
return nil, err
}
return &spacesyncproto.ObjectFullSyncResponse{
Heads: t.Heads(),
Changes: ourChanges,
SnapshotPath: t.SnapshotPath(),
}, nil
}
func (s *syncHandler) addTree(
ctx context.Context,
response *spacesyncproto.ObjectFullSyncResponse,
header *aclpb.TreeHeader,
treeId string) error {
return s.treeCache.AddTree(
ctx,
storage.TreeStorageCreatePayload{
TreeId: treeId,
Header: header,
Changes: response.Changes,
Heads: response.Heads,
})
}