From 944c9cf10addc48e3aed58399eab1a8042931079 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 15 Nov 2024 17:45:35 +0100 Subject: [PATCH] stream keep alive --- .../mock_deletionmanager/mock_deletionmanager.go | 8 ++++---- commonspace/headsync/diffsyncer.go | 2 ++ commonspace/headsync/diffsyncer_test.go | 8 ++++++++ .../peermanager/mock_peermanager/mock_peermanager.go | 12 ++++++++++++ commonspace/peermanager/peermanager.go | 2 ++ 5 files changed, 28 insertions(+), 4 deletions(-) diff --git a/commonspace/deletionmanager/mock_deletionmanager/mock_deletionmanager.go b/commonspace/deletionmanager/mock_deletionmanager/mock_deletionmanager.go index 005def77..3ac5b508 100644 --- a/commonspace/deletionmanager/mock_deletionmanager/mock_deletionmanager.go +++ b/commonspace/deletionmanager/mock_deletionmanager/mock_deletionmanager.go @@ -137,13 +137,13 @@ func (m *MockDeleter) EXPECT() *MockDeleterMockRecorder { } // Delete mocks base method. -func (m *MockDeleter) Delete(arg0 context.Context) { +func (m *MockDeleter) Delete(ctx context.Context) { m.ctrl.T.Helper() - m.ctrl.Call(m, "Delete", arg0) + m.ctrl.Call(m, "Delete", ctx) } // Delete indicates an expected call of Delete. -func (mr *MockDeleterMockRecorder) Delete(arg0 any) *gomock.Call { +func (mr *MockDeleterMockRecorder) Delete(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockDeleter)(nil).Delete), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockDeleter)(nil).Delete), ctx) } diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 38daa0e9..5de8ca36 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -110,6 +110,8 @@ func (d *diffSyncer) Sync(ctx context.Context) error { } } d.log.DebugCtx(ctx, "diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st))) + + d.peerManager.KeepAlive(ctx) return nil } diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index 2989702c..a2a14827 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -86,6 +86,8 @@ func TestDiffSyncer(t *testing.T) { fx.deletionStateMock.EXPECT().Filter([]string{"changed"}).Return([]string{"changed"}).Times(1) fx.deletionStateMock.EXPECT().Filter(nil).Return(nil).Times(1) fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer, []string{"changed"}, []string{"new"}).Return(nil) + fx.peerManagerMock.EXPECT().KeepAlive(gomock.Any()) + require.NoError(t, fx.diffSyncer.Sync(ctx)) }) @@ -108,6 +110,8 @@ func TestDiffSyncer(t *testing.T) { fx.deletionStateMock.EXPECT().Filter(nil).Return(nil).Times(1) fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer, []string{"changed"}, []string{"new"}).Return(nil) fx.aclMock.EXPECT().SyncWithPeer(gomock.Any(), mPeer).Return(nil) + fx.peerManagerMock.EXPECT().KeepAlive(gomock.Any()) + require.NoError(t, fx.diffSyncer.Sync(ctx)) }) @@ -197,6 +201,8 @@ func TestDiffSyncer(t *testing.T) { SpacePush(gomock.Any(), newPushSpaceRequestMatcher(fx.spaceState.SpaceId, aclRootId, settingsId, credential, spaceHeader)). Return(nil, nil) fx.peerManagerMock.EXPECT().SendMessage(gomock.Any(), "peerId", gomock.Any()).Return(nil) + fx.peerManagerMock.EXPECT().KeepAlive(gomock.Any()) + require.NoError(t, fx.diffSyncer.Sync(ctx)) }) @@ -213,6 +219,7 @@ func TestDiffSyncer(t *testing.T) { fx.diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(remDiff)). Return(nil, nil, nil, spacesyncproto.ErrUnexpected) + fx.peerManagerMock.EXPECT().KeepAlive(gomock.Any()) require.NoError(t, fx.diffSyncer.Sync(ctx)) }) @@ -231,6 +238,7 @@ func TestDiffSyncer(t *testing.T) { fx.diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(remDiff)). Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted) + fx.peerManagerMock.EXPECT().KeepAlive(gomock.Any()) require.NoError(t, fx.diffSyncer.Sync(ctx)) }) diff --git a/commonspace/peermanager/mock_peermanager/mock_peermanager.go b/commonspace/peermanager/mock_peermanager/mock_peermanager.go index ed86bb27..4a6a9e90 100644 --- a/commonspace/peermanager/mock_peermanager/mock_peermanager.go +++ b/commonspace/peermanager/mock_peermanager/mock_peermanager.go @@ -101,6 +101,18 @@ func (mr *MockPeerManagerMockRecorder) Init(a any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockPeerManager)(nil).Init), a) } +// KeepAlive mocks base method. +func (m *MockPeerManager) KeepAlive(ctx context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "KeepAlive", ctx) +} + +// KeepAlive indicates an expected call of KeepAlive. +func (mr *MockPeerManagerMockRecorder) KeepAlive(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KeepAlive", reflect.TypeOf((*MockPeerManager)(nil).KeepAlive), ctx) +} + // Name mocks base method. func (m *MockPeerManager) Name() string { m.ctrl.T.Helper() diff --git a/commonspace/peermanager/peermanager.go b/commonspace/peermanager/peermanager.go index aaca8c94..22ce523e 100644 --- a/commonspace/peermanager/peermanager.go +++ b/commonspace/peermanager/peermanager.go @@ -24,6 +24,8 @@ type PeerManager interface { BroadcastMessage(ctx context.Context, msg drpc.Message) error // SendMessage sends message to peer SendMessage(ctx context.Context, peerId string, msg drpc.Message) error + // KeepAlive sends keepAlive messages to needed peers + KeepAlive(ctx context.Context) } type PeerManagerProvider interface {