mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 14:07:02 +09:00
Add tests for syncService
This commit is contained in:
parent
a0d22b72d5
commit
b32ea26747
2 changed files with 361 additions and 10 deletions
|
@ -15,7 +15,6 @@ import (
|
||||||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||||
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
|
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
|
||||||
"github.com/anyproto/any-sync/metric"
|
"github.com/anyproto/any-sync/metric"
|
||||||
"github.com/anyproto/any-sync/net/streampool"
|
|
||||||
"github.com/anyproto/any-sync/nodeconf"
|
"github.com/anyproto/any-sync/nodeconf"
|
||||||
"github.com/anyproto/any-sync/util/multiqueue"
|
"github.com/anyproto/any-sync/util/multiqueue"
|
||||||
"github.com/anyproto/any-sync/util/syncqueues"
|
"github.com/anyproto/any-sync/util/syncqueues"
|
||||||
|
@ -41,7 +40,6 @@ type SyncService interface {
|
||||||
type syncService struct {
|
type syncService struct {
|
||||||
receiveQueue multiqueue.MultiQueue[msgCtx]
|
receiveQueue multiqueue.MultiQueue[msgCtx]
|
||||||
manager RequestManager
|
manager RequestManager
|
||||||
streamPool streampool.StreamPool
|
|
||||||
peerManager peermanager.PeerManager
|
peerManager peermanager.PeerManager
|
||||||
nodeConf nodeconf.NodeConf
|
nodeConf nodeconf.NodeConf
|
||||||
handler syncdeps.SyncHandler
|
handler syncdeps.SyncHandler
|
||||||
|
@ -75,7 +73,6 @@ func (s *syncService) Init(a *app.App) (err error) {
|
||||||
s.handler = a.MustComponent(syncdeps.CName).(syncdeps.SyncHandler)
|
s.handler = a.MustComponent(syncdeps.CName).(syncdeps.SyncHandler)
|
||||||
s.receiveQueue = multiqueue.New[msgCtx](s.handleIncomingMessage, s.metric, syncdeps.MsgTypeIncoming, 100)
|
s.receiveQueue = multiqueue.New[msgCtx](s.handleIncomingMessage, s.metric, syncdeps.MsgTypeIncoming, 100)
|
||||||
s.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
|
s.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
|
||||||
s.streamPool = a.MustComponent(streampool.CName).(streampool.StreamPool)
|
|
||||||
s.commonMetric, _ = a.Component(metric.CName).(metric.Metric)
|
s.commonMetric, _ = a.Component(metric.CName).(metric.Metric)
|
||||||
syncQueues := a.MustComponent(syncqueues.CName).(syncqueues.SyncQueues)
|
syncQueues := a.MustComponent(syncqueues.CName).(syncqueues.SyncQueues)
|
||||||
s.manager = NewRequestManager(s.handler, s.metric, syncQueues.ActionPool(s.spaceId), syncQueues.Limit(s.spaceId))
|
s.manager = NewRequestManager(s.handler, s.metric, syncQueues.ActionPool(s.spaceId), syncQueues.Limit(s.spaceId))
|
||||||
|
@ -113,14 +110,8 @@ func (s *syncService) handleIncomingMessage(msg msgCtx) {
|
||||||
}
|
}
|
||||||
err = s.manager.QueueRequest(req)
|
err = s.manager.QueueRequest(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("failed to queue request", zap.Error(err))
|
log.Error("failed to queue testRequest", zap.Error(err))
|
||||||
}
|
}
|
||||||
//msg.StartHandlingTime = time.Now()
|
|
||||||
//ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId)
|
|
||||||
//ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId))
|
|
||||||
//s.metric.RequestLog(msg.PeerCtx, "space.streamOp", msg.LogFields(
|
|
||||||
// zap.Error(err),
|
|
||||||
//)...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncService) HandleMessage(ctx context.Context, msg drpc.Message) error {
|
func (s *syncService) HandleMessage(ctx context.Context, msg drpc.Message) error {
|
||||||
|
|
360
commonspace/sync/sync_test.go
Normal file
360
commonspace/sync/sync_test.go
Normal file
|
@ -0,0 +1,360 @@
|
||||||
|
package sync
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/anyproto/protobuf/proto"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/mock/gomock"
|
||||||
|
"storj.io/drpc"
|
||||||
|
|
||||||
|
"github.com/anyproto/any-sync/app"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/peermanager"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/peermanager/mock_peermanager"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/spacestate"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||||
|
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
|
||||||
|
"github.com/anyproto/any-sync/nodeconf"
|
||||||
|
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
|
||||||
|
"github.com/anyproto/any-sync/testutil/accounttest"
|
||||||
|
"github.com/anyproto/any-sync/testutil/anymock"
|
||||||
|
"github.com/anyproto/any-sync/util/syncqueues"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ctx = context.Background()
|
||||||
|
|
||||||
|
func TestSyncService(t *testing.T) {
|
||||||
|
t.Run("send and receive", func(t *testing.T) {
|
||||||
|
f := newFixture(t)
|
||||||
|
defer f.Close(t)
|
||||||
|
f.syncHandler.toSendData = map[string][]*testResponse{
|
||||||
|
"objectId": {
|
||||||
|
{msg: "send-msg1"},
|
||||||
|
{msg: "send-msg2"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
f.syncHandler.toReceiveData = map[string][]*testResponse{
|
||||||
|
"objectId": {
|
||||||
|
{msg: "receive-msg1"},
|
||||||
|
{msg: "receive-msg2"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
rq := &testRequest{peerId: "peerId", objectId: "objectId"}
|
||||||
|
returnRq := &testRequest{peerId: "peerId1", objectId: "objectId"}
|
||||||
|
handleStream := f.syncHandler.newSendStream(ctx, "objectId")
|
||||||
|
f.syncHandler.returnRequest = returnRq
|
||||||
|
err := f.HandleStreamRequest(ctx, rq, handleStream)
|
||||||
|
require.NoError(t, err)
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
for i, resp := range handleStream.(*testStream).toReceive {
|
||||||
|
require.Equal(t, f.syncHandler.toSendData["objectId"][i], resp)
|
||||||
|
}
|
||||||
|
for i, resp := range f.syncHandler.toReceiveData["objectId"] {
|
||||||
|
require.Equal(t, f.syncHandler.collector.responses[i].resp, resp)
|
||||||
|
require.Equal(t, f.syncHandler.collector.responses[i].peerId, returnRq.peerId)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("send", func(t *testing.T) {
|
||||||
|
f := newFixture(t)
|
||||||
|
defer f.Close(t)
|
||||||
|
f.syncHandler.toReceiveData = map[string][]*testResponse{
|
||||||
|
"objectId": {
|
||||||
|
{msg: "receive-msg1"},
|
||||||
|
{msg: "receive-msg2"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
rq := &testRequest{peerId: "peerId", objectId: "objectId"}
|
||||||
|
err := f.SendRequest(ctx, rq, f.syncHandler.collector)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for i, resp := range f.syncHandler.toReceiveData["objectId"] {
|
||||||
|
require.Equal(t, f.syncHandler.collector.responses[i].resp, resp)
|
||||||
|
require.Equal(t, f.syncHandler.collector.responses[i].peerId, rq.peerId)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("queue", func(t *testing.T) {
|
||||||
|
f := newFixture(t)
|
||||||
|
defer f.Close(t)
|
||||||
|
f.syncHandler.toReceiveData = map[string][]*testResponse{
|
||||||
|
"objectId": {
|
||||||
|
{msg: "receive-msg1"},
|
||||||
|
{msg: "receive-msg2"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
rq := &testRequest{peerId: "peerId", objectId: "objectId"}
|
||||||
|
err := f.QueueRequest(ctx, rq)
|
||||||
|
require.NoError(t, err)
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
for i, resp := range f.syncHandler.toReceiveData["objectId"] {
|
||||||
|
require.Equal(t, f.syncHandler.collector.responses[i].resp, resp)
|
||||||
|
require.Equal(t, f.syncHandler.collector.responses[i].peerId, rq.peerId)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("handle message and queue", func(t *testing.T) {
|
||||||
|
f := newFixture(t)
|
||||||
|
defer f.Close(t)
|
||||||
|
f.syncHandler.toReceiveData = map[string][]*testResponse{
|
||||||
|
"objectId": {
|
||||||
|
{msg: "receive-msg1"},
|
||||||
|
{msg: "receive-msg2"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
headUpdate := &testMessage{objectId: "objectId"}
|
||||||
|
rq := &testRequest{peerId: "peerId", objectId: "objectId"}
|
||||||
|
f.syncHandler.returnRequest = rq
|
||||||
|
err := f.HandleMessage(ctx, headUpdate)
|
||||||
|
require.NoError(t, err)
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
for i, resp := range f.syncHandler.toReceiveData["objectId"] {
|
||||||
|
require.Equal(t, f.syncHandler.collector.responses[i].resp, resp)
|
||||||
|
require.Equal(t, f.syncHandler.collector.responses[i].peerId, rq.peerId)
|
||||||
|
}
|
||||||
|
require.Equal(t, headUpdate, f.syncHandler.headUpdate)
|
||||||
|
})
|
||||||
|
t.Run("handle message", func(t *testing.T) {
|
||||||
|
f := newFixture(t)
|
||||||
|
defer f.Close(t)
|
||||||
|
f.syncHandler.toReceiveData = map[string][]*testResponse{
|
||||||
|
"objectId": {
|
||||||
|
{msg: "receive-msg1"},
|
||||||
|
{msg: "receive-msg2"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
headUpdate := &testMessage{objectId: "objectId"}
|
||||||
|
err := f.HandleMessage(ctx, headUpdate)
|
||||||
|
require.NoError(t, err)
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
require.Equal(t, headUpdate, f.syncHandler.headUpdate)
|
||||||
|
require.Empty(t, f.syncHandler.collector.responses)
|
||||||
|
})
|
||||||
|
t.Run("send EOF", func(t *testing.T) {
|
||||||
|
f := newFixture(t)
|
||||||
|
defer f.Close(t)
|
||||||
|
f.syncHandler.toReceiveData = map[string][]*testResponse{
|
||||||
|
"objectId": {},
|
||||||
|
}
|
||||||
|
rq := &testRequest{peerId: "peerId", objectId: "objectId"}
|
||||||
|
err := f.SendRequest(ctx, rq, f.syncHandler.collector)
|
||||||
|
require.Equal(t, io.EOF, err)
|
||||||
|
})
|
||||||
|
t.Run("ignore handling same requests", func(t *testing.T) {
|
||||||
|
f := newFixture(t)
|
||||||
|
defer f.Close(t)
|
||||||
|
f.syncService.manager.(*requestManager).incomingGuard.TryTake(fullId("peerId", "objectId"))
|
||||||
|
rq := &testRequest{peerId: "peerId", objectId: "objectId"}
|
||||||
|
handleStream := f.syncHandler.newSendStream(ctx, "objectId")
|
||||||
|
err := f.HandleStreamRequest(ctx, rq, handleStream)
|
||||||
|
require.Equal(t, spacesyncproto.ErrDuplicateRequest, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type fixture struct {
|
||||||
|
*syncService
|
||||||
|
a *app.App
|
||||||
|
nodeConf *mock_nodeconf.MockService
|
||||||
|
syncHandler *testSyncHandler
|
||||||
|
spaceState *spacestate.SpaceState
|
||||||
|
peerManager *mock_peermanager.MockPeerManager
|
||||||
|
syncQueues syncqueues.SyncQueues
|
||||||
|
ctrl *gomock.Controller
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fx *fixture) Close(t *testing.T) {
|
||||||
|
require.NoError(t, fx.a.Close(context.Background()))
|
||||||
|
fx.ctrl.Finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFixture(t *testing.T) *fixture {
|
||||||
|
f := &fixture{}
|
||||||
|
f.a = &app.App{}
|
||||||
|
f.ctrl = gomock.NewController(t)
|
||||||
|
f.nodeConf = mock_nodeconf.NewMockService(f.ctrl)
|
||||||
|
f.syncHandler = &testSyncHandler{
|
||||||
|
toReceiveData: map[string][]*testResponse{},
|
||||||
|
toSendData: map[string][]*testResponse{},
|
||||||
|
collector: &testResponseCollector{},
|
||||||
|
}
|
||||||
|
accService := &accounttest.AccountTestService{}
|
||||||
|
f.spaceState = &spacestate.SpaceState{SpaceId: "spaceId"}
|
||||||
|
f.peerManager = mock_peermanager.NewMockPeerManager(f.ctrl)
|
||||||
|
anymock.ExpectComp(f.peerManager.EXPECT(), peermanager.CName)
|
||||||
|
anymock.ExpectComp(f.nodeConf.EXPECT(), nodeconf.CName)
|
||||||
|
f.nodeConf.EXPECT().Configuration().Return(nodeconf.Configuration{})
|
||||||
|
f.syncQueues = syncqueues.New()
|
||||||
|
f.syncService = &syncService{}
|
||||||
|
f.a.Register(f.syncHandler).
|
||||||
|
Register(accService).
|
||||||
|
Register(f.peerManager).
|
||||||
|
Register(f.spaceState).
|
||||||
|
Register(f.nodeConf).
|
||||||
|
Register(f.syncQueues).
|
||||||
|
Register(f.syncService)
|
||||||
|
|
||||||
|
require.NoError(t, f.a.Start(context.Background()))
|
||||||
|
return f
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockEncoding struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockEncoding) Marshal(msg drpc.Message) ([]byte, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockEncoding) Unmarshal(buf []byte, msg drpc.Message) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type testSyncHandler struct {
|
||||||
|
toSendData map[string][]*testResponse
|
||||||
|
toReceiveData map[string][]*testResponse
|
||||||
|
collector *testResponseCollector
|
||||||
|
returnRequest syncdeps.Request
|
||||||
|
headUpdate *testMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testSyncHandler) newSendStream(ctx context.Context, objectId string) drpc.Stream {
|
||||||
|
return &testStream{ctx: ctx}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testSyncHandler) newReceiveStream(ctx context.Context, objectId string) drpc.Stream {
|
||||||
|
return &testStream{ctx: ctx, toSend: m.toReceiveData[objectId]}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testSyncHandler) Init(a *app.App) (err error) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testSyncHandler) Name() (name string) {
|
||||||
|
return syncdeps.CName
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testSyncHandler) HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (syncdeps.Request, error) {
|
||||||
|
m.headUpdate = headUpdate.(*testMessage)
|
||||||
|
return m.returnRequest, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testSyncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, updater syncdeps.QueueSizeUpdater, sendResponse func(resp proto.Message) error) (syncdeps.Request, error) {
|
||||||
|
for _, resp := range m.toSendData[rq.ObjectId()] {
|
||||||
|
if err := sendResponse(resp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return m.returnRequest, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testSyncHandler) HandleDeprecatedObjectSync(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testSyncHandler) ApplyRequest(ctx context.Context, rq syncdeps.Request, requestSender syncdeps.RequestSender) error {
|
||||||
|
return requestSender.SendRequest(ctx, rq, m.collector)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testSyncHandler) SendStreamRequest(ctx context.Context, rq syncdeps.Request, receive func(stream drpc.Stream) error) (err error) {
|
||||||
|
return receive(m.newReceiveStream(ctx, rq.ObjectId()))
|
||||||
|
}
|
||||||
|
|
||||||
|
type testStream struct {
|
||||||
|
ctx context.Context
|
||||||
|
toSend []*testResponse
|
||||||
|
toReceive []*testResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testStream) Context() context.Context {
|
||||||
|
return t.ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testStream) MsgSend(msg drpc.Message, enc drpc.Encoding) error {
|
||||||
|
t.toReceive = append(t.toReceive, msg.(*testResponse))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testStream) MsgRecv(msg drpc.Message, enc drpc.Encoding) error {
|
||||||
|
if len(t.toSend) == 0 {
|
||||||
|
return io.EOF
|
||||||
|
}
|
||||||
|
msg.(*testResponse).msg = t.toSend[0].msg
|
||||||
|
t.toSend = t.toSend[1:]
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testStream) CloseSend() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testStream) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type collectedResponse struct {
|
||||||
|
peerId string
|
||||||
|
objectId string
|
||||||
|
resp syncdeps.Response
|
||||||
|
}
|
||||||
|
|
||||||
|
type testResponseCollector struct {
|
||||||
|
responses []collectedResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testResponseCollector) NewResponse() syncdeps.Response {
|
||||||
|
return &testResponse{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testResponseCollector) CollectResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error {
|
||||||
|
t.responses = append(t.responses, collectedResponse{peerId: peerId, objectId: objectId, resp: resp})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type testResponse struct {
|
||||||
|
msg string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testResponse) Reset() {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testResponse) String() string {
|
||||||
|
return t.msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testResponse) ProtoMessage() {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testResponse) MsgSize() uint64 {
|
||||||
|
return uint64(len(t.msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
type testRequest struct {
|
||||||
|
peerId string
|
||||||
|
objectId string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *testRequest) PeerId() string {
|
||||||
|
return r.peerId
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *testRequest) ObjectId() string {
|
||||||
|
return r.objectId
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *testRequest) Proto() (proto.Message, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *testRequest) MsgSize() uint64 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
type testMessage struct {
|
||||||
|
objectId string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testMessage) ObjectId() string {
|
||||||
|
return t.objectId
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testMessage) MsgSize() uint64 {
|
||||||
|
return uint64(len(t.objectId))
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue