mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 14:07:02 +09:00
Change deletion logic in headsync and objectsync
This commit is contained in:
parent
73b27c7dac
commit
a0aa02bb8a
4 changed files with 28 additions and 13 deletions
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/any-sync/commonspace/syncstatus"
|
||||
"github.com/anytypeio/any-sync/nodeconf"
|
||||
"github.com/anytypeio/any-sync/util/periodicsync"
|
||||
"go.uber.org/zap"
|
||||
"strings"
|
||||
|
@ -51,6 +52,7 @@ func NewHeadSync(
|
|||
spaceId string,
|
||||
spaceIsDeleted *atomic.Bool,
|
||||
syncPeriod int,
|
||||
configuration nodeconf.Configuration,
|
||||
storage spacestorage.SpaceStorage,
|
||||
peerManager peermanager.PeerManager,
|
||||
cache treegetter.TreeGetter,
|
||||
|
@ -62,7 +64,7 @@ func NewHeadSync(
|
|||
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
|
||||
syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l)
|
||||
sync := func(ctx context.Context) (err error) {
|
||||
if spaceIsDeleted.Load() {
|
||||
if spaceIsDeleted.Load() && !configuration.IsResponsible(spaceId) {
|
||||
return spacesyncproto.ErrSpaceIsDeleted
|
||||
}
|
||||
return syncer.Sync(ctx)
|
||||
|
@ -88,10 +90,6 @@ func (d *headSync) Init(objectIds []string, deletionState settingsstate.ObjectDe
|
|||
}
|
||||
|
||||
func (d *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
|
||||
if d.spaceIsDeleted.Load() {
|
||||
err = spacesyncproto.ErrSpaceIsDeleted
|
||||
return
|
||||
}
|
||||
return HandleRangeRequest(ctx, d.diff, req)
|
||||
}
|
||||
|
||||
|
|
|
@ -8,7 +8,9 @@ import (
|
|||
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
|
||||
"github.com/anytypeio/any-sync/commonspace/peermanager"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/any-sync/nodeconf"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
@ -27,8 +29,9 @@ type ObjectSync interface {
|
|||
type objectSync struct {
|
||||
spaceId string
|
||||
|
||||
messagePool MessagePool
|
||||
objectGetter syncobjectgetter.SyncObjectGetter
|
||||
messagePool MessagePool
|
||||
objectGetter syncobjectgetter.SyncObjectGetter
|
||||
configuration nodeconf.Configuration
|
||||
|
||||
syncCtx context.Context
|
||||
cancelSync context.CancelFunc
|
||||
|
@ -38,12 +41,14 @@ type objectSync struct {
|
|||
func NewObjectSync(
|
||||
spaceId string,
|
||||
spaceIsDeleted *atomic.Bool,
|
||||
configuration nodeconf.Configuration,
|
||||
peerManager peermanager.PeerManager,
|
||||
objectGetter syncobjectgetter.SyncObjectGetter) ObjectSync {
|
||||
syncCtx, cancel := context.WithCancel(context.Background())
|
||||
os := newObjectSync(
|
||||
spaceId,
|
||||
spaceIsDeleted,
|
||||
configuration,
|
||||
objectGetter,
|
||||
syncCtx,
|
||||
cancel)
|
||||
|
@ -55,6 +60,7 @@ func NewObjectSync(
|
|||
func newObjectSync(
|
||||
spaceId string,
|
||||
spaceIsDeleted *atomic.Bool,
|
||||
configuration nodeconf.Configuration,
|
||||
objectGetter syncobjectgetter.SyncObjectGetter,
|
||||
syncCtx context.Context,
|
||||
cancel context.CancelFunc,
|
||||
|
@ -65,6 +71,7 @@ func newObjectSync(
|
|||
syncCtx: syncCtx,
|
||||
cancelSync: cancel,
|
||||
spaceIsDeleted: spaceIsDeleted,
|
||||
configuration: configuration,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -85,8 +92,13 @@ func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message
|
|||
}
|
||||
|
||||
func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
log := log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId))
|
||||
if s.spaceIsDeleted.Load() {
|
||||
return spacesyncproto.ErrSpaceIsDeleted
|
||||
log = log.With(zap.Bool("isDeleted", true))
|
||||
// preventing sync with other clients
|
||||
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) {
|
||||
return spacesyncproto.ErrSpaceIsDeleted
|
||||
}
|
||||
}
|
||||
log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId)).DebugCtx(ctx, "handling message")
|
||||
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
type SpaceIdsProvider interface {
|
||||
AllIds() []string
|
||||
RemoveObjects(ids []string)
|
||||
}
|
||||
|
||||
type SpaceDeleter interface {
|
||||
|
@ -60,13 +61,17 @@ func (d *deletionManager) UpdateState(state *settingsstate.State) (err error) {
|
|||
if ok {
|
||||
spaceDeleter.DeleteSpace(d.spaceId)
|
||||
}
|
||||
d.onSpaceDelete()
|
||||
if d.isResponsible {
|
||||
allIds := slice.DiscardFromSlice(d.provider.AllIds(), func(s string) bool {
|
||||
return s == d.settingsId
|
||||
allIds := slice.DiscardFromSlice(d.provider.AllIds(), func(id string) bool {
|
||||
return id == d.settingsId
|
||||
})
|
||||
err = d.deletionState.Add(allIds)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
d.provider.RemoveObjects(allIds)
|
||||
}
|
||||
d.onSpaceDelete()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -139,8 +139,8 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
headSync := headsync.NewHeadSync(id, spaceIsDeleted, s.config.SyncPeriod, st, peerManager, getter, syncStatus, log)
|
||||
objectSync := objectsync.NewObjectSync(id, spaceIsDeleted, peerManager, getter)
|
||||
headSync := headsync.NewHeadSync(id, spaceIsDeleted, s.config.SyncPeriod, lastConfiguration, st, peerManager, getter, syncStatus, log)
|
||||
objectSync := objectsync.NewObjectSync(id, spaceIsDeleted, lastConfiguration, peerManager, getter)
|
||||
sp := &space{
|
||||
id: id,
|
||||
objectSync: objectSync,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue