1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Add requests and more stuff

This commit is contained in:
mcrakhman 2024-06-15 14:46:38 +02:00
parent ac08c01120
commit 4efac4a239
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
34 changed files with 263 additions and 1058 deletions

View file

@ -240,22 +240,5 @@ func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, peerId string, cl
return
}
d.log.InfoCtx(ctx, "space push completed successfully")
if e := d.subscribe(ctx, peerId); e != nil {
d.log.WarnCtx(ctx, "error subscribing for space", zap.Error(e))
}
return
}
func (d *diffSyncer) subscribe(ctx context.Context, peerId string) (err error) {
var msg = &spacesyncproto.SpaceSubscription{
SpaceIds: []string{d.spaceId},
Action: spacesyncproto.SpaceSubscriptionAction_Subscribe,
}
payload, err := msg.Marshal()
if err != nil {
return
}
return d.peerManager.SendPeer(ctx, peerId, &spacesyncproto.ObjectSyncMessage{
Payload: payload,
})
}

View file

@ -247,8 +247,6 @@ func TestDiffSyncer(t *testing.T) {
fx.clientMock.EXPECT().
SpacePush(gomock.Any(), newPushSpaceRequestMatcher(fx.spaceState.SpaceId, aclRootId, settingsId, credential, spaceHeader)).
Return(nil, nil)
fx.peerManagerMock.EXPECT().SendPeer(gomock.Any(), "peerId", gomock.Any())
require.NoError(t, fx.diffSyncer.Sync(ctx))
})

View file

@ -5,6 +5,7 @@
//
// mockgen -destination mock_list/mock_list.go github.com/anyproto/any-sync/commonspace/object/acl/list AclList
//
// Package mock_list is a generated GoMock package.
package mock_list

View file

@ -1,42 +1,26 @@
package syncacl
import (
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/sync/objectsync"
"github.com/anyproto/any-sync/consensus/consensusproto"
)
type Request struct {
peerId string
objectId string
spaceId string
head string
root *consensusproto.RawRecordWithId
type InnerRequest struct {
head string
root *consensusproto.RawRecordWithId
}
func NewRequest(peerId, objectId, spaceId, head string, root *consensusproto.RawRecordWithId) *Request {
return &Request{
peerId: peerId,
objectId: objectId,
spaceId: spaceId,
head: head,
root: root,
}
func NewRequest(peerId, objectId, spaceId, head string, root *consensusproto.RawRecordWithId) *objectsync.Request {
return objectsync.NewRequest(peerId, spaceId, objectId, &InnerRequest{
head: head,
root: root,
})
}
func (r *Request) PeerId() string {
return r.peerId
}
func (r *Request) ObjectId() string {
return r.objectId
}
func (r *Request) Proto() (proto.Message, error) {
func (r *InnerRequest) Marshall() ([]byte, error) {
req := &consensusproto.LogFullSyncRequest{
Head: r.head,
}
fullSync := consensusproto.WrapFullRequest(req, r.root)
return spacesyncproto.MarshallSyncMessage(fullSync, r.spaceId, r.objectId)
return fullSync.Marshal()
}

View file

@ -10,7 +10,7 @@ import (
type RequestFactory interface {
CreateHeadUpdate(l list.AclList, added []*consensusproto.RawRecordWithId) (headUpdate *objectsync.HeadUpdate)
CreateFullSyncRequest(peerId string, l list.AclList) *Request
CreateFullSyncRequest(peerId string, l list.AclList) *objectsync.Request
CreateFullSyncResponse(l list.AclList, theirHead string) (resp *Response, err error)
}
@ -36,7 +36,7 @@ func (r *requestFactory) CreateHeadUpdate(l list.AclList, added []*consensusprot
}
}
func (r *requestFactory) CreateFullSyncRequest(peerId string, l list.AclList) *Request {
func (r *requestFactory) CreateFullSyncRequest(peerId string, l list.AclList) *objectsync.Request {
return NewRequest(peerId, l.Id(), r.spaceId, l.Head().Id, l.Root())
}

View file

@ -30,6 +30,7 @@ var (
type SyncAcl interface {
app.ComponentRunnable
list.AclList
syncdeps.ObjectSyncHandler
SetHeadUpdater(updater headupdater.HeadUpdater)
SyncWithPeer(ctx context.Context, peerId string) (err error)
SetAclUpdater(updater headupdater.AclUpdater)

View file

@ -73,16 +73,25 @@ func (s *syncAclHandler) HandleHeadUpdate(ctx context.Context, headUpdate drpc.M
}
func (s *syncAclHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, send func(resp proto.Message) error) (syncdeps.Request, error) {
req, ok := rq.(*Request)
req, ok := rq.(*objectsync.Request)
if !ok {
return nil, ErrUnexpectedRequestType
}
s.aclList.Lock()
if !s.aclList.HasHead(req.head) {
s.aclList.Unlock()
return s.syncClient.CreateFullSyncRequest(req.peerId, s.aclList), nil
syncMsg := &consensusproto.LogSyncMessage{}
err := proto.Unmarshal(req.Bytes, syncMsg)
if err != nil {
return nil, err
}
resp, err := s.syncClient.CreateFullSyncResponse(s.aclList, req.head)
request := syncMsg.GetContent().GetFullSyncRequest()
if request == nil {
return nil, ErrUnexpectedRequestType
}
s.aclList.Lock()
if !s.aclList.HasHead(request.Head) {
s.aclList.Unlock()
return s.syncClient.CreateFullSyncRequest(req.PeerId(), s.aclList), nil
}
resp, err := s.syncClient.CreateFullSyncResponse(s.aclList, request.Head)
if err != nil {
s.aclList.Unlock()
return nil, err

View file

@ -2,12 +2,13 @@ package syncobjectgetter
import (
"context"
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
)
type SyncObject interface {
Id() string
synchandler.SyncHandler
syncdeps.ObjectSyncHandler
}
type SyncObjectGetter interface {

View file

@ -117,6 +117,21 @@ func (mr *MockObjectTreeMockRecorder) ChangesAfterCommonSnapshot(arg0, arg1 any)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChangesAfterCommonSnapshot", reflect.TypeOf((*MockObjectTree)(nil).ChangesAfterCommonSnapshot), arg0, arg1)
}
// ChangesAfterCommonSnapshotLoader mocks base method.
func (m *MockObjectTree) ChangesAfterCommonSnapshotLoader(arg0, arg1 []string) (objecttree.LoadIterator, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChangesAfterCommonSnapshotLoader", arg0, arg1)
ret0, _ := ret[0].(objecttree.LoadIterator)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ChangesAfterCommonSnapshotLoader indicates an expected call of ChangesAfterCommonSnapshotLoader.
func (mr *MockObjectTreeMockRecorder) ChangesAfterCommonSnapshotLoader(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChangesAfterCommonSnapshotLoader", reflect.TypeOf((*MockObjectTree)(nil).ChangesAfterCommonSnapshotLoader), arg0, arg1)
}
// Close mocks base method.
func (m *MockObjectTree) Close() error {
m.ctrl.T.Helper()

View file

@ -1,45 +1,29 @@
package synctree
import (
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/sync/objectsync"
)
type Request struct {
peerId string
spaceId string
objectId string
type InnerRequest struct {
heads []string
snapshotPath []string
root *treechangeproto.RawTreeChangeWithId
}
func NewRequest(peerId, spaceId, objectId string, heads []string, snapshotPath []string, root *treechangeproto.RawTreeChangeWithId) *Request {
return &Request{
peerId: peerId,
spaceId: spaceId,
objectId: objectId,
func NewRequest(peerId, spaceId, objectId string, heads []string, snapshotPath []string, root *treechangeproto.RawTreeChangeWithId) *objectsync.Request {
return objectsync.NewRequest(peerId, spaceId, objectId, &InnerRequest{
heads: heads,
snapshotPath: snapshotPath,
root: root,
}
})
}
func (r *Request) PeerId() string {
return r.peerId
}
func (r *Request) ObjectId() string {
return r.root.Id
}
func (r *Request) Proto() (proto.Message, error) {
func (r *InnerRequest) Marshall() ([]byte, error) {
msg := &treechangeproto.TreeFullSyncRequest{
Heads: r.heads,
SnapshotPath: r.snapshotPath,
}
req := treechangeproto.WrapFullRequest(msg, r.root)
return spacesyncproto.MarshallSyncMessage(req, r.spaceId, r.objectId)
return req.Marshal()
}

View file

@ -10,8 +10,8 @@ const batchSize = 1024 * 1024 * 10
type RequestFactory interface {
CreateHeadUpdate(t objecttree.ObjectTree, ignoredPeer string, added []*treechangeproto.RawTreeChangeWithId) (headUpdate *objectsync.HeadUpdate)
CreateNewTreeRequest(peerId, objectId string) *Request
CreateFullSyncRequest(peerId string, t objecttree.ObjectTree) *Request
CreateNewTreeRequest(peerId, objectId string) *objectsync.Request
CreateFullSyncRequest(peerId string, t objecttree.ObjectTree) *objectsync.Request
CreateResponseProducer(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (ResponseProducer, error)
}
@ -43,11 +43,11 @@ func (r *requestFactory) CreateHeadUpdate(t objecttree.ObjectTree, ignoredPeer s
}
}
func (r *requestFactory) CreateNewTreeRequest(peerId, objectId string) *Request {
func (r *requestFactory) CreateNewTreeRequest(peerId, objectId string) *objectsync.Request {
return NewRequest(peerId, r.spaceId, objectId, nil, nil, nil)
}
func (r *requestFactory) CreateFullSyncRequest(peerId string, t objecttree.ObjectTree) *Request {
func (r *requestFactory) CreateFullSyncRequest(peerId string, t objecttree.ObjectTree) *objectsync.Request {
return NewRequest(peerId, r.spaceId, t.Id(), t.Heads(), t.SnapshotPath(), t.Header())
}

View file

@ -76,13 +76,22 @@ func (s *syncHandler) HandleHeadUpdate(ctx context.Context, headUpdate drpc.Mess
}
func (s *syncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, send func(resp proto.Message) error) (syncdeps.Request, error) {
req, ok := rq.(*Request)
req, ok := rq.(*objectsync.Request)
if !ok {
return nil, ErrUnexpectedRequestType
}
treeSyncMsg := &treechangeproto.TreeSyncMessage{}
err := proto.Unmarshal(req.Bytes, treeSyncMsg)
if err != nil {
return nil, err
}
request := treeSyncMsg.GetContent().GetFullSyncRequest()
if request == nil {
return nil, ErrUnexpectedRequestType
}
s.tree.Lock()
curHeads := s.tree.Heads()
producer, err := newResponseProducer(s.spaceId, s.tree, req.heads, req.snapshotPath)
producer, err := newResponseProducer(s.spaceId, s.tree, request.Heads, request.SnapshotPath)
if err != nil {
s.tree.Unlock()
return nil, err
@ -106,7 +115,7 @@ func (s *syncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Reque
}
s.tree.Lock()
}
if !slice.UnsortedEquals(curHeads, req.heads) {
if !slice.UnsortedEquals(curHeads, request.Heads) {
return s.syncClient.CreateFullSyncRequest(rq.PeerId(), s.tree), nil
}
return nil, nil

View file

@ -13,7 +13,6 @@ const CName = "common.object.treemanager"
type TreeManager interface {
app.ComponentRunnable
GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error)
PickTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error)
MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error
DeleteTree(ctx context.Context, spaceId, treeId string) error
}

View file

@ -5,6 +5,7 @@
//
// mockgen -destination mock_treesyncer/mock_treesyncer.go github.com/anyproto/any-sync/commonspace/object/treesyncer TreeSyncer
//
// Package mock_treesyncer is a generated GoMock package.
package mock_treesyncer

View file

@ -1,157 +0,0 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anyproto/any-sync/commonspace/objectsync (interfaces: ObjectSync)
//
// Generated by this command:
//
// mockgen -destination mock_objectsync/mock_objectsync.go github.com/anyproto/any-sync/commonspace/objectsync ObjectSync
//
// Package mock_objectsync is a generated GoMock package.
package mock_objectsync
import (
context "context"
reflect "reflect"
time "time"
app "github.com/anyproto/any-sync/app"
objectsync "github.com/anyproto/any-sync/commonspace/objectsync"
spacesyncproto "github.com/anyproto/any-sync/commonspace/spacesyncproto"
gomock "go.uber.org/mock/gomock"
)
// MockObjectSync is a mock of ObjectSync interface.
type MockObjectSync struct {
ctrl *gomock.Controller
recorder *MockObjectSyncMockRecorder
}
// MockObjectSyncMockRecorder is the mock recorder for MockObjectSync.
type MockObjectSyncMockRecorder struct {
mock *MockObjectSync
}
// NewMockObjectSync creates a new mock instance.
func NewMockObjectSync(ctrl *gomock.Controller) *MockObjectSync {
mock := &MockObjectSync{ctrl: ctrl}
mock.recorder = &MockObjectSyncMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockObjectSync) EXPECT() *MockObjectSyncMockRecorder {
return m.recorder
}
// Close mocks base method.
func (m *MockObjectSync) Close(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockObjectSyncMockRecorder) Close(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockObjectSync)(nil).Close), arg0)
}
// CloseThread mocks base method.
func (m *MockObjectSync) CloseThread(arg0 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CloseThread", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// CloseThread indicates an expected call of CloseThread.
func (mr *MockObjectSyncMockRecorder) CloseThread(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseThread", reflect.TypeOf((*MockObjectSync)(nil).CloseThread), arg0)
}
// HandleMessage mocks base method.
func (m *MockObjectSync) HandleMessage(arg0 context.Context, arg1 objectsync.HandleMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HandleMessage", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// HandleMessage indicates an expected call of HandleMessage.
func (mr *MockObjectSyncMockRecorder) HandleMessage(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleMessage", reflect.TypeOf((*MockObjectSync)(nil).HandleMessage), arg0, arg1)
}
// HandleRequest mocks base method.
func (m *MockObjectSync) HandleRequest(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) (*spacesyncproto.ObjectSyncMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HandleRequest", arg0, arg1)
ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// HandleRequest indicates an expected call of HandleRequest.
func (mr *MockObjectSyncMockRecorder) HandleRequest(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleRequest", reflect.TypeOf((*MockObjectSync)(nil).HandleRequest), arg0, arg1)
}
// Init mocks base method.
func (m *MockObjectSync) Init(arg0 *app.App) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Init", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Init indicates an expected call of Init.
func (mr *MockObjectSyncMockRecorder) Init(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockObjectSync)(nil).Init), arg0)
}
// LastUsage mocks base method.
func (m *MockObjectSync) LastUsage() time.Time {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "LastUsage")
ret0, _ := ret[0].(time.Time)
return ret0
}
// LastUsage indicates an expected call of LastUsage.
func (mr *MockObjectSyncMockRecorder) LastUsage() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastUsage", reflect.TypeOf((*MockObjectSync)(nil).LastUsage))
}
// Name mocks base method.
func (m *MockObjectSync) Name() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Name")
ret0, _ := ret[0].(string)
return ret0
}
// Name indicates an expected call of Name.
func (mr *MockObjectSyncMockRecorder) Name() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockObjectSync)(nil).Name))
}
// Run mocks base method.
func (m *MockObjectSync) Run(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Run", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Run indicates an expected call of Run.
func (mr *MockObjectSyncMockRecorder) Run(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockObjectSync)(nil).Run), arg0)
}

View file

@ -1,215 +0,0 @@
//go:generate mockgen -destination mock_objectsync/mock_objectsync.go github.com/anyproto/any-sync/commonspace/objectsync ObjectSync
package objectsync
import (
"context"
"fmt"
"time"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/util/multiqueue"
"github.com/cheggaaa/mb/v3"
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/object/syncobjectgetter"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/nodeconf"
"go.uber.org/zap"
)
const CName = "common.commonspace.objectsync"
var log = logger.NewNamed(CName)
type ObjectSync interface {
LastUsage() time.Time
HandleMessage(ctx context.Context, hm HandleMessage) (err error)
HandleRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error)
CloseThread(id string) (err error)
app.ComponentRunnable
}
type HandleMessage struct {
Id uint64
ReceiveTime time.Time
StartHandlingTime time.Time
Deadline time.Time
SenderId string
Message *spacesyncproto.ObjectSyncMessage
PeerCtx context.Context
}
func (m HandleMessage) LogFields(fields ...zap.Field) []zap.Field {
return append(fields,
metric.SpaceId(m.Message.SpaceId),
metric.ObjectId(m.Message.ObjectId),
metric.QueueDur(m.StartHandlingTime.Sub(m.ReceiveTime)),
metric.TotalDur(time.Since(m.ReceiveTime)),
)
}
type objectSync struct {
spaceId string
objectGetter syncobjectgetter.SyncObjectGetter
configuration nodeconf.NodeConf
spaceStorage spacestorage.SpaceStorage
metric metric.Metric
handleQueue multiqueue.MultiQueue[HandleMessage]
}
func (s *objectSync) Init(a *app.App) (err error) {
s.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
s.objectGetter = a.MustComponent(treemanager.CName).(syncobjectgetter.SyncObjectGetter)
s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
sharedData := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
mc := a.Component(metric.CName)
if mc != nil {
s.metric = mc.(metric.Metric)
}
s.spaceId = sharedData.SpaceId
s.handleQueue = multiqueue.New[HandleMessage](s.processHandleMessage, 30)
return nil
}
func (s *objectSync) Name() (name string) {
return CName
}
func (s *objectSync) Run(ctx context.Context) (err error) {
return nil
}
func (s *objectSync) Close(ctx context.Context) (err error) {
return s.handleQueue.Close()
}
func New() ObjectSync {
return &objectSync{}
}
func (s *objectSync) LastUsage() time.Time {
// TODO: add time
return time.Time{}
}
func (s *objectSync) HandleRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
return nil, err
}
return s.handleRequest(ctx, peerId, req)
}
func (s *objectSync) HandleMessage(ctx context.Context, hm HandleMessage) (err error) {
threadId := hm.Message.ObjectId
hm.ReceiveTime = time.Now()
if hm.PeerCtx == nil {
hm.PeerCtx = ctx
}
err = s.handleQueue.Add(ctx, threadId, hm)
if err == mb.ErrOverflowed {
log.InfoCtx(ctx, "queue overflowed", zap.String("spaceId", s.spaceId), zap.String("objectId", threadId))
// skip overflowed error
return nil
}
return
}
func (s *objectSync) processHandleMessage(msg HandleMessage) {
var err error
msg.StartHandlingTime = time.Now()
ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId)
ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId))
defer func() {
if s.metric == nil {
return
}
s.metric.RequestLog(msg.PeerCtx, "space.streamOp", msg.LogFields(
zap.Error(err),
)...)
}()
if !msg.Deadline.IsZero() {
now := time.Now()
if now.After(msg.Deadline) {
log.InfoCtx(ctx, "skip message: deadline exceed")
err = context.DeadlineExceeded
return
}
}
if err = s.handleMessage(ctx, msg.SenderId, msg.Message); err != nil {
if msg.Message.ObjectId != "" {
// cleanup thread on error
_ = s.handleQueue.CloseThread(msg.Message.ObjectId)
}
log.InfoCtx(ctx, "handleMessage error", zap.Error(err))
}
}
func (s *objectSync) handleRequest(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
log := log.With(zap.String("objectId", msg.ObjectId))
err = s.checkEmptyFullSync(log, msg)
if err != nil {
return nil, err
}
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
if err != nil {
return nil, treechangeproto.ErrGetTree
}
return obj.HandleRequest(ctx, senderId, msg)
}
func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
log := log.With(zap.String("objectId", msg.ObjectId))
err = s.checkEmptyFullSync(log, msg)
if err != nil {
return err
}
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
if err != nil {
return fmt.Errorf("failed to get object from cache: %w", err)
}
err = obj.HandleMessage(ctx, senderId, msg)
if err != nil {
return fmt.Errorf("failed to handle message: %w", err)
}
return
}
func (s *objectSync) CloseThread(id string) (err error) {
return s.handleQueue.CloseThread(id)
}
func (s *objectSync) checkEmptyFullSync(log logger.CtxLogger, msg *spacesyncproto.ObjectSyncMessage) (err error) {
hasTree, err := s.spaceStorage.HasTree(msg.ObjectId)
if err != nil {
log.Warn("failed to execute get operation on storage has tree", zap.Error(err))
return spacesyncproto.ErrUnexpected
}
// in this case we will try to get it from remote, unless the sender also sent us the same request :-)
if !hasTree {
treeMsg := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(msg.Payload, treeMsg)
if err != nil {
return nil
}
// this means that we don't have the tree locally and therefore can't return it
if s.isEmptyFullSyncRequest(treeMsg) {
return treechangeproto.ErrGetTree
}
}
return
}
func (s *objectSync) isEmptyFullSyncRequest(msg *treechangeproto.TreeSyncMessage) bool {
return msg.GetContent().GetFullSyncRequest() != nil && len(msg.GetContent().GetFullSyncRequest().GetHeads()) == 0
}

View file

@ -1,11 +0,0 @@
package synchandler
import (
"context"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
type SyncHandler interface {
HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error)
HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error)
}

View file

@ -14,9 +14,10 @@ import (
reflect "reflect"
app "github.com/anyproto/any-sync/app"
spacesyncproto "github.com/anyproto/any-sync/commonspace/spacesyncproto"
peer "github.com/anyproto/any-sync/net/peer"
streampool "github.com/anyproto/any-sync/net/streampool"
gomock "go.uber.org/mock/gomock"
drpc "storj.io/drpc"
)
// MockPeerManager is a mock of PeerManager interface.
@ -42,18 +43,18 @@ func (m *MockPeerManager) EXPECT() *MockPeerManagerMockRecorder {
return m.recorder
}
// Broadcast mocks base method.
func (m *MockPeerManager) Broadcast(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) error {
// BroadcastMessage mocks base method.
func (m *MockPeerManager) BroadcastMessage(arg0 context.Context, arg1 drpc.Message, arg2 streampool.StreamPool) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Broadcast", arg0, arg1)
ret := m.ctrl.Call(m, "BroadcastMessage", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// Broadcast indicates an expected call of Broadcast.
func (mr *MockPeerManagerMockRecorder) Broadcast(arg0, arg1 any) *gomock.Call {
// BroadcastMessage indicates an expected call of BroadcastMessage.
func (mr *MockPeerManagerMockRecorder) BroadcastMessage(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockPeerManager)(nil).Broadcast), arg0, arg1)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastMessage", reflect.TypeOf((*MockPeerManager)(nil).BroadcastMessage), arg0, arg1, arg2)
}
// GetNodePeers mocks base method.
@ -113,17 +114,3 @@ func (mr *MockPeerManagerMockRecorder) Name() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockPeerManager)(nil).Name))
}
// SendPeer mocks base method.
func (m *MockPeerManager) SendPeer(arg0 context.Context, arg1 string, arg2 *spacesyncproto.ObjectSyncMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendPeer", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// SendPeer indicates an expected call of SendPeer.
func (mr *MockPeerManagerMockRecorder) SendPeer(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendPeer", reflect.TypeOf((*MockPeerManager)(nil).SendPeer), arg0, arg1, arg2)
}

View file

@ -1,170 +0,0 @@
package requestmanager
import (
"context"
"sync"
"go.uber.org/zap"
"storj.io/drpc"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/debugstat"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/net/rpc/rpcerr"
)
const CName = "common.commonspace.requestmanager"
var log = logger.NewNamed(CName)
type RequestManager interface {
app.ComponentRunnable
SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
}
func New() RequestManager {
return &requestManager{
workers: 10,
queueSize: 300,
pools: map[string]*requestPool{},
}
}
type MessageHandler interface {
HandleMessage(ctx context.Context, hm objectsync.HandleMessage) (err error)
}
type requestManager struct {
sync.Mutex
pools map[string]*requestPool
peerPool pool.Pool
workers int
queueSize int
handler MessageHandler
ctx context.Context
cancel context.CancelFunc
clientFactory spacesyncproto.ClientFactory
statService debugstat.StatService
reqStat *requestStat
spaceId string
}
func (r *requestManager) AggregateStat(stats []debugstat.StatValue) any {
return r.reqStat.Aggregate(stats)
}
func (r *requestManager) ProvideStat() any {
return r.reqStat.QueueStat()
}
func (r *requestManager) StatId() string {
return r.spaceId
}
func (r *requestManager) StatType() string {
return CName
}
func (r *requestManager) Init(a *app.App) (err error) {
r.ctx, r.cancel = context.WithCancel(context.Background())
spaceState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
r.statService, _ = a.Component(debugstat.CName).(debugstat.StatService)
if r.statService == nil {
r.statService = debugstat.NewNoOp()
}
r.reqStat = newRequestStat(spaceState.SpaceId)
r.spaceId = spaceState.SpaceId
r.handler = a.MustComponent(objectsync.CName).(MessageHandler)
r.peerPool = a.MustComponent(pool.CName).(pool.Pool)
r.clientFactory = spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
return
}
func (r *requestManager) Name() (name string) {
return CName
}
func (r *requestManager) Run(ctx context.Context) (err error) {
r.statService.AddProvider(r)
return nil
}
func (r *requestManager) Close(ctx context.Context) (err error) {
r.Lock()
defer r.Unlock()
r.cancel()
r.statService.RemoveProvider(r)
for _, p := range r.pools {
_ = p.Close()
}
return nil
}
func (r *requestManager) SendRequest(ctx context.Context, peerId string, req *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
// TODO: limit concurrent sends?
r.reqStat.AddSyncRequest(peerId, req)
defer func() {
r.reqStat.RemoveSyncRequest(peerId, req)
}()
return r.doRequest(ctx, peerId, req)
}
func (r *requestManager) QueueRequest(peerId string, req *spacesyncproto.ObjectSyncMessage) (err error) {
r.Lock()
defer r.Unlock()
pl, exists := r.pools[peerId]
if !exists {
pl = newRequestPool(r.workers, r.queueSize)
r.pools[peerId] = pl
pl.Run()
}
r.reqStat.AddQueueRequest(peerId, req)
// TODO: for later think when many clients are there,
// we need to close pools for inactive clients
return pl.TryAdd(
req.ObjectId,
func() {
doRequestAndHandle(r, peerId, req)
},
func() {
r.reqStat.RemoveQueueRequest(peerId, req)
},
)
}
var doRequestAndHandle = (*requestManager).requestAndHandle
func (r *requestManager) requestAndHandle(peerId string, req *spacesyncproto.ObjectSyncMessage) {
ctx := r.ctx
resp, err := r.doRequest(ctx, peerId, req)
if err != nil {
log.Warn("failed to send request", zap.Error(err))
return
}
ctx = peer.CtxWithPeerId(ctx, peerId)
_ = r.handler.HandleMessage(ctx, objectsync.HandleMessage{
SenderId: peerId,
Message: resp,
PeerCtx: ctx,
})
}
func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
pr, err := r.peerPool.Get(ctx, peerId)
if err != nil {
return
}
err = pr.DoDrpc(ctx, func(conn drpc.Conn) error {
cl := r.clientFactory.Client(conn)
resp, err = cl.ObjectSync(ctx, msg)
return err
})
err = rpcerr.Unwrap(err)
return
}

View file

@ -1,195 +0,0 @@
package requestmanager
import (
"context"
"sync"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
"github.com/anyproto/any-sync/app/debugstat/mock_debugstat"
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto/mock_spacesyncproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/peer/mock_peer"
"github.com/anyproto/any-sync/net/pool/mock_pool"
)
type fixture struct {
requestManager *requestManager
messageHandlerMock *mock_objectsync.MockObjectSync
peerPoolMock *mock_pool.MockPool
clientMock *mock_spacesyncproto.MockDRPCSpaceSyncClient
statMock *mock_debugstat.MockStatService
ctrl *gomock.Controller
}
func newFixture(t *testing.T) *fixture {
ctrl := gomock.NewController(t)
manager := New().(*requestManager)
peerPoolMock := mock_pool.NewMockPool(ctrl)
messageHandlerMock := mock_objectsync.NewMockObjectSync(ctrl)
clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl)
statMock := mock_debugstat.NewMockStatService(ctrl)
manager.peerPool = peerPoolMock
manager.handler = messageHandlerMock
manager.clientFactory = spacesyncproto.ClientFactoryFunc(func(cc drpc.Conn) spacesyncproto.DRPCSpaceSyncClient {
return clientMock
})
manager.statService = statMock
manager.reqStat = newRequestStat("spaceId")
manager.ctx, manager.cancel = context.WithCancel(context.Background())
return &fixture{
requestManager: manager,
messageHandlerMock: messageHandlerMock,
peerPoolMock: peerPoolMock,
clientMock: clientMock,
statMock: statMock,
ctrl: ctrl,
}
}
func (fx *fixture) stop() {
fx.ctrl.Finish()
}
func TestRequestManager_SyncRequest(t *testing.T) {
ctx := context.Background()
t.Run("send request", func(t *testing.T) {
fx := newFixture(t)
defer fx.stop()
peerId := "PeerId"
peerMock := mock_peer.NewMockPeer(fx.ctrl)
conn := &drpcconn.Conn{}
msg := &spacesyncproto.ObjectSyncMessage{}
resp := &spacesyncproto.ObjectSyncMessage{}
fx.peerPoolMock.EXPECT().Get(ctx, peerId).Return(peerMock, nil)
fx.clientMock.EXPECT().ObjectSync(ctx, msg).Return(resp, nil)
peerMock.EXPECT().DoDrpc(ctx, gomock.Any()).DoAndReturn(func(ctx context.Context, drpcHandler func(conn drpc.Conn) error) {
drpcHandler(conn)
}).Return(nil)
res, err := fx.requestManager.SendRequest(ctx, peerId, msg)
require.NoError(t, err)
require.Equal(t, resp, res)
})
t.Run("request and handle", func(t *testing.T) {
fx := newFixture(t)
defer fx.stop()
ctx = fx.requestManager.ctx
peerId := "PeerId"
peerMock := mock_peer.NewMockPeer(fx.ctrl)
conn := &drpcconn.Conn{}
msg := &spacesyncproto.ObjectSyncMessage{}
resp := &spacesyncproto.ObjectSyncMessage{}
fx.peerPoolMock.EXPECT().Get(ctx, peerId).Return(peerMock, nil)
fx.clientMock.EXPECT().ObjectSync(ctx, msg).Return(resp, nil)
peerMock.EXPECT().DoDrpc(ctx, gomock.Any()).DoAndReturn(func(ctx context.Context, drpcHandler func(conn drpc.Conn) error) {
drpcHandler(conn)
}).Return(nil)
fx.messageHandlerMock.EXPECT().HandleMessage(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, msg objectsync.HandleMessage) {
require.Equal(t, peerId, msg.SenderId)
require.Equal(t, resp, msg.Message)
pId, _ := peer.CtxPeerId(msg.PeerCtx)
require.Equal(t, peerId, pId)
}).Return(nil)
fx.requestManager.requestAndHandle(peerId, msg)
})
}
func TestRequestManager_QueueRequest(t *testing.T) {
t.Run("max concurrent reqs for peer, independent reqs for other peer", func(t *testing.T) {
// testing 2 concurrent requests to one peer and simultaneous to another peer
fx := newFixture(t)
defer fx.stop()
fx.requestManager.workers = 2
msgRelease := make(chan struct{})
msgWait := make(chan struct{})
msgs := sync.Map{}
doRequestAndHandle = func(manager *requestManager, peerId string, req *spacesyncproto.ObjectSyncMessage) {
msgs.Store(req.ObjectId, struct{}{})
<-msgWait
<-msgRelease
}
otherPeer := "otherPeer"
msg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id1"}
msg2 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id2"}
msg3 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id3"}
otherMsg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "otherId1"}
// sending requests to first peer
peerId := "PeerId"
err := fx.requestManager.QueueRequest(peerId, msg1)
require.NoError(t, err)
err = fx.requestManager.QueueRequest(peerId, msg2)
require.NoError(t, err)
err = fx.requestManager.QueueRequest(peerId, msg3)
require.NoError(t, err)
// waiting until all the messages are loaded
msgWait <- struct{}{}
msgWait <- struct{}{}
_, ok := msgs.Load("id1")
require.True(t, ok)
_, ok = msgs.Load("id2")
require.True(t, ok)
// third message should not be read
_, ok = msgs.Load("id3")
require.False(t, ok)
// request for other peer should pass
err = fx.requestManager.QueueRequest(otherPeer, otherMsg1)
require.NoError(t, err)
msgWait <- struct{}{}
_, ok = msgs.Load("otherId1")
require.True(t, ok)
close(msgRelease)
fx.statMock.EXPECT().RemoveProvider(gomock.Any())
fx.requestManager.Close(context.Background())
})
t.Run("no requests after close", func(t *testing.T) {
fx := newFixture(t)
defer fx.stop()
fx.requestManager.workers = 1
msgRelease := make(chan struct{})
msgWait := make(chan struct{})
msgs := sync.Map{}
doRequestAndHandle = func(manager *requestManager, peerId string, req *spacesyncproto.ObjectSyncMessage) {
msgs.Store(req.ObjectId, struct{}{})
<-msgWait
<-msgRelease
}
msg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id1"}
msg2 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id2"}
// sending requests to first peer
peerId := "PeerId"
err := fx.requestManager.QueueRequest(peerId, msg1)
require.NoError(t, err)
err = fx.requestManager.QueueRequest(peerId, msg2)
require.NoError(t, err)
// waiting until all the message is loaded
msgWait <- struct{}{}
_, ok := msgs.Load("id1")
require.True(t, ok)
_, ok = msgs.Load("id2")
require.False(t, ok)
fx.statMock.EXPECT().RemoveProvider(gomock.Any())
fx.requestManager.Close(context.Background())
close(msgRelease)
_, ok = msgs.Load("id2")
require.False(t, ok)
})
}

View file

@ -1,98 +0,0 @@
package requestmanager
import (
"context"
"sync"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
)
type entry struct {
call func()
onRemove func()
}
// newRequestPool creates new requestPool
// workers - how many processes will execute tasks
// maxSize - limit for queue size
func newRequestPool(workers, maxSize int) *requestPool {
ctx, cancel := context.WithCancel(context.Background())
ss := &requestPool{
ctx: ctx,
cancel: cancel,
workers: workers,
batch: mb.New[string](maxSize),
entries: map[string]entry{},
}
return ss
}
// requestPool needed for parallel execution of the incoming send tasks
type requestPool struct {
ctx context.Context
cancel context.CancelFunc
workers int
entries map[string]entry
batch *mb.MB[string]
mx sync.Mutex
}
func (rp *requestPool) TryAdd(id string, call, remove func()) (err error) {
rp.mx.Lock()
if prevEntry, ok := rp.entries[id]; ok {
rp.entries[id] = entry{
call: call,
onRemove: remove,
}
rp.mx.Unlock()
prevEntry.onRemove()
return
}
rp.entries[id] = entry{
call: call,
onRemove: remove,
}
rp.mx.Unlock()
err = rp.batch.TryAdd(id)
if err != nil {
rp.mx.Lock()
curEntry := rp.entries[id]
delete(rp.entries, id)
rp.mx.Unlock()
if curEntry.onRemove != nil {
curEntry.onRemove()
}
}
return
}
func (rp *requestPool) Run() {
for i := 0; i < rp.workers; i++ {
go rp.sendLoop()
}
}
func (rp *requestPool) sendLoop() {
for {
id, err := rp.batch.WaitOne(rp.ctx)
if err != nil {
log.Debug("close send loop", zap.Error(err))
return
}
rp.mx.Lock()
curEntry := rp.entries[id]
delete(rp.entries, id)
rp.mx.Unlock()
if curEntry.call != nil {
curEntry.call()
curEntry.onRemove()
}
}
}
func (rp *requestPool) Close() (err error) {
rp.cancel()
return rp.batch.Close()
}

View file

@ -1,80 +0,0 @@
package requestmanager
import (
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestRequestPool_Add(t *testing.T) {
t.Run("add many same key", func(t *testing.T) {
removed := atomic.Int32{}
firstVal := atomic.Int32{}
secondVal := atomic.Int32{}
ch := make(chan struct{})
setBlock := func() {
<-ch
}
onRemove := func() {
removed.Add(1)
}
rp := newRequestPool(2, 2)
err := rp.TryAdd("1", setBlock, onRemove)
require.NoError(t, err)
err = rp.TryAdd("2", setBlock, onRemove)
require.NoError(t, err)
rp.Run()
// wait until setBlock is executed
time.Sleep(time.Millisecond * 100)
lenReq := 100
for i := 0; i < lenReq; i++ {
tmp := i
err := rp.TryAdd("1", func() {
firstVal.Store(int32(tmp))
}, onRemove)
require.NoError(t, err)
err = rp.TryAdd("2", func() {
secondVal.Store(int32(tmp))
}, onRemove)
require.NoError(t, err)
}
close(ch)
// wait until rp runs all tasks
time.Sleep(time.Millisecond * 100)
rp.Close()
require.Equal(t, int32(lenReq-1), firstVal.Load())
require.Equal(t, int32(lenReq-1), secondVal.Load())
require.Equal(t, int32(2*lenReq+2), removed.Load())
})
t.Run("error does not save func", func(t *testing.T) {
removed := atomic.Int32{}
firstVal := atomic.Int32{}
secondVal := atomic.Int32{}
thirdVal := atomic.Int32{}
onRemove := func() {
removed.Add(1)
}
rp := newRequestPool(2, 2)
err := rp.TryAdd("1", func() {
firstVal.Store(1)
}, onRemove)
require.NoError(t, err)
err = rp.TryAdd("2", func() {
secondVal.Store(2)
}, onRemove)
require.NoError(t, err)
err = rp.TryAdd("3", func() {
thirdVal.Store(3)
}, onRemove)
require.Error(t, err)
require.Empty(t, rp.entries["3"])
rp.Run()
time.Sleep(100 * time.Millisecond)
rp.Close()
require.Equal(t, int32(3), removed.Load())
require.Equal(t, int32(1), firstVal.Load())
require.Equal(t, int32(2), secondVal.Load())
})
}

View file

@ -8,6 +8,7 @@ import (
"time"
"go.uber.org/zap"
"storj.io/drpc"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/acl/aclclient"
@ -15,13 +16,14 @@ import (
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
"github.com/anyproto/any-sync/commonspace/object/treesyncer"
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/settings"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
syncservice "github.com/anyproto/any-sync/commonspace/sync"
"github.com/anyproto/any-sync/commonspace/sync/objectsync"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/util/crypto"
@ -83,8 +85,7 @@ type Space interface {
DeleteTree(ctx context.Context, id string) (err error)
GetNodePeers(ctx context.Context) (peer []peer.Peer, err error)
HandleMessage(ctx context.Context, msg objectsync.HandleMessage) (err error)
HandleSyncRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error)
HandleStreamSyncRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage, stream drpc.Stream) (err error)
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
TryClose(objectTTL time.Duration) (close bool, err error)
@ -102,7 +103,7 @@ type space struct {
treeSyncer treesyncer.TreeSyncer
peerManager peermanager.PeerManager
headSync headsync.HeadSync
objectSync objectsync.ObjectSync
syncService syncservice.SyncService
syncStatus syncstatus.StatusService
settings settings.Settings
storage spacestorage.SpaceStorage
@ -143,12 +144,13 @@ func (s *space) DeleteTree(ctx context.Context, id string) (err error) {
return s.settings.DeleteTree(ctx, id)
}
func (s *space) HandleMessage(ctx context.Context, msg objectsync.HandleMessage) (err error) {
return s.objectSync.HandleMessage(ctx, msg)
}
func (s *space) HandleSyncRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
return s.objectSync.HandleRequest(ctx, req)
func (s *space) HandleStreamSyncRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage, stream drpc.Stream) (err error) {
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
return
}
objSyncReq := objectsync.NewByteRequest(peerId, req.SpaceId, req.ObjectId, req.Payload)
return s.syncService.HandleStreamRequest(ctx, objSyncReq, stream)
}
func (s *space) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
@ -188,7 +190,7 @@ func (s *space) Init(ctx context.Context) (err error) {
s.headSync = s.app.MustComponent(headsync.CName).(headsync.HeadSync)
s.syncStatus = s.app.MustComponent(syncstatus.CName).(syncstatus.StatusService)
s.settings = s.app.MustComponent(settings.CName).(settings.Settings)
s.objectSync = s.app.MustComponent(objectsync.CName).(objectsync.ObjectSync)
s.syncService = s.app.MustComponent(syncservice.CName).(syncservice.SyncService)
s.storage = s.app.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
s.peerManager = s.app.MustComponent(peermanager.CName).(peermanager.PeerManager)
s.aclList = s.app.MustComponent(syncacl.CName).(list.AclList)
@ -220,9 +222,6 @@ func (s *space) Close() error {
}
func (s *space) TryClose(objectTTL time.Duration) (close bool, err error) {
if time.Now().Sub(s.objectSync.LastUsage()) < objectTTL {
return false, nil
}
locked := s.state.TreesUsed.Load() > 1
log.With(zap.Int32("trees used", s.state.TreesUsed.Load()), zap.Bool("locked", locked), zap.String("spaceId", s.state.SpaceId)).Debug("space lock status check")
if locked {

View file

@ -12,8 +12,10 @@ import (
"github.com/anyproto/any-sync/commonspace/acl/aclclient"
"github.com/anyproto/any-sync/commonspace/deletionmanager"
"github.com/anyproto/any-sync/commonspace/object/treesyncer"
"github.com/anyproto/any-sync/commonspace/sync"
"github.com/anyproto/any-sync/net"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/streampool"
"storj.io/drpc"
@ -29,10 +31,8 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/objectmanager"
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/requestmanager"
"github.com/anyproto/any-sync/commonspace/settings"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacestorage"
@ -180,17 +180,17 @@ func (s *spaceService) NewSpace(ctx context.Context, id string, deps Deps) (Spac
spaceApp := s.app.ChildApp()
spaceApp.Register(state).
Register(peerManager).
Register(streampool.NewStreamPool()).
Register(newCommonStorage(st)).
Register(statusService).
Register(syncacl.New()).
Register(requestmanager.New()).
Register(deletionstate.New()).
Register(deletionmanager.New()).
Register(settings.New()).
Register(objectmanager.New(s.treeManager)).
Register(deps.TreeSyncer).
Register(objecttreebuilder.New()).
Register(objectsync.New()).
Register(sync.NewSyncService()).
Register(aclclient.NewAclSpaceClient()).
Register(headsync.New())

View file

@ -115,6 +115,21 @@ func (mr *MockDRPCSpaceSyncClientMockRecorder) ObjectSync(arg0, arg1 any) *gomoc
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObjectSync", reflect.TypeOf((*MockDRPCSpaceSyncClient)(nil).ObjectSync), arg0, arg1)
}
// ObjectSyncRequestStream mocks base method.
func (m *MockDRPCSpaceSyncClient) ObjectSyncRequestStream(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) (spacesyncproto.DRPCSpaceSync_ObjectSyncRequestStreamClient, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ObjectSyncRequestStream", arg0, arg1)
ret0, _ := ret[0].(spacesyncproto.DRPCSpaceSync_ObjectSyncRequestStreamClient)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ObjectSyncRequestStream indicates an expected call of ObjectSyncRequestStream.
func (mr *MockDRPCSpaceSyncClientMockRecorder) ObjectSyncRequestStream(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObjectSyncRequestStream", reflect.TypeOf((*MockDRPCSpaceSyncClient)(nil).ObjectSyncRequestStream), arg0, arg1)
}
// ObjectSyncStream mocks base method.
func (m *MockDRPCSpaceSyncClient) ObjectSyncStream(arg0 context.Context) (spacesyncproto.DRPCSpaceSync_ObjectSyncStreamClient, error) {
m.ctrl.T.Helper()

View file

@ -0,0 +1,30 @@
package sync
import (
"context"
"time"
"go.uber.org/zap"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/metric"
)
type HandleMessage struct {
Id uint64
ReceiveTime time.Time
StartHandlingTime time.Time
Deadline time.Time
SenderId string
Message *spacesyncproto.ObjectSyncMessage
PeerCtx context.Context
}
func (m HandleMessage) LogFields(fields ...zap.Field) []zap.Field {
return append(fields,
metric.SpaceId(m.Message.SpaceId),
metric.ObjectId(m.Message.ObjectId),
metric.QueueDur(m.StartHandlingTime.Sub(m.ReceiveTime)),
metric.TotalDur(time.Since(m.ReceiveTime)),
)
}

View file

@ -0,0 +1,57 @@
package objectsync
import (
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
type InnerRequest interface {
Marshall() ([]byte, error)
}
type Request struct {
peerId string
spaceId string
objectId string
Inner InnerRequest
Bytes []byte
}
func NewByteRequest(peerId, spaceId, objectId string, message []byte) *Request {
return &Request{
peerId: peerId,
spaceId: spaceId,
objectId: objectId,
Bytes: message,
}
}
func NewRequest(peerId, spaceId, objectId string, inner InnerRequest) *Request {
return &Request{
peerId: peerId,
spaceId: spaceId,
objectId: objectId,
Inner: inner,
}
}
func (r *Request) PeerId() string {
return r.peerId
}
func (r *Request) ObjectId() string {
return r.objectId
}
func (r *Request) Proto() (proto.Message, error) {
msg, err := r.Inner.Marshall()
if err != nil {
return nil, err
}
return &spacesyncproto.ObjectSyncMessage{
SpaceId: r.spaceId,
Payload: msg,
ObjectId: r.objectId,
}, nil
}

View file

@ -16,6 +16,7 @@ type RequestManager interface {
QueueRequest(rq syncdeps.Request) error
SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error
HandleStreamRequest(ctx context.Context, rq syncdeps.Request, stream drpc.Stream) error
Close()
}
type StreamResponse struct {
@ -79,6 +80,10 @@ func (r *requestManager) HandleStreamRequest(ctx context.Context, rq syncdeps.Re
return nil
}
func (r *requestManager) Close() {
r.requestPool.Close()
}
func fullId(peerId, objectId string) string {
return strings.Join([]string{peerId, objectId}, "-")
}

View file

@ -9,6 +9,7 @@ type RequestPool interface {
TryTake(peerId, objectId string) bool
Release(peerId, objectId string)
QueueRequestAction(peerId, objectId string, action func(ctx context.Context)) (err error)
Close()
}
type requestPool struct {

View file

@ -1,4 +1,4 @@
package requestmanager
package sync
import (
"sync"

View file

@ -2,6 +2,7 @@ package sync
import (
"context"
"errors"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
@ -47,6 +48,10 @@ func NewSyncService() SyncService {
return &syncService{}
}
func (s *syncService) Name() (name string) {
return CName
}
func (s *syncService) Init(a *app.App) (err error) {
s.handler = a.MustComponent(syncdeps.CName).(syncdeps.SyncHandler)
s.sendQueueProvider = multiqueue.NewQueueProvider[drpc.Message](100, s.handleOutgoingMessage)
@ -59,8 +64,18 @@ func (s *syncService) Init(a *app.App) (err error) {
return nil
}
func (s *syncService) Name() (name string) {
return CName
func (s *syncService) Run(ctx context.Context) (err error) {
return nil
}
func (s *syncService) Close(ctx context.Context) (err error) {
receiveErr := s.receiveQueue.Close()
providerErr := s.sendQueueProvider.Close()
if receiveErr != nil || providerErr != nil {
err = errors.Join(receiveErr, providerErr)
}
s.manager.Close()
return
}
func (s *syncService) BroadcastMessage(ctx context.Context, msg drpc.Message) error {
@ -83,6 +98,12 @@ func (s *syncService) handleIncomingMessage(msg msgCtx) {
if err != nil {
log.Error("failed to queue request", zap.Error(err))
}
//msg.StartHandlingTime = time.Now()
//ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId)
//ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId))
//s.metric.RequestLog(msg.PeerCtx, "space.streamOp", msg.LogFields(
// zap.Error(err),
//)...)
}
func (s *syncService) GetQueue(peerId string) *multiqueue.Queue[drpc.Message] {
@ -96,10 +117,15 @@ func (s *syncService) NewReadMessage() drpc.Message {
func (s *syncService) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error {
// TODO: make this queue per object and add closing of the individual queues
return s.receiveQueue.Add(ctx, peerId, msgCtx{
err := s.receiveQueue.Add(ctx, peerId, msgCtx{
ctx: ctx,
Message: msg,
})
if errors.Is(err, mb.ErrOverflowed) {
log.Info("queue overflowed", zap.String("peerId", peerId))
return nil
}
return err
}
func (s *syncService) QueueRequest(ctx context.Context, rq syncdeps.Request) error {

View file

@ -64,8 +64,8 @@ func (p *PeerGlobalPool) MakePeers() {
}
}
func (c *PeerGlobalPool) GetPeerIds() (peerIds []string) {
return c.peerIds
func (p *PeerGlobalPool) GetPeerIds() (peerIds []string) {
return p.peerIds
}
func (p *PeerGlobalPool) AddCtrl(peerId string, addCtrl connCtrl) {

View file

@ -203,6 +203,9 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream,
stats: newStreamStat(peerId),
}
st.queue = s.syncDelegate.GetQueue(peerId)
if st.queue == nil {
return nil, fmt.Errorf("no queue for peer %s", peerId)
}
s.streams[streamId] = st
s.streamIdsByPeer[peerId] = append(s.streamIdsByPeer[peerId], streamId)
for _, tag := range tags {

View file

@ -1,17 +1,20 @@
package multiqueue
import (
"errors"
"sync"
)
type QueueProvider[T any] interface {
GetQueue(id string) *Queue[T]
RemoveQueue(id string) error
Close() error
}
type queueProvider[T any] struct {
queues map[string]*Queue[T]
mx sync.Mutex
closed bool
size int
handler QueueHandler[T]
}
@ -27,6 +30,9 @@ func NewQueueProvider[T any](size int, handler QueueHandler[T]) QueueProvider[T]
func (p *queueProvider[T]) GetQueue(id string) *Queue[T] {
p.mx.Lock()
defer p.mx.Unlock()
if p.closed {
return nil
}
q, ok := p.queues[id]
if !ok {
q = NewQueue(id, p.size, p.handler)
@ -38,6 +44,9 @@ func (p *queueProvider[T]) GetQueue(id string) *Queue[T] {
func (p *queueProvider[T]) RemoveQueue(id string) error {
p.mx.Lock()
defer p.mx.Unlock()
if p.closed {
return nil
}
q, ok := p.queues[id]
if !ok {
return nil
@ -45,3 +54,17 @@ func (p *queueProvider[T]) RemoveQueue(id string) error {
delete(p.queues, id)
return q.Close()
}
func (p *queueProvider[T]) Close() error {
p.mx.Lock()
defer p.mx.Unlock()
p.closed = true
var err error
for _, q := range p.queues {
qErr := q.Close()
if qErr != nil {
err = errors.Join(err, qErr)
}
}
return err
}