1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

stream keep alive

This commit is contained in:
Sergey Cherepanov 2024-11-15 17:45:35 +01:00
parent 91bf2659e9
commit 944c9cf10a
No known key found for this signature in database
GPG key ID: 87F8EDE8FBDF637C
5 changed files with 28 additions and 4 deletions

View file

@ -137,13 +137,13 @@ func (m *MockDeleter) EXPECT() *MockDeleterMockRecorder {
}
// Delete mocks base method.
func (m *MockDeleter) Delete(arg0 context.Context) {
func (m *MockDeleter) Delete(ctx context.Context) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Delete", arg0)
m.ctrl.Call(m, "Delete", ctx)
}
// Delete indicates an expected call of Delete.
func (mr *MockDeleterMockRecorder) Delete(arg0 any) *gomock.Call {
func (mr *MockDeleterMockRecorder) Delete(ctx any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockDeleter)(nil).Delete), arg0)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockDeleter)(nil).Delete), ctx)
}

View file

@ -110,6 +110,8 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
}
}
d.log.DebugCtx(ctx, "diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
d.peerManager.KeepAlive(ctx)
return nil
}

View file

@ -86,6 +86,8 @@ func TestDiffSyncer(t *testing.T) {
fx.deletionStateMock.EXPECT().Filter([]string{"changed"}).Return([]string{"changed"}).Times(1)
fx.deletionStateMock.EXPECT().Filter(nil).Return(nil).Times(1)
fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer, []string{"changed"}, []string{"new"}).Return(nil)
fx.peerManagerMock.EXPECT().KeepAlive(gomock.Any())
require.NoError(t, fx.diffSyncer.Sync(ctx))
})
@ -108,6 +110,8 @@ func TestDiffSyncer(t *testing.T) {
fx.deletionStateMock.EXPECT().Filter(nil).Return(nil).Times(1)
fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer, []string{"changed"}, []string{"new"}).Return(nil)
fx.aclMock.EXPECT().SyncWithPeer(gomock.Any(), mPeer).Return(nil)
fx.peerManagerMock.EXPECT().KeepAlive(gomock.Any())
require.NoError(t, fx.diffSyncer.Sync(ctx))
})
@ -197,6 +201,8 @@ func TestDiffSyncer(t *testing.T) {
SpacePush(gomock.Any(), newPushSpaceRequestMatcher(fx.spaceState.SpaceId, aclRootId, settingsId, credential, spaceHeader)).
Return(nil, nil)
fx.peerManagerMock.EXPECT().SendMessage(gomock.Any(), "peerId", gomock.Any()).Return(nil)
fx.peerManagerMock.EXPECT().KeepAlive(gomock.Any())
require.NoError(t, fx.diffSyncer.Sync(ctx))
})
@ -213,6 +219,7 @@ func TestDiffSyncer(t *testing.T) {
fx.diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(remDiff)).
Return(nil, nil, nil, spacesyncproto.ErrUnexpected)
fx.peerManagerMock.EXPECT().KeepAlive(gomock.Any())
require.NoError(t, fx.diffSyncer.Sync(ctx))
})
@ -231,6 +238,7 @@ func TestDiffSyncer(t *testing.T) {
fx.diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(remDiff)).
Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted)
fx.peerManagerMock.EXPECT().KeepAlive(gomock.Any())
require.NoError(t, fx.diffSyncer.Sync(ctx))
})

View file

@ -101,6 +101,18 @@ func (mr *MockPeerManagerMockRecorder) Init(a any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockPeerManager)(nil).Init), a)
}
// KeepAlive mocks base method.
func (m *MockPeerManager) KeepAlive(ctx context.Context) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "KeepAlive", ctx)
}
// KeepAlive indicates an expected call of KeepAlive.
func (mr *MockPeerManagerMockRecorder) KeepAlive(ctx any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KeepAlive", reflect.TypeOf((*MockPeerManager)(nil).KeepAlive), ctx)
}
// Name mocks base method.
func (m *MockPeerManager) Name() string {
m.ctrl.T.Helper()

View file

@ -24,6 +24,8 @@ type PeerManager interface {
BroadcastMessage(ctx context.Context, msg drpc.Message) error
// SendMessage sends message to peer
SendMessage(ctx context.Context, peerId string, msg drpc.Message) error
// KeepAlive sends keepAlive messages to needed peers
KeepAlive(ctx context.Context)
}
type PeerManagerProvider interface {