diff --git a/commonspace/sync/requestmanager.go b/commonspace/sync/requestmanager.go index e0565075..bbc574dd 100644 --- a/commonspace/sync/requestmanager.go +++ b/commonspace/sync/requestmanager.go @@ -24,18 +24,14 @@ type StreamResponse struct { } type requestManager struct { - requestPool RequestPool - requestHandler syncdeps.RequestHandler - responseHandler syncdeps.ResponseHandler - requestSender syncdeps.RequestSender + requestPool RequestPool + handler syncdeps.SyncHandler } -func NewRequestManager(deps syncdeps.SyncDeps) RequestManager { +func NewRequestManager(handler syncdeps.SyncHandler) RequestManager { return &requestManager{ - requestPool: NewRequestPool(), - requestHandler: deps.RequestHandler, - responseHandler: deps.ResponseHandler, - requestSender: deps.RequestSender, + requestPool: NewRequestPool(), + handler: handler, } } @@ -43,9 +39,9 @@ func (r *requestManager) QueueRequest(rq syncdeps.Request) error { return r.requestPool.QueueRequestAction(rq.PeerId(), rq.ObjectId(), func(ctx context.Context) { fmt.Println("starting stream request", rq.PeerId(), rq.ObjectId()) defer fmt.Println("ending stream request", rq.PeerId(), rq.ObjectId()) - err := r.requestSender.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error { + err := r.handler.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error { for { - resp := r.responseHandler.NewResponse() + resp := r.handler.NewResponse() fmt.Println("receiving message", rq.PeerId(), rq.ObjectId()) err := stream.MsgRecv(resp, streampool.EncodingProto) fmt.Println("received message", rq.PeerId(), rq.ObjectId(), err) @@ -53,7 +49,7 @@ func (r *requestManager) QueueRequest(rq syncdeps.Request) error { return err } fmt.Println("handling response", rq.PeerId(), rq.ObjectId()) - err = r.responseHandler.HandleResponse(ctx, rq.PeerId(), rq.ObjectId(), resp) + err = r.handler.HandleResponse(ctx, rq.PeerId(), rq.ObjectId(), resp) fmt.Println("handled response", rq.PeerId(), rq.ObjectId(), err) if err != nil { return err @@ -72,7 +68,7 @@ func (r *requestManager) HandleStreamRequest(ctx context.Context, rq syncdeps.Re } fmt.Println("handling stream request", rq.PeerId(), rq.ObjectId()) defer r.requestPool.Release(rq.PeerId(), rq.ObjectId()) - newRq, err := r.requestHandler.HandleStreamRequest(ctx, rq, func(resp proto.Message) error { + newRq, err := r.handler.HandleStreamRequest(ctx, rq, func(resp proto.Message) error { fmt.Println("sending response", rq.PeerId(), rq.ObjectId()) return stream.MsgSend(resp, streampool.EncodingProto) }) diff --git a/commonspace/sync/sync.go b/commonspace/sync/sync.go index 175a7279..f6f1cb1c 100644 --- a/commonspace/sync/sync.go +++ b/commonspace/sync/sync.go @@ -29,9 +29,7 @@ type syncService struct { sendQueueProvider multiqueue.QueueProvider[drpc.Message] receiveQueue multiqueue.MultiQueue[msgCtx] manager RequestManager - handler syncdeps.HeadUpdateHandler - mergeFilter syncdeps.MergeFilterFunc - newMessage func() drpc.Message + handler syncdeps.SyncHandler ctx context.Context cancel context.CancelFunc } @@ -42,14 +40,10 @@ type msgCtx struct { } func (s *syncService) Init(a *app.App) (err error) { - factory := a.MustComponent(syncdeps.CName).(syncdeps.SyncDepsFactory) + s.handler = a.MustComponent(syncdeps.CName).(syncdeps.SyncHandler) s.sendQueueProvider = multiqueue.NewQueueProvider[drpc.Message](100, s.handleOutgoingMessage) s.receiveQueue = multiqueue.New[msgCtx](s.handleIncomingMessage, 100) - deps := factory.SyncDeps() - s.handler = deps.HeadUpdateHandler - s.mergeFilter = deps.MergeFilter - s.newMessage = deps.ReadMessageConstructor - s.manager = NewRequestManager(deps) + s.manager = NewRequestManager(s.handler) s.ctx, s.cancel = context.WithCancel(context.Background()) return nil } @@ -63,7 +57,7 @@ func NewSyncService() SyncService { } func (s *syncService) handleOutgoingMessage(id string, msg drpc.Message, q *mb.MB[drpc.Message]) error { - return s.mergeFilter(s.ctx, msg, q) + return s.handler.TryAddMessage(s.ctx, msg, q) } func (s *syncService) handleIncomingMessage(msg msgCtx) { @@ -86,7 +80,7 @@ func (s *syncService) GetQueue(peerId string) *multiqueue.Queue[drpc.Message] { } func (s *syncService) NewReadMessage() drpc.Message { - return s.newMessage() + return s.handler.NewMessage() } func (s *syncService) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error { diff --git a/commonspace/sync/sync_test.go b/commonspace/sync/sync_test.go index 08751ee4..aba43885 100644 --- a/commonspace/sync/sync_test.go +++ b/commonspace/sync/sync_test.go @@ -48,7 +48,7 @@ func newFixture(t *testing.T, peerId string, params counterFixtureParams) *count Register(synctest.NewPeerProvider(peerId)). Register(synctest.NewCounter(params.start, params.delta)). Register(streampool.NewStreamPool()). - Register(synctest.NewCounterSyncDepsFactory()). + Register(synctest.NewCounterSyncHandler()). Register(NewSyncService()). Register(synctest.NewCounterGenerator()). Register(synctest.NewRpcServer()) diff --git a/commonspace/sync/syncdeps/headupdatehandler.go b/commonspace/sync/syncdeps/headupdatehandler.go deleted file mode 100644 index 6598139d..00000000 --- a/commonspace/sync/syncdeps/headupdatehandler.go +++ /dev/null @@ -1,11 +0,0 @@ -package syncdeps - -import ( - "context" - - "storj.io/drpc" -) - -type HeadUpdateHandler interface { - HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (Request, error) -} diff --git a/commonspace/sync/syncdeps/requesthandler.go b/commonspace/sync/syncdeps/requesthandler.go deleted file mode 100644 index 04137cfb..00000000 --- a/commonspace/sync/syncdeps/requesthandler.go +++ /dev/null @@ -1,11 +0,0 @@ -package syncdeps - -import ( - "context" - - "github.com/gogo/protobuf/proto" -) - -type RequestHandler interface { - HandleStreamRequest(ctx context.Context, rq Request, send func(resp proto.Message) error) (Request, error) -} diff --git a/commonspace/sync/syncdeps/requestsender.go b/commonspace/sync/syncdeps/requestsender.go deleted file mode 100644 index 9acd19dd..00000000 --- a/commonspace/sync/syncdeps/requestsender.go +++ /dev/null @@ -1,11 +0,0 @@ -package syncdeps - -import ( - "context" - - "storj.io/drpc" -) - -type RequestSender interface { - SendStreamRequest(ctx context.Context, rq Request, receive func(stream drpc.Stream) error) (err error) -} diff --git a/commonspace/sync/syncdeps/responsehandler.go b/commonspace/sync/syncdeps/responsehandler.go deleted file mode 100644 index 78400f85..00000000 --- a/commonspace/sync/syncdeps/responsehandler.go +++ /dev/null @@ -1,8 +0,0 @@ -package syncdeps - -import "context" - -type ResponseHandler interface { - NewResponse() Response - HandleResponse(ctx context.Context, peerId, objectId string, resp Response) error -} diff --git a/commonspace/sync/syncdeps/syncdeps.go b/commonspace/sync/syncdeps/syncdeps.go index 829d8490..7ec5e9db 100644 --- a/commonspace/sync/syncdeps/syncdeps.go +++ b/commonspace/sync/syncdeps/syncdeps.go @@ -4,16 +4,21 @@ import ( "context" "github.com/cheggaaa/mb/v3" + "github.com/gogo/protobuf/proto" "storj.io/drpc" + + "github.com/anyproto/any-sync/app" ) -type MergeFilterFunc func(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error +const CName = "common.sync.syncdeps" -type SyncDeps struct { - HeadUpdateHandler HeadUpdateHandler - ResponseHandler ResponseHandler - RequestHandler RequestHandler - RequestSender RequestSender - MergeFilter MergeFilterFunc - ReadMessageConstructor func() drpc.Message +type SyncHandler interface { + app.Component + HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (Request, error) + TryAddMessage(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error + HandleStreamRequest(ctx context.Context, rq Request, send func(resp proto.Message) error) (Request, error) + SendStreamRequest(ctx context.Context, rq Request, receive func(stream drpc.Stream) error) (err error) + HandleResponse(ctx context.Context, peerId, objectId string, resp Response) error + NewResponse() Response + NewMessage() drpc.Message } diff --git a/commonspace/sync/syncdeps/syncdepsfactory.go b/commonspace/sync/syncdeps/syncdepsfactory.go deleted file mode 100644 index e10a70e5..00000000 --- a/commonspace/sync/syncdeps/syncdepsfactory.go +++ /dev/null @@ -1,10 +0,0 @@ -package syncdeps - -import "github.com/anyproto/any-sync/app" - -const CName = "common.sync.syncdeps" - -type SyncDepsFactory interface { - app.Component - SyncDeps() SyncDeps -} diff --git a/commonspace/sync/synctest/countersynchandler.go b/commonspace/sync/synctest/countersynchandler.go new file mode 100644 index 00000000..06182f36 --- /dev/null +++ b/commonspace/sync/synctest/countersynchandler.go @@ -0,0 +1,65 @@ +package synctest + +import ( + "context" + + "github.com/cheggaaa/mb/v3" + "github.com/gogo/protobuf/proto" + "storj.io/drpc" + + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/commonspace/sync/syncdeps" +) + +type CounterSyncHandler struct { + requestHandler *CounterRequestHandler + requestSender *CounterRequestSender + responseHandler *CounterResponseHandler + updateHandler *CounterUpdateHandler +} + +func NewCounterSyncHandler() syncdeps.SyncHandler { + return &CounterSyncHandler{} +} + +func (c *CounterSyncHandler) HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (syncdeps.Request, error) { + return c.updateHandler.HandleHeadUpdate(ctx, headUpdate) +} + +func (c *CounterSyncHandler) TryAddMessage(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error { + return q.TryAdd(msg) +} + +func (c *CounterSyncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, send func(resp proto.Message) error) (syncdeps.Request, error) { + return c.requestHandler.HandleStreamRequest(ctx, rq, send) +} + +func (c *CounterSyncHandler) SendStreamRequest(ctx context.Context, rq syncdeps.Request, receive func(stream drpc.Stream) error) (err error) { + return c.requestSender.SendStreamRequest(ctx, rq, receive) +} + +func (c *CounterSyncHandler) HandleResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error { + return c.responseHandler.HandleResponse(ctx, peerId, objectId, resp) +} + +func (c *CounterSyncHandler) NewResponse() syncdeps.Response { + return c.responseHandler.NewResponse() +} + +func (c *CounterSyncHandler) Init(a *app.App) (err error) { + counter := a.MustComponent(CounterName).(*Counter) + peerProvider := a.MustComponent(PeerName).(*PeerProvider) + c.requestHandler = &CounterRequestHandler{counter: counter} + c.requestSender = &CounterRequestSender{peerProvider: a.MustComponent(PeerName).(*PeerProvider)} + c.responseHandler = &CounterResponseHandler{counter: counter} + c.updateHandler = &CounterUpdateHandler{counter: counter, peerProvider: peerProvider} + return nil +} + +func (c *CounterSyncHandler) Name() (name string) { + return syncdeps.CName +} + +func (c *CounterSyncHandler) NewMessage() drpc.Message { + return &CounterUpdate{} +} diff --git a/commonspace/sync/synctest/syncdepsfactory.go b/commonspace/sync/synctest/syncdepsfactory.go deleted file mode 100644 index 1e78ac00..00000000 --- a/commonspace/sync/synctest/syncdepsfactory.go +++ /dev/null @@ -1,49 +0,0 @@ -package synctest - -import ( - "context" - - "github.com/cheggaaa/mb/v3" - "storj.io/drpc" - - "github.com/anyproto/any-sync/app" - "github.com/anyproto/any-sync/commonspace/sync/syncdeps" -) - -type CounterSyncDepsFactory struct { - syncDeps syncdeps.SyncDeps -} - -func NewCounterSyncDepsFactory() syncdeps.SyncDepsFactory { - return &CounterSyncDepsFactory{} -} - -func (c *CounterSyncDepsFactory) Init(a *app.App) (err error) { - counter := a.MustComponent(CounterName).(*Counter) - peerProvider := a.MustComponent(PeerName).(*PeerProvider) - requestHandler := &CounterRequestHandler{counter: counter} - requestSender := &CounterRequestSender{peerProvider: a.MustComponent(PeerName).(*PeerProvider)} - responseHandler := &CounterResponseHandler{counter: counter} - updateHandler := &CounterUpdateHandler{counter: counter, peerProvider: peerProvider} - c.syncDeps = syncdeps.SyncDeps{ - HeadUpdateHandler: updateHandler, - ResponseHandler: responseHandler, - RequestHandler: requestHandler, - RequestSender: requestSender, - MergeFilter: func(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error { - return q.TryAdd(msg) - }, - ReadMessageConstructor: func() drpc.Message { - return &CounterUpdate{} - }, - } - return nil -} - -func (c *CounterSyncDepsFactory) Name() (name string) { - return syncdeps.CName -} - -func (c *CounterSyncDepsFactory) SyncDeps() syncdeps.SyncDeps { - return c.syncDeps -}