1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-09 09:35:03 +09:00

Simplify and improve request manager logic

This commit is contained in:
mcrakhman 2024-06-13 09:47:47 +02:00
parent 8d3fbd4415
commit 3ce1c9bc91
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
6 changed files with 53 additions and 58 deletions

View file

@ -14,14 +14,10 @@ import (
type RequestManager interface {
QueueRequest(rq syncdeps.Request) error
SendRequest(ctx context.Context, rq syncdeps.Request, collector ResponseCollector) error
SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error
HandleStreamRequest(ctx context.Context, rq syncdeps.Request, stream drpc.Stream) error
}
type ResponseCollector interface {
CollectResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error
}
type StreamResponse struct {
Stream drpc.Stream
Connection drpc.Conn
@ -41,7 +37,7 @@ func NewRequestManager(handler syncdeps.SyncHandler) RequestManager {
}
}
func (r *requestManager) SendRequest(ctx context.Context, rq syncdeps.Request, collector ResponseCollector) error {
func (r *requestManager) SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error {
return r.handler.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error {
for {
resp := r.handler.NewResponse()
@ -59,21 +55,9 @@ func (r *requestManager) SendRequest(ctx context.Context, rq syncdeps.Request, c
func (r *requestManager) QueueRequest(rq syncdeps.Request) error {
return r.requestPool.QueueRequestAction(rq.PeerId(), rq.ObjectId(), func(ctx context.Context) {
err := r.handler.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error {
for {
resp := r.handler.NewResponse()
err := stream.MsgRecv(resp, streampool.EncodingProto)
if err != nil {
return err
}
err = r.handler.HandleResponse(ctx, rq.PeerId(), rq.ObjectId(), resp)
if err != nil {
return err
}
}
})
err := r.handler.ApplyRequest(ctx, rq, r)
if err != nil {
log.Warn("failed to receive request", zap.Error(err))
log.Error("failed to apply request", zap.Error(err))
}
})
}

View file

@ -23,7 +23,7 @@ type SyncService interface {
app.Component
BroadcastMessage(ctx context.Context, msg drpc.Message) error
HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error
SendRequest(ctx context.Context, rq syncdeps.Request, collector ResponseCollector) error
SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error
QueueRequest(ctx context.Context, rq syncdeps.Request) error
}
@ -106,7 +106,7 @@ func (s *syncService) QueueRequest(ctx context.Context, rq syncdeps.Request) err
return s.manager.QueueRequest(rq)
}
func (s *syncService) SendRequest(ctx context.Context, rq syncdeps.Request, collector ResponseCollector) error {
func (s *syncService) SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error {
return s.manager.SendRequest(ctx, rq, collector)
}

View file

@ -12,6 +12,14 @@ import (
const CName = "common.sync.syncdeps"
type ResponseCollector interface {
CollectResponse(ctx context.Context, peerId, objectId string, resp Response) error
}
type RequestSender interface {
SendRequest(ctx context.Context, rq Request, collector ResponseCollector) error
}
type ObjectSyncHandler interface {
HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (Request, error)
HandleStreamRequest(ctx context.Context, rq Request, send func(resp proto.Message) error) (Request, error)
@ -20,7 +28,9 @@ type ObjectSyncHandler interface {
type SyncHandler interface {
app.Component
ObjectSyncHandler
HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (Request, error)
HandleStreamRequest(ctx context.Context, rq Request, sendResponse func(resp proto.Message) error) (Request, error)
ApplyRequest(ctx context.Context, rq Request, requestSender RequestSender) error
TryAddMessage(ctx context.Context, peerId string, msg drpc.Message, q *mb.MB[drpc.Message]) error
SendStreamRequest(ctx context.Context, rq Request, receive func(stream drpc.Stream) error) (err error)
NewResponse() Response

View file

@ -0,0 +1,22 @@
package synctest
import (
"context"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
)
type CounterResponseCollector struct {
counter *Counter
}
func NewCounterResponseCollector(counter *Counter) *CounterResponseCollector {
return &CounterResponseCollector{counter: counter}
}
func (c *CounterResponseCollector) CollectResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error {
counterResp := resp.(*synctestproto.CounterIncrease)
c.counter.Add(counterResp.Value)
return nil
}

View file

@ -1,22 +0,0 @@
package synctest
import (
"context"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
)
type CounterResponseHandler struct {
counter *Counter
}
func (c *CounterResponseHandler) NewResponse() syncdeps.Response {
return &synctestproto.CounterIncrease{}
}
func (c *CounterResponseHandler) HandleResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error {
counterResp := resp.(*synctestproto.CounterIncrease)
c.counter.Add(counterResp.Value)
return nil
}

View file

@ -9,13 +9,19 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
)
type CounterSyncHandler struct {
requestHandler *CounterRequestHandler
requestSender *CounterRequestSender
responseHandler *CounterResponseHandler
updateHandler *CounterUpdateHandler
counter *Counter
requestHandler *CounterRequestHandler
requestSender *CounterRequestSender
updateHandler *CounterUpdateHandler
}
func (c *CounterSyncHandler) ApplyRequest(ctx context.Context, rq syncdeps.Request, requestSender syncdeps.RequestSender) error {
collector := NewCounterResponseCollector(c.counter)
return requestSender.SendRequest(ctx, rq, collector)
}
func NewCounterSyncHandler() syncdeps.SyncHandler {
@ -38,21 +44,16 @@ func (c *CounterSyncHandler) SendStreamRequest(ctx context.Context, rq syncdeps.
return c.requestSender.SendStreamRequest(ctx, rq, receive)
}
func (c *CounterSyncHandler) HandleResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error {
return c.responseHandler.HandleResponse(ctx, peerId, objectId, resp)
}
func (c *CounterSyncHandler) NewResponse() syncdeps.Response {
return c.responseHandler.NewResponse()
return &synctestproto.CounterIncrease{}
}
func (c *CounterSyncHandler) Init(a *app.App) (err error) {
counter := a.MustComponent(CounterName).(*Counter)
peerProvider := a.MustComponent(PeerName).(*PeerProvider)
c.requestHandler = &CounterRequestHandler{counter: counter}
c.counter = a.MustComponent(CounterName).(*Counter)
c.requestHandler = &CounterRequestHandler{counter: c.counter}
c.requestSender = &CounterRequestSender{peerProvider: a.MustComponent(PeerName).(*PeerProvider)}
c.responseHandler = &CounterResponseHandler{counter: counter}
c.updateHandler = &CounterUpdateHandler{counter: counter, peerProvider: peerProvider}
c.updateHandler = &CounterUpdateHandler{counter: c.counter, peerProvider: peerProvider}
return nil
}