mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
Prevent connections on space deletion
This commit is contained in:
parent
17a6f7cb55
commit
8b0bb93a2f
9 changed files with 30 additions and 108 deletions
|
@ -1,4 +1,4 @@
|
|||
//go:generate mockgen -destination mock_deletionmanager/mock_deletionmanager.go github.com/anyproto/any-sync/commonspace/deletionmanager DeletionManager,Deleter,SpaceIdsProvider
|
||||
//go:generate mockgen -destination mock_deletionmanager/mock_deletionmanager.go github.com/anyproto/any-sync/commonspace/deletionmanager DeletionManager,Deleter
|
||||
package deletionmanager
|
||||
|
||||
import (
|
||||
|
@ -13,11 +13,6 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type SpaceIdsProvider interface {
|
||||
app.Component
|
||||
AllIds() []string
|
||||
}
|
||||
|
||||
type DeletionManager interface {
|
||||
app.ComponentRunnable
|
||||
UpdateState(ctx context.Context, state *settingsstate.State) (err error)
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/anyproto/any-sync/commonspace/deletionmanager (interfaces: DeletionManager,Deleter,SpaceIdsProvider)
|
||||
// Source: github.com/anyproto/any-sync/commonspace/deletionmanager (interfaces: DeletionManager,Deleter)
|
||||
|
||||
// Package mock_deletionmanager is a generated GoMock package.
|
||||
package mock_deletionmanager
|
||||
|
@ -140,68 +140,3 @@ func (mr *MockDeleterMockRecorder) Delete() *gomock.Call {
|
|||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockDeleter)(nil).Delete))
|
||||
}
|
||||
|
||||
// MockSpaceIdsProvider is a mock of SpaceIdsProvider interface.
|
||||
type MockSpaceIdsProvider struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockSpaceIdsProviderMockRecorder
|
||||
}
|
||||
|
||||
// MockSpaceIdsProviderMockRecorder is the mock recorder for MockSpaceIdsProvider.
|
||||
type MockSpaceIdsProviderMockRecorder struct {
|
||||
mock *MockSpaceIdsProvider
|
||||
}
|
||||
|
||||
// NewMockSpaceIdsProvider creates a new mock instance.
|
||||
func NewMockSpaceIdsProvider(ctrl *gomock.Controller) *MockSpaceIdsProvider {
|
||||
mock := &MockSpaceIdsProvider{ctrl: ctrl}
|
||||
mock.recorder = &MockSpaceIdsProviderMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockSpaceIdsProvider) EXPECT() *MockSpaceIdsProviderMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// AllIds mocks base method.
|
||||
func (m *MockSpaceIdsProvider) AllIds() []string {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "AllIds")
|
||||
ret0, _ := ret[0].([]string)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// AllIds indicates an expected call of AllIds.
|
||||
func (mr *MockSpaceIdsProviderMockRecorder) AllIds() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllIds", reflect.TypeOf((*MockSpaceIdsProvider)(nil).AllIds))
|
||||
}
|
||||
|
||||
// Init mocks base method.
|
||||
func (m *MockSpaceIdsProvider) Init(arg0 *app.App) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Init", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Init indicates an expected call of Init.
|
||||
func (mr *MockSpaceIdsProviderMockRecorder) Init(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockSpaceIdsProvider)(nil).Init), arg0)
|
||||
}
|
||||
|
||||
// Name mocks base method.
|
||||
func (m *MockSpaceIdsProvider) Name() string {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Name")
|
||||
ret0, _ := ret[0].(string)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Name indicates an expected call of Name.
|
||||
func (mr *MockSpaceIdsProviderMockRecorder) Name() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockSpaceIdsProvider)(nil).Name))
|
||||
}
|
||||
|
|
|
@ -130,13 +130,12 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
|||
err = rpcerr.Unwrap(err)
|
||||
if err != nil && err != spacesyncproto.ErrSpaceMissing {
|
||||
if err == spacesyncproto.ErrSpaceIsDeleted {
|
||||
d.log.Debug("got space deleted while syncing")
|
||||
d.treeSyncer.SyncAll(ctx, p.Id(), []string{d.storage.SpaceSettingsId()}, nil)
|
||||
d.syncStatus.SetNodesOnline(p.Id(), syncstatus.RemovedFromNetwork)
|
||||
}
|
||||
d.syncStatus.SetNodesOnline(p.Id(), false)
|
||||
d.syncStatus.SetNodesOnline(p.Id(), syncstatus.ConnectionError)
|
||||
return fmt.Errorf("diff error: %v", err)
|
||||
}
|
||||
d.syncStatus.SetNodesOnline(p.Id(), true)
|
||||
d.syncStatus.SetNodesOnline(p.Id(), syncstatus.Online)
|
||||
|
||||
if err == spacesyncproto.ErrSpaceMissing {
|
||||
return d.sendPushSpaceRequest(ctx, p.Id(), cl)
|
||||
|
|
|
@ -262,8 +262,6 @@ func TestDiffSyncer(t *testing.T) {
|
|||
fx.diffMock.EXPECT().
|
||||
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock))).
|
||||
Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted)
|
||||
fx.storageMock.EXPECT().SpaceSettingsId().Return("settingsId")
|
||||
fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"settingsId"}, nil).Return(nil)
|
||||
|
||||
require.NoError(t, fx.diffSyncer.Sync(ctx))
|
||||
})
|
||||
|
|
|
@ -19,12 +19,10 @@ import (
|
|||
"github.com/anyproto/any-sync/commonspace/spacestorage"
|
||||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anyproto/any-sync/commonspace/syncstatus"
|
||||
"github.com/anyproto/any-sync/net/peer"
|
||||
"github.com/anyproto/any-sync/nodeconf"
|
||||
"github.com/anyproto/any-sync/util/periodicsync"
|
||||
"github.com/anyproto/any-sync/util/slice"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
var log = logger.NewNamed(CName)
|
||||
|
@ -90,7 +88,7 @@ func (h *headSync) Init(a *app.App) (err error) {
|
|||
h.syncer = createDiffSyncer(h)
|
||||
sync := func(ctx context.Context) (err error) {
|
||||
// for clients cancelling the sync process
|
||||
if h.spaceIsDeleted.Load() && !h.configuration.IsResponsible(h.spaceId) {
|
||||
if h.spaceIsDeleted.Load() {
|
||||
return spacesyncproto.ErrSpaceIsDeleted
|
||||
}
|
||||
return h.syncer.Sync(ctx)
|
||||
|
@ -118,14 +116,7 @@ func (h *headSync) Run(ctx context.Context) (err error) {
|
|||
|
||||
func (h *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
|
||||
if h.spaceIsDeleted.Load() {
|
||||
peerId, err := peer.CtxPeerId(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// stop receiving all request for sync from clients
|
||||
if !slices.Contains(h.configuration.NodeIds(h.spaceId), peerId) {
|
||||
return nil, spacesyncproto.ErrSpaceIsDeleted
|
||||
}
|
||||
return nil, spacesyncproto.ErrSpaceIsDeleted
|
||||
}
|
||||
return HandleRangeRequest(ctx, h.diff, req)
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anyproto/any-sync/nodeconf"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
const CName = "common.commonspace.objectsync"
|
||||
|
@ -162,11 +161,7 @@ func (s *objectSync) processHandleMessage(msg HandleMessage) {
|
|||
func (s *objectSync) handleRequest(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
log := log.With(zap.String("objectId", msg.ObjectId))
|
||||
if s.spaceIsDeleted.Load() {
|
||||
log = log.With(zap.Bool("isDeleted", true))
|
||||
// preventing sync with other clients if they are not just syncing the settings tree
|
||||
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() {
|
||||
return nil, spacesyncproto.ErrSpaceIsDeleted
|
||||
}
|
||||
return nil, spacesyncproto.ErrSpaceIsDeleted
|
||||
}
|
||||
err = s.checkEmptyFullSync(log, msg)
|
||||
if err != nil {
|
||||
|
@ -182,11 +177,7 @@ func (s *objectSync) handleRequest(ctx context.Context, senderId string, msg *sp
|
|||
func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
log := log.With(zap.String("objectId", msg.ObjectId))
|
||||
if s.spaceIsDeleted.Load() {
|
||||
log = log.With(zap.Bool("isDeleted", true))
|
||||
// preventing sync with other clients if they are not just syncing the settings tree
|
||||
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() {
|
||||
return spacesyncproto.ErrSpaceIsDeleted
|
||||
}
|
||||
return spacesyncproto.ErrSpaceIsDeleted
|
||||
}
|
||||
err = s.checkEmptyFullSync(log, msg)
|
||||
if err != nil {
|
||||
|
|
|
@ -74,6 +74,7 @@ type Space interface {
|
|||
|
||||
DeleteTree(ctx context.Context, id string) (err error)
|
||||
GetNodePeers(ctx context.Context) (peer []peer.Peer, err error)
|
||||
SetDeleted()
|
||||
|
||||
HandleMessage(ctx context.Context, msg objectsync.HandleMessage) (err error)
|
||||
HandleSyncRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error)
|
||||
|
@ -153,6 +154,10 @@ func (s *space) GetNodePeers(ctx context.Context) (peer []peer.Peer, err error)
|
|||
return s.peerManager.GetNodePeers(ctx)
|
||||
}
|
||||
|
||||
func (s *space) SetDeleted() {
|
||||
s.state.SpaceIsDeleted.Swap(true)
|
||||
}
|
||||
|
||||
func (s *space) Acl() syncacl.SyncAcl {
|
||||
return s.aclList.(syncacl.SyncAcl)
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ func (n *noOpSyncStatus) HeadsChange(treeId string, heads []string) {
|
|||
func (n *noOpSyncStatus) HeadsReceive(senderId, treeId string, heads []string) {
|
||||
}
|
||||
|
||||
func (n *noOpSyncStatus) SetNodesOnline(senderId string, online bool) {
|
||||
func (n *noOpSyncStatus) SetNodesOnline(senderId string, status ConnectionStatus) {
|
||||
}
|
||||
|
||||
func (n *noOpSyncStatus) StateCounter() uint64 {
|
||||
|
|
|
@ -27,16 +27,24 @@ var log = logger.NewNamed(CName)
|
|||
|
||||
const CName = "common.commonspace.syncstatus"
|
||||
|
||||
type ConnectionStatus int
|
||||
|
||||
const (
|
||||
Online ConnectionStatus = iota
|
||||
ConnectionError
|
||||
RemovedFromNetwork
|
||||
)
|
||||
|
||||
type UpdateReceiver interface {
|
||||
UpdateTree(ctx context.Context, treeId string, status SyncStatus) (err error)
|
||||
UpdateNodeConnection(online bool)
|
||||
UpdateNodeStatus(status ConnectionStatus)
|
||||
}
|
||||
|
||||
type StatusUpdater interface {
|
||||
HeadsChange(treeId string, heads []string)
|
||||
HeadsReceive(senderId, treeId string, heads []string)
|
||||
|
||||
SetNodesOnline(senderId string, online bool)
|
||||
SetNodesOnline(senderId string, status ConnectionStatus)
|
||||
StateCounter() uint64
|
||||
RemoveAllExcept(senderId string, differentRemoteIds []string, stateCounter uint64)
|
||||
}
|
||||
|
@ -89,7 +97,7 @@ type syncStatusService struct {
|
|||
treeHeads map[string]treeHeadsEntry
|
||||
watchers map[string]struct{}
|
||||
stateCounter uint64
|
||||
nodesOnline bool
|
||||
nodeStatus ConnectionStatus
|
||||
|
||||
treeStatusBuf []treeStatus
|
||||
|
||||
|
@ -150,7 +158,7 @@ func (s *syncStatusService) HeadsChange(treeId string, heads []string) {
|
|||
s.stateCounter++
|
||||
}
|
||||
|
||||
func (s *syncStatusService) SetNodesOnline(senderId string, online bool) {
|
||||
func (s *syncStatusService) SetNodesOnline(senderId string, status ConnectionStatus) {
|
||||
if !s.isSenderResponsible(senderId) {
|
||||
return
|
||||
}
|
||||
|
@ -158,7 +166,7 @@ func (s *syncStatusService) SetNodesOnline(senderId string, online bool) {
|
|||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
s.nodesOnline = online
|
||||
s.nodeStatus = status
|
||||
}
|
||||
|
||||
func (s *syncStatusService) update(ctx context.Context) (err error) {
|
||||
|
@ -179,9 +187,9 @@ func (s *syncStatusService) update(ctx context.Context) (err error) {
|
|||
}
|
||||
s.treeStatusBuf = append(s.treeStatusBuf, treeStatus{treeId, treeHeads.syncStatus, treeHeads.heads})
|
||||
}
|
||||
nodesOnline := s.nodesOnline
|
||||
nodeStatus := s.nodeStatus
|
||||
s.Unlock()
|
||||
s.updateReceiver.UpdateNodeConnection(nodesOnline)
|
||||
s.updateReceiver.UpdateNodeStatus(nodeStatus)
|
||||
for _, entry := range s.treeStatusBuf {
|
||||
err = s.updateReceiver.UpdateTree(ctx, entry.treeId, entry.status)
|
||||
if err != nil {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue