1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Add protocol version to context and add new sync logic

This commit is contained in:
mcrakhman 2024-07-02 11:38:03 +02:00
parent 23390e556e
commit bab5cb694f
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
14 changed files with 121 additions and 37 deletions

View file

@ -152,13 +152,13 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
// if we removed acl head from the list
if len(existingIds) < prevExistingLen {
if syncErr := d.syncAcl.SyncWithPeer(ctx, p.Id()); syncErr != nil {
if syncErr := d.syncAcl.SyncWithPeer(ctx, p); syncErr != nil {
log.Warn("failed to send acl sync message to peer", zap.String("aclId", syncAclId))
}
}
// treeSyncer should not get acl id, that's why we filter existing ids before
err = d.treeSyncer.SyncAll(ctx, p.Id(), existingIds, missingIds)
err = d.treeSyncer.SyncAll(ctx, p, existingIds, missingIds)
if err != nil {
return err
}

View file

@ -9,6 +9,7 @@ import (
type RequestFactory interface {
CreateHeadUpdate(l list.AclList, added []*consensusproto.RawRecordWithId) (msg *consensusproto.LogSyncMessage)
CreateEmptyFullSyncRequest(l list.AclList) (req *consensusproto.LogSyncMessage)
CreateFullSyncRequest(l list.AclList, theirHead string) (req *consensusproto.LogSyncMessage, err error)
CreateFullSyncResponse(l list.AclList, theirHead string) (*consensusproto.LogSyncMessage, error)
}
@ -26,6 +27,12 @@ func (r *requestFactory) CreateHeadUpdate(l list.AclList, added []*consensusprot
}, l.Root())
}
func (r *requestFactory) CreateEmptyFullSyncRequest(l list.AclList) (req *consensusproto.LogSyncMessage) {
return consensusproto.WrapFullRequest(&consensusproto.LogFullSyncRequest{
Head: l.Head().Id,
}, l.Root())
}
func (r *requestFactory) CreateFullSyncRequest(l list.AclList, theirHead string) (req *consensusproto.LogSyncMessage, err error) {
if !l.HasHead(theirHead) {
return consensusproto.WrapFullRequest(&consensusproto.LogFullSyncRequest{

View file

@ -8,6 +8,8 @@ import (
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl/headupdater"
"github.com/anyproto/any-sync/commonspace/object/syncobjectgetter"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/secureservice"
"github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app"
@ -35,7 +37,7 @@ type SyncAcl interface {
list.AclList
syncobjectgetter.SyncObject
SetHeadUpdater(updater headupdater.HeadUpdater)
SyncWithPeer(ctx context.Context, peerId string) (err error)
SyncWithPeer(ctx context.Context, p peer.Peer) (err error)
SetAclUpdater(updater headupdater.AclUpdater)
}
@ -73,8 +75,8 @@ func (s *syncAcl) SetHeadUpdater(updater headupdater.HeadUpdater) {
s.headUpdater = updater
}
func (s *syncAcl) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) {
return s.syncHandler.HandleMessage(ctx, senderId, request)
func (s *syncAcl) HandleMessage(ctx context.Context, senderId string, protoVersion uint32, request *spacesyncproto.ObjectSyncMessage) (err error) {
return s.syncHandler.HandleMessage(ctx, senderId, protoVersion, request)
}
func (s *syncAcl) Init(a *app.App) (err error) {
@ -136,11 +138,18 @@ func (s *syncAcl) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (e
return
}
func (s *syncAcl) SyncWithPeer(ctx context.Context, peerId string) (err error) {
func (s *syncAcl) SyncWithPeer(ctx context.Context, p peer.Peer) (err error) {
s.Lock()
defer s.Unlock()
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
return s.syncClient.SendUpdate(peerId, headUpdate)
protoVersion, err := peer.CtxProtoVersion(ctx)
// this works with old protocol
if err != nil || protoVersion <= secureservice.ProtoVersion {
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
return s.syncClient.SendUpdate(p.Id(), headUpdate)
}
// for new protocol sending empty request
request := s.syncClient.CreateEmptyFullSyncRequest(s)
return s.syncClient.QueueRequest(p.Id(), request)
}
func (s *syncAcl) Close(ctx context.Context) (err error) {

View file

@ -4,12 +4,13 @@ import (
"context"
"errors"
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/gogo/protobuf/proto"
)
var (
@ -35,7 +36,7 @@ func newSyncAclHandler(spaceId string, aclList list.AclList, syncClient SyncClie
}
}
func (s *syncAclHandler) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
func (s *syncAclHandler) HandleMessage(ctx context.Context, senderId string, protoVersion uint32, message *spacesyncproto.ObjectSyncMessage) (err error) {
unmarshalled := &consensusproto.LogSyncMessage{}
err = proto.Unmarshal(message.Payload, unmarshalled)
if err != nil {

View file

@ -2,6 +2,7 @@ package synctree
import (
"fmt"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/util/slice"
@ -10,6 +11,7 @@ import (
type RequestFactory interface {
CreateHeadUpdate(t objecttree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *treechangeproto.TreeSyncMessage)
CreateNewTreeRequest() (msg *treechangeproto.TreeSyncMessage)
CreateEmptyFullSyncRequest(t objecttree.ObjectTree) (req *treechangeproto.TreeSyncMessage)
CreateFullSyncRequest(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (req *treechangeproto.TreeSyncMessage, err error)
CreateFullSyncResponse(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (*treechangeproto.TreeSyncMessage, error)
}
@ -32,6 +34,13 @@ func (r *requestFactory) CreateNewTreeRequest() (msg *treechangeproto.TreeSyncMe
return treechangeproto.WrapFullRequest(&treechangeproto.TreeFullSyncRequest{}, nil)
}
func (r *requestFactory) CreateEmptyFullSyncRequest(t objecttree.ObjectTree) (msg *treechangeproto.TreeSyncMessage) {
return treechangeproto.WrapFullRequest(&treechangeproto.TreeFullSyncRequest{
Heads: t.Heads(),
SnapshotPath: t.SnapshotPath(),
}, t.Header())
}
func (r *requestFactory) CreateFullSyncRequest(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (msg *treechangeproto.TreeSyncMessage, err error) {
req := &treechangeproto.TreeFullSyncRequest{}
if t == nil {

View file

@ -17,6 +17,7 @@ import (
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/secureservice"
"github.com/anyproto/any-sync/nodeconf"
)
@ -37,7 +38,7 @@ type SyncTree interface {
objecttree.ObjectTree
synchandler.SyncHandler
ListenerSetter
SyncWithPeer(ctx context.Context, peerId string) (err error)
SyncWithPeer(ctx context.Context, p peer.Peer) (err error)
}
// SyncTree sends head updates to sync service and also sends new changes to update listener
@ -245,14 +246,18 @@ func (s *syncTree) checkAlive() (err error) {
return
}
func (s *syncTree) SyncWithPeer(ctx context.Context, peerId string) (err error) {
func (s *syncTree) SyncWithPeer(ctx context.Context, p peer.Peer) (err error) {
s.Lock()
defer s.Unlock()
if objecttree.IsEmptyDerivedTree(s) {
return nil
protoVersion, err := peer.CtxProtoVersion(ctx)
// this works with old protocol
if err != nil || protoVersion <= secureservice.ProtoVersion {
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
return s.syncClient.SendUpdate(p.Id(), headUpdate.RootChange.Id, headUpdate)
}
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
return s.syncClient.SendUpdate(peerId, headUpdate.RootChange.Id, headUpdate)
// for new protocol sending empty request
request := s.syncClient.CreateEmptyFullSyncRequest(s)
return s.syncClient.QueueRequest(p.Id(), s.Id(), request)
}
func (s *syncTree) afterBuild() {

View file

@ -5,13 +5,15 @@ import (
"errors"
"sync"
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/net/secureservice"
"github.com/anyproto/any-sync/util/slice"
"github.com/gogo/protobuf/proto"
)
var (
@ -85,7 +87,7 @@ func (s *syncTreeHandler) handleRequest(ctx context.Context, senderId string, fu
return
}
func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, protoVersion uint32, msg *spacesyncproto.ObjectSyncMessage) (err error) {
unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(msg.Payload, unmarshalled)
if err != nil {
@ -100,10 +102,10 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms
return
}
s.handlerLock.Unlock()
return s.handleMessage(ctx, unmarshalled, senderId)
return s.handleMessage(ctx, unmarshalled, protoVersion, senderId)
}
func (s *syncTreeHandler) handleMessage(ctx context.Context, msg *treechangeproto.TreeSyncMessage, senderId string) (err error) {
func (s *syncTreeHandler) handleMessage(ctx context.Context, msg *treechangeproto.TreeSyncMessage, protoVersion uint32, senderId string) (err error) {
s.objTree.Lock()
defer s.objTree.Unlock()
var (
@ -137,7 +139,22 @@ func (s *syncTreeHandler) handleMessage(ctx context.Context, msg *treechangeprot
case content.GetFullSyncRequest() != nil:
return ErrMessageIsRequest
case content.GetFullSyncResponse() != nil:
return s.syncProtocol.FullSyncResponse(ctx, senderId, content.GetFullSyncResponse())
err := s.syncProtocol.FullSyncResponse(ctx, senderId, content.GetFullSyncResponse())
if err != nil {
return err
}
if protoVersion <= secureservice.ProtoVersion {
return nil
}
cnt := content.GetFullSyncResponse()
if slice.UnsortedEquals(cnt.Heads, s.objTree.Heads()) {
return nil
}
req, err := s.syncClient.CreateFullSyncRequest(s.objTree, cnt.Heads, cnt.SnapshotPath)
if err != nil {
return err
}
return s.syncClient.QueueRequest(senderId, treeId, req)
}
return
}

View file

@ -5,6 +5,7 @@ import (
"context"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/net/peer"
)
const CName = "common.object.treesyncer"
@ -14,5 +15,5 @@ type TreeSyncer interface {
StartSync()
StopSync()
ShouldSync(peerId string) bool
SyncAll(ctx context.Context, peerId string, existing, missing []string) error
SyncAll(ctx context.Context, p peer.Peer, existing, missing []string) error
}

View file

@ -15,6 +15,7 @@ import (
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/secureservice"
"github.com/anyproto/any-sync/util/multiqueue"
"go.uber.org/zap"
@ -128,6 +129,10 @@ func (s *objectSync) HandleMessage(ctx context.Context, hm HandleMessage) (err e
func (s *objectSync) processHandleMessage(msg HandleMessage) {
var err error
peerProtoVersion, err := peer.CtxProtoVersion(msg.PeerCtx)
if err != nil {
peerProtoVersion = secureservice.ProtoVersion
}
msg.StartHandlingTime = time.Now()
ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId)
ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId))
@ -148,7 +153,7 @@ func (s *objectSync) processHandleMessage(msg HandleMessage) {
return
}
}
if err = s.handleMessage(ctx, msg.SenderId, msg.Message); err != nil {
if err = s.handleMessage(ctx, msg.SenderId, peerProtoVersion, msg.Message); err != nil {
if msg.Message.ObjectId != "" {
// cleanup thread on error
_ = s.handleQueue.CloseThread(msg.Message.ObjectId)
@ -170,7 +175,7 @@ func (s *objectSync) handleRequest(ctx context.Context, senderId string, msg *sp
return obj.HandleRequest(ctx, senderId, msg)
}
func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
func (s *objectSync) handleMessage(ctx context.Context, senderId string, protoVersion uint32, msg *spacesyncproto.ObjectSyncMessage) (err error) {
log := log.With(zap.String("objectId", msg.ObjectId))
defer func() {
if p := recover(); p != nil {
@ -186,7 +191,7 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
if err != nil {
return fmt.Errorf("failed to get object from cache: %w", err)
}
err = obj.HandleMessage(ctx, senderId, msg)
err = obj.HandleMessage(ctx, senderId, protoVersion, msg)
if err != nil {
return fmt.Errorf("failed to handle message: %w", err)
}

View file

@ -2,10 +2,11 @@ package synchandler
import (
"context"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
type SyncHandler interface {
HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error)
HandleMessage(ctx context.Context, senderId string, protoVersion uint32, message *spacesyncproto.ObjectSyncMessage) (err error)
HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error)
}

View file

@ -13,7 +13,6 @@ import (
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/net/rpc/rpcerr"
)
@ -112,7 +111,11 @@ func (r *requestManager) SendRequest(ctx context.Context, peerId string, req *sp
defer func() {
r.reqStat.RemoveSyncRequest(peerId, req)
}()
return r.doRequest(ctx, peerId, req)
res, err := r.doRequest(ctx, peerId, req)
if err != nil {
return nil, err
}
return res.resp, err
}
func (r *requestManager) QueueRequest(peerId string, req *spacesyncproto.ObjectSyncMessage) (err error) {
@ -142,27 +145,36 @@ var doRequestAndHandle = (*requestManager).requestAndHandle
func (r *requestManager) requestAndHandle(peerId string, req *spacesyncproto.ObjectSyncMessage) {
ctx := r.ctx
resp, err := r.doRequest(ctx, peerId, req)
res, err := r.doRequest(ctx, peerId, req)
if err != nil {
log.Warn("failed to send request", zap.Error(err))
return
}
ctx = peer.CtxWithPeerId(ctx, peerId)
_ = r.handler.HandleMessage(ctx, objectsync.HandleMessage{
SenderId: peerId,
Message: resp,
PeerCtx: ctx,
Message: res.resp,
PeerCtx: res.peerCtx,
})
}
func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
type result struct {
peerCtx context.Context
resp *spacesyncproto.ObjectSyncMessage
}
func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (res result, err error) {
pr, err := r.peerPool.Get(ctx, peerId)
if err != nil {
return
}
res.peerCtx = pr.Context()
err = pr.DoDrpc(ctx, func(conn drpc.Conn) error {
cl := r.clientFactory.Client(conn)
resp, err = cl.ObjectSync(ctx, msg)
resp, err := cl.ObjectSync(ctx, msg)
if err != nil {
return err
}
res.resp = resp
return err
})
err = rpcerr.Unwrap(err)

View file

@ -298,7 +298,7 @@ func (m mockTreeSyncer) StartSync() {
func (m mockTreeSyncer) StopSync() {
}
func (m mockTreeSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error {
func (m mockTreeSyncer) SyncAll(ctx context.Context, p peer.Peer, existing, missing []string) error {
return nil
}

View file

@ -17,11 +17,13 @@ const (
contextKeyIdentity
contextKeyPeerAddr
contextKeyPeerClientVersion
contextKeyPeerProtoVersion
)
var (
ErrPeerIdNotFoundInContext = errors.New("peer id not found in context")
ErrIdentityNotFoundInContext = errors.New("identity not found in context")
ErrPeerIdNotFoundInContext = errors.New("peer id not found in context")
ErrProtoVersionNotFoundInContext = errors.New("proto version not found in context")
ErrIdentityNotFoundInContext = errors.New("identity not found in context")
)
const CtxResponsiblePeers = "*"
@ -42,6 +44,19 @@ func CtxWithPeerId(ctx context.Context, peerId string) context.Context {
return context.WithValue(ctx, contextKeyPeerId, peerId)
}
// CtxWithProtoVersion sets peer protocol version
func CtxWithProtoVersion(ctx context.Context, version uint32) context.Context {
return context.WithValue(ctx, contextKeyPeerProtoVersion, version)
}
// CtxProtoVersion returns peer protocol version
func CtxProtoVersion(ctx context.Context) (uint32, error) {
if protoVersion, ok := ctx.Value(contextKeyPeerProtoVersion).(uint32); ok {
return protoVersion, nil
}
return 0, ErrProtoVersionNotFoundInContext
}
// CtxPeerAddr returns peer address
func CtxPeerAddr(ctx context.Context) string {
if p, ok := ctx.Value(contextKeyPeerAddr).(string); ok {

View file

@ -119,6 +119,7 @@ func (s *secureService) HandshakeInbound(ctx context.Context, conn io.ReadWriteC
cctx = peer.CtxWithPeerId(cctx, peerId)
cctx = peer.CtxWithIdentity(cctx, res.Identity)
cctx = peer.CtxWithClientVersion(cctx, res.ClientVersion)
cctx = peer.CtxWithProtoVersion(cctx, res.ProtoVersion)
return
}
@ -146,6 +147,7 @@ func (s *secureService) HandshakeOutbound(ctx context.Context, conn io.ReadWrite
cctx = peer.CtxWithPeerId(cctx, peerId)
cctx = peer.CtxWithIdentity(cctx, res.Identity)
cctx = peer.CtxWithClientVersion(cctx, res.ClientVersion)
cctx = peer.CtxWithProtoVersion(cctx, res.ProtoVersion)
return cctx, nil
}