diff --git a/commonspace/sync/requestmanager.go b/commonspace/sync/requestmanager.go index 3ce6ec6c..4d179ff1 100644 --- a/commonspace/sync/requestmanager.go +++ b/commonspace/sync/requestmanager.go @@ -14,14 +14,10 @@ import ( type RequestManager interface { QueueRequest(rq syncdeps.Request) error - SendRequest(ctx context.Context, rq syncdeps.Request, collector ResponseCollector) error + SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error HandleStreamRequest(ctx context.Context, rq syncdeps.Request, stream drpc.Stream) error } -type ResponseCollector interface { - CollectResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error -} - type StreamResponse struct { Stream drpc.Stream Connection drpc.Conn @@ -41,7 +37,7 @@ func NewRequestManager(handler syncdeps.SyncHandler) RequestManager { } } -func (r *requestManager) SendRequest(ctx context.Context, rq syncdeps.Request, collector ResponseCollector) error { +func (r *requestManager) SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error { return r.handler.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error { for { resp := r.handler.NewResponse() @@ -59,21 +55,9 @@ func (r *requestManager) SendRequest(ctx context.Context, rq syncdeps.Request, c func (r *requestManager) QueueRequest(rq syncdeps.Request) error { return r.requestPool.QueueRequestAction(rq.PeerId(), rq.ObjectId(), func(ctx context.Context) { - err := r.handler.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error { - for { - resp := r.handler.NewResponse() - err := stream.MsgRecv(resp, streampool.EncodingProto) - if err != nil { - return err - } - err = r.handler.HandleResponse(ctx, rq.PeerId(), rq.ObjectId(), resp) - if err != nil { - return err - } - } - }) + err := r.handler.ApplyRequest(ctx, rq, r) if err != nil { - log.Warn("failed to receive request", zap.Error(err)) + log.Error("failed to apply request", zap.Error(err)) } }) } diff --git a/commonspace/sync/sync.go b/commonspace/sync/sync.go index ee572ec7..d1d32145 100644 --- a/commonspace/sync/sync.go +++ b/commonspace/sync/sync.go @@ -23,7 +23,7 @@ type SyncService interface { app.Component BroadcastMessage(ctx context.Context, msg drpc.Message) error HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error - SendRequest(ctx context.Context, rq syncdeps.Request, collector ResponseCollector) error + SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error QueueRequest(ctx context.Context, rq syncdeps.Request) error } @@ -106,7 +106,7 @@ func (s *syncService) QueueRequest(ctx context.Context, rq syncdeps.Request) err return s.manager.QueueRequest(rq) } -func (s *syncService) SendRequest(ctx context.Context, rq syncdeps.Request, collector ResponseCollector) error { +func (s *syncService) SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error { return s.manager.SendRequest(ctx, rq, collector) } diff --git a/commonspace/sync/syncdeps/syncdeps.go b/commonspace/sync/syncdeps/syncdeps.go index 0dedcb00..e1fde4a9 100644 --- a/commonspace/sync/syncdeps/syncdeps.go +++ b/commonspace/sync/syncdeps/syncdeps.go @@ -12,6 +12,14 @@ import ( const CName = "common.sync.syncdeps" +type ResponseCollector interface { + CollectResponse(ctx context.Context, peerId, objectId string, resp Response) error +} + +type RequestSender interface { + SendRequest(ctx context.Context, rq Request, collector ResponseCollector) error +} + type ObjectSyncHandler interface { HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (Request, error) HandleStreamRequest(ctx context.Context, rq Request, send func(resp proto.Message) error) (Request, error) @@ -20,7 +28,9 @@ type ObjectSyncHandler interface { type SyncHandler interface { app.Component - ObjectSyncHandler + HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (Request, error) + HandleStreamRequest(ctx context.Context, rq Request, sendResponse func(resp proto.Message) error) (Request, error) + ApplyRequest(ctx context.Context, rq Request, requestSender RequestSender) error TryAddMessage(ctx context.Context, peerId string, msg drpc.Message, q *mb.MB[drpc.Message]) error SendStreamRequest(ctx context.Context, rq Request, receive func(stream drpc.Stream) error) (err error) NewResponse() Response diff --git a/commonspace/sync/synctest/counterresponsecollector.go b/commonspace/sync/synctest/counterresponsecollector.go new file mode 100644 index 00000000..797a4c4c --- /dev/null +++ b/commonspace/sync/synctest/counterresponsecollector.go @@ -0,0 +1,22 @@ +package synctest + +import ( + "context" + + "github.com/anyproto/any-sync/commonspace/sync/syncdeps" + "github.com/anyproto/any-sync/commonspace/sync/synctestproto" +) + +type CounterResponseCollector struct { + counter *Counter +} + +func NewCounterResponseCollector(counter *Counter) *CounterResponseCollector { + return &CounterResponseCollector{counter: counter} +} + +func (c *CounterResponseCollector) CollectResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error { + counterResp := resp.(*synctestproto.CounterIncrease) + c.counter.Add(counterResp.Value) + return nil +} diff --git a/commonspace/sync/synctest/counterresponsehandler.go b/commonspace/sync/synctest/counterresponsehandler.go deleted file mode 100644 index b89b19d2..00000000 --- a/commonspace/sync/synctest/counterresponsehandler.go +++ /dev/null @@ -1,22 +0,0 @@ -package synctest - -import ( - "context" - - "github.com/anyproto/any-sync/commonspace/sync/syncdeps" - "github.com/anyproto/any-sync/commonspace/sync/synctestproto" -) - -type CounterResponseHandler struct { - counter *Counter -} - -func (c *CounterResponseHandler) NewResponse() syncdeps.Response { - return &synctestproto.CounterIncrease{} -} - -func (c *CounterResponseHandler) HandleResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error { - counterResp := resp.(*synctestproto.CounterIncrease) - c.counter.Add(counterResp.Value) - return nil -} diff --git a/commonspace/sync/synctest/countersynchandler.go b/commonspace/sync/synctest/countersynchandler.go index 89a01dfa..50dfc5e9 100644 --- a/commonspace/sync/synctest/countersynchandler.go +++ b/commonspace/sync/synctest/countersynchandler.go @@ -9,13 +9,19 @@ import ( "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/sync/syncdeps" + "github.com/anyproto/any-sync/commonspace/sync/synctestproto" ) type CounterSyncHandler struct { - requestHandler *CounterRequestHandler - requestSender *CounterRequestSender - responseHandler *CounterResponseHandler - updateHandler *CounterUpdateHandler + counter *Counter + requestHandler *CounterRequestHandler + requestSender *CounterRequestSender + updateHandler *CounterUpdateHandler +} + +func (c *CounterSyncHandler) ApplyRequest(ctx context.Context, rq syncdeps.Request, requestSender syncdeps.RequestSender) error { + collector := NewCounterResponseCollector(c.counter) + return requestSender.SendRequest(ctx, rq, collector) } func NewCounterSyncHandler() syncdeps.SyncHandler { @@ -38,21 +44,16 @@ func (c *CounterSyncHandler) SendStreamRequest(ctx context.Context, rq syncdeps. return c.requestSender.SendStreamRequest(ctx, rq, receive) } -func (c *CounterSyncHandler) HandleResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error { - return c.responseHandler.HandleResponse(ctx, peerId, objectId, resp) -} - func (c *CounterSyncHandler) NewResponse() syncdeps.Response { - return c.responseHandler.NewResponse() + return &synctestproto.CounterIncrease{} } func (c *CounterSyncHandler) Init(a *app.App) (err error) { - counter := a.MustComponent(CounterName).(*Counter) peerProvider := a.MustComponent(PeerName).(*PeerProvider) - c.requestHandler = &CounterRequestHandler{counter: counter} + c.counter = a.MustComponent(CounterName).(*Counter) + c.requestHandler = &CounterRequestHandler{counter: c.counter} c.requestSender = &CounterRequestSender{peerProvider: a.MustComponent(PeerName).(*PeerProvider)} - c.responseHandler = &CounterResponseHandler{counter: counter} - c.updateHandler = &CounterUpdateHandler{counter: counter, peerProvider: peerProvider} + c.updateHandler = &CounterUpdateHandler{counter: c.counter, peerProvider: peerProvider} return nil }