diff --git a/commonspace/sync/requestmanager.go b/commonspace/sync/requestmanager.go index beae3b4c..3ce6ec6c 100644 --- a/commonspace/sync/requestmanager.go +++ b/commonspace/sync/requestmanager.go @@ -14,9 +14,14 @@ import ( type RequestManager interface { QueueRequest(rq syncdeps.Request) error + SendRequest(ctx context.Context, rq syncdeps.Request, collector ResponseCollector) error HandleStreamRequest(ctx context.Context, rq syncdeps.Request, stream drpc.Stream) error } +type ResponseCollector interface { + CollectResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error +} + type StreamResponse struct { Stream drpc.Stream Connection drpc.Conn @@ -36,6 +41,22 @@ func NewRequestManager(handler syncdeps.SyncHandler) RequestManager { } } +func (r *requestManager) SendRequest(ctx context.Context, rq syncdeps.Request, collector ResponseCollector) error { + return r.handler.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error { + for { + resp := r.handler.NewResponse() + err := stream.MsgRecv(resp, streampool.EncodingProto) + if err != nil { + return err + } + err = collector.CollectResponse(ctx, rq.PeerId(), rq.ObjectId(), resp) + if err != nil { + return err + } + } + }) +} + func (r *requestManager) QueueRequest(rq syncdeps.Request) error { return r.requestPool.QueueRequestAction(rq.PeerId(), rq.ObjectId(), func(ctx context.Context) { err := r.handler.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error { diff --git a/commonspace/sync/sync.go b/commonspace/sync/sync.go index ed7e8675..6ac9cecb 100644 --- a/commonspace/sync/sync.go +++ b/commonspace/sync/sync.go @@ -23,6 +23,7 @@ type SyncService interface { app.Component BroadcastMessage(ctx context.Context, msg drpc.Message) error HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error + SendRequest(ctx context.Context, rq syncdeps.Request, collector ResponseCollector) error QueueRequest(ctx context.Context, rq syncdeps.Request) error } @@ -104,6 +105,10 @@ func (s *syncService) QueueRequest(ctx context.Context, rq syncdeps.Request) err return s.manager.QueueRequest(rq) } +func (s *syncService) SendRequest(ctx context.Context, rq syncdeps.Request, collector ResponseCollector) error { + return s.manager.SendRequest(ctx, rq, collector) +} + func (s *syncService) HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error { return s.manager.HandleStreamRequest(ctx, req, stream) } diff --git a/commonspace/sync/synctest/counterrequestsender.go b/commonspace/sync/synctest/counterrequestsender.go index ee96a185..00c934d0 100644 --- a/commonspace/sync/synctest/counterrequestsender.go +++ b/commonspace/sync/synctest/counterrequestsender.go @@ -2,6 +2,8 @@ package synctest import ( "context" + "errors" + "io" "storj.io/drpc" @@ -25,6 +27,10 @@ func (c *CounterRequestSender) SendStreamRequest(ctx context.Context, rq syncdep if err != nil { return err } - return receive(stream) + err = receive(stream) + if errors.Is(err, io.EOF) { + return nil + } + return err }) }