From b2fd9d2211554943aa015520a6b95df0fcd2f35c Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 14 Sep 2022 18:54:10 +0200 Subject: [PATCH] WIP sync --- common/commonspace/rpchandler.go | 1 + common/commonspace/syncservice/streampool.go | 165 ++++++++++ common/commonspace/syncservice/synchandler.go | 27 +- common/commonspace/syncservice/syncservice.go | 293 +----------------- pkg/acl/account/accountdata.go | 2 +- service/sync/requesthandler/requesthandler.go | 257 ++++++++------- 6 files changed, 335 insertions(+), 410 deletions(-) create mode 100644 common/commonspace/syncservice/streampool.go diff --git a/common/commonspace/rpchandler.go b/common/commonspace/rpchandler.go index 3ce71e1a..9a2d6993 100644 --- a/common/commonspace/rpchandler.go +++ b/common/commonspace/rpchandler.go @@ -21,5 +21,6 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR } func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error { + return fmt.Errorf("not implemented") } diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go new file mode 100644 index 00000000..03cc6dcd --- /dev/null +++ b/common/commonspace/syncservice/streampool.go @@ -0,0 +1,165 @@ +package syncservice + +import ( + "errors" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/libp2p/go-libp2p-core/sec" + "storj.io/drpc" + "storj.io/drpc/drpcctx" + "sync" +) + +var ErrEmptyPeer = errors.New("don't have such a peer") +var ErrStreamClosed = errors.New("stream is already closed") + +const maxSimultaneousOperationsPerStream = 10 + +// StreamPool can be made generic to work with different streams +type StreamPool interface { + AddStream(stream spacesyncproto.SpaceStream) (err error) + SyncClient +} + +type SyncClient interface { + SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) + BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) +} + +type MessageHandler interface { + HandleMessage(peerId string, message *spacesyncproto.ObjectSyncMessage) +} + +type streamPool struct { + sync.Mutex + peerStreams map[string]spacesyncproto.SpaceStream + messageHandler MessageHandler +} + +func newStreamPool(messageHandler MessageHandler) StreamPool { + return &streamPool{ + peerStreams: make(map[string]spacesyncproto.SpaceStream), + messageHandler: messageHandler, + } +} + +func (s *streamPool) SendAsync(peerId string, message *spacesyncproto.ObjectSyncMessage) (err error) { + stream, err := s.getStream(peerId) + if err != nil { + return + } + + return stream.Send(message) +} + +func (s *streamPool) getStream(id string) (stream spacesyncproto.SpaceStream, err error) { + s.Lock() + defer s.Unlock() + stream, exists := s.peerStreams[id] + if !exists { + err = ErrEmptyPeer + return + } + + select { + case <-stream.Context().Done(): + delete(s.peerStreams, id) + err = ErrStreamClosed + default: + } + + return +} + +func (s *streamPool) getAllStreams() (streams []spacesyncproto.SpaceStream) { + s.Lock() + defer s.Unlock() +Loop: + for id, stream := range s.peerStreams { + select { + case <-stream.Context().Done(): + delete(s.peerStreams, id) + continue Loop + default: + } + streams = append(streams, stream) + } + + return +} + +func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) { + streams := s.getAllStreams() + for _, stream := range streams { + if err = stream.Send(message); err != nil { + // TODO: add logging + } + } + + return nil +} + +func (s *streamPool) AddStream(stream spacesyncproto.SpaceStream) (err error) { + s.Lock() + defer s.Unlock() + peerId, err := getPeerIdFromStream(stream) + if err != nil { + return + } + + s.peerStreams[peerId] = stream + go s.readPeerLoop(peerId, stream) + return +} + +func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) { + limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) + for i := 0; i < maxSimultaneousOperationsPerStream; i++ { + limiter <- struct{}{} + } + +Loop: + for { + msg, err := stream.Recv() + if err != nil { + break + } + select { + case <-limiter: + case <-stream.Context().Done(): + break Loop + } + go func() { + defer func() { + limiter <- struct{}{} + }() + + s.messageHandler.HandleMessage(peerId, msg) + }() + } + if err = s.removePeer(peerId); err != nil { + // TODO: log something + } + return +} + +func (s *streamPool) removePeer(peerId string) (err error) { + s.Lock() + defer s.Unlock() + _, ok := s.peerStreams[peerId] + if !ok { + return ErrEmptyPeer + } + delete(s.peerStreams, peerId) + return +} + +func getPeerIdFromStream(stream drpc.Stream) (string, error) { + ctx := stream.Context() + conn, ok := ctx.Value(drpcctx.TransportKey{}).(sec.SecureConn) + if !ok { + return "", fmt.Errorf("incorrect connection type in stream") + } + + return conn.RemotePeer().String(), nil +} diff --git a/common/commonspace/syncservice/synchandler.go b/common/commonspace/syncservice/synchandler.go index fed8f8c9..2aa2ea9b 100644 --- a/common/commonspace/syncservice/synchandler.go +++ b/common/commonspace/syncservice/synchandler.go @@ -19,10 +19,6 @@ type SyncHandler interface { HandleSyncMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) } -type SyncClient interface { - SendSyncMessage(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) -} - func newSyncHandler(treeCache cache.TreeCache, syncClient SyncClient) *syncHandler { return &syncHandler{ treeCache: treeCache, @@ -96,7 +92,7 @@ func (s *syncHandler) HandleHeadUpdate( } // if we have incompatible heads, or we haven't seen the tree at all if fullRequest != nil { - return s.syncClient.SendSyncMessage(senderId, spacesyncproto.WrapFullRequest(fullRequest, header, treeId)) + return s.syncClient.SendAsync(senderId, spacesyncproto.WrapFullRequest(fullRequest, header, treeId)) } // if error or nothing has changed if err != nil || len(result.Added) == 0 { @@ -109,7 +105,7 @@ func (s *syncHandler) HandleHeadUpdate( Changes: result.Added, SnapshotPath: snapshotPath, } - return s.syncClient.SendSyncMessage("", spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId)) + return s.syncClient.BroadcastAsync(spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId)) } func (s *syncHandler) HandleFullSyncRequest( @@ -142,7 +138,7 @@ func (s *syncHandler) HandleFullSyncRequest( if err != nil { return err } - return s.syncClient.SendSyncMessage(senderId, spacesyncproto.WrapFullResponse(fullResponse, header, treeId)) + return s.syncClient.SendAsync(senderId, spacesyncproto.WrapFullResponse(fullResponse, header, treeId)) } func (s *syncHandler) HandleFullSyncResponse( @@ -163,21 +159,24 @@ func (s *syncHandler) HandleFullSyncResponse( } err = func() error { - objTree := res.Tree - objTree.Lock() + syncTree := res.Tree + syncTree.Lock() defer res.Release() - defer objTree.Unlock() + defer syncTree.Unlock() // if we already have the heads for whatever reason - if slice.UnsortedEquals(response.Heads, objTree.Heads()) { + if slice.UnsortedEquals(response.Heads, syncTree.Heads()) { return nil } - result, err = objTree.AddRawChanges(ctx, response.Changes...) + // syncTree -> syncService: HeadUpdate() + // AddRawChanges -> syncTree.addRawChanges(); syncService.HeadUpdate() + result, err = syncTree.AddRawChanges(ctx, response.Changes...) + if err != nil { return err } - snapshotPath = objTree.SnapshotPath() + snapshotPath = syncTree.SnapshotPath() return nil }() @@ -203,7 +202,7 @@ func (s *syncHandler) HandleFullSyncResponse( Changes: result.Added, SnapshotPath: snapshotPath, } - return s.syncClient.SendSyncMessage("", spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId)) + return s.syncClient.BroadcastAsync(spacesyncproto.WrapHeadUpdate(newUpdate, header, treeId)) } func (s *syncHandler) prepareFullSyncRequest(t tree.ObjectTree, sendOwnChanges bool) (*spacesyncproto.ObjectFullSyncRequest, error) { diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index 0f133fd9..c9ce956d 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -1,291 +1,32 @@ package syncservice import ( - "github.com/anytypeio/go-anytype-infrastructure-experiments/app" - "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" - "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" - "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" - "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" - "go.uber.org/zap" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" ) -type syncService struct { - treeCache treecache.Service - account account.Service - messageService MessageSender -} - -var log = logger.NewNamed("requesthandler") - -func New() app.Component { - return &syncService{} -} - type SyncService interface { - HandleSyncMessage(ctx context.Context, senderId string, request *syncproto.Sync) (err error) + NotifyHeadUpdate(treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) + StreamPool() StreamPool } -type MessageSender interface { - SendMessageAsync(peerId string, msg *syncproto.Sync) error - SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error +type syncService struct { + syncHandler SyncHandler + streamPool StreamPool + configuration nodeconf.Configuration } -const CName = "SyncRequestHandler" - -func (r *syncService) Init(a *app.App) (err error) { - r.treeCache = a.MustComponent(treecache.CName).(treecache.Service) - r.account = a.MustComponent(account.CName).(account.Service) - r.messageService = a.MustComponent("MessageService").(MessageSender) - return nil +func (s *syncService) NotifyHeadUpdate(treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) { + msg := spacesyncproto.WrapHeadUpdate(update, header, treeId) + all + return s.streamPool.BroadcastAsync(msg) } -func (r *syncService) Name() (name string) { - return CName +func (s *syncService) StreamPool() StreamPool { + return s.streamPool } -func (r *syncService) Run(ctx context.Context) (err error) { - return nil -} - -func (r *syncService) Close(ctx context.Context) (err error) { - return nil -} - -func (r *syncService) HandleSyncMessage(ctx context.Context, senderId string, content *syncproto.Sync) error { - msg := content.GetMessage() - switch { - case msg.GetFullSyncRequest() != nil: - return r.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest(), content.GetTreeHeader(), content.GetTreeId()) - case msg.GetFullSyncResponse() != nil: - return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse(), content.GetTreeHeader(), content.GetTreeId()) - case msg.GetHeadUpdate() != nil: - return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate(), content.GetTreeHeader(), content.GetTreeId()) - case msg.GetAclList() != nil: - return r.HandleACLList(ctx, senderId, msg.GetAclList(), content.GetTreeHeader(), content.GetTreeId()) - } - return nil -} - -func (r *syncService) HandleHeadUpdate( - ctx context.Context, - senderId string, - update *syncproto.SyncHeadUpdate, - header *aclpb.Header, - treeId string) (err error) { - - var ( - fullRequest *syncproto.SyncFullRequest - snapshotPath []string - result tree.AddResult - ) - log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)). - Debug("processing head update") - - err = r.treeCache.Do(ctx, treeId, func(obj any) error { - objTree := obj.(tree.ObjectTree) - objTree.Lock() - defer objTree.Unlock() - - if slice.UnsortedEquals(update.Heads, objTree.Heads()) { - return nil - } - - result, err = objTree.AddRawChanges(ctx, update.Changes...) - if err != nil { - return err - } - - // if we couldn't add all the changes - shouldFullSync := len(update.Changes) != len(result.Added) - snapshotPath = objTree.SnapshotPath() - if shouldFullSync { - fullRequest, err = r.prepareFullSyncRequest(objTree) - if err != nil { - return err - } - } - return nil - }) - - // if there are no such tree - if err == storage.ErrUnknownTreeId { - fullRequest = &syncproto.SyncFullRequest{} - } - // if we have incompatible heads, or we haven't seen the tree at all - if fullRequest != nil { - return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest, header, treeId)) - } - // if error or nothing has changed - if err != nil || len(result.Added) == 0 { - return err - } - - // otherwise sending heads update message - newUpdate := &syncproto.SyncHeadUpdate{ - Heads: result.Heads, - Changes: result.Added, - SnapshotPath: snapshotPath, - } - return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId)) -} - -func (r *syncService) HandleFullSyncRequest( - ctx context.Context, - senderId string, - request *syncproto.SyncFullRequest, - header *aclpb.Header, - treeId string) (err error) { - - var fullResponse *syncproto.SyncFullResponse - err = r.treeCache.Do(ctx, treeId, func(obj any) error { - objTree := obj.(tree.ObjectTree) - objTree.Lock() - defer objTree.Unlock() - - fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Heads, objTree) - if err != nil { - return err - } - return nil - }) - - if err != nil { - return err - } - return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId)) -} - -func (r *syncService) HandleFullSyncResponse( - ctx context.Context, - senderId string, - response *syncproto.SyncFullResponse, - header *aclpb.Header, - treeId string) (err error) { - - var ( - snapshotPath []string - result tree.AddResult - ) - - err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error { - objTree := obj.(tree.ObjectTree) - objTree.Lock() - defer objTree.Unlock() - - // if we already have the heads for whatever reason - if slice.UnsortedEquals(response.Heads, objTree.Heads()) { - return nil - } - - result, err = objTree.AddRawChanges(ctx, response.Changes...) - if err != nil { - return err - } - snapshotPath = objTree.SnapshotPath() - return nil - }) - - // if error or nothing has changed - if (err != nil || len(result.Added) == 0) && err != storage.ErrUnknownTreeId { - return err - } - // if we have a new tree - if err == storage.ErrUnknownTreeId { - err = r.createTree(ctx, response, header, treeId) - if err != nil { - return err - } - result = tree.AddResult{ - OldHeads: []string{}, - Heads: response.Heads, - Added: response.Changes, - } - } - // sending heads update message - newUpdate := &syncproto.SyncHeadUpdate{ - Heads: result.Heads, - Changes: result.Added, - SnapshotPath: snapshotPath, - } - return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId)) -} - -func (r *syncService) HandleACLList( - ctx context.Context, - senderId string, - req *syncproto.SyncACLList, - header *aclpb.Header, - id string) (err error) { - - err = r.treeCache.Do(ctx, id, func(obj interface{}) error { - return nil - }) - // do nothing if already added - if err == nil { - return nil - } - // if not found then add to storage - if err == storage.ErrUnknownTreeId { - return r.createACLList(ctx, req, header, id) - } - return err -} - -func (r *syncService) prepareFullSyncRequest(t tree.ObjectTree) (*syncproto.SyncFullRequest, error) { - return &syncproto.SyncFullRequest{ - Heads: t.Heads(), - SnapshotPath: t.SnapshotPath(), - }, nil -} - -func (r *syncService) prepareFullSyncResponse( - treeId string, - theirPath, theirHeads []string, - t tree.ObjectTree) (*syncproto.SyncFullResponse, error) { - ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath, theirHeads) - if err != nil { - return nil, err - } - - return &syncproto.SyncFullResponse{ - Heads: t.Heads(), - Changes: ourChanges, - SnapshotPath: t.SnapshotPath(), - }, nil -} - -func (r *syncService) createTree( - ctx context.Context, - response *syncproto.SyncFullResponse, - header *aclpb.Header, - treeId string) error { - - return r.treeCache.Add( - ctx, - treeId, - storage.TreeStorageCreatePayload{ - TreeId: treeId, - Header: header, - Changes: response.Changes, - Heads: response.Heads, - }) -} - -func (r *syncService) createACLList( - ctx context.Context, - req *syncproto.SyncACLList, - header *aclpb.Header, - treeId string) error { - - return r.treeCache.Add( - ctx, - treeId, - storage.ACLListStorageCreatePayload{ - ListId: treeId, - Header: header, - Records: req.Records, - }) +func newSyncService() { + } diff --git a/pkg/acl/account/accountdata.go b/pkg/acl/account/accountdata.go index bc775854..4368a26c 100644 --- a/pkg/acl/account/accountdata.go +++ b/pkg/acl/account/accountdata.go @@ -7,7 +7,7 @@ import ( ) type AccountData struct { // TODO: create a convenient constructor for this - Identity []byte // TODO: this is essentially the same as sign key + Identity []byte // public key SignKey signingkey.PrivKey EncKey encryptionkey.PrivKey Decoder keys.Decoder diff --git a/service/sync/requesthandler/requesthandler.go b/service/sync/requesthandler/requesthandler.go index f7d992e0..9c17171f 100644 --- a/service/sync/requesthandler/requesthandler.go +++ b/service/sync/requesthandler/requesthandler.go @@ -6,9 +6,8 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" "github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" @@ -61,212 +60,232 @@ func (r *requestHandler) HandleSyncMessage(ctx context.Context, senderId string, msg := content.GetMessage() switch { case msg.GetFullSyncRequest() != nil: - return r.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest()) + return r.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest(), content.GetTreeHeader(), content.GetTreeId()) case msg.GetFullSyncResponse() != nil: - return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse()) + return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse(), content.GetTreeHeader(), content.GetTreeId()) case msg.GetHeadUpdate() != nil: - return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate()) + return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate(), content.GetTreeHeader(), content.GetTreeId()) } return nil } -func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, update *syncproto.Sync_HeadUpdate) (err error) { +func (r *requestHandler) HandleHeadUpdate( + ctx context.Context, + senderId string, + update *syncproto.SyncHeadUpdate, + header *aclpb.Header, + treeId string) (err error) { + var ( - fullRequest *syncproto.Sync_Full_Request + fullRequest *syncproto.SyncFullRequest snapshotPath []string - result acltree.AddResult + result tree.AddResult ) - log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)). + log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)). Debug("processing head update") - err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error { - // TODO: check if we already have those changes - result, err = tree.AddRawChanges(ctx, update.Changes...) + err = r.treeCache.Do(ctx, treeId, func(obj any) error { + objTree := obj.(tree.ObjectTree) + objTree.Lock() + defer objTree.Unlock() + + if slice.UnsortedEquals(update.Heads, objTree.Heads()) { + return nil + } + + result, err = objTree.AddRawChanges(ctx, update.Changes...) if err != nil { return err } - log.With(zap.Strings("update heads", update.Heads), zap.Strings("tree heads", tree.Heads())). - Debug("comparing heads after head update") - shouldFullSync := !slice.UnsortedEquals(update.Heads, tree.Heads()) - snapshotPath = tree.SnapshotPath() + + // if we couldn't add all the changes + shouldFullSync := len(update.Changes) != len(result.Added) + snapshotPath = objTree.SnapshotPath() if shouldFullSync { - fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.TreeHeader, update.SnapshotPath, tree) + fullRequest, err = r.prepareFullSyncRequest(objTree) if err != nil { return err } } return nil }) + // if there are no such tree - if err == treestorage.ErrUnknownTreeId { - // TODO: maybe we can optimize this by sending the header and stuff right away, so when the tree is created we are able to add it on first request - fullRequest = &syncproto.Sync_Full_Request{ - TreeId: update.TreeId, - TreeHeader: update.TreeHeader, - } + if err == storage.ErrUnknownTreeId { + fullRequest = &syncproto.SyncFullRequest{} } // if we have incompatible heads, or we haven't seen the tree at all if fullRequest != nil { - return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest)) + return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest, header, treeId)) } // if error or nothing has changed if err != nil || len(result.Added) == 0 { return err } + // otherwise sending heads update message - newUpdate := &syncproto.Sync_HeadUpdate{ + newUpdate := &syncproto.SyncHeadUpdate{ Heads: result.Heads, Changes: result.Added, SnapshotPath: snapshotPath, - TreeId: update.TreeId, - TreeHeader: update.TreeHeader, } - return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate)) + return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId)) } -func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncproto.Sync_Full_Request) (err error) { - var ( - fullResponse *syncproto.Sync_Full_Response - snapshotPath []string - result acltree.AddResult - ) - log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)). - Debug("processing full sync request") +func (r *requestHandler) HandleFullSyncRequest( + ctx context.Context, + senderId string, + request *syncproto.SyncFullRequest, + header *aclpb.Header, + treeId string) (err error) { - err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error { - // TODO: check if we already have those changes - // if we have non-empty request - if len(request.Heads) != 0 { - result, err = tree.AddRawChanges(ctx, request.Changes...) - if err != nil { - return err - } - } - snapshotPath = tree.SnapshotPath() - fullResponse, err = r.prepareFullSyncResponse(request.TreeId, request.SnapshotPath, request.Changes, tree) + var fullResponse *syncproto.SyncFullResponse + err = r.treeCache.Do(ctx, treeId, func(obj any) error { + objTree := obj.(tree.ObjectTree) + objTree.Lock() + defer objTree.Unlock() + + fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Heads, objTree) if err != nil { return err } return nil }) + if err != nil { return err } - err = r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse)) - // if error or nothing has changed - if err != nil || len(result.Added) == 0 { - return err - } - - // otherwise sending heads update message - newUpdate := &syncproto.Sync_HeadUpdate{ - Heads: result.Heads, - Changes: result.Added, - SnapshotPath: snapshotPath, - TreeId: request.TreeId, - TreeHeader: request.TreeHeader, - } - return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate)) + return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId)) } -func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncproto.Sync_Full_Response) (err error) { +func (r *requestHandler) HandleFullSyncResponse( + ctx context.Context, + senderId string, + response *syncproto.SyncFullResponse, + header *aclpb.Header, + treeId string) (err error) { + var ( snapshotPath []string - result acltree.AddResult + result tree.AddResult ) - log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)). - Debug("processing full sync response") - err = r.treeCache.Do(ctx, response.TreeId, func(tree acltree.ACLTree) error { - // TODO: check if we already have those changes - result, err = tree.AddRawChanges(ctx, response.Changes...) + err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error { + objTree := obj.(tree.ObjectTree) + objTree.Lock() + defer objTree.Unlock() + + // if we already have the heads for whatever reason + if slice.UnsortedEquals(response.Heads, objTree.Heads()) { + return nil + } + + result, err = objTree.AddRawChanges(ctx, response.Changes...) if err != nil { return err } - snapshotPath = tree.SnapshotPath() + snapshotPath = objTree.SnapshotPath() return nil }) + // if error or nothing has changed - if (err != nil || len(result.Added) == 0) && err != treestorage.ErrUnknownTreeId { + if (err != nil || len(result.Added) == 0) && err != storage.ErrUnknownTreeId { return err } // if we have a new tree - if err == treestorage.ErrUnknownTreeId { - err = r.createTree(ctx, response) + if err == storage.ErrUnknownTreeId { + err = r.createTree(ctx, response, header, treeId) if err != nil { return err } - result = acltree.AddResult{ + result = tree.AddResult{ OldHeads: []string{}, Heads: response.Heads, Added: response.Changes, } } // sending heads update message - newUpdate := &syncproto.Sync_HeadUpdate{ + newUpdate := &syncproto.SyncHeadUpdate{ Heads: result.Heads, Changes: result.Added, SnapshotPath: snapshotPath, - TreeId: response.TreeId, } - return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate)) + return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId)) } -func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncproto.Sync_Full_Request, error) { - ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath) - if err != nil { - return nil, err +func (r *requestHandler) HandleACLList( + ctx context.Context, + senderId string, + req *syncproto.SyncACLList, + header *aclpb.Header, + id string) (err error) { + + err = r.treeCache.Do(ctx, id, func(obj interface{}) error { + return nil + }) + // do nothing if already added + if err == nil { + return nil } - return &syncproto.Sync_Full_Request{ - Heads: tree.Heads(), - Changes: ourChanges, - TreeId: treeId, - SnapshotPath: tree.SnapshotPath(), - TreeHeader: header, + // if not found then add to storage + if err == storage.ErrUnknownTreeId { + return r.createACLList(ctx, req, header, id) + } + return err +} + +func (r *requestHandler) prepareFullSyncRequest(t tree.ObjectTree) (*syncproto.SyncFullRequest, error) { + return &syncproto.SyncFullRequest{ + Heads: t.Heads(), + SnapshotPath: t.SnapshotPath(), }, nil } func (r *requestHandler) prepareFullSyncResponse( treeId string, - theirPath []string, - theirChanges []*aclpb.RawChange, - tree acltree.ACLTree) (*syncproto.Sync_Full_Response, error) { - // TODO: we can probably use the common snapshot calculated on the request step from previous peer - ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath) + theirPath, theirHeads []string, + t tree.ObjectTree) (*syncproto.SyncFullResponse, error) { + ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath, theirHeads) if err != nil { return nil, err } - theirMap := make(map[string]struct{}) - for _, ch := range theirChanges { - theirMap[ch.Id] = struct{}{} - } - // filtering our changes, so we will not send the same changes back - var final []*aclpb.RawChange - for _, ch := range ourChanges { - if _, exists := theirMap[ch.Id]; !exists { - final = append(final, ch) - } - } - log.With(zap.Int("len(changes)", len(final)), zap.String("id", treeId)). - Debug("preparing changes for tree") - - return &syncproto.Sync_Full_Response{ - Heads: tree.Heads(), - Changes: final, - TreeId: treeId, - SnapshotPath: tree.SnapshotPath(), - TreeHeader: tree.Header(), + return &syncproto.SyncFullResponse{ + Heads: t.Heads(), + Changes: ourChanges, + SnapshotPath: t.SnapshotPath(), }, nil } -func (r *requestHandler) createTree(ctx context.Context, response *syncproto.Sync_Full_Response) error { +func (r *requestHandler) createTree( + ctx context.Context, + response *syncproto.SyncFullResponse, + header *aclpb.Header, + treeId string) error { + return r.treeCache.Add( ctx, - response.TreeId, - response.TreeHeader, - response.Changes, - func(tree acltree.ACLTree) error { - return nil + treeId, + storage.TreeStorageCreatePayload{ + TreeId: treeId, + Header: header, + Changes: response.Changes, + Heads: response.Heads, + }) +} + +func (r *requestHandler) createACLList( + ctx context.Context, + req *syncproto.SyncACLList, + header *aclpb.Header, + treeId string) error { + + return r.treeCache.Add( + ctx, + treeId, + storage.ACLListStorageCreatePayload{ + ListId: treeId, + Header: header, + Records: req.Records, }) }