mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
Add sync with peer
This commit is contained in:
parent
3fadc5f589
commit
bb63dca331
5 changed files with 104 additions and 18 deletions
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
|
||||
"github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage"
|
||||
"github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage/syncstorage"
|
||||
"github.com/anyproto/any-sync/commonspace/object/keyvalue/kvinterfaces"
|
||||
"github.com/anyproto/any-sync/commonspace/spacestate"
|
||||
"github.com/anyproto/any-sync/commonspace/spacestorage"
|
||||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||
|
@ -26,15 +27,7 @@ import (
|
|||
|
||||
var ErrUnexpectedMessageType = errors.New("unexpected message type")
|
||||
|
||||
var log = logger.NewNamed(CName)
|
||||
|
||||
const CName = "common.object.keyvalue"
|
||||
|
||||
type KeyValueService interface {
|
||||
app.ComponentRunnable
|
||||
DefaultStore() keyvaluestorage.Storage
|
||||
HandleMessage(ctx context.Context, msg objectmessages.HeadUpdate) (err error)
|
||||
}
|
||||
var log = logger.NewNamed(kvinterfaces.CName)
|
||||
|
||||
type keyValueService struct {
|
||||
storageId string
|
||||
|
@ -42,20 +35,32 @@ type keyValueService struct {
|
|||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
limiter *concurrentLimiter
|
||||
spaceStorage spacestorage.SpaceStorage
|
||||
defaultStore keyvaluestorage.Storage
|
||||
clientFactory spacesyncproto.ClientFactory
|
||||
syncService sync.SyncService
|
||||
}
|
||||
|
||||
func New() kvinterfaces.KeyValueService {
|
||||
return &keyValueService{}
|
||||
}
|
||||
|
||||
func (k *keyValueService) DefaultStore() keyvaluestorage.Storage {
|
||||
return k.defaultStore
|
||||
}
|
||||
|
||||
func (k *keyValueService) SyncWithPeer(ctx context.Context, p peer.Peer) (err error) {
|
||||
if k.syncService == nil {
|
||||
return nil
|
||||
}
|
||||
func (k *keyValueService) SyncWithPeer(p peer.Peer) (err error) {
|
||||
k.limiter.ScheduleRequest(k.ctx, p.Id(), func() {
|
||||
err = k.syncWithPeer(k.ctx, p)
|
||||
if err != nil {
|
||||
log.Error("failed to sync with peer", zap.String("peerId", p.ID()), zap.Error(err))
|
||||
}
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *keyValueService) syncWithPeer(ctx context.Context, p peer.Peer) (err error) {
|
||||
conn, err := p.AcquireDrpcConn(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -180,6 +185,8 @@ func (k *keyValueService) Init(a *app.App) (err error) {
|
|||
k.ctx, k.cancel = context.WithCancel(context.Background())
|
||||
spaceState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
|
||||
k.spaceId = spaceState.SpaceId
|
||||
k.clientFactory = spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
|
||||
k.limiter = newConcurrentLimiter()
|
||||
accountService := a.MustComponent(accountservice.CName).(accountservice.Service)
|
||||
aclList := a.MustComponent(syncacl.CName).(list.AclList)
|
||||
k.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
|
||||
|
@ -199,7 +206,7 @@ func (k *keyValueService) Init(a *app.App) (err error) {
|
|||
}
|
||||
|
||||
func (k *keyValueService) Name() (name string) {
|
||||
return CName
|
||||
return kvinterfaces.CName
|
||||
}
|
||||
|
||||
func (k *keyValueService) Run(ctx context.Context) (err error) {
|
||||
|
@ -208,6 +215,7 @@ func (k *keyValueService) Run(ctx context.Context) (err error) {
|
|||
|
||||
func (k *keyValueService) Close(ctx context.Context) (err error) {
|
||||
k.cancel()
|
||||
k.limiter.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
18
commonspace/object/keyvalue/kvinterfaces/interfaces.go
Normal file
18
commonspace/object/keyvalue/kvinterfaces/interfaces.go
Normal file
|
@ -0,0 +1,18 @@
|
|||
package kvinterfaces
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"storj.io/drpc"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage"
|
||||
)
|
||||
|
||||
const CName = "common.object.keyvalue"
|
||||
|
||||
type KeyValueService interface {
|
||||
app.ComponentRunnable
|
||||
DefaultStore() keyvaluestorage.Storage
|
||||
HandleMessage(ctx context.Context, msg drpc.Message) (err error)
|
||||
}
|
52
commonspace/object/keyvalue/limiter.go
Normal file
52
commonspace/object/keyvalue/limiter.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package keyvalue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type concurrentLimiter struct {
|
||||
mu sync.Mutex
|
||||
inProgress map[string]bool
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func newConcurrentLimiter() *concurrentLimiter {
|
||||
return &concurrentLimiter{
|
||||
inProgress: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (cl *concurrentLimiter) ScheduleRequest(ctx context.Context, id string, action func()) bool {
|
||||
cl.mu.Lock()
|
||||
if cl.inProgress[id] {
|
||||
cl.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
cl.inProgress[id] = true
|
||||
cl.wg.Add(1)
|
||||
cl.mu.Unlock()
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
cl.mu.Lock()
|
||||
delete(cl.inProgress, id)
|
||||
cl.mu.Unlock()
|
||||
cl.wg.Done()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
action()
|
||||
}
|
||||
}()
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (cl *concurrentLimiter) Close() {
|
||||
cl.wg.Wait()
|
||||
}
|
|
@ -13,6 +13,7 @@ import (
|
|||
|
||||
"github.com/anyproto/any-sync/commonspace/acl/aclclient"
|
||||
"github.com/anyproto/any-sync/commonspace/deletionmanager"
|
||||
"github.com/anyproto/any-sync/commonspace/object/keyvalue"
|
||||
"github.com/anyproto/any-sync/commonspace/object/treesyncer"
|
||||
"github.com/anyproto/any-sync/commonspace/sync"
|
||||
"github.com/anyproto/any-sync/commonspace/sync/objectsync"
|
||||
|
@ -184,6 +185,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string, deps Deps) (Spac
|
|||
Register(deps.SyncStatus).
|
||||
Register(peerManager).
|
||||
Register(st).
|
||||
Register(keyvalue.New()).
|
||||
Register(objectsync.New()).
|
||||
Register(sync.NewSyncService()).
|
||||
Register(syncacl.New()).
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/app/logger"
|
||||
"github.com/anyproto/any-sync/commonspace/object/keyvalue/kvinterfaces"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/synctree"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anyproto/any-sync/commonspace/object/treemanager"
|
||||
|
@ -30,10 +31,11 @@ var ErrUnexpectedHeadUpdateType = errors.New("unexpected head update type")
|
|||
var log = logger.NewNamed(syncdeps.CName)
|
||||
|
||||
type objectSync struct {
|
||||
spaceId string
|
||||
pool pool.Service
|
||||
manager objectmanager.ObjectManager
|
||||
status syncstatus.StatusUpdater
|
||||
spaceId string
|
||||
pool pool.Service
|
||||
manager objectmanager.ObjectManager
|
||||
status syncstatus.StatusUpdater
|
||||
keyValue kvinterfaces.KeyValueService
|
||||
}
|
||||
|
||||
func New() syncdeps.SyncHandler {
|
||||
|
@ -43,6 +45,7 @@ func New() syncdeps.SyncHandler {
|
|||
func (o *objectSync) Init(a *app.App) (err error) {
|
||||
o.manager = a.MustComponent(treemanager.CName).(objectmanager.ObjectManager)
|
||||
o.pool = a.MustComponent(pool.CName).(pool.Service)
|
||||
o.keyValue = a.MustComponent(kvinterfaces.CName).(kvinterfaces.KeyValueService)
|
||||
o.status = a.MustComponent(syncstatus.CName).(syncstatus.StatusUpdater)
|
||||
o.spaceId = a.MustComponent(spacestate.CName).(*spacestate.SpaceState).SpaceId
|
||||
return
|
||||
|
@ -57,6 +60,9 @@ func (o *objectSync) HandleHeadUpdate(ctx context.Context, headUpdate drpc.Messa
|
|||
if !ok {
|
||||
return nil, ErrUnexpectedHeadUpdateType
|
||||
}
|
||||
if update.ObjectType() == spacesyncproto.ObjectType_KeyValue {
|
||||
return nil, o.keyValue.HandleMessage(ctx, update)
|
||||
}
|
||||
peerId, err := peer.CtxPeerId(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue