mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-11 10:18:08 +09:00
WIP compatibility
This commit is contained in:
parent
da23e685f4
commit
e05b2146fa
5 changed files with 121 additions and 1 deletions
|
@ -5,9 +5,11 @@ import (
|
|||
"errors"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
"storj.io/drpc"
|
||||
|
||||
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
||||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages"
|
||||
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
|
||||
"github.com/anyproto/any-sync/commonspace/syncstatus"
|
||||
|
@ -75,6 +77,54 @@ func (s *syncAclHandler) HandleHeadUpdate(ctx context.Context, statusUpdater syn
|
|||
return s.syncClient.CreateFullSyncRequest(peerId, s.aclList), nil
|
||||
}
|
||||
|
||||
func (s *syncAclHandler) HandleDeprecatedRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
syncMsg := &consensusproto.LogSyncMessage{}
|
||||
err = proto.Unmarshal(req.Payload, syncMsg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
request := syncMsg.GetContent().GetFullSyncRequest()
|
||||
if request == nil {
|
||||
return nil, ErrUnexpectedRequestType
|
||||
}
|
||||
s.aclList.Lock()
|
||||
root := s.aclList.Root()
|
||||
head := s.aclList.Head().Id
|
||||
prepareResponse := func(records []*consensusproto.RawRecordWithId) (*spacesyncproto.ObjectSyncMessage, error) {
|
||||
logResp := consensusproto.WrapFullResponse(&consensusproto.LogFullSyncResponse{
|
||||
Head: head,
|
||||
Records: records,
|
||||
}, root)
|
||||
marshalled, err := proto.Marshal(logResp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &spacesyncproto.ObjectSyncMessage{
|
||||
Payload: marshalled,
|
||||
ObjectId: req.ObjectId,
|
||||
SpaceId: s.spaceId,
|
||||
}, nil
|
||||
}
|
||||
if !s.aclList.HasHead(request.Head) {
|
||||
if request.Records != nil {
|
||||
err = s.aclList.AddRawRecords(request.Records)
|
||||
if err != nil {
|
||||
log.Warn("failed to add records", zap.Error(err))
|
||||
}
|
||||
}
|
||||
s.aclList.Unlock()
|
||||
return prepareResponse(nil)
|
||||
}
|
||||
recs, err := s.aclList.RecordsAfter(ctx, request.Head)
|
||||
if err != nil {
|
||||
s.aclList.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
head = s.aclList.Head().Id
|
||||
s.aclList.Unlock()
|
||||
return prepareResponse(recs)
|
||||
}
|
||||
|
||||
func (s *syncAclHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, updater syncdeps.QueueSizeUpdater, send func(resp proto.Message) error) (syncdeps.Request, error) {
|
||||
req, ok := rq.(*objectmessages.Request)
|
||||
if !ok {
|
||||
|
|
|
@ -5,10 +5,12 @@ import (
|
|||
"errors"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
"storj.io/drpc"
|
||||
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages"
|
||||
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
|
||||
"github.com/anyproto/any-sync/commonspace/syncstatus"
|
||||
|
@ -77,6 +79,58 @@ func (s *syncHandler) HandleHeadUpdate(ctx context.Context, statusUpdater syncst
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *syncHandler) HandleDeprecatedRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
unmarshalled := &treechangeproto.TreeSyncMessage{}
|
||||
err = proto.Unmarshal(req.Payload, unmarshalled)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cnt := unmarshalled.GetContent().GetFullSyncRequest()
|
||||
if cnt == nil {
|
||||
return nil, treechangeproto.ErrGetTree
|
||||
}
|
||||
peerId, err := peer.CtxPeerId(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
prepareResponse := func(chs []*treechangeproto.RawTreeChangeWithId) (*spacesyncproto.ObjectSyncMessage, error) {
|
||||
treeSyncMessage := treechangeproto.WrapFullResponse(&treechangeproto.TreeFullSyncResponse{
|
||||
Heads: s.tree.Heads(),
|
||||
SnapshotPath: s.tree.SnapshotPath(),
|
||||
Changes: chs,
|
||||
}, s.tree.Header())
|
||||
payload, err := proto.Marshal(treeSyncMessage)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &spacesyncproto.ObjectSyncMessage{
|
||||
SpaceId: req.SpaceId,
|
||||
Payload: payload,
|
||||
ObjectId: req.ObjectId,
|
||||
}, nil
|
||||
}
|
||||
s.tree.Lock()
|
||||
defer s.tree.Unlock()
|
||||
// in this case we are only adding data and returning empty response
|
||||
if cnt.Changes != nil {
|
||||
if !slice.UnsortedEquals(s.tree.Heads(), cnt.Heads) {
|
||||
_, err = s.tree.AddRawChangesFromPeer(ctx, peerId, objecttree.RawChangesPayload{
|
||||
NewHeads: cnt.Heads,
|
||||
RawChanges: cnt.Changes,
|
||||
})
|
||||
if err != nil {
|
||||
log.Warn("failed to add changes from peer", zap.Error(err), zap.String("peerId", peerId))
|
||||
}
|
||||
}
|
||||
return prepareResponse(nil)
|
||||
}
|
||||
chs, err := s.tree.ChangesAfterCommonSnapshot(cnt.SnapshotPath, cnt.Heads)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return prepareResponse(chs)
|
||||
}
|
||||
|
||||
func (s *syncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, updater syncdeps.QueueSizeUpdater, send func(resp proto.Message) error) (syncdeps.Request, error) {
|
||||
req, ok := rq.(*objectmessages.Request)
|
||||
if !ok {
|
||||
|
|
|
@ -356,6 +356,11 @@ type mockTreeManager struct {
|
|||
waitLoad chan struct{}
|
||||
}
|
||||
|
||||
func (t *mockTreeManager) ValidateAndPutTree(ctx context.Context, spaceId string, payload treestorage.TreeStorageCreatePayload) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func newMockTreeManager(spaceId string) *mockTreeManager {
|
||||
return &mockTreeManager{
|
||||
spaceId: spaceId,
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"storj.io/drpc"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
|
||||
"github.com/anyproto/any-sync/util/multiqueue"
|
||||
)
|
||||
|
@ -19,6 +20,11 @@ type CounterSyncHandler struct {
|
|||
updateHandler *CounterUpdateHandler
|
||||
}
|
||||
|
||||
func (c *CounterSyncHandler) HandleDeprecatedObjectSync(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *CounterSyncHandler) ApplyRequest(ctx context.Context, rq syncdeps.Request, requestSender syncdeps.RequestSender) error {
|
||||
collector := NewCounterResponseCollector(c.counter)
|
||||
return requestSender.SendRequest(ctx, rq, collector)
|
||||
|
|
|
@ -2,11 +2,16 @@ package rpctest
|
|||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/anyproto/any-sync/net/peer"
|
||||
"github.com/anyproto/any-sync/net/rpc/rpctest/multiconntest"
|
||||
"github.com/anyproto/any-sync/net/secureservice"
|
||||
"github.com/anyproto/any-sync/net/transport"
|
||||
)
|
||||
|
||||
func MultiConnPair(peerIdServ, peerIdClient string) (serv, client transport.MultiConn) {
|
||||
return multiconntest.MultiConnPair(peer.CtxWithPeerId(context.Background(), peerIdServ), peer.CtxWithPeerId(context.Background(), peerIdClient))
|
||||
return multiconntest.MultiConnPair(
|
||||
peer.CtxWithProtoVersion(peer.CtxWithPeerId(context.Background(), peerIdServ), secureservice.ProtoVersion),
|
||||
peer.CtxWithProtoVersion(peer.CtxWithPeerId(context.Background(), peerIdClient), secureservice.ProtoVersion),
|
||||
)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue