1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-12 10:40:44 +09:00
This commit is contained in:
mcrakhman 2022-09-14 18:54:10 +02:00 committed by Mikhail Iudin
parent 3f41c60f2b
commit b2fd9d2211
No known key found for this signature in database
GPG key ID: FAAAA8BAABDFF1C0
6 changed files with 335 additions and 410 deletions

View file

@ -21,5 +21,6 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR
} }
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error { func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error {
return fmt.Errorf("not implemented") return fmt.Errorf("not implemented")
} }

View file

@ -0,0 +1,165 @@
package syncservice
import (
"errors"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/libp2p/go-libp2p-core/sec"
"storj.io/drpc"
"storj.io/drpc/drpcctx"
"sync"
)
var ErrEmptyPeer = errors.New("don't have such a peer")
var ErrStreamClosed = errors.New("stream is already closed")
const maxSimultaneousOperationsPerStream = 10
// StreamPool can be made generic to work with different streams
type StreamPool interface {
AddStream(stream spacesyncproto.SpaceStream) (err error)
SyncClient
}
type SyncClient interface {
SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error)
BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error)
}
type MessageHandler interface {
HandleMessage(peerId string, message *spacesyncproto.ObjectSyncMessage)
}
type streamPool struct {
sync.Mutex
peerStreams map[string]spacesyncproto.SpaceStream
messageHandler MessageHandler
}
func newStreamPool(messageHandler MessageHandler) StreamPool {
return &streamPool{
peerStreams: make(map[string]spacesyncproto.SpaceStream),
messageHandler: messageHandler,
}
}
func (s *streamPool) SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
stream, err := s.getStream(peerId)
if err != nil {
return
}
return stream.Send(message)
}
func (s *streamPool) getStream(id string) (stream spacesyncproto.SpaceStream, err error) {
s.Lock()
defer s.Unlock()
stream, exists := s.peerStreams[id]
if !exists {
err = ErrEmptyPeer
return
}
select {
case <-stream.Context().Done():
delete(s.peerStreams, id)
err = ErrStreamClosed
default:
}
return
}
func (s *streamPool) getAllStreams() (streams []spacesyncproto.SpaceStream) {
s.Lock()
defer s.Unlock()
Loop:
for id, stream := range s.peerStreams {
select {
case <-stream.Context().Done():
delete(s.peerStreams, id)
continue Loop
default:
}
streams = append(streams, stream)
}
return
}
func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) {
streams := s.getAllStreams()
for _, stream := range streams {
if err = stream.Send(message); err != nil {
// TODO: add logging
}
}
return nil
}
func (s *streamPool) AddStream(stream spacesyncproto.SpaceStream) (err error) {
s.Lock()
defer s.Unlock()
peerId, err := getPeerIdFromStream(stream)
if err != nil {
return
}
s.peerStreams[peerId] = stream
go s.readPeerLoop(peerId, stream)
return
}
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) {
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream)
for i := 0; i < maxSimultaneousOperationsPerStream; i++ {
limiter <- struct{}{}
}
Loop:
for {
msg, err := stream.Recv()
if err != nil {
break
}
select {
case <-limiter:
case <-stream.Context().Done():
break Loop
}
go func() {
defer func() {
limiter <- struct{}{}
}()
s.messageHandler.HandleMessage(peerId, msg)
}()
}
if err = s.removePeer(peerId); err != nil {
// TODO: log something
}
return
}
func (s *streamPool) removePeer(peerId string) (err error) {
s.Lock()
defer s.Unlock()
_, ok := s.peerStreams[peerId]
if !ok {
return ErrEmptyPeer
}
delete(s.peerStreams, peerId)
return
}
func getPeerIdFromStream(stream drpc.Stream) (string, error) {
ctx := stream.Context()
conn, ok := ctx.Value(drpcctx.TransportKey{}).(sec.SecureConn)
if !ok {
return "", fmt.Errorf("incorrect connection type in stream")
}
return conn.RemotePeer().String(), nil
}

View file

@ -19,10 +19,6 @@ type SyncHandler interface {
HandleSyncMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) HandleSyncMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error)
} }
type SyncClient interface {
SendSyncMessage(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
}
func newSyncHandler(treeCache cache.TreeCache, syncClient SyncClient) *syncHandler { func newSyncHandler(treeCache cache.TreeCache, syncClient SyncClient) *syncHandler {
return &syncHandler{ return &syncHandler{
treeCache: treeCache, treeCache: treeCache,
@ -96,7 +92,7 @@ func (s *syncHandler) HandleHeadUpdate(
} }
// if we have incompatible heads, or we haven't seen the tree at all // if we have incompatible heads, or we haven't seen the tree at all
if fullRequest != nil { if fullRequest != nil {
return s.syncClient.SendSyncMessage(senderId, spacesyncproto.WrapFullRequest(fullRequest, header, treeId)) return s.syncClient.SendAsync(senderId, spacesyncproto.WrapFullRequest(fullRequest, header, treeId))
} }
// if error or nothing has changed // if error or nothing has changed
if err != nil || len(result.Added) == 0 { if err != nil || len(result.Added) == 0 {
@ -109,7 +105,7 @@ func (s *syncHandler) HandleHeadUpdate(
Changes: result.Added, Changes: result.Added,
SnapshotPath: snapshotPath, SnapshotPath: snapshotPath,
} }
return s.syncClient.SendSyncMessage("", spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId)) return s.syncClient.BroadcastAsync(spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId))
} }
func (s *syncHandler) HandleFullSyncRequest( func (s *syncHandler) HandleFullSyncRequest(
@ -142,7 +138,7 @@ func (s *syncHandler) HandleFullSyncRequest(
if err != nil { if err != nil {
return err return err
} }
return s.syncClient.SendSyncMessage(senderId, spacesyncproto.WrapFullResponse(fullResponse, header, treeId)) return s.syncClient.SendAsync(senderId, spacesyncproto.WrapFullResponse(fullResponse, header, treeId))
} }
func (s *syncHandler) HandleFullSyncResponse( func (s *syncHandler) HandleFullSyncResponse(
@ -163,21 +159,24 @@ func (s *syncHandler) HandleFullSyncResponse(
} }
err = func() error { err = func() error {
objTree := res.Tree syncTree := res.Tree
objTree.Lock() syncTree.Lock()
defer res.Release() defer res.Release()
defer objTree.Unlock() defer syncTree.Unlock()
// if we already have the heads for whatever reason // if we already have the heads for whatever reason
if slice.UnsortedEquals(response.Heads, objTree.Heads()) { if slice.UnsortedEquals(response.Heads, syncTree.Heads()) {
return nil return nil
} }
result, err = objTree.AddRawChanges(ctx, response.Changes...) // syncTree -> syncService: HeadUpdate()
// AddRawChanges -> syncTree.addRawChanges(); syncService.HeadUpdate()
result, err = syncTree.AddRawChanges(ctx, response.Changes...)
if err != nil { if err != nil {
return err return err
} }
snapshotPath = objTree.SnapshotPath() snapshotPath = syncTree.SnapshotPath()
return nil return nil
}() }()
@ -203,7 +202,7 @@ func (s *syncHandler) HandleFullSyncResponse(
Changes: result.Added, Changes: result.Added,
SnapshotPath: snapshotPath, SnapshotPath: snapshotPath,
} }
return s.syncClient.SendSyncMessage("", spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId)) return s.syncClient.BroadcastAsync(spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId))
} }
func (s *syncHandler) prepareFullSyncRequest(t tree.ObjectTree, sendOwnChanges bool) (*spacesyncproto.ObjectFullSyncRequest, error) { func (s *syncHandler) prepareFullSyncRequest(t tree.ObjectTree, sendOwnChanges bool) (*spacesyncproto.ObjectFullSyncRequest, error) {

View file

@ -1,291 +1,32 @@
package syncservice package syncservice
import ( import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
"go.uber.org/zap"
) )
type syncService struct {
treeCache treecache.Service
account account.Service
messageService MessageSender
}
var log = logger.NewNamed("requesthandler")
func New() app.Component {
return &syncService{}
}
type SyncService interface { type SyncService interface {
HandleSyncMessage(ctx context.Context, senderId string, request *syncproto.Sync) (err error) NotifyHeadUpdate(treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error)
StreamPool() StreamPool
} }
type MessageSender interface { type syncService struct {
SendMessageAsync(peerId string, msg *syncproto.Sync) error syncHandler SyncHandler
SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error streamPool StreamPool
configuration nodeconf.Configuration
} }
const CName = "SyncRequestHandler" func (s *syncService) NotifyHeadUpdate(treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) {
msg := spacesyncproto.WrapHeadUpdate(update, header, treeId)
func (r *syncService) Init(a *app.App) (err error) { all
r.treeCache = a.MustComponent(treecache.CName).(treecache.Service) return s.streamPool.BroadcastAsync(msg)
r.account = a.MustComponent(account.CName).(account.Service)
r.messageService = a.MustComponent("MessageService").(MessageSender)
return nil
} }
func (r *syncService) Name() (name string) { func (s *syncService) StreamPool() StreamPool {
return CName return s.streamPool
} }
func (r *syncService) Run(ctx context.Context) (err error) { func newSyncService() {
return nil
}
func (r *syncService) Close(ctx context.Context) (err error) {
return nil
}
func (r *syncService) HandleSyncMessage(ctx context.Context, senderId string, content *syncproto.Sync) error {
msg := content.GetMessage()
switch {
case msg.GetFullSyncRequest() != nil:
return r.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest(), content.GetTreeHeader(), content.GetTreeId())
case msg.GetFullSyncResponse() != nil:
return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse(), content.GetTreeHeader(), content.GetTreeId())
case msg.GetHeadUpdate() != nil:
return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate(), content.GetTreeHeader(), content.GetTreeId())
case msg.GetAclList() != nil:
return r.HandleACLList(ctx, senderId, msg.GetAclList(), content.GetTreeHeader(), content.GetTreeId())
}
return nil
}
func (r *syncService) HandleHeadUpdate(
ctx context.Context,
senderId string,
update *syncproto.SyncHeadUpdate,
header *aclpb.Header,
treeId string) (err error) {
var (
fullRequest *syncproto.SyncFullRequest
snapshotPath []string
result tree.AddResult
)
log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)).
Debug("processing head update")
err = r.treeCache.Do(ctx, treeId, func(obj any) error {
objTree := obj.(tree.ObjectTree)
objTree.Lock()
defer objTree.Unlock()
if slice.UnsortedEquals(update.Heads, objTree.Heads()) {
return nil
}
result, err = objTree.AddRawChanges(ctx, update.Changes...)
if err != nil {
return err
}
// if we couldn't add all the changes
shouldFullSync := len(update.Changes) != len(result.Added)
snapshotPath = objTree.SnapshotPath()
if shouldFullSync {
fullRequest, err = r.prepareFullSyncRequest(objTree)
if err != nil {
return err
}
}
return nil
})
// if there are no such tree
if err == storage.ErrUnknownTreeId {
fullRequest = &syncproto.SyncFullRequest{}
}
// if we have incompatible heads, or we haven't seen the tree at all
if fullRequest != nil {
return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest, header, treeId))
}
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
return err
}
// otherwise sending heads update message
newUpdate := &syncproto.SyncHeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
}
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
}
func (r *syncService) HandleFullSyncRequest(
ctx context.Context,
senderId string,
request *syncproto.SyncFullRequest,
header *aclpb.Header,
treeId string) (err error) {
var fullResponse *syncproto.SyncFullResponse
err = r.treeCache.Do(ctx, treeId, func(obj any) error {
objTree := obj.(tree.ObjectTree)
objTree.Lock()
defer objTree.Unlock()
fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Heads, objTree)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId))
}
func (r *syncService) HandleFullSyncResponse(
ctx context.Context,
senderId string,
response *syncproto.SyncFullResponse,
header *aclpb.Header,
treeId string) (err error) {
var (
snapshotPath []string
result tree.AddResult
)
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
objTree := obj.(tree.ObjectTree)
objTree.Lock()
defer objTree.Unlock()
// if we already have the heads for whatever reason
if slice.UnsortedEquals(response.Heads, objTree.Heads()) {
return nil
}
result, err = objTree.AddRawChanges(ctx, response.Changes...)
if err != nil {
return err
}
snapshotPath = objTree.SnapshotPath()
return nil
})
// if error or nothing has changed
if (err != nil || len(result.Added) == 0) && err != storage.ErrUnknownTreeId {
return err
}
// if we have a new tree
if err == storage.ErrUnknownTreeId {
err = r.createTree(ctx, response, header, treeId)
if err != nil {
return err
}
result = tree.AddResult{
OldHeads: []string{},
Heads: response.Heads,
Added: response.Changes,
}
}
// sending heads update message
newUpdate := &syncproto.SyncHeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
}
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
}
func (r *syncService) HandleACLList(
ctx context.Context,
senderId string,
req *syncproto.SyncACLList,
header *aclpb.Header,
id string) (err error) {
err = r.treeCache.Do(ctx, id, func(obj interface{}) error {
return nil
})
// do nothing if already added
if err == nil {
return nil
}
// if not found then add to storage
if err == storage.ErrUnknownTreeId {
return r.createACLList(ctx, req, header, id)
}
return err
}
func (r *syncService) prepareFullSyncRequest(t tree.ObjectTree) (*syncproto.SyncFullRequest, error) {
return &syncproto.SyncFullRequest{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
}, nil
}
func (r *syncService) prepareFullSyncResponse(
treeId string,
theirPath, theirHeads []string,
t tree.ObjectTree) (*syncproto.SyncFullResponse, error) {
ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath, theirHeads)
if err != nil {
return nil, err
}
return &syncproto.SyncFullResponse{
Heads: t.Heads(),
Changes: ourChanges,
SnapshotPath: t.SnapshotPath(),
}, nil
}
func (r *syncService) createTree(
ctx context.Context,
response *syncproto.SyncFullResponse,
header *aclpb.Header,
treeId string) error {
return r.treeCache.Add(
ctx,
treeId,
storage.TreeStorageCreatePayload{
TreeId: treeId,
Header: header,
Changes: response.Changes,
Heads: response.Heads,
})
}
func (r *syncService) createACLList(
ctx context.Context,
req *syncproto.SyncACLList,
header *aclpb.Header,
treeId string) error {
return r.treeCache.Add(
ctx,
treeId,
storage.ACLListStorageCreatePayload{
ListId: treeId,
Header: header,
Records: req.Records,
})
} }

View file

@ -7,7 +7,7 @@ import (
) )
type AccountData struct { // TODO: create a convenient constructor for this type AccountData struct { // TODO: create a convenient constructor for this
Identity []byte // TODO: this is essentially the same as sign key Identity []byte // public key
SignKey signingkey.PrivKey SignKey signingkey.PrivKey
EncKey encryptionkey.PrivKey EncKey encryptionkey.PrivKey
Decoder keys.Decoder Decoder keys.Decoder

View file

@ -6,9 +6,8 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
@ -61,212 +60,232 @@ func (r *requestHandler) HandleSyncMessage(ctx context.Context, senderId string,
msg := content.GetMessage() msg := content.GetMessage()
switch { switch {
case msg.GetFullSyncRequest() != nil: case msg.GetFullSyncRequest() != nil:
return r.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest()) return r.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest(), content.GetTreeHeader(), content.GetTreeId())
case msg.GetFullSyncResponse() != nil: case msg.GetFullSyncResponse() != nil:
return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse()) return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse(), content.GetTreeHeader(), content.GetTreeId())
case msg.GetHeadUpdate() != nil: case msg.GetHeadUpdate() != nil:
return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate()) return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate(), content.GetTreeHeader(), content.GetTreeId())
} }
return nil return nil
} }
func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, update *syncproto.Sync_HeadUpdate) (err error) { func (r *requestHandler) HandleHeadUpdate(
ctx context.Context,
senderId string,
update *syncproto.SyncHeadUpdate,
header *aclpb.Header,
treeId string) (err error) {
var ( var (
fullRequest *syncproto.Sync_Full_Request fullRequest *syncproto.SyncFullRequest
snapshotPath []string snapshotPath []string
result acltree.AddResult result tree.AddResult
) )
log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)). log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)).
Debug("processing head update") Debug("processing head update")
err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error { err = r.treeCache.Do(ctx, treeId, func(obj any) error {
// TODO: check if we already have those changes objTree := obj.(tree.ObjectTree)
result, err = tree.AddRawChanges(ctx, update.Changes...) objTree.Lock()
defer objTree.Unlock()
if slice.UnsortedEquals(update.Heads, objTree.Heads()) {
return nil
}
result, err = objTree.AddRawChanges(ctx, update.Changes...)
if err != nil { if err != nil {
return err return err
} }
log.With(zap.Strings("update heads", update.Heads), zap.Strings("tree heads", tree.Heads())).
Debug("comparing heads after head update") // if we couldn't add all the changes
shouldFullSync := !slice.UnsortedEquals(update.Heads, tree.Heads()) shouldFullSync := len(update.Changes) != len(result.Added)
snapshotPath = tree.SnapshotPath() snapshotPath = objTree.SnapshotPath()
if shouldFullSync { if shouldFullSync {
fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.TreeHeader, update.SnapshotPath, tree) fullRequest, err = r.prepareFullSyncRequest(objTree)
if err != nil { if err != nil {
return err return err
} }
} }
return nil return nil
}) })
// if there are no such tree // if there are no such tree
if err == treestorage.ErrUnknownTreeId { if err == storage.ErrUnknownTreeId {
// TODO: maybe we can optimize this by sending the header and stuff right away, so when the tree is created we are able to add it on first request fullRequest = &syncproto.SyncFullRequest{}
fullRequest = &syncproto.Sync_Full_Request{
TreeId: update.TreeId,
TreeHeader: update.TreeHeader,
}
} }
// if we have incompatible heads, or we haven't seen the tree at all // if we have incompatible heads, or we haven't seen the tree at all
if fullRequest != nil { if fullRequest != nil {
return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest)) return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest, header, treeId))
} }
// if error or nothing has changed // if error or nothing has changed
if err != nil || len(result.Added) == 0 { if err != nil || len(result.Added) == 0 {
return err return err
} }
// otherwise sending heads update message // otherwise sending heads update message
newUpdate := &syncproto.Sync_HeadUpdate{ newUpdate := &syncproto.SyncHeadUpdate{
Heads: result.Heads, Heads: result.Heads,
Changes: result.Added, Changes: result.Added,
SnapshotPath: snapshotPath, SnapshotPath: snapshotPath,
TreeId: update.TreeId,
TreeHeader: update.TreeHeader,
} }
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate)) return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
} }
func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncproto.Sync_Full_Request) (err error) { func (r *requestHandler) HandleFullSyncRequest(
var ( ctx context.Context,
fullResponse *syncproto.Sync_Full_Response senderId string,
snapshotPath []string request *syncproto.SyncFullRequest,
result acltree.AddResult header *aclpb.Header,
) treeId string) (err error) {
log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)).
Debug("processing full sync request")
err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error { var fullResponse *syncproto.SyncFullResponse
// TODO: check if we already have those changes err = r.treeCache.Do(ctx, treeId, func(obj any) error {
// if we have non-empty request objTree := obj.(tree.ObjectTree)
if len(request.Heads) != 0 { objTree.Lock()
result, err = tree.AddRawChanges(ctx, request.Changes...) defer objTree.Unlock()
if err != nil {
return err fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Heads, objTree)
}
}
snapshotPath = tree.SnapshotPath()
fullResponse, err = r.prepareFullSyncResponse(request.TreeId, request.SnapshotPath, request.Changes, tree)
if err != nil { if err != nil {
return err return err
} }
return nil return nil
}) })
if err != nil { if err != nil {
return err return err
} }
err = r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse)) return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId))
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
return err
}
// otherwise sending heads update message
newUpdate := &syncproto.Sync_HeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
TreeId: request.TreeId,
TreeHeader: request.TreeHeader,
}
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate))
} }
func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncproto.Sync_Full_Response) (err error) { func (r *requestHandler) HandleFullSyncResponse(
ctx context.Context,
senderId string,
response *syncproto.SyncFullResponse,
header *aclpb.Header,
treeId string) (err error) {
var ( var (
snapshotPath []string snapshotPath []string
result acltree.AddResult result tree.AddResult
) )
log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)).
Debug("processing full sync response")
err = r.treeCache.Do(ctx, response.TreeId, func(tree acltree.ACLTree) error { err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
// TODO: check if we already have those changes objTree := obj.(tree.ObjectTree)
result, err = tree.AddRawChanges(ctx, response.Changes...) objTree.Lock()
defer objTree.Unlock()
// if we already have the heads for whatever reason
if slice.UnsortedEquals(response.Heads, objTree.Heads()) {
return nil
}
result, err = objTree.AddRawChanges(ctx, response.Changes...)
if err != nil { if err != nil {
return err return err
} }
snapshotPath = tree.SnapshotPath() snapshotPath = objTree.SnapshotPath()
return nil return nil
}) })
// if error or nothing has changed // if error or nothing has changed
if (err != nil || len(result.Added) == 0) && err != treestorage.ErrUnknownTreeId { if (err != nil || len(result.Added) == 0) && err != storage.ErrUnknownTreeId {
return err return err
} }
// if we have a new tree // if we have a new tree
if err == treestorage.ErrUnknownTreeId { if err == storage.ErrUnknownTreeId {
err = r.createTree(ctx, response) err = r.createTree(ctx, response, header, treeId)
if err != nil { if err != nil {
return err return err
} }
result = acltree.AddResult{ result = tree.AddResult{
OldHeads: []string{}, OldHeads: []string{},
Heads: response.Heads, Heads: response.Heads,
Added: response.Changes, Added: response.Changes,
} }
} }
// sending heads update message // sending heads update message
newUpdate := &syncproto.Sync_HeadUpdate{ newUpdate := &syncproto.SyncHeadUpdate{
Heads: result.Heads, Heads: result.Heads,
Changes: result.Added, Changes: result.Added,
SnapshotPath: snapshotPath, SnapshotPath: snapshotPath,
TreeId: response.TreeId,
} }
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate)) return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
} }
func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncproto.Sync_Full_Request, error) { func (r *requestHandler) HandleACLList(
ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath) ctx context.Context,
if err != nil { senderId string,
return nil, err req *syncproto.SyncACLList,
header *aclpb.Header,
id string) (err error) {
err = r.treeCache.Do(ctx, id, func(obj interface{}) error {
return nil
})
// do nothing if already added
if err == nil {
return nil
} }
return &syncproto.Sync_Full_Request{ // if not found then add to storage
Heads: tree.Heads(), if err == storage.ErrUnknownTreeId {
Changes: ourChanges, return r.createACLList(ctx, req, header, id)
TreeId: treeId, }
SnapshotPath: tree.SnapshotPath(), return err
TreeHeader: header, }
func (r *requestHandler) prepareFullSyncRequest(t tree.ObjectTree) (*syncproto.SyncFullRequest, error) {
return &syncproto.SyncFullRequest{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
}, nil }, nil
} }
func (r *requestHandler) prepareFullSyncResponse( func (r *requestHandler) prepareFullSyncResponse(
treeId string, treeId string,
theirPath []string, theirPath, theirHeads []string,
theirChanges []*aclpb.RawChange, t tree.ObjectTree) (*syncproto.SyncFullResponse, error) {
tree acltree.ACLTree) (*syncproto.Sync_Full_Response, error) { ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath, theirHeads)
// TODO: we can probably use the common snapshot calculated on the request step from previous peer
ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
theirMap := make(map[string]struct{})
for _, ch := range theirChanges {
theirMap[ch.Id] = struct{}{}
}
// filtering our changes, so we will not send the same changes back return &syncproto.SyncFullResponse{
var final []*aclpb.RawChange Heads: t.Heads(),
for _, ch := range ourChanges { Changes: ourChanges,
if _, exists := theirMap[ch.Id]; !exists { SnapshotPath: t.SnapshotPath(),
final = append(final, ch)
}
}
log.With(zap.Int("len(changes)", len(final)), zap.String("id", treeId)).
Debug("preparing changes for tree")
return &syncproto.Sync_Full_Response{
Heads: tree.Heads(),
Changes: final,
TreeId: treeId,
SnapshotPath: tree.SnapshotPath(),
TreeHeader: tree.Header(),
}, nil }, nil
} }
func (r *requestHandler) createTree(ctx context.Context, response *syncproto.Sync_Full_Response) error { func (r *requestHandler) createTree(
ctx context.Context,
response *syncproto.SyncFullResponse,
header *aclpb.Header,
treeId string) error {
return r.treeCache.Add( return r.treeCache.Add(
ctx, ctx,
response.TreeId, treeId,
response.TreeHeader, storage.TreeStorageCreatePayload{
response.Changes, TreeId: treeId,
func(tree acltree.ACLTree) error { Header: header,
return nil Changes: response.Changes,
Heads: response.Heads,
})
}
func (r *requestHandler) createACLList(
ctx context.Context,
req *syncproto.SyncACLList,
header *aclpb.Header,
treeId string) error {
return r.treeCache.Add(
ctx,
treeId,
storage.ACLListStorageCreatePayload{
ListId: treeId,
Header: header,
Records: req.Records,
}) })
} }