From fb18c5470254d5cd157eb98ff8ac955f3f5ace19 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sun, 2 Jun 2024 14:55:11 +0200 Subject: [PATCH] WIP test sync protocol --- commonspace/sync/deps.go | 10 -- commonspace/sync/headupdate.go | 4 + commonspace/sync/headupdatesender.go | 12 -- commonspace/sync/requestmanager.go | 52 +++------ commonspace/sync/requestpool.go | 16 ++- commonspace/sync/sync.go | 52 ++++++--- commonspace/sync/sync_test.go | 27 +++++ .../sync/{ => syncdeps}/headupdatehandler.go | 2 +- commonspace/sync/syncdeps/request.go | 9 ++ commonspace/sync/syncdeps/requesthandler.go | 11 ++ commonspace/sync/syncdeps/requestsender.go | 11 ++ commonspace/sync/syncdeps/response.go | 7 ++ commonspace/sync/syncdeps/responsehandler.go | 8 ++ commonspace/sync/syncdeps/syncdeps.go | 19 ++++ commonspace/sync/syncdeps/syncdepsfactory.go | 10 ++ commonspace/sync/synctest/counter.go | 92 +++++++++++++++ commonspace/sync/synctest/countergenerator.go | 61 ++++++++++ commonspace/sync/synctest/counterrequest.go | 34 ++++++ .../sync/synctest/counterrequesthandler.go | 29 +++++ .../sync/synctest/counterrequestsender.go | 30 +++++ .../sync/synctest/counterresponsehandler.go | 22 ++++ commonspace/sync/synctest/counterupdate.go | 33 ++++++ .../sync/synctest/counterupdatehandler.go | 27 +++++ commonspace/sync/synctest/rpcserver.go | 22 +++- commonspace/sync/synctest/streamopener.go | 39 +++++++ commonspace/sync/synctest/syncdepsfactory.go | 48 ++++++++ .../sync/synctestproto/protos/synctest.proto | 2 +- commonspace/sync/synctestproto/synctest.pb.go | 14 +-- .../sync/synctestproto/synctest_drpc.pb.go | 35 ++++-- net/streampool/stream.go | 4 +- net/streampool/streampool.go | 79 +++++++++---- net/streampool/streampoolservice.go | 6 +- util/periodicsync/periodicsync.go | 47 ++++---- util/slice/slice.go | 22 ++++ util/slice/slice_test.go | 105 ++++++++++++++++++ 35 files changed, 849 insertions(+), 152 deletions(-) delete mode 100644 commonspace/sync/deps.go delete mode 100644 commonspace/sync/headupdatesender.go rename commonspace/sync/{ => syncdeps}/headupdatehandler.go (90%) create mode 100644 commonspace/sync/syncdeps/request.go create mode 100644 commonspace/sync/syncdeps/requesthandler.go create mode 100644 commonspace/sync/syncdeps/requestsender.go create mode 100644 commonspace/sync/syncdeps/response.go create mode 100644 commonspace/sync/syncdeps/responsehandler.go create mode 100644 commonspace/sync/syncdeps/syncdeps.go create mode 100644 commonspace/sync/syncdeps/syncdepsfactory.go create mode 100644 commonspace/sync/synctest/counter.go create mode 100644 commonspace/sync/synctest/countergenerator.go create mode 100644 commonspace/sync/synctest/counterrequest.go create mode 100644 commonspace/sync/synctest/counterrequesthandler.go create mode 100644 commonspace/sync/synctest/counterrequestsender.go create mode 100644 commonspace/sync/synctest/counterresponsehandler.go create mode 100644 commonspace/sync/synctest/counterupdate.go create mode 100644 commonspace/sync/synctest/counterupdatehandler.go create mode 100644 commonspace/sync/synctest/streamopener.go create mode 100644 commonspace/sync/synctest/syncdepsfactory.go create mode 100644 util/slice/slice_test.go diff --git a/commonspace/sync/deps.go b/commonspace/sync/deps.go deleted file mode 100644 index e6fa838b..00000000 --- a/commonspace/sync/deps.go +++ /dev/null @@ -1,10 +0,0 @@ -package sync - -type SyncDeps struct { - HeadUpdateHandler HeadUpdateHandler - HeadUpdateSender HeadUpdateSender - ResponseHandler ResponseHandler - RequestHandler RequestHandler - RequestSender RequestSender - MergeFilter MergeFilterFunc -} diff --git a/commonspace/sync/headupdate.go b/commonspace/sync/headupdate.go index d1bbdbe5..93362e34 100644 --- a/commonspace/sync/headupdate.go +++ b/commonspace/sync/headupdate.go @@ -10,6 +10,10 @@ import ( "github.com/anyproto/any-sync/commonspace/spacesyncproto" ) +type BroadcastOptions struct { + EmptyPeers []string +} + type HeadUpdate struct { peerId string objectId string diff --git a/commonspace/sync/headupdatesender.go b/commonspace/sync/headupdatesender.go deleted file mode 100644 index 252b463b..00000000 --- a/commonspace/sync/headupdatesender.go +++ /dev/null @@ -1,12 +0,0 @@ -package sync - -import "context" - -type BroadcastOptions struct { - EmptyPeers []string -} - -type HeadUpdateSender interface { - SendHeadUpdate(ctx context.Context, peerId string, headUpdate *HeadUpdate) error - BroadcastHeadUpdate(ctx context.Context, opts BroadcastOptions, headUpdate *HeadUpdate) error -} diff --git a/commonspace/sync/requestmanager.go b/commonspace/sync/requestmanager.go index 530f6899..aea87e9b 100644 --- a/commonspace/sync/requestmanager.go +++ b/commonspace/sync/requestmanager.go @@ -1,34 +1,20 @@ package sync import ( + "context" "strings" "github.com/gogo/protobuf/proto" "go.uber.org/zap" "storj.io/drpc" + "github.com/anyproto/any-sync/commonspace/sync/syncdeps" "github.com/anyproto/any-sync/net/streampool" ) -type Request interface { - PeerId() string - ObjectId() string -} - -type Response interface { - // heads []string - // changes []*treechangeproto.RawTreeChangeWithId - // root *treechangeproto.RawTreeChangeWithId -} - type RequestManager interface { - QueueRequest(rq Request) error - HandleStreamRequest(rq Request, stream drpc.Stream) error -} - -type RequestHandler interface { - HandleRequest(rq Request) (Request, error) - HandleStreamRequest(rq Request, send func(resp proto.Message) error) (Request, error) + QueueRequest(rq syncdeps.Request) error + HandleStreamRequest(ctx context.Context, rq syncdeps.Request, stream drpc.Stream) error } type StreamResponse struct { @@ -36,24 +22,14 @@ type StreamResponse struct { Connection drpc.Conn } -type RequestSender interface { - SendRequest(rq Request) (resp Response, err error) - SendStreamRequest(rq Request, receive func(stream drpc.Stream) error) (err error) -} - -type ResponseHandler interface { - NewResponse() Response - HandleResponse(peerId, objectId string, resp Response) error -} - type requestManager struct { requestPool RequestPool - requestHandler RequestHandler - responseHandler ResponseHandler - requestSender RequestSender + requestHandler syncdeps.RequestHandler + responseHandler syncdeps.ResponseHandler + requestSender syncdeps.RequestSender } -func NewRequestManager(deps SyncDeps) RequestManager { +func NewRequestManager(deps syncdeps.SyncDeps) RequestManager { return &requestManager{ requestPool: NewRequestPool(), requestHandler: deps.RequestHandler, @@ -62,16 +38,16 @@ func NewRequestManager(deps SyncDeps) RequestManager { } } -func (r *requestManager) QueueRequest(rq Request) error { - return r.requestPool.QueueRequestAction(rq.PeerId(), rq.ObjectId(), func() { - err := r.requestSender.SendStreamRequest(rq, func(stream drpc.Stream) error { +func (r *requestManager) QueueRequest(rq syncdeps.Request) error { + return r.requestPool.QueueRequestAction(rq.PeerId(), rq.ObjectId(), func(ctx context.Context) { + err := r.requestSender.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error { for { resp := r.responseHandler.NewResponse() err := stream.MsgRecv(resp, streampool.EncodingProto) if err != nil { return err } - err = r.responseHandler.HandleResponse(rq.PeerId(), rq.ObjectId(), resp) + err = r.responseHandler.HandleResponse(ctx, rq.PeerId(), rq.ObjectId(), resp) if err != nil { return err } @@ -83,12 +59,12 @@ func (r *requestManager) QueueRequest(rq Request) error { }) } -func (r *requestManager) HandleStreamRequest(rq Request, stream drpc.Stream) error { +func (r *requestManager) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, stream drpc.Stream) error { if !r.requestPool.TryTake(rq.PeerId(), rq.ObjectId()) { return nil } defer r.requestPool.Release(rq.PeerId(), rq.ObjectId()) - newRq, err := r.requestHandler.HandleStreamRequest(rq, func(resp proto.Message) error { + newRq, err := r.requestHandler.HandleStreamRequest(ctx, rq, func(resp proto.Message) error { return stream.MsgSend(resp, streampool.EncodingProto) }) if err != nil { diff --git a/commonspace/sync/requestpool.go b/commonspace/sync/requestpool.go index 70d0be05..466614e1 100644 --- a/commonspace/sync/requestpool.go +++ b/commonspace/sync/requestpool.go @@ -1,26 +1,32 @@ package sync import ( + "context" "sync" ) type RequestPool interface { TryTake(peerId, objectId string) bool Release(peerId, objectId string) - QueueRequestAction(peerId, objectId string, action func()) (err error) + QueueRequestAction(peerId, objectId string, action func(ctx context.Context)) (err error) } type requestPool struct { mu sync.Mutex taken map[string]struct{} pools map[string]*tryAddQueue + ctx context.Context + cancel context.CancelFunc isClosed bool } func NewRequestPool() RequestPool { + ctx, cancel := context.WithCancel(context.Background()) return &requestPool{ - taken: make(map[string]struct{}), - pools: make(map[string]*tryAddQueue), + ctx: ctx, + cancel: cancel, + taken: make(map[string]struct{}), + pools: make(map[string]*tryAddQueue), } } @@ -47,7 +53,7 @@ func (rp *requestPool) Release(peerId, objectId string) { delete(rp.taken, id) } -func (rp *requestPool) QueueRequestAction(peerId, objectId string, action func()) (err error) { +func (rp *requestPool) QueueRequestAction(peerId, objectId string, action func(ctx context.Context)) (err error) { rp.mu.Lock() if rp.isClosed { rp.mu.Unlock() @@ -70,7 +76,7 @@ func (rp *requestPool) QueueRequestAction(peerId, objectId string, action func() pool.TryAdd(objectId, wrappedAction, func() {}) return } - action() + action(rp.ctx) rp.Release(peerId, objectId) } pool.Replace(objectId, wrappedAction, func() {}) diff --git a/commonspace/sync/sync.go b/commonspace/sync/sync.go index d01b00fb..653fc405 100644 --- a/commonspace/sync/sync.go +++ b/commonspace/sync/sync.go @@ -7,7 +7,9 @@ import ( "go.uber.org/zap" "storj.io/drpc" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/sync/syncdeps" "github.com/anyproto/any-sync/util/multiqueue" ) @@ -16,32 +18,43 @@ const CName = "common.commonspace.sync" var log = logger.NewNamed("sync") type SyncService interface { - GetQueueProvider() multiqueue.QueueProvider[drpc.Message] + app.Component + GetQueue(peerId string) *multiqueue.Queue[drpc.Message] + HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error + HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error + QueueRequest(ctx context.Context, rq syncdeps.Request) error } -type MergeFilterFunc func(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error - type syncService struct { sendQueueProvider multiqueue.QueueProvider[drpc.Message] receiveQueue multiqueue.MultiQueue[drpc.Message] manager RequestManager - handler HeadUpdateHandler - sender HeadUpdateSender - mergeFilter MergeFilterFunc + handler syncdeps.HeadUpdateHandler + mergeFilter syncdeps.MergeFilterFunc + newMessage func() drpc.Message ctx context.Context cancel context.CancelFunc } -func NewSyncService(deps SyncDeps) SyncService { - s := &syncService{} - s.ctx, s.cancel = context.WithCancel(context.Background()) +func (s *syncService) Init(a *app.App) (err error) { + factory := a.MustComponent(syncdeps.CName).(syncdeps.SyncDepsFactory) s.sendQueueProvider = multiqueue.NewQueueProvider[drpc.Message](100, s.handleOutgoingMessage) s.receiveQueue = multiqueue.New[drpc.Message](s.handleIncomingMessage, 100) - s.sender = deps.HeadUpdateSender + deps := factory.SyncDeps() s.handler = deps.HeadUpdateHandler s.mergeFilter = deps.MergeFilter + s.newMessage = deps.ReadMessageConstructor s.manager = NewRequestManager(deps) - return s + s.ctx, s.cancel = context.WithCancel(context.Background()) + return nil +} + +func (s *syncService) Name() (name string) { + return CName +} + +func NewSyncService() SyncService { + return &syncService{} } func (s *syncService) handleOutgoingMessage(id string, msg drpc.Message, q *mb.MB[drpc.Message]) error { @@ -62,18 +75,23 @@ func (s *syncService) handleIncomingMessage(msg drpc.Message) { } } -func (s *syncService) GetQueueProvider() multiqueue.QueueProvider[drpc.Message] { - return s.sendQueueProvider +func (s *syncService) GetQueue(peerId string) *multiqueue.Queue[drpc.Message] { + queue := s.sendQueueProvider.GetQueue(peerId) + return queue +} + +func (s *syncService) NewReadMessage() drpc.Message { + return s.newMessage() } func (s *syncService) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error { return s.receiveQueue.Add(ctx, peerId, msg) } -func (s *syncService) HandleStreamRequest(ctx context.Context, req Request, stream drpc.Stream) error { - return s.manager.HandleStreamRequest(req, stream) +func (s *syncService) QueueRequest(ctx context.Context, rq syncdeps.Request) error { + return s.manager.QueueRequest(rq) } -func (s *syncService) NewReadMessage() drpc.Message { - return &HeadUpdate{} +func (s *syncService) HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error { + return s.manager.HandleStreamRequest(ctx, req, stream) } diff --git a/commonspace/sync/sync_test.go b/commonspace/sync/sync_test.go index 0d2aebe7..457135b4 100644 --- a/commonspace/sync/sync_test.go +++ b/commonspace/sync/sync_test.go @@ -11,6 +11,7 @@ import ( "github.com/anyproto/any-sync/commonspace/sync/synctest" "github.com/anyproto/any-sync/commonspace/sync/synctestproto" "github.com/anyproto/any-sync/net/rpc/rpctest" + "github.com/anyproto/any-sync/net/streampool" ) var ctx = context.Background() @@ -25,6 +26,7 @@ func TestNewSyncService(t *testing.T) { Register(rpctest.NewTestServer()). Register(synctest.NewRpcServer()). Register(synctest.NewPeerProvider("first")) + //Register(synctest.NewCounterStreamOpener()) secondApp.Register(connProvider). Register(rpctest.NewTestServer()). Register(synctest.NewRpcServer()). @@ -49,3 +51,28 @@ func TestNewSyncService(t *testing.T) { }) require.NoError(t, err) } + +type counterFixture struct { + a *app.App +} + +type counterFixtureParams struct { + connProvider *synctest.ConnProvider + start int32 + delta int32 +} + +func newFixture(t *testing.T, peerId string, params counterFixtureParams) *counterFixture { + a := &app.App{} + a.Register(params.connProvider). + Register(rpctest.NewTestServer()). + Register(synctest.NewCounterStreamOpener()). + Register(synctest.NewPeerProvider(peerId)). + Register(synctest.NewCounter(params.start, params.delta)). + Register(streampool.NewStreamPool()). + Register(synctest.NewCounterSyncDepsFactory()). + Register(NewSyncService()). + //Register(). + Register(synctest.NewRpcServer()) + return nil +} diff --git a/commonspace/sync/headupdatehandler.go b/commonspace/sync/syncdeps/headupdatehandler.go similarity index 90% rename from commonspace/sync/headupdatehandler.go rename to commonspace/sync/syncdeps/headupdatehandler.go index 9064cb36..6598139d 100644 --- a/commonspace/sync/headupdatehandler.go +++ b/commonspace/sync/syncdeps/headupdatehandler.go @@ -1,4 +1,4 @@ -package sync +package syncdeps import ( "context" diff --git a/commonspace/sync/syncdeps/request.go b/commonspace/sync/syncdeps/request.go new file mode 100644 index 00000000..19615dc7 --- /dev/null +++ b/commonspace/sync/syncdeps/request.go @@ -0,0 +1,9 @@ +package syncdeps + +import "github.com/gogo/protobuf/proto" + +type Request interface { + PeerId() string + ObjectId() string + Proto() proto.Message +} diff --git a/commonspace/sync/syncdeps/requesthandler.go b/commonspace/sync/syncdeps/requesthandler.go new file mode 100644 index 00000000..04137cfb --- /dev/null +++ b/commonspace/sync/syncdeps/requesthandler.go @@ -0,0 +1,11 @@ +package syncdeps + +import ( + "context" + + "github.com/gogo/protobuf/proto" +) + +type RequestHandler interface { + HandleStreamRequest(ctx context.Context, rq Request, send func(resp proto.Message) error) (Request, error) +} diff --git a/commonspace/sync/syncdeps/requestsender.go b/commonspace/sync/syncdeps/requestsender.go new file mode 100644 index 00000000..9acd19dd --- /dev/null +++ b/commonspace/sync/syncdeps/requestsender.go @@ -0,0 +1,11 @@ +package syncdeps + +import ( + "context" + + "storj.io/drpc" +) + +type RequestSender interface { + SendStreamRequest(ctx context.Context, rq Request, receive func(stream drpc.Stream) error) (err error) +} diff --git a/commonspace/sync/syncdeps/response.go b/commonspace/sync/syncdeps/response.go new file mode 100644 index 00000000..ec9f101d --- /dev/null +++ b/commonspace/sync/syncdeps/response.go @@ -0,0 +1,7 @@ +package syncdeps + +type Response interface { + // heads []string + // changes []*treechangeproto.RawTreeChangeWithId + // root *treechangeproto.RawTreeChangeWithId +} diff --git a/commonspace/sync/syncdeps/responsehandler.go b/commonspace/sync/syncdeps/responsehandler.go new file mode 100644 index 00000000..78400f85 --- /dev/null +++ b/commonspace/sync/syncdeps/responsehandler.go @@ -0,0 +1,8 @@ +package syncdeps + +import "context" + +type ResponseHandler interface { + NewResponse() Response + HandleResponse(ctx context.Context, peerId, objectId string, resp Response) error +} diff --git a/commonspace/sync/syncdeps/syncdeps.go b/commonspace/sync/syncdeps/syncdeps.go new file mode 100644 index 00000000..829d8490 --- /dev/null +++ b/commonspace/sync/syncdeps/syncdeps.go @@ -0,0 +1,19 @@ +package syncdeps + +import ( + "context" + + "github.com/cheggaaa/mb/v3" + "storj.io/drpc" +) + +type MergeFilterFunc func(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error + +type SyncDeps struct { + HeadUpdateHandler HeadUpdateHandler + ResponseHandler ResponseHandler + RequestHandler RequestHandler + RequestSender RequestSender + MergeFilter MergeFilterFunc + ReadMessageConstructor func() drpc.Message +} diff --git a/commonspace/sync/syncdeps/syncdepsfactory.go b/commonspace/sync/syncdeps/syncdepsfactory.go new file mode 100644 index 00000000..e10a70e5 --- /dev/null +++ b/commonspace/sync/syncdeps/syncdepsfactory.go @@ -0,0 +1,10 @@ +package syncdeps + +import "github.com/anyproto/any-sync/app" + +const CName = "common.sync.syncdeps" + +type SyncDepsFactory interface { + app.Component + SyncDeps() SyncDeps +} diff --git a/commonspace/sync/synctest/counter.go b/commonspace/sync/synctest/counter.go new file mode 100644 index 00000000..2a108b93 --- /dev/null +++ b/commonspace/sync/synctest/counter.go @@ -0,0 +1,92 @@ +package synctest + +import ( + "sync" + + "golang.org/x/exp/slices" + + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/util/slice" +) + +const CounterName = "counter" + +type Counter struct { + sync.Mutex + counters map[int32]struct{} + next, delta int32 + maxVal int32 +} + +func (c *Counter) Init(a *app.App) (err error) { + return nil +} + +func (c *Counter) Name() (name string) { + return CounterName +} + +func (c *Counter) Generate() (ret int32) { + c.Lock() + defer c.Unlock() + ret = c.next + c.next += c.delta + c.counters[ret] = struct{}{} + if ret > c.maxVal { + c.maxVal = ret + } + return ret +} + +func (c *Counter) CheckComplete() bool { + c.Lock() + defer c.Unlock() + return c.maxVal <= int32(len(c.counters)) +} + +func (c *Counter) Add(val int32) { + c.Lock() + defer c.Unlock() + if val > c.maxVal { + c.maxVal = val + } + c.counters[val] = struct{}{} +} + +func (c *Counter) Dump() (ret []int32) { + c.Lock() + defer c.Unlock() + for val := range c.counters { + ret = append(ret, val) + } + slices.Sort(ret) + return +} + +func (c *Counter) DiffCurrentNew(vals []int32) (toSend, toAsk []int32) { + c.Lock() + defer c.Unlock() + m := make(map[int32]struct{}) + for _, val := range vals { + m[val] = struct{}{} + } + _, toSend, toAsk = slice.CompareMaps(m, c.counters) + return +} + +func (c *Counter) KnownCounters() (ret []int32) { + c.Lock() + defer c.Unlock() + for val := range c.counters { + ret = append(ret, val) + } + return +} + +func NewCounter(cur, delta int32) *Counter { + return &Counter{ + counters: make(map[int32]struct{}), + next: cur, + delta: delta, + } +} diff --git a/commonspace/sync/synctest/countergenerator.go b/commonspace/sync/synctest/countergenerator.go new file mode 100644 index 00000000..9a6e4533 --- /dev/null +++ b/commonspace/sync/synctest/countergenerator.go @@ -0,0 +1,61 @@ +package synctest + +import ( + "context" + "fmt" + "math/rand/v2" + "time" + + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/sync/syncdeps" + "github.com/anyproto/any-sync/commonspace/sync/synctestproto" + "github.com/anyproto/any-sync/net/streampool" + "github.com/anyproto/any-sync/util/periodicsync" +) + +var log = logger.NewNamed(syncdeps.CName) + +const CounterGeneratorName = "countergenerator" + +type CounterGenerator struct { + counter *Counter + streamPool streampool.StreamPool + loop periodicsync.PeriodicSync + ownId string +} + +func (c *CounterGenerator) Init(a *app.App) (err error) { + c.counter = a.MustComponent(CounterName).(*Counter) + c.ownId = a.MustComponent(PeerName).(*PeerProvider).myPeer + c.streamPool = a.MustComponent(streampool.CName).(streampool.StreamPool) + c.loop = periodicsync.NewPeriodicSyncDuration(time.Millisecond*100, time.Millisecond*100, c.update, log) + return +} + +func (c *CounterGenerator) Name() (name string) { + return CounterGeneratorName +} + +func (c *CounterGenerator) update(ctx context.Context) error { + res := c.counter.Generate() + randChoice := rand.Int()%2 == 0 + if randChoice { + fmt.Println("Broadcast", res, "by", c.ownId) + return c.streamPool.Broadcast(ctx, &synctestproto.CounterIncrease{ + Value: res, + ObjectId: "counter", + }) + } + return nil +} + +func (c *CounterGenerator) Run(ctx context.Context) (err error) { + c.loop.Run() + return nil +} + +func (c *CounterGenerator) Close(ctx context.Context) (err error) { + c.loop.Close() + return nil +} diff --git a/commonspace/sync/synctest/counterrequest.go b/commonspace/sync/synctest/counterrequest.go new file mode 100644 index 00000000..2a8a747c --- /dev/null +++ b/commonspace/sync/synctest/counterrequest.go @@ -0,0 +1,34 @@ +package synctest + +import ( + "github.com/gogo/protobuf/proto" + + "github.com/anyproto/any-sync/commonspace/sync/synctestproto" +) + +type CounterRequest struct { + peerId string + *synctestproto.CounterRequest +} + +func (c CounterRequest) Proto() proto.Message { + return c.CounterRequest +} + +func NewCounterRequest(peerId, objectId string, counters []int32) CounterRequest { + return CounterRequest{ + peerId: peerId, + CounterRequest: &synctestproto.CounterRequest{ + ExistingValues: counters, + ObjectId: objectId, + }, + } +} + +func (c CounterRequest) PeerId() string { + return c.peerId +} + +func (c CounterRequest) ObjectId() string { + return c.CounterRequest.ObjectId +} diff --git a/commonspace/sync/synctest/counterrequesthandler.go b/commonspace/sync/synctest/counterrequesthandler.go new file mode 100644 index 00000000..46174122 --- /dev/null +++ b/commonspace/sync/synctest/counterrequesthandler.go @@ -0,0 +1,29 @@ +package synctest + +import ( + "context" + + "github.com/gogo/protobuf/proto" + + "github.com/anyproto/any-sync/commonspace/sync/syncdeps" + "github.com/anyproto/any-sync/commonspace/sync/synctestproto" +) + +type CounterRequestHandler struct { + counter *Counter +} + +func (c *CounterRequestHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, send func(resp proto.Message) error) (syncdeps.Request, error) { + counterRequest := rq.(*CounterRequest) + toSend, toAsk := c.counter.DiffCurrentNew(counterRequest.ExistingValues) + for _, value := range toSend { + _ = send(&synctestproto.CounterIncrease{ + Value: value, + ObjectId: counterRequest.ObjectId(), + }) + } + if len(toAsk) == 0 { + return nil, nil + } + return NewCounterRequest(counterRequest.PeerId(), counterRequest.ObjectId(), toAsk), nil +} diff --git a/commonspace/sync/synctest/counterrequestsender.go b/commonspace/sync/synctest/counterrequestsender.go new file mode 100644 index 00000000..ee96a185 --- /dev/null +++ b/commonspace/sync/synctest/counterrequestsender.go @@ -0,0 +1,30 @@ +package synctest + +import ( + "context" + + "storj.io/drpc" + + "github.com/anyproto/any-sync/commonspace/sync/syncdeps" + "github.com/anyproto/any-sync/commonspace/sync/synctestproto" +) + +type CounterRequestSender struct { + peerProvider *PeerProvider +} + +func (c *CounterRequestSender) SendStreamRequest(ctx context.Context, rq syncdeps.Request, receive func(stream drpc.Stream) error) (err error) { + peerId := rq.PeerId() + pr, err := c.peerProvider.GetPeer(peerId) + if err != nil { + return err + } + return pr.DoDrpc(ctx, func(conn drpc.Conn) error { + cl := synctestproto.NewDRPCCounterSyncClient(conn) + stream, err := cl.CounterStreamRequest(ctx, rq.Proto().(*synctestproto.CounterRequest)) + if err != nil { + return err + } + return receive(stream) + }) +} diff --git a/commonspace/sync/synctest/counterresponsehandler.go b/commonspace/sync/synctest/counterresponsehandler.go new file mode 100644 index 00000000..b89b19d2 --- /dev/null +++ b/commonspace/sync/synctest/counterresponsehandler.go @@ -0,0 +1,22 @@ +package synctest + +import ( + "context" + + "github.com/anyproto/any-sync/commonspace/sync/syncdeps" + "github.com/anyproto/any-sync/commonspace/sync/synctestproto" +) + +type CounterResponseHandler struct { + counter *Counter +} + +func (c *CounterResponseHandler) NewResponse() syncdeps.Response { + return &synctestproto.CounterIncrease{} +} + +func (c *CounterResponseHandler) HandleResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error { + counterResp := resp.(*synctestproto.CounterIncrease) + c.counter.Add(counterResp.Value) + return nil +} diff --git a/commonspace/sync/synctest/counterupdate.go b/commonspace/sync/synctest/counterupdate.go new file mode 100644 index 00000000..177177cb --- /dev/null +++ b/commonspace/sync/synctest/counterupdate.go @@ -0,0 +1,33 @@ +package synctest + +import ( + "github.com/gogo/protobuf/proto" + + "github.com/anyproto/any-sync/commonspace/sync/synctestproto" +) + +type CounterUpdate struct { + counter int32 + objectId string +} + +func (c *CounterUpdate) message() proto.Message { + return &synctestproto.CounterIncrease{ + Value: c.counter, + ObjectId: c.objectId, + } +} + +func (c *CounterUpdate) SetProtoMessage(message proto.Message) error { + msg := message.(*synctestproto.CounterIncrease) + c.counter = msg.Value + c.objectId = msg.ObjectId + return nil +} + +func (c *CounterUpdate) ProtoMessage() (proto.Message, error) { + if c.objectId == "" { + return &synctestproto.CounterIncrease{}, nil + } + return c.message(), nil +} diff --git a/commonspace/sync/synctest/counterupdatehandler.go b/commonspace/sync/synctest/counterupdatehandler.go new file mode 100644 index 00000000..f2de757e --- /dev/null +++ b/commonspace/sync/synctest/counterupdatehandler.go @@ -0,0 +1,27 @@ +package synctest + +import ( + "context" + + "storj.io/drpc" + + "github.com/anyproto/any-sync/commonspace/sync/syncdeps" + "github.com/anyproto/any-sync/net/peer" +) + +type CounterUpdateHandler struct { + counter *Counter +} + +func (c *CounterUpdateHandler) HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (syncdeps.Request, error) { + update := headUpdate.(CounterUpdate) + c.counter.Add(update.counter) + if c.counter.CheckComplete() { + return nil, nil + } + peerId, err := peer.CtxPeerId(ctx) + if err != nil { + return nil, err + } + return NewCounterRequest(peerId, update.objectId, c.counter.KnownCounters()), nil +} diff --git a/commonspace/sync/synctest/rpcserver.go b/commonspace/sync/synctest/rpcserver.go index 24c0542c..c2d63e02 100644 --- a/commonspace/sync/synctest/rpcserver.go +++ b/commonspace/sync/synctest/rpcserver.go @@ -1,10 +1,17 @@ package synctest import ( + "context" "fmt" + "storj.io/drpc" + + "github.com/anyproto/any-sync/commonspace/sync/syncdeps" + "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/rpc/rpctest" "github.com/anyproto/any-sync/net/rpc/server" + "github.com/anyproto/any-sync/net/streampool" + "github.com/anyproto/any-sync/util/multiqueue" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/sync/synctestproto" @@ -12,7 +19,17 @@ import ( const RpcName = "rpcserver" +type SyncService interface { + app.Component + GetQueue(peerId string) *multiqueue.Queue[drpc.Message] + HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error + HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error + QueueRequest(ctx context.Context, rq syncdeps.Request) error +} + type RpcServer struct { + streamPool streampool.StreamPool + syncService SyncService } func NewRpcServer() *RpcServer { @@ -20,16 +37,17 @@ func NewRpcServer() *RpcServer { } func (r *RpcServer) CounterStreamRequest(request *synctestproto.CounterRequest, stream synctestproto.DRPCCounterSync_CounterStreamRequestStream) error { - fmt.Println(request.ObjectId) + fmt.Println(peer.CtxPeerId(stream.Context())) return nil } -func (r *RpcServer) CounterStream(request *synctestproto.CounterRequest, stream synctestproto.DRPCCounterSync_CounterStreamStream) error { +func (r *RpcServer) CounterStream(stream synctestproto.DRPCCounterSync_CounterStreamStream) error { return nil } func (r *RpcServer) Init(a *app.App) (err error) { serv := a.MustComponent(server.CName).(*rpctest.TestServer) + r.streamPool = a.MustComponent(streampool.CName).(streampool.StreamPool) return synctestproto.DRPCRegisterCounterSync(serv, r) } diff --git a/commonspace/sync/synctest/streamopener.go b/commonspace/sync/synctest/streamopener.go new file mode 100644 index 00000000..991e4722 --- /dev/null +++ b/commonspace/sync/synctest/streamopener.go @@ -0,0 +1,39 @@ +package synctest + +import ( + "context" + + "storj.io/drpc" + + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/commonspace/spacesyncproto" + "github.com/anyproto/any-sync/net/peer" + "github.com/anyproto/any-sync/net/streampool" +) + +func NewCounterStreamOpener() streampool.StreamOpener { + return &CounterStreamOpener{} +} + +type CounterStreamOpener struct { +} + +func (c *CounterStreamOpener) Init(a *app.App) (err error) { + return nil +} + +func (c *CounterStreamOpener) Name() (name string) { + return streampool.StreamOpenerCName +} + +func (c *CounterStreamOpener) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) { + conn, err := p.AcquireDrpcConn(ctx) + if err != nil { + return + } + objectStream, err := spacesyncproto.NewDRPCSpaceSyncClient(conn).ObjectSyncStream(ctx) + if err != nil { + return + } + return objectStream, nil, nil +} diff --git a/commonspace/sync/synctest/syncdepsfactory.go b/commonspace/sync/synctest/syncdepsfactory.go new file mode 100644 index 00000000..32aeac76 --- /dev/null +++ b/commonspace/sync/synctest/syncdepsfactory.go @@ -0,0 +1,48 @@ +package synctest + +import ( + "context" + + "github.com/cheggaaa/mb/v3" + "storj.io/drpc" + + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/commonspace/sync/syncdeps" +) + +type CounterSyncDepsFactory struct { + syncDeps syncdeps.SyncDeps +} + +func NewCounterSyncDepsFactory() syncdeps.SyncDepsFactory { + return &CounterSyncDepsFactory{} +} + +func (c *CounterSyncDepsFactory) Init(a *app.App) (err error) { + counter := a.MustComponent(CounterName).(*Counter) + requestHandler := &CounterRequestHandler{counter: counter} + requestSender := &CounterRequestSender{peerProvider: a.MustComponent(PeerName).(*PeerProvider)} + responseHandler := &CounterResponseHandler{counter: counter} + updateHandler := &CounterUpdateHandler{counter: counter} + c.syncDeps = syncdeps.SyncDeps{ + HeadUpdateHandler: updateHandler, + ResponseHandler: responseHandler, + RequestHandler: requestHandler, + RequestSender: requestSender, + MergeFilter: func(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error { + return nil + }, + ReadMessageConstructor: func() drpc.Message { + return &CounterUpdate{} + }, + } + return nil +} + +func (c *CounterSyncDepsFactory) Name() (name string) { + return syncdeps.CName +} + +func (c *CounterSyncDepsFactory) SyncDeps() syncdeps.SyncDeps { + return c.syncDeps +} diff --git a/commonspace/sync/synctestproto/protos/synctest.proto b/commonspace/sync/synctestproto/protos/synctest.proto index 41e6d257..266c1f04 100644 --- a/commonspace/sync/synctestproto/protos/synctest.proto +++ b/commonspace/sync/synctestproto/protos/synctest.proto @@ -14,5 +14,5 @@ message CounterRequest { service CounterSync { rpc CounterStreamRequest(CounterRequest) returns (stream CounterIncrease); - rpc CounterStream(CounterRequest) returns (stream CounterIncrease); + rpc CounterStream(stream CounterIncrease) returns (stream CounterIncrease); } diff --git a/commonspace/sync/synctestproto/synctest.pb.go b/commonspace/sync/synctestproto/synctest.pb.go index 0935f910..03a48d84 100644 --- a/commonspace/sync/synctestproto/synctest.pb.go +++ b/commonspace/sync/synctestproto/synctest.pb.go @@ -136,7 +136,7 @@ func init() { } var fileDescriptor_dd5c22b15d7f69e4 = []byte{ - // 247 bytes of a gzipped FileDescriptorProto + // 254 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x49, 0xce, 0xcf, 0xcd, 0xcd, 0xcf, 0x2b, 0x2e, 0x48, 0x4c, 0x4e, 0xd5, 0x2f, 0xae, 0xcc, 0x4b, 0x06, 0x13, 0x25, 0xa9, 0xc5, 0x25, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0xfa, 0x60, 0xb2, 0x18, 0x2e, 0xa8, 0x07, 0xe6, 0x0b, @@ -146,13 +146,13 @@ var fileDescriptor_dd5c22b15d7f69e4 = []byte{ 0x72, 0x89, 0x67, 0x8a, 0x04, 0x93, 0x02, 0xa3, 0x06, 0x67, 0x10, 0x9c, 0xaf, 0x14, 0xc2, 0xc5, 0x07, 0x35, 0x24, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, 0x8d, 0x8b, 0x2f, 0xb5, 0x22, 0xb3, 0xb8, 0x24, 0x33, 0x2f, 0x3d, 0x0c, 0xa4, 0xbd, 0x58, 0x82, 0x51, 0x81, 0x59, 0x83, 0x35, - 0x08, 0x4d, 0x14, 0x9f, 0xa9, 0x46, 0x4b, 0x18, 0xb9, 0xb8, 0xa1, 0xc6, 0x06, 0x57, 0xe6, 0x25, + 0x08, 0x4d, 0x14, 0x9f, 0xa9, 0x46, 0xcb, 0x19, 0xb9, 0xb8, 0xa1, 0xc6, 0x06, 0x57, 0xe6, 0x25, 0x0b, 0xf9, 0x72, 0x89, 0xc0, 0xb8, 0x25, 0x45, 0xa9, 0x89, 0xb9, 0x30, 0xbb, 0x24, 0xf4, 0xe0, - 0xbe, 0x43, 0x75, 0x85, 0x94, 0x24, 0x86, 0x0c, 0xcc, 0x93, 0x06, 0x8c, 0x42, 0x6e, 0x5c, 0xbc, - 0x28, 0xc6, 0x91, 0x69, 0x8e, 0x93, 0xc5, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, - 0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, 0x1c, 0xc3, 0x85, 0xc7, 0x72, 0x0c, 0x37, 0x1e, 0xcb, 0x31, - 0x44, 0xc9, 0xe1, 0x8f, 0x9b, 0x24, 0x36, 0x30, 0x65, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x3b, - 0x05, 0x13, 0xdb, 0xc4, 0x01, 0x00, 0x00, + 0xbe, 0x43, 0x75, 0x85, 0x94, 0x24, 0x86, 0x0c, 0xcc, 0x93, 0x06, 0x8c, 0x42, 0x9e, 0x5c, 0xbc, + 0x28, 0xc6, 0x09, 0xe1, 0x56, 0x8d, 0xc7, 0x20, 0x0d, 0x46, 0x03, 0x46, 0x27, 0x8b, 0x13, 0x8f, + 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0x71, 0xc2, 0x63, 0x39, 0x86, 0x0b, + 0x8f, 0xe5, 0x18, 0x6e, 0x3c, 0x96, 0x63, 0x88, 0x92, 0xc3, 0x1f, 0x3d, 0x49, 0x6c, 0x60, 0xca, + 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x5b, 0xf3, 0x0f, 0x8d, 0xc7, 0x01, 0x00, 0x00, } func (m *CounterIncrease) Marshal() (dAtA []byte, err error) { diff --git a/commonspace/sync/synctestproto/synctest_drpc.pb.go b/commonspace/sync/synctestproto/synctest_drpc.pb.go index bbdebb45..e0690986 100644 --- a/commonspace/sync/synctestproto/synctest_drpc.pb.go +++ b/commonspace/sync/synctestproto/synctest_drpc.pb.go @@ -41,7 +41,7 @@ type DRPCCounterSyncClient interface { DRPCConn() drpc.Conn CounterStreamRequest(ctx context.Context, in *CounterRequest) (DRPCCounterSync_CounterStreamRequestClient, error) - CounterStream(ctx context.Context, in *CounterRequest) (DRPCCounterSync_CounterStreamClient, error) + CounterStream(ctx context.Context) (DRPCCounterSync_CounterStreamClient, error) } type drpcCounterSyncClient struct { @@ -94,23 +94,18 @@ func (x *drpcCounterSync_CounterStreamRequestClient) RecvMsg(m *CounterIncrease) return x.MsgRecv(m, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{}) } -func (c *drpcCounterSyncClient) CounterStream(ctx context.Context, in *CounterRequest) (DRPCCounterSync_CounterStreamClient, error) { +func (c *drpcCounterSyncClient) CounterStream(ctx context.Context) (DRPCCounterSync_CounterStreamClient, error) { stream, err := c.cc.NewStream(ctx, "/synctest.CounterSync/CounterStream", drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{}) if err != nil { return nil, err } x := &drpcCounterSync_CounterStreamClient{stream} - if err := x.MsgSend(in, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{}); err != nil { - return nil, err - } - if err := x.CloseSend(); err != nil { - return nil, err - } return x, nil } type DRPCCounterSync_CounterStreamClient interface { drpc.Stream + Send(*CounterIncrease) error Recv() (*CounterIncrease, error) } @@ -122,6 +117,10 @@ func (x *drpcCounterSync_CounterStreamClient) GetStream() drpc.Stream { return x.Stream } +func (x *drpcCounterSync_CounterStreamClient) Send(m *CounterIncrease) error { + return x.MsgSend(m, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{}) +} + func (x *drpcCounterSync_CounterStreamClient) Recv() (*CounterIncrease, error) { m := new(CounterIncrease) if err := x.MsgRecv(m, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{}); err != nil { @@ -136,7 +135,7 @@ func (x *drpcCounterSync_CounterStreamClient) RecvMsg(m *CounterIncrease) error type DRPCCounterSyncServer interface { CounterStreamRequest(*CounterRequest, DRPCCounterSync_CounterStreamRequestStream) error - CounterStream(*CounterRequest, DRPCCounterSync_CounterStreamStream) error + CounterStream(DRPCCounterSync_CounterStreamStream) error } type DRPCCounterSyncUnimplementedServer struct{} @@ -145,7 +144,7 @@ func (s *DRPCCounterSyncUnimplementedServer) CounterStreamRequest(*CounterReques return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) } -func (s *DRPCCounterSyncUnimplementedServer) CounterStream(*CounterRequest, DRPCCounterSync_CounterStreamStream) error { +func (s *DRPCCounterSyncUnimplementedServer) CounterStream(DRPCCounterSync_CounterStreamStream) error { return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) } @@ -169,8 +168,7 @@ func (DRPCCounterSyncDescription) Method(n int) (string, drpc.Encoding, drpc.Rec func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return nil, srv.(DRPCCounterSyncServer). CounterStream( - in1.(*CounterRequest), - &drpcCounterSync_CounterStreamStream{in2.(drpc.Stream)}, + &drpcCounterSync_CounterStreamStream{in1.(drpc.Stream)}, ) }, DRPCCounterSyncServer.CounterStream, true default: @@ -198,6 +196,7 @@ func (x *drpcCounterSync_CounterStreamRequestStream) Send(m *CounterIncrease) er type DRPCCounterSync_CounterStreamStream interface { drpc.Stream Send(*CounterIncrease) error + Recv() (*CounterIncrease, error) } type drpcCounterSync_CounterStreamStream struct { @@ -207,3 +206,15 @@ type drpcCounterSync_CounterStreamStream struct { func (x *drpcCounterSync_CounterStreamStream) Send(m *CounterIncrease) error { return x.MsgSend(m, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{}) } + +func (x *drpcCounterSync_CounterStreamStream) Recv() (*CounterIncrease, error) { + m := new(CounterIncrease) + if err := x.MsgRecv(m, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{}); err != nil { + return nil, err + } + return m, nil +} + +func (x *drpcCounterSync_CounterStreamStream) RecvMsg(m *CounterIncrease) error { + return x.MsgRecv(m, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{}) +} diff --git a/net/streampool/stream.go b/net/streampool/stream.go index 8aae8161..59056158 100644 --- a/net/streampool/stream.go +++ b/net/streampool/stream.go @@ -40,14 +40,14 @@ func (sr *stream) readLoop() error { }() sr.l.Debug("stream read started") for { - msg := sr.pool.handler.NewReadMessage() + msg := sr.pool.syncDelegate.NewReadMessage() if err := sr.stream.MsgRecv(msg, EncodingProto); err != nil { sr.l.Info("msg receive error", zap.Error(err)) return err } ctx := streamCtx(sr.peerCtx, sr.streamId, sr.peerId) ctx = logger.CtxWithFields(ctx, zap.String("peerId", sr.peerId)) - if err := sr.pool.handler.HandleMessage(ctx, sr.peerId, msg); err != nil { + if err := sr.pool.syncDelegate.HandleMessage(ctx, sr.peerId, msg); err != nil { sr.l.Info("msg handle error", zap.Error(err)) return err } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 4cae48e0..1993d83e 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -9,22 +9,46 @@ import ( "golang.org/x/net/context" "storj.io/drpc" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/debugstat" "github.com/anyproto/any-sync/net" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/util/multiqueue" ) -// StreamHandler handles incoming messages from streams -type StreamHandler interface { - // OpenStream opens stream with given peer - OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) +type configGetter interface { + GetConfig() StreamConfig +} + +type StreamSyncDelegate interface { + app.Component // HandleMessage handles incoming message HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error) // NewReadMessage creates new empty message for unmarshalling into it NewReadMessage() drpc.Message - // GetQueueProvider returns queue provider for outgoing messages - GetQueueProvider() multiqueue.QueueProvider[drpc.Message] + // GetQueue returns queue for outgoing messages + GetQueue(peerId string) *multiqueue.Queue[drpc.Message] +} + +const ( + StreamOpenerCName = "common.commonspace.streampool" + streamSyncDelegateCName = "common.commonspace.sync" +) + +func NewStreamPool() StreamPool { + return &streamPool{ + streamIdsByPeer: map[string][]uint32{}, + streamIdsByTag: map[string][]uint32{}, + streams: map[uint32]*stream{}, + opening: map[string]*openingProcess{}, + } +} + +// StreamOpener handles incoming messages from streams +type StreamOpener interface { + app.Component + // OpenStream opens stream with given peer + OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) } // PeerGetter should dial or return cached peers @@ -37,39 +61,54 @@ type MessageQueueId interface { // StreamPool keeps and read streams type StreamPool interface { + app.ComponentRunnable // AddStream adds new outgoing stream into the pool AddStream(stream drpc.Stream, tags ...string) (err error) // ReadStream adds new incoming stream and synchronously read it ReadStream(stream drpc.Stream, tags ...string) (err error) // Send sends a message to given peers. A stream will be opened if it is not cached before. Works async. Send(ctx context.Context, msg drpc.Message, target PeerGetter) (err error) - // SendById sends a message to given peerIds. Works only if stream exists - SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) // Broadcast sends a message to all peers with given tags. Works async. Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error) - // AddTagsCtx adds tags to stream, stream will be extracted from ctx - AddTagsCtx(ctx context.Context, tags ...string) error - // RemoveTagsCtx removes tags from stream, stream will be extracted from ctx - RemoveTagsCtx(ctx context.Context, tags ...string) error - // Streams gets all streams for specific tags - Streams(tags ...string) (streams []drpc.Stream) - // Close closes all streams - Close() error } type streamPool struct { - handler StreamHandler + streamOpener StreamOpener + syncDelegate StreamSyncDelegate statService debugstat.StatService streamIdsByPeer map[string][]uint32 streamIdsByTag map[string][]uint32 streams map[uint32]*stream opening map[string]*openingProcess + streamConfig StreamConfig dial *ExecPool mu sync.Mutex writeQueueSize int lastStreamId uint32 } +func (s *streamPool) Init(a *app.App) (err error) { + s.streamOpener = a.MustComponent(StreamOpenerCName).(StreamOpener) + s.syncDelegate = a.MustComponent(streamSyncDelegateCName).(StreamSyncDelegate) + comp, ok := a.Component(debugstat.CName).(debugstat.StatService) + if !ok { + comp = debugstat.NewNoOp() + } + s.statService = comp + s.streamConfig = a.MustComponent("config").(configGetter).GetConfig() + s.statService.AddProvider(s) + return nil +} + +func (s *streamPool) Name() (name string) { + return CName +} + +func (s *streamPool) Run(ctx context.Context) (err error) { + s.dial = NewExecPool(s.streamConfig.DialQueueWorkers, s.streamConfig.DialQueueSize) + return nil +} + func (s *streamPool) ProvideStat() any { s.mu.Lock() var totalSize int64 @@ -166,7 +205,7 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream, tags: tags, stats: newStreamStat(peerId), } - st.queue = s.handler.GetQueueProvider().GetQueue(peerId) + st.queue = s.syncDelegate.GetQueue(peerId) s.streams[streamId] = st s.streamIdsByPeer[peerId] = append(s.streamIdsByPeer[peerId], streamId) for _, tag := range tags { @@ -291,7 +330,7 @@ func (s *streamPool) openStream(ctx context.Context, p peer.Peer) *openingProces // in case there was no peerId in context ctx := peer.CtxWithPeerId(ctx, p.Id()) // open new stream and add to pool - st, tags, err := s.handler.OpenStream(ctx, p) + st, tags, err := s.streamOpener.OpenStream(ctx, p) if err != nil { op.err = err return @@ -393,7 +432,7 @@ func (s *streamPool) removeStream(streamId uint32) { st.l.Debug("stream removed", zap.Strings("tags", st.tags)) } -func (s *streamPool) Close() (err error) { +func (s *streamPool) Close(ctx context.Context) (err error) { s.statService.RemoveProvider(s) return s.dial.Close() } diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go index f49d2d22..cbff7c2e 100644 --- a/net/streampool/streampoolservice.go +++ b/net/streampool/streampoolservice.go @@ -25,7 +25,7 @@ type StreamConfig struct { } type Service interface { - NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool + NewStreamPool(h StreamOpener, conf StreamConfig) StreamPool app.Component } @@ -34,10 +34,10 @@ type service struct { debugStat debugstat.StatService } -func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { +func (s *service) NewStreamPool(h StreamOpener, conf StreamConfig) StreamPool { pl := NewExecPool(conf.DialQueueWorkers, conf.DialQueueSize) sp := &streamPool{ - handler: h, + streamOpener: h, writeQueueSize: conf.SendQueueSize, streamIdsByPeer: map[string][]uint32{}, streamIdsByTag: map[string][]uint32{}, diff --git a/util/periodicsync/periodicsync.go b/util/periodicsync/periodicsync.go index aab8b050..f5e9135e 100644 --- a/util/periodicsync/periodicsync.go +++ b/util/periodicsync/periodicsync.go @@ -3,10 +3,12 @@ package periodicsync import ( "context" - "github.com/anyproto/any-sync/app/logger" - "go.uber.org/zap" "sync/atomic" "time" + + "go.uber.org/zap" + + "github.com/anyproto/any-sync/app/logger" ) type PeriodicSync interface { @@ -17,39 +19,40 @@ type PeriodicSync interface { type SyncerFunc func(ctx context.Context) error func NewPeriodicSync(periodSeconds int, timeout time.Duration, caller SyncerFunc, l logger.CtxLogger) PeriodicSync { - // TODO: rename to PeriodicCall (including folders) and do PRs in all repos where we are using this - // https://linear.app/anytype/issue/GO-1241/change-periodicsync-component-to-periodiccall + return NewPeriodicSyncDuration(time.Duration(periodSeconds)*time.Second, timeout, caller, l) +} + +func NewPeriodicSyncDuration(periodicLoopInterval, timeout time.Duration, caller SyncerFunc, l logger.CtxLogger) PeriodicSync { ctx, cancel := context.WithCancel(context.Background()) ctx = logger.CtxWithFields(ctx, zap.String("rootOp", "periodicCall")) return &periodicCall{ - caller: caller, - log: l, - loopCtx: ctx, - loopCancel: cancel, - loopDone: make(chan struct{}), - periodSeconds: periodSeconds, - timeout: timeout, + caller: caller, + log: l, + loopCtx: ctx, + loopCancel: cancel, + loopDone: make(chan struct{}), + period: periodicLoopInterval, + timeout: timeout, } } type periodicCall struct { - log logger.CtxLogger - caller SyncerFunc - loopCtx context.Context - loopCancel context.CancelFunc - loopDone chan struct{} - periodSeconds int - timeout time.Duration - isRunning atomic.Bool + log logger.CtxLogger + caller SyncerFunc + loopCtx context.Context + loopCancel context.CancelFunc + loopDone chan struct{} + period time.Duration + timeout time.Duration + isRunning atomic.Bool } func (p *periodicCall) Run() { p.isRunning.Store(true) - go p.loop(p.periodSeconds) + go p.loop(p.period) } -func (p *periodicCall) loop(periodSeconds int) { - period := time.Duration(periodSeconds) * time.Second +func (p *periodicCall) loop(period time.Duration) { defer close(p.loopDone) doCall := func() { ctx := p.loopCtx diff --git a/util/slice/slice.go b/util/slice/slice.go index 816a01c0..f8fff6da 100644 --- a/util/slice/slice.go +++ b/util/slice/slice.go @@ -136,3 +136,25 @@ func DiscardFromSlice[T any](elements []T, isDiscarded func(T) bool) []T { elements = elements[:finishedIdx] return elements } + +func CompareMaps[T comparable](map1, map2 map[T]struct{}) (both, first, second []T) { + both = []T{} + first = []T{} + second = []T{} + + for key := range map1 { + if _, found := map2[key]; found { + both = append(both, key) + } else { + first = append(first, key) + } + } + + for key := range map2 { + if _, found := map1[key]; !found { + second = append(second, key) + } + } + + return both, first, second +} diff --git a/util/slice/slice_test.go b/util/slice/slice_test.go new file mode 100644 index 00000000..1e0684ae --- /dev/null +++ b/util/slice/slice_test.go @@ -0,0 +1,105 @@ +package slice + +import ( + "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" +) + +func TestCompareMaps(t *testing.T) { + tests := []struct { + name string + map1, map2 map[string]struct{} + expectedBoth []string + expectedFirst []string + expectedSecond []string + }{ + { + name: "Both maps empty", + map1: map[string]struct{}{}, + map2: map[string]struct{}{}, + expectedBoth: []string{}, + expectedFirst: []string{}, + expectedSecond: []string{}, + }, + { + name: "Disjoint maps", + map1: map[string]struct{}{ + "a": {}, + "b": {}, + }, + map2: map[string]struct{}{ + "c": {}, + "d": {}, + }, + expectedBoth: []string{}, + expectedFirst: []string{"a", "b"}, + expectedSecond: []string{"c", "d"}, + }, + { + name: "Identical maps", + map1: map[string]struct{}{ + "a": {}, + "b": {}, + }, + map2: map[string]struct{}{ + "a": {}, + "b": {}, + }, + expectedBoth: []string{"a", "b"}, + expectedFirst: []string{}, + expectedSecond: []string{}, + }, + { + name: "Partial overlap", + map1: map[string]struct{}{ + "a": {}, + "b": {}, + "c": {}, + }, + map2: map[string]struct{}{ + "b": {}, + "c": {}, + "d": {}, + }, + expectedBoth: []string{"b", "c"}, + expectedFirst: []string{"a"}, + expectedSecond: []string{"d"}, + }, + { + name: "First map empty", + map1: map[string]struct{}{}, + map2: map[string]struct{}{ + "a": {}, + "b": {}, + }, + expectedBoth: []string{}, + expectedFirst: []string{}, + expectedSecond: []string{"a", "b"}, + }, + { + name: "Second map empty", + map1: map[string]struct{}{ + "a": {}, + "b": {}, + }, + map2: map[string]struct{}{}, + expectedBoth: []string{}, + expectedFirst: []string{"a", "b"}, + expectedSecond: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + both, onlyInFirst, onlyInSecond := CompareMaps(tt.map1, tt.map2) + slices.Sort(onlyInFirst) + slices.Sort(onlyInSecond) + slices.Sort(both) + require.Equal(t, tt.expectedBoth, both) + require.Equal(t, tt.expectedFirst, onlyInFirst) + require.Equal(t, tt.expectedSecond, onlyInSecond) + }) + } +}