diff --git a/commonspace/object/keyvalue/keyvalue.go b/commonspace/object/keyvalue/keyvalue.go index 5357047f..68ab8d52 100644 --- a/commonspace/object/keyvalue/keyvalue.go +++ b/commonspace/object/keyvalue/keyvalue.go @@ -15,6 +15,7 @@ import ( "github.com/anyproto/any-sync/commonspace/object/acl/syncacl" "github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage" "github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage/syncstorage" + "github.com/anyproto/any-sync/commonspace/object/keyvalue/kvinterfaces" "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacesyncproto" @@ -26,15 +27,7 @@ import ( var ErrUnexpectedMessageType = errors.New("unexpected message type") -var log = logger.NewNamed(CName) - -const CName = "common.object.keyvalue" - -type KeyValueService interface { - app.ComponentRunnable - DefaultStore() keyvaluestorage.Storage - HandleMessage(ctx context.Context, msg objectmessages.HeadUpdate) (err error) -} +var log = logger.NewNamed(kvinterfaces.CName) type keyValueService struct { storageId string @@ -42,20 +35,32 @@ type keyValueService struct { ctx context.Context cancel context.CancelFunc + limiter *concurrentLimiter spaceStorage spacestorage.SpaceStorage defaultStore keyvaluestorage.Storage clientFactory spacesyncproto.ClientFactory syncService sync.SyncService } +func New() kvinterfaces.KeyValueService { + return &keyValueService{} +} + func (k *keyValueService) DefaultStore() keyvaluestorage.Storage { return k.defaultStore } -func (k *keyValueService) SyncWithPeer(ctx context.Context, p peer.Peer) (err error) { - if k.syncService == nil { - return nil - } +func (k *keyValueService) SyncWithPeer(p peer.Peer) (err error) { + k.limiter.ScheduleRequest(k.ctx, p.Id(), func() { + err = k.syncWithPeer(k.ctx, p) + if err != nil { + log.Error("failed to sync with peer", zap.String("peerId", p.ID()), zap.Error(err)) + } + }) + return nil +} + +func (k *keyValueService) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { conn, err := p.AcquireDrpcConn(ctx) if err != nil { return @@ -180,6 +185,8 @@ func (k *keyValueService) Init(a *app.App) (err error) { k.ctx, k.cancel = context.WithCancel(context.Background()) spaceState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) k.spaceId = spaceState.SpaceId + k.clientFactory = spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) + k.limiter = newConcurrentLimiter() accountService := a.MustComponent(accountservice.CName).(accountservice.Service) aclList := a.MustComponent(syncacl.CName).(list.AclList) k.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) @@ -199,7 +206,7 @@ func (k *keyValueService) Init(a *app.App) (err error) { } func (k *keyValueService) Name() (name string) { - return CName + return kvinterfaces.CName } func (k *keyValueService) Run(ctx context.Context) (err error) { @@ -208,6 +215,7 @@ func (k *keyValueService) Run(ctx context.Context) (err error) { func (k *keyValueService) Close(ctx context.Context) (err error) { k.cancel() + k.limiter.Close() return nil } diff --git a/commonspace/object/keyvalue/kvinterfaces/interfaces.go b/commonspace/object/keyvalue/kvinterfaces/interfaces.go new file mode 100644 index 00000000..b0416806 --- /dev/null +++ b/commonspace/object/keyvalue/kvinterfaces/interfaces.go @@ -0,0 +1,18 @@ +package kvinterfaces + +import ( + "context" + + "storj.io/drpc" + + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage" +) + +const CName = "common.object.keyvalue" + +type KeyValueService interface { + app.ComponentRunnable + DefaultStore() keyvaluestorage.Storage + HandleMessage(ctx context.Context, msg drpc.Message) (err error) +} \ No newline at end of file diff --git a/commonspace/object/keyvalue/limiter.go b/commonspace/object/keyvalue/limiter.go new file mode 100644 index 00000000..7a36bffd --- /dev/null +++ b/commonspace/object/keyvalue/limiter.go @@ -0,0 +1,52 @@ +package keyvalue + +import ( + "context" + "sync" +) + +type concurrentLimiter struct { + mu sync.Mutex + inProgress map[string]bool + wg sync.WaitGroup +} + +func newConcurrentLimiter() *concurrentLimiter { + return &concurrentLimiter{ + inProgress: make(map[string]bool), + } +} + +func (cl *concurrentLimiter) ScheduleRequest(ctx context.Context, id string, action func()) bool { + cl.mu.Lock() + if cl.inProgress[id] { + cl.mu.Unlock() + return false + } + + cl.inProgress[id] = true + cl.wg.Add(1) + cl.mu.Unlock() + + go func() { + defer func() { + cl.mu.Lock() + delete(cl.inProgress, id) + cl.mu.Unlock() + cl.wg.Done() + }() + + select { + case <-ctx.Done(): + return + default: + action() + } + }() + + return true +} + +func (cl *concurrentLimiter) Close() { + cl.wg.Wait() +} diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index b7e0f447..6b306659 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -13,6 +13,7 @@ import ( "github.com/anyproto/any-sync/commonspace/acl/aclclient" "github.com/anyproto/any-sync/commonspace/deletionmanager" + "github.com/anyproto/any-sync/commonspace/object/keyvalue" "github.com/anyproto/any-sync/commonspace/object/treesyncer" "github.com/anyproto/any-sync/commonspace/sync" "github.com/anyproto/any-sync/commonspace/sync/objectsync" @@ -184,6 +185,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string, deps Deps) (Spac Register(deps.SyncStatus). Register(peerManager). Register(st). + Register(keyvalue.New()). Register(objectsync.New()). Register(sync.NewSyncService()). Register(syncacl.New()). diff --git a/commonspace/sync/objectsync/synchandler.go b/commonspace/sync/objectsync/synchandler.go index 382f355c..2cc9d984 100644 --- a/commonspace/sync/objectsync/synchandler.go +++ b/commonspace/sync/objectsync/synchandler.go @@ -11,6 +11,7 @@ import ( "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/object/keyvalue/kvinterfaces" "github.com/anyproto/any-sync/commonspace/object/tree/synctree" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/treemanager" @@ -30,10 +31,11 @@ var ErrUnexpectedHeadUpdateType = errors.New("unexpected head update type") var log = logger.NewNamed(syncdeps.CName) type objectSync struct { - spaceId string - pool pool.Service - manager objectmanager.ObjectManager - status syncstatus.StatusUpdater + spaceId string + pool pool.Service + manager objectmanager.ObjectManager + status syncstatus.StatusUpdater + keyValue kvinterfaces.KeyValueService } func New() syncdeps.SyncHandler { @@ -43,6 +45,7 @@ func New() syncdeps.SyncHandler { func (o *objectSync) Init(a *app.App) (err error) { o.manager = a.MustComponent(treemanager.CName).(objectmanager.ObjectManager) o.pool = a.MustComponent(pool.CName).(pool.Service) + o.keyValue = a.MustComponent(kvinterfaces.CName).(kvinterfaces.KeyValueService) o.status = a.MustComponent(syncstatus.CName).(syncstatus.StatusUpdater) o.spaceId = a.MustComponent(spacestate.CName).(*spacestate.SpaceState).SpaceId return @@ -57,6 +60,9 @@ func (o *objectSync) HandleHeadUpdate(ctx context.Context, headUpdate drpc.Messa if !ok { return nil, ErrUnexpectedHeadUpdateType } + if update.ObjectType() == spacesyncproto.ObjectType_KeyValue { + return nil, o.keyValue.HandleMessage(ctx, update) + } peerId, err := peer.CtxPeerId(ctx) if err != nil { return nil, err