1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-11 18:20:28 +09:00

WIP new document logic

This commit is contained in:
mcrakhman 2022-07-16 14:24:27 +02:00 committed by Mikhail Iudin
parent 4d935a0a3b
commit 9d106fee5b
No known key found for this signature in database
GPG key ID: FAAAA8BAABDFF1C0
3 changed files with 65 additions and 20 deletions

View file

@ -46,6 +46,7 @@ var ErrNoCommonSnapshot = errors.New("trees doesn't have a common snapshot")
type ACLTree interface {
RWLocker
ID() string
ACLState() *ACLState
AddContent(ctx context.Context, f func(builder ChangeBuilder) error) (*Change, error)
AddRawChanges(ctx context.Context, changes ...*aclpb.RawChange) (AddResult, error)
@ -65,6 +66,7 @@ type aclTree struct {
accountData *account.AccountData
updateListener TreeUpdateListener
id string
fullTree *Tree
aclTreeFromStart *Tree // TODO: right now we don't use it, we can probably have only local var for now. This tree is built from start of the document
aclState *ACLState
@ -112,6 +114,10 @@ func BuildACLTree(
if err != nil {
return nil, err
}
aclTree.id, err = t.TreeID()
if err != nil {
return nil, err
}
listener.Rebuild(aclTree)
@ -214,6 +220,10 @@ func (a *aclTree) rebuildFromStorage(fromStart bool) error {
return nil
}
func (a *aclTree) ID() string {
return a.id
}
func (a *aclTree) ACLState() *ACLState {
return a.aclState
}
@ -400,13 +410,25 @@ func (a *aclTree) SnapshotPath() []string {
func (a *aclTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawChange, error) {
// TODO: think about when the clients will have their full acl tree and thus full snapshots
// but no changes after some of the snapshots
commonSnapshot, err := a.commonSnapshotForTwoPaths(a.SnapshotPath(), theirPath)
if err != nil {
return nil, err
var (
isNewDocument = len(theirPath) != 0
ourPath = a.SnapshotPath()
// by default returning everything we have
commonSnapshot = ourPath[len(ourPath)-1] // TODO: root snapshot, probably it is better to have a specific method in treestorage
err error
)
// if this is non-empty request
if !isNewDocument {
commonSnapshot, err = a.commonSnapshotForTwoPaths(ourPath, theirPath)
if err != nil {
return nil, err
}
}
// we presume that we have everything after the common snapshot, though this may not be the case in case of clients and only ACL tree changes
changes, err := a.treeBuilder.dfs(a.fullTree.Heads(), commonSnapshot, func(id string) (*Change, error) {
// using custom load function to skip verification step and save raw changes
var rawChanges []*aclpb.RawChange
// using custom load function to skip verification step and save raw changes
load := func(id string) (*Change, error) {
raw, err := a.treeStorage.GetChange(context.Background(), id)
if err != nil {
return nil, err
@ -418,16 +440,21 @@ func (a *aclTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh
}
ch := NewChange(id, aclChange)
ch.Raw = raw
rawChanges = append(rawChanges, raw)
return ch, nil
})
}
// we presume that we have everything after the common snapshot, though this may not be the case in case of clients and only ACL tree changes
_, err = a.treeBuilder.dfs(a.fullTree.Heads(), commonSnapshot, load)
if err != nil {
return nil, err
}
var rawChanges []*aclpb.RawChange
for _, ch := range changes {
rawChanges = append(rawChanges, ch.Raw)
if isNewDocument {
_, err = load(commonSnapshot)
if err != nil {
return nil, err
}
}
return rawChanges, nil
}

View file

@ -6,6 +6,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/client"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
@ -15,6 +16,7 @@ import (
type requestHandler struct {
treeCache treecache.Service
client client.Client
account account.Service
}
func NewRequestHandler() app.Component {
@ -31,6 +33,8 @@ const CName = "SyncRequestHandler"
func (r *requestHandler) Init(ctx context.Context, a *app.App) (err error) {
r.treeCache = a.MustComponent(treecache.CName).(treecache.Service)
r.client = a.MustComponent(client.CName).(client.Client)
r.account = a.MustComponent(account.CName).(account.Service)
return nil
}
@ -102,9 +106,12 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes
result, err = tree.AddRawChanges(ctx, request.Changes...)
if err != nil {
return err
// if we have non empty request
if len(request.Heads) != 0 {
result, err = tree.AddRawChanges(ctx, request.Changes...)
if err != nil {
return err
}
}
snapshotPath = tree.SnapshotPath()
fullResponse, err = r.prepareFullSyncResponse(request.TreeId, request.SnapshotPath, request.Changes, tree)
@ -132,23 +139,24 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
return r.client.NotifyHeadsChanged(newUpdate)
}
func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error) {
func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncpb.SyncFullResponse) (err error) {
var (
snapshotPath []string
result acltree.AddResult
)
err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error {
err = r.treeCache.Do(ctx, response.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes
result, err = tree.AddRawChanges(ctx, request.Changes...)
result, err = tree.AddRawChanges(ctx, response.Changes...)
if err != nil {
return err
}
snapshotPath = tree.SnapshotPath()
return nil
})
if err != nil {
return err
if err == treestorage.ErrUnknownTreeId {
// TODO: probably sometimes we should notify about this (e.g. if client created new document)
return r.createTree(response)
}
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
@ -161,7 +169,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
TreeId: request.TreeId,
TreeId: response.TreeId,
}
return r.client.NotifyHeadsChanged(newUpdate)
}
@ -205,3 +213,8 @@ func (r *requestHandler) prepareFullSyncResponse(treeId string, theirPath []stri
SnapshotPath: tree.SnapshotPath(),
}, nil
}
func (r *requestHandler) createTree(response *syncpb.SyncFullResponse) error {
// TODO: write create tree functionality
return nil
}

View file

@ -13,6 +13,7 @@ const CName = "treecache"
type Service interface {
Do(ctx context.Context, treeId string, f func(tree acltree.ACLTree) error) error
Add(ctx context.Context, treeId string, tree acltree.ACLTree) error
}
type service struct {
@ -37,6 +38,10 @@ func (s *service) Do(ctx context.Context, treeId string, f func(tree acltree.ACL
return f(tree.(acltree.ACLTree))
}
func (s *service) Add(ctx context.Context, treeId string, tree acltree.ACLTree) error {
return s.cache.Add(treeId, tree)
}
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
s.cache = ocache.New(s.loadTree)
s.account = a.MustComponent(account.CName).(account.Service)