diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index aab03b18..9974559d 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -11,6 +11,7 @@ import ( "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/syncstatus" + "github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/util/periodicsync" "go.uber.org/zap" "strings" @@ -51,6 +52,7 @@ func NewHeadSync( spaceId string, spaceIsDeleted *atomic.Bool, syncPeriod int, + configuration nodeconf.Configuration, storage spacestorage.SpaceStorage, peerManager peermanager.PeerManager, cache treegetter.TreeGetter, @@ -62,7 +64,7 @@ func NewHeadSync( factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l) sync := func(ctx context.Context) (err error) { - if spaceIsDeleted.Load() { + if spaceIsDeleted.Load() && !configuration.IsResponsible(spaceId) { return spacesyncproto.ErrSpaceIsDeleted } return syncer.Sync(ctx) @@ -88,10 +90,6 @@ func (d *headSync) Init(objectIds []string, deletionState settingsstate.ObjectDe } func (d *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { - if d.spaceIsDeleted.Load() { - err = spacesyncproto.ErrSpaceIsDeleted - return - } return HandleRangeRequest(ctx, d.diff, req) } diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index 04ff453d..a1f17b49 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -8,7 +8,9 @@ import ( "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" + "github.com/anytypeio/any-sync/nodeconf" "go.uber.org/zap" + "golang.org/x/exp/slices" "sync/atomic" "time" ) @@ -27,8 +29,9 @@ type ObjectSync interface { type objectSync struct { spaceId string - messagePool MessagePool - objectGetter syncobjectgetter.SyncObjectGetter + messagePool MessagePool + objectGetter syncobjectgetter.SyncObjectGetter + configuration nodeconf.Configuration syncCtx context.Context cancelSync context.CancelFunc @@ -38,12 +41,14 @@ type objectSync struct { func NewObjectSync( spaceId string, spaceIsDeleted *atomic.Bool, + configuration nodeconf.Configuration, peerManager peermanager.PeerManager, objectGetter syncobjectgetter.SyncObjectGetter) ObjectSync { syncCtx, cancel := context.WithCancel(context.Background()) os := newObjectSync( spaceId, spaceIsDeleted, + configuration, objectGetter, syncCtx, cancel) @@ -55,6 +60,7 @@ func NewObjectSync( func newObjectSync( spaceId string, spaceIsDeleted *atomic.Bool, + configuration nodeconf.Configuration, objectGetter syncobjectgetter.SyncObjectGetter, syncCtx context.Context, cancel context.CancelFunc, @@ -65,6 +71,7 @@ func newObjectSync( syncCtx: syncCtx, cancelSync: cancel, spaceIsDeleted: spaceIsDeleted, + configuration: configuration, } } @@ -85,8 +92,13 @@ func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message } func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { + log := log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId)) if s.spaceIsDeleted.Load() { - return spacesyncproto.ErrSpaceIsDeleted + log = log.With(zap.Bool("isDeleted", true)) + // preventing sync with other clients + if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) { + return spacesyncproto.ErrSpaceIsDeleted + } } log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId)).DebugCtx(ctx, "handling message") obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId) diff --git a/commonspace/settings/deletionmanager.go b/commonspace/settings/deletionmanager.go index ddad78af..36c45936 100644 --- a/commonspace/settings/deletionmanager.go +++ b/commonspace/settings/deletionmanager.go @@ -9,6 +9,7 @@ import ( type SpaceIdsProvider interface { AllIds() []string + RemoveObjects(ids []string) } type SpaceDeleter interface { @@ -60,13 +61,17 @@ func (d *deletionManager) UpdateState(state *settingsstate.State) (err error) { if ok { spaceDeleter.DeleteSpace(d.spaceId) } - d.onSpaceDelete() if d.isResponsible { - allIds := slice.DiscardFromSlice(d.provider.AllIds(), func(s string) bool { - return s == d.settingsId + allIds := slice.DiscardFromSlice(d.provider.AllIds(), func(id string) bool { + return id == d.settingsId }) err = d.deletionState.Add(allIds) + if err != nil { + return + } + d.provider.RemoveObjects(allIds) } + d.onSpaceDelete() } return } diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 08c30308..8cf79947 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -139,8 +139,8 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { return nil, err } - headSync := headsync.NewHeadSync(id, spaceIsDeleted, s.config.SyncPeriod, st, peerManager, getter, syncStatus, log) - objectSync := objectsync.NewObjectSync(id, spaceIsDeleted, peerManager, getter) + headSync := headsync.NewHeadSync(id, spaceIsDeleted, s.config.SyncPeriod, lastConfiguration, st, peerManager, getter, syncStatus, log) + objectSync := objectsync.NewObjectSync(id, spaceIsDeleted, lastConfiguration, peerManager, getter) sp := &space{ id: id, objectSync: objectSync,