1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-09 09:35:03 +09:00

More syncclient and synctree changes

This commit is contained in:
mcrakhman 2024-06-12 21:33:23 +02:00
parent 5c2923b440
commit ed24b81555
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
3 changed files with 76 additions and 51 deletions

View file

@ -0,0 +1,32 @@
package synctree
import (
"context"
"errors"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
)
var ErrUnexpectedResponseType = errors.New("unexpected response type")
type responseCollector struct {
heads []string
root *treechangeproto.RawTreeChangeWithId
changes []*treechangeproto.RawTreeChangeWithId
}
func newResponseCollector() *responseCollector {
return &responseCollector{}
}
func (r *responseCollector) CollectResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error {
treeResp, ok := resp.(Response)
if !ok {
return ErrUnexpectedResponseType
}
r.heads = treeResp.heads
r.root = treeResp.root
r.changes = append(r.changes, treeResp.changes...)
return nil
}

View file

@ -3,67 +3,41 @@ package synctree
import (
"context"
"go.uber.org/zap"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/requestmanager"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/sync"
)
type SyncClient interface {
RequestFactory
Broadcast(msg *treechangeproto.TreeSyncMessage)
SendUpdate(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error)
QueueRequest(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error)
SendRequest(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
Broadcast(ctx context.Context, headUpdate HeadUpdate) error
SendNewTreeRequest(ctx context.Context, peerId, objectId string, collector *responseCollector) (err error)
QueueRequest(ctx context.Context, peerId string, tree objecttree.ObjectTree) (err error)
}
type syncClient struct {
RequestFactory
spaceId string
syncService sync.SyncService
spaceId string
}
func NewSyncClient(spaceId string, requestManager requestmanager.RequestManager, peerManager peermanager.PeerManager) SyncClient {
func NewSyncClient(spaceId string, syncService sync.SyncService) SyncClient {
return &syncClient{
RequestFactory: &requestFactory{},
spaceId: spaceId,
requestManager: requestManager,
peerManager: peerManager,
syncService: syncService,
}
}
func (s *syncClient) Broadcast(msg *treechangeproto.TreeSyncMessage) {
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, msg.RootChange.Id)
if err != nil {
return
}
err = s.peerManager.Broadcast(context.Background(), objMsg)
if err != nil {
log.Debug("broadcast error", zap.Error(err))
}
func (s *syncClient) Broadcast(ctx context.Context, headUpdate HeadUpdate) error {
return s.syncService.BroadcastMessage(ctx, headUpdate)
}
func (s *syncClient) SendUpdate(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) {
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, objectId)
if err != nil {
return
}
return s.peerManager.SendPeer(context.Background(), peerId, objMsg)
func (s *syncClient) SendNewTreeRequest(ctx context.Context, peerId, objectId string, collector *responseCollector) (err error) {
req := s.CreateNewTreeRequest(peerId, objectId)
return s.syncService.SendRequest(ctx, req, collector)
}
func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, objectId)
if err != nil {
return
}
return s.requestManager.SendRequest(ctx, peerId, objMsg)
}
func (s *syncClient) QueueRequest(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) {
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, objectId)
if err != nil {
return
}
return s.requestManager.QueueRequest(peerId, objMsg)
func (s *syncClient) QueueRequest(ctx context.Context, peerId string, tree objecttree.ObjectTree) (err error) {
req := s.CreateFullSyncRequest(peerId, tree)
return s.syncService.QueueRequest(ctx, req)
}

View file

@ -118,9 +118,12 @@ func buildSyncTree(ctx context.Context, sendUpdate bool, deps BuildDeps) (t Sync
// don't send updates for empty derived trees, because they won't be accepted
if sendUpdate && !objecttree.IsEmptyDerivedTree(objTree) {
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
headUpdate := syncTree.syncClient.CreateHeadUpdate(objTree, "", nil)
// send to everybody, because everybody should know that the node or client got new tree
syncTree.syncClient.Broadcast(headUpdate)
broadcastErr := syncTree.syncClient.Broadcast(ctx, headUpdate)
if broadcastErr != nil {
log.Warn("failed to broadcast head update", zap.Error(broadcastErr))
}
}
return
}
@ -156,8 +159,26 @@ func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableCh
s.notifiable.UpdateHeads(s.Id(), res.Heads)
}
s.syncStatus.HeadsChange(s.Id(), res.Heads)
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
s.syncClient.Broadcast(headUpdate)
headUpdate := s.syncClient.CreateHeadUpdate(s, "", res.Added)
broadcastErr := s.syncClient.Broadcast(ctx, headUpdate)
if broadcastErr != nil {
log.Warn("failed to broadcast head update", zap.Error(broadcastErr))
}
return
}
func (s *syncTree) AddRawChangesFromPeer(ctx context.Context, peerId string, changesPayload objecttree.RawChangesPayload) (res objecttree.AddResult, err error) {
res, err = s.AddRawChanges(ctx, changesPayload)
if err != nil {
return
}
if res.Mode != objecttree.Nothing {
headUpdate := s.syncClient.CreateHeadUpdate(s, peerId, res.Added)
broadcastErr := s.syncClient.Broadcast(ctx, headUpdate)
if broadcastErr != nil {
log.Warn("failed to broadcast head update", zap.Error(broadcastErr))
}
}
return
}
@ -183,8 +204,7 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree.
if s.notifiable != nil {
s.notifiable.UpdateHeads(s.Id(), res.Heads)
}
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
s.syncClient.Broadcast(headUpdate)
// broadcast will happen on upper level, so we know which peer sent the changes
}
return
}
@ -251,8 +271,7 @@ func (s *syncTree) SyncWithPeer(ctx context.Context, peerId string) (err error)
if objecttree.IsEmptyDerivedTree(s) {
return nil
}
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
return s.syncClient.SendUpdate(peerId, headUpdate.RootChange.Id, headUpdate)
return s.syncClient.QueueRequest(ctx, peerId, s)
}
func (s *syncTree) afterBuild() {