1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-11 10:18:08 +09:00

Remove responsibility of objecttree to notify and move to synctree

This commit is contained in:
mcrakhman 2022-09-17 21:39:17 +02:00 committed by Mikhail Iudin
parent 4754c9704b
commit 212553d63d
No known key found for this signature in database
GPG key ID: FAAAA8BAABDFF1C0
8 changed files with 53 additions and 468 deletions

View file

@ -3,7 +3,6 @@ package cache
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
)
@ -21,5 +20,4 @@ type TreeResult struct {
type TreeCache interface {
app.ComponentRunnable
GetTree(ctx context.Context, id string) (TreeResult, error)
AddTree(ctx context.Context, payload storage.TreeStorageCreatePayload) error
}

View file

@ -22,8 +22,8 @@ type Space interface {
SyncService() syncservice.SyncService
DiffService() diffservice.DiffService
CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error)
BuildTree(ctx context.Context, id string, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error)
CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener synctree.UpdateListener) (tree.ObjectTree, error)
BuildTree(ctx context.Context, id string, listener synctree.UpdateListener) (tree.ObjectTree, error)
Close() error
}
@ -65,11 +65,11 @@ func (s *space) DiffService() diffservice.DiffService {
return s.diffService
}
func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) {
return synctree.CreateSyncTree(payload, s.syncService, listener, nil, s.storage.CreateTreeStorage)
func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener synctree.UpdateListener) (tree.ObjectTree, error) {
return synctree.CreateSyncTree(ctx, payload, s.syncService, listener, nil, s.storage.CreateTreeStorage)
}
func (s *space) BuildTree(ctx context.Context, id string, listener tree.ObjectTreeUpdateListener) (t tree.ObjectTree, err error) {
func (s *space) BuildTree(ctx context.Context, id string, listener synctree.UpdateListener) (t tree.ObjectTree, err error) {
getTreeRemote := func() (*spacesyncproto.ObjectSyncMessage, error) {
// TODO: add empty context handling (when this is not happening due to head update)
peerId, err := syncservice.GetPeerIdFromStreamContext(ctx)
@ -115,7 +115,7 @@ func (s *space) BuildTree(ctx context.Context, id string, listener tree.ObjectTr
return
}
}
return synctree.BuildSyncTree(s.syncService, store.(treestorage.TreeStorage), listener, s.aclList)
return synctree.BuildSyncTree(ctx, s.syncService, store.(treestorage.TreeStorage), listener, s.aclList)
}
func (s *space) getObjectIds() []string {

View file

@ -120,6 +120,7 @@ Loop:
delete(s.peerStreams, id)
continue Loop
default:
break
}
streams = append(streams, stream)
}

View file

@ -10,24 +10,35 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
)
type UpdateListener interface {
Update(tree tree.ObjectTree)
Rebuild(tree tree.ObjectTree)
}
type SyncTree struct {
tree.ObjectTree
syncService syncservice.SyncService
listener UpdateListener
}
func CreateSyncTree(
ctx context.Context,
payload tree.ObjectTreeCreatePayload,
syncService syncservice.SyncService,
listener tree.ObjectTreeUpdateListener,
listener UpdateListener,
aclList list.ACLList,
createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) {
t, err = tree.CreateObjectTree(payload, listener, aclList, createStorage)
t, err = tree.CreateObjectTree(payload, aclList, createStorage)
if err != nil {
return
}
t = &SyncTree{
ObjectTree: t,
syncService: syncService,
listener: listener,
}
// TODO: use context where it is needed
err = syncService.NotifyHeadUpdate(context.Background(), t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{
err = syncService.NotifyHeadUpdate(ctx, t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
})
@ -35,25 +46,31 @@ func CreateSyncTree(
}
func BuildSyncTree(
ctx context.Context,
syncService syncservice.SyncService,
treeStorage storage.TreeStorage,
listener tree.ObjectTreeUpdateListener,
listener UpdateListener,
aclList list.ACLList) (t tree.ObjectTree, err error) {
return buildSyncTree(syncService, treeStorage, listener, aclList)
return buildSyncTree(ctx, syncService, treeStorage, listener, aclList)
}
func buildSyncTree(
ctx context.Context,
syncService syncservice.SyncService,
treeStorage storage.TreeStorage,
listener tree.ObjectTreeUpdateListener,
listener UpdateListener,
aclList list.ACLList) (t tree.ObjectTree, err error) {
t, err = tree.BuildObjectTree(treeStorage, listener, aclList)
t, err = tree.BuildObjectTree(treeStorage, aclList)
if err != nil {
return
}
t = &SyncTree{
ObjectTree: t,
syncService: syncService,
listener: listener,
}
// TODO: use context where it is needed
err = syncService.NotifyHeadUpdate(context.Background(), t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{
err = syncService.NotifyHeadUpdate(ctx, t.ID(), t.Header(), &spacesyncproto.ObjectHeadUpdate{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
})
@ -75,9 +92,18 @@ func (s *SyncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
func (s *SyncTree) AddRawChanges(ctx context.Context, changes ...*aclpb.RawTreeChangeWithId) (res tree.AddResult, err error) {
res, err = s.AddRawChanges(ctx, changes...)
if err != nil || res.Mode == tree.Nothing {
if err != nil {
return
}
switch res.Mode {
case tree.Nothing:
return
case tree.Append:
s.listener.Update(s)
case tree.Rebuild:
s.listener.Rebuild(s)
}
err = s.syncService.NotifyHeadUpdate(ctx, s.ID(), s.Header(), &spacesyncproto.ObjectHeadUpdate{
Heads: res.Heads,
Changes: res.Added,
@ -85,3 +111,7 @@ func (s *SyncTree) AddRawChanges(ctx context.Context, changes ...*aclpb.RawTreeC
})
return
}
func (s *SyncTree) Tree() tree.ObjectTree {
return s
}

View file

@ -11,11 +11,6 @@ import (
"sync"
)
type ObjectTreeUpdateListener interface {
Update(tree ObjectTree)
Rebuild(tree ObjectTree)
}
type RWLocker interface {
sync.Locker
RLock()
@ -71,7 +66,6 @@ type objectTree struct {
rawChangeLoader *rawChangeLoader
treeBuilder *treeBuilder
aclList list.ACLList
updateListener ObjectTreeUpdateListener
id string
header *aclpb.TreeHeader
@ -94,7 +88,6 @@ type objectTreeDeps struct {
changeBuilder ChangeBuilder
treeBuilder *treeBuilder
treeStorage storage.TreeStorage
updateListener ObjectTreeUpdateListener
validator ObjectTreeValidator
rawChangeLoader *rawChangeLoader
aclList list.ACLList
@ -102,7 +95,6 @@ type objectTreeDeps struct {
func defaultObjectTreeDeps(
treeStorage storage.TreeStorage,
listener ObjectTreeUpdateListener,
aclList list.ACLList) objectTreeDeps {
keychain := common.NewKeychain()
@ -112,7 +104,6 @@ func defaultObjectTreeDeps(
changeBuilder: changeBuilder,
treeBuilder: treeBuilder,
treeStorage: treeStorage,
updateListener: listener,
validator: newTreeValidator(),
rawChangeLoader: newRawChangeLoader(treeStorage, changeBuilder),
aclList: aclList,
@ -211,8 +202,7 @@ func (ot *objectTree) prepareBuilderContent(content SignableChangeContent) (cnt
}
func (ot *objectTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawTreeChangeWithId) (addResult AddResult, err error) {
var mode Mode
mode, addResult, err = ot.addRawChanges(ctx, rawChanges...)
addResult, err = ot.addRawChanges(ctx, rawChanges...)
if err != nil {
return
}
@ -230,26 +220,10 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.Ra
// setting heads
err = ot.treeStorage.SetHeads(ot.tree.Heads())
if err != nil {
return
}
if ot.updateListener == nil {
return
}
switch mode {
case Append:
ot.updateListener.Update(ot)
case Rebuild:
ot.updateListener.Rebuild(ot)
default:
break
}
return
}
func (ot *objectTree) addRawChanges(ctx context.Context, rawChanges ...*aclpb.RawTreeChangeWithId) (mode Mode, addResult AddResult, err error) {
func (ot *objectTree) addRawChanges(ctx context.Context, rawChanges ...*aclpb.RawTreeChangeWithId) (addResult AddResult, err error) {
// resetting buffers
ot.tmpChangesBuf = ot.tmpChangesBuf[:0]
ot.notSeenIdxBuf = ot.notSeenIdxBuf[:0]

View file

@ -19,19 +19,18 @@ type ObjectTreeCreatePayload struct {
TreeType aclpb.TreeHeaderType
}
func BuildObjectTree(treeStorage storage.TreeStorage, listener ObjectTreeUpdateListener, aclList list.ACLList) (ObjectTree, error) {
deps := defaultObjectTreeDeps(treeStorage, listener, aclList)
func BuildObjectTree(treeStorage storage.TreeStorage, aclList list.ACLList) (ObjectTree, error) {
deps := defaultObjectTreeDeps(treeStorage, aclList)
return buildObjectTree(deps)
}
func CreateObjectTree(
payload ObjectTreeCreatePayload,
listener ObjectTreeUpdateListener,
aclList list.ACLList,
createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
aclList.RLock()
var (
deps = defaultObjectTreeDeps(nil, listener, aclList)
deps = defaultObjectTreeDeps(nil, aclList)
state = aclList.ACLState()
aclId = aclList.ID()
aclHeadId = aclList.Head().Id
@ -91,7 +90,6 @@ func CreateObjectTree(
func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) {
objTree := &objectTree{
treeStorage: deps.treeStorage,
updateListener: deps.updateListener,
treeBuilder: deps.treeBuilder,
validator: deps.validator,
aclList: deps.aclList,

View file

@ -1,125 +0,0 @@
package message
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
pool2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/node"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/gogo/protobuf/proto"
"sync"
"time"
)
var log = logger.NewNamed("messageservice")
const CName = "MessageService"
type service struct {
nodes []*node.Node
requestHandler requesthandler.RequestHandler
pool pool2.Pool
sync.RWMutex
}
func New() app.Component {
return &service{}
}
type Service interface {
SendMessageAsync(peerId string, msg *syncproto.Sync) error
SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error
}
func (s *service) Init(a *app.App) (err error) {
s.requestHandler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler)
s.nodes = a.MustComponent(node.CName).(node.Service).Nodes()
s.pool = a.MustComponent(pool2.CName).(pool2.Pool)
s.pool.AddHandler(syncproto.MessageType_MessageTypeSync, s.HandleMessage)
return nil
}
func (s *service) Name() (name string) {
return CName
}
func (s *service) Run(ctx context.Context) (err error) {
return nil
}
func (s *service) Close(ctx context.Context) (err error) {
return nil
}
func (s *service) HandleMessage(ctx context.Context, msg *pool.Message) (err error) {
defer func() {
if err != nil {
msg.AckError(syncproto.System_Error_UNKNOWN, err.Error())
} else {
msg.Ack()
}
}()
syncMsg := &syncproto.Sync{}
err = proto.Unmarshal(msg.Data, syncMsg)
if err != nil {
return
}
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
err = s.requestHandler.HandleSyncMessage(timeoutCtx, msg.Peer().Id(), syncMsg)
return
}
func (s *service) SendMessageAsync(peerId string, msg *syncproto.Sync) (err error) {
_, err = s.pool.DialAndAddPeer(context.Background(), peerId)
if err != nil {
return
}
marshalled, err := proto.Marshal(msg)
if err != nil {
return
}
go s.sendAsync(peerId, msgInfo(msg), marshalled)
return
}
func (s *service) SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error {
for _, rp := range s.nodes {
s.SendMessageAsync(rp.PeerId, msg)
}
return nil
}
func (s *service) sendAsync(peerId string, msgTypeStr string, marshalled []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
return s.pool.SendAndWait(ctx, peerId, &syncproto.Message{
Header: &syncproto.Header{
Type: syncproto.MessageType_MessageTypeSync,
DebugInfo: msgTypeStr,
},
Data: marshalled,
})
}
func msgInfo(content *syncproto.Sync) (syncMethod string) {
msg := content.GetMessage()
switch {
case msg.GetFullSyncRequest() != nil:
syncMethod = "FullSyncRequest"
case msg.GetFullSyncResponse() != nil:
syncMethod = "FullSyncResponse"
case msg.GetHeadUpdate() != nil:
syncMethod = "HeadUpdate"
}
syncMethod = fmt.Sprintf("method: %s, treeType: %s", syncMethod, content.TreeHeader.DocType.String())
return
}

View file

@ -1,291 +0,0 @@
package requesthandler
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
"go.uber.org/zap"
)
type requestHandler struct {
treeCache treecache.Service
account account.Service
messageService MessageSender
}
var log = logger.NewNamed("requesthandler")
func New() app.Component {
return &requestHandler{}
}
type RequestHandler interface {
HandleSyncMessage(ctx context.Context, senderId string, request *syncproto.Sync) (err error)
}
type MessageSender interface {
SendMessageAsync(peerId string, msg *syncproto.Sync) error
SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error
}
const CName = "SyncRequestHandler"
func (r *requestHandler) Init(a *app.App) (err error) {
r.treeCache = a.MustComponent(treecache.CName).(treecache.Service)
r.account = a.MustComponent(account.CName).(account.Service)
r.messageService = a.MustComponent("MessageService").(MessageSender)
return nil
}
func (r *requestHandler) Name() (name string) {
return CName
}
func (r *requestHandler) Run(ctx context.Context) (err error) {
return nil
}
func (r *requestHandler) Close(ctx context.Context) (err error) {
return nil
}
func (r *requestHandler) HandleSyncMessage(ctx context.Context, senderId string, content *syncproto.Sync) error {
msg := content.GetMessage()
switch {
case msg.GetFullSyncRequest() != nil:
return r.HandleFullSyncRequest(ctx, senderId, msg.GetFullSyncRequest(), content.GetTreeHeader(), content.GetTreeId())
case msg.GetFullSyncResponse() != nil:
return r.HandleFullSyncResponse(ctx, senderId, msg.GetFullSyncResponse(), content.GetTreeHeader(), content.GetTreeId())
case msg.GetHeadUpdate() != nil:
return r.HandleHeadUpdate(ctx, senderId, msg.GetHeadUpdate(), content.GetTreeHeader(), content.GetTreeId())
}
return nil
}
func (r *requestHandler) HandleHeadUpdate(
ctx context.Context,
senderId string,
update *syncproto.SyncHeadUpdate,
header *aclpb.Header,
treeId string) (err error) {
var (
fullRequest *syncproto.SyncFullRequest
snapshotPath []string
result tree.AddResult
)
log.With(zap.String("peerId", senderId), zap.String("treeId", treeId)).
Debug("processing head update")
err = r.treeCache.Do(ctx, treeId, func(obj any) error {
objTree := obj.(tree.ObjectTree)
objTree.Lock()
defer objTree.Unlock()
if slice.UnsortedEquals(update.Heads, objTree.Heads()) {
return nil
}
result, err = objTree.AddRawChanges(ctx, update.Changes...)
if err != nil {
return err
}
// if we couldn't add all the changes
shouldFullSync := len(update.Changes) != len(result.Added)
snapshotPath = objTree.SnapshotPath()
if shouldFullSync {
fullRequest, err = r.prepareFullSyncRequest(objTree)
if err != nil {
return err
}
}
return nil
})
// if there are no such tree
if err == storage.ErrUnknownTreeId {
fullRequest = &syncproto.SyncFullRequest{}
}
// if we have incompatible heads, or we haven't seen the tree at all
if fullRequest != nil {
return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest, header, treeId))
}
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
return err
}
// otherwise sending heads update message
newUpdate := &syncproto.SyncHeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
}
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
}
func (r *requestHandler) HandleFullSyncRequest(
ctx context.Context,
senderId string,
request *syncproto.SyncFullRequest,
header *aclpb.Header,
treeId string) (err error) {
var fullResponse *syncproto.SyncFullResponse
err = r.treeCache.Do(ctx, treeId, func(obj any) error {
objTree := obj.(tree.ObjectTree)
objTree.Lock()
defer objTree.Unlock()
fullResponse, err = r.prepareFullSyncResponse(treeId, request.SnapshotPath, request.Heads, objTree)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse, header, treeId))
}
func (r *requestHandler) HandleFullSyncResponse(
ctx context.Context,
senderId string,
response *syncproto.SyncFullResponse,
header *aclpb.Header,
treeId string) (err error) {
var (
snapshotPath []string
result tree.AddResult
)
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
objTree := obj.(tree.ObjectTree)
objTree.Lock()
defer objTree.Unlock()
// if we already have the heads for whatever reason
if slice.UnsortedEquals(response.Heads, objTree.Heads()) {
return nil
}
result, err = objTree.AddRawChanges(ctx, response.Changes...)
if err != nil {
return err
}
snapshotPath = objTree.SnapshotPath()
return nil
})
// if error or nothing has changed
if (err != nil || len(result.Added) == 0) && err != storage.ErrUnknownTreeId {
return err
}
// if we have a new tree
if err == storage.ErrUnknownTreeId {
err = r.createTree(ctx, response, header, treeId)
if err != nil {
return err
}
result = tree.AddResult{
OldHeads: []string{},
Heads: response.Heads,
Added: response.Changes,
}
}
// sending heads update message
newUpdate := &syncproto.SyncHeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
}
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate, header, treeId))
}
func (r *requestHandler) HandleACLList(
ctx context.Context,
senderId string,
req *syncproto.SyncACLList,
header *aclpb.Header,
id string) (err error) {
err = r.treeCache.Do(ctx, id, func(obj interface{}) error {
return nil
})
// do nothing if already added
if err == nil {
return nil
}
// if not found then add to storage
if err == storage.ErrUnknownTreeId {
return r.createACLList(ctx, req, header, id)
}
return err
}
func (r *requestHandler) prepareFullSyncRequest(t tree.ObjectTree) (*syncproto.SyncFullRequest, error) {
return &syncproto.SyncFullRequest{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
}, nil
}
func (r *requestHandler) prepareFullSyncResponse(
treeId string,
theirPath, theirHeads []string,
t tree.ObjectTree) (*syncproto.SyncFullResponse, error) {
ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath, theirHeads)
if err != nil {
return nil, err
}
return &syncproto.SyncFullResponse{
Heads: t.Heads(),
Changes: ourChanges,
SnapshotPath: t.SnapshotPath(),
}, nil
}
func (r *requestHandler) createTree(
ctx context.Context,
response *syncproto.SyncFullResponse,
header *aclpb.Header,
treeId string) error {
return r.treeCache.Add(
ctx,
treeId,
storage.TreeStorageCreatePayload{
TreeId: treeId,
Header: header,
Changes: response.Changes,
Heads: response.Heads,
})
}
func (r *requestHandler) createACLList(
ctx context.Context,
req *syncproto.SyncACLList,
header *aclpb.Header,
treeId string) error {
return r.treeCache.Add(
ctx,
treeId,
storage.ACLListStorageCreatePayload{
ListId: treeId,
Header: header,
Records: req.Records,
})
}