mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 14:07:02 +09:00
Simplify deps
This commit is contained in:
parent
43383853e1
commit
7ce669d85d
11 changed files with 93 additions and 133 deletions
|
@ -25,17 +25,13 @@ type StreamResponse struct {
|
|||
|
||||
type requestManager struct {
|
||||
requestPool RequestPool
|
||||
requestHandler syncdeps.RequestHandler
|
||||
responseHandler syncdeps.ResponseHandler
|
||||
requestSender syncdeps.RequestSender
|
||||
handler syncdeps.SyncHandler
|
||||
}
|
||||
|
||||
func NewRequestManager(deps syncdeps.SyncDeps) RequestManager {
|
||||
func NewRequestManager(handler syncdeps.SyncHandler) RequestManager {
|
||||
return &requestManager{
|
||||
requestPool: NewRequestPool(),
|
||||
requestHandler: deps.RequestHandler,
|
||||
responseHandler: deps.ResponseHandler,
|
||||
requestSender: deps.RequestSender,
|
||||
handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -43,9 +39,9 @@ func (r *requestManager) QueueRequest(rq syncdeps.Request) error {
|
|||
return r.requestPool.QueueRequestAction(rq.PeerId(), rq.ObjectId(), func(ctx context.Context) {
|
||||
fmt.Println("starting stream request", rq.PeerId(), rq.ObjectId())
|
||||
defer fmt.Println("ending stream request", rq.PeerId(), rq.ObjectId())
|
||||
err := r.requestSender.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error {
|
||||
err := r.handler.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error {
|
||||
for {
|
||||
resp := r.responseHandler.NewResponse()
|
||||
resp := r.handler.NewResponse()
|
||||
fmt.Println("receiving message", rq.PeerId(), rq.ObjectId())
|
||||
err := stream.MsgRecv(resp, streampool.EncodingProto)
|
||||
fmt.Println("received message", rq.PeerId(), rq.ObjectId(), err)
|
||||
|
@ -53,7 +49,7 @@ func (r *requestManager) QueueRequest(rq syncdeps.Request) error {
|
|||
return err
|
||||
}
|
||||
fmt.Println("handling response", rq.PeerId(), rq.ObjectId())
|
||||
err = r.responseHandler.HandleResponse(ctx, rq.PeerId(), rq.ObjectId(), resp)
|
||||
err = r.handler.HandleResponse(ctx, rq.PeerId(), rq.ObjectId(), resp)
|
||||
fmt.Println("handled response", rq.PeerId(), rq.ObjectId(), err)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -72,7 +68,7 @@ func (r *requestManager) HandleStreamRequest(ctx context.Context, rq syncdeps.Re
|
|||
}
|
||||
fmt.Println("handling stream request", rq.PeerId(), rq.ObjectId())
|
||||
defer r.requestPool.Release(rq.PeerId(), rq.ObjectId())
|
||||
newRq, err := r.requestHandler.HandleStreamRequest(ctx, rq, func(resp proto.Message) error {
|
||||
newRq, err := r.handler.HandleStreamRequest(ctx, rq, func(resp proto.Message) error {
|
||||
fmt.Println("sending response", rq.PeerId(), rq.ObjectId())
|
||||
return stream.MsgSend(resp, streampool.EncodingProto)
|
||||
})
|
||||
|
|
|
@ -29,9 +29,7 @@ type syncService struct {
|
|||
sendQueueProvider multiqueue.QueueProvider[drpc.Message]
|
||||
receiveQueue multiqueue.MultiQueue[msgCtx]
|
||||
manager RequestManager
|
||||
handler syncdeps.HeadUpdateHandler
|
||||
mergeFilter syncdeps.MergeFilterFunc
|
||||
newMessage func() drpc.Message
|
||||
handler syncdeps.SyncHandler
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
@ -42,14 +40,10 @@ type msgCtx struct {
|
|||
}
|
||||
|
||||
func (s *syncService) Init(a *app.App) (err error) {
|
||||
factory := a.MustComponent(syncdeps.CName).(syncdeps.SyncDepsFactory)
|
||||
s.handler = a.MustComponent(syncdeps.CName).(syncdeps.SyncHandler)
|
||||
s.sendQueueProvider = multiqueue.NewQueueProvider[drpc.Message](100, s.handleOutgoingMessage)
|
||||
s.receiveQueue = multiqueue.New[msgCtx](s.handleIncomingMessage, 100)
|
||||
deps := factory.SyncDeps()
|
||||
s.handler = deps.HeadUpdateHandler
|
||||
s.mergeFilter = deps.MergeFilter
|
||||
s.newMessage = deps.ReadMessageConstructor
|
||||
s.manager = NewRequestManager(deps)
|
||||
s.manager = NewRequestManager(s.handler)
|
||||
s.ctx, s.cancel = context.WithCancel(context.Background())
|
||||
return nil
|
||||
}
|
||||
|
@ -63,7 +57,7 @@ func NewSyncService() SyncService {
|
|||
}
|
||||
|
||||
func (s *syncService) handleOutgoingMessage(id string, msg drpc.Message, q *mb.MB[drpc.Message]) error {
|
||||
return s.mergeFilter(s.ctx, msg, q)
|
||||
return s.handler.TryAddMessage(s.ctx, msg, q)
|
||||
}
|
||||
|
||||
func (s *syncService) handleIncomingMessage(msg msgCtx) {
|
||||
|
@ -86,7 +80,7 @@ func (s *syncService) GetQueue(peerId string) *multiqueue.Queue[drpc.Message] {
|
|||
}
|
||||
|
||||
func (s *syncService) NewReadMessage() drpc.Message {
|
||||
return s.newMessage()
|
||||
return s.handler.NewMessage()
|
||||
}
|
||||
|
||||
func (s *syncService) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error {
|
||||
|
|
|
@ -48,7 +48,7 @@ func newFixture(t *testing.T, peerId string, params counterFixtureParams) *count
|
|||
Register(synctest.NewPeerProvider(peerId)).
|
||||
Register(synctest.NewCounter(params.start, params.delta)).
|
||||
Register(streampool.NewStreamPool()).
|
||||
Register(synctest.NewCounterSyncDepsFactory()).
|
||||
Register(synctest.NewCounterSyncHandler()).
|
||||
Register(NewSyncService()).
|
||||
Register(synctest.NewCounterGenerator()).
|
||||
Register(synctest.NewRpcServer())
|
||||
|
|
|
@ -1,11 +0,0 @@
|
|||
package syncdeps
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"storj.io/drpc"
|
||||
)
|
||||
|
||||
type HeadUpdateHandler interface {
|
||||
HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (Request, error)
|
||||
}
|
|
@ -1,11 +0,0 @@
|
|||
package syncdeps
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
)
|
||||
|
||||
type RequestHandler interface {
|
||||
HandleStreamRequest(ctx context.Context, rq Request, send func(resp proto.Message) error) (Request, error)
|
||||
}
|
|
@ -1,11 +0,0 @@
|
|||
package syncdeps
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"storj.io/drpc"
|
||||
)
|
||||
|
||||
type RequestSender interface {
|
||||
SendStreamRequest(ctx context.Context, rq Request, receive func(stream drpc.Stream) error) (err error)
|
||||
}
|
|
@ -1,8 +0,0 @@
|
|||
package syncdeps
|
||||
|
||||
import "context"
|
||||
|
||||
type ResponseHandler interface {
|
||||
NewResponse() Response
|
||||
HandleResponse(ctx context.Context, peerId, objectId string, resp Response) error
|
||||
}
|
|
@ -4,16 +4,21 @@ import (
|
|||
"context"
|
||||
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"storj.io/drpc"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
)
|
||||
|
||||
type MergeFilterFunc func(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error
|
||||
const CName = "common.sync.syncdeps"
|
||||
|
||||
type SyncDeps struct {
|
||||
HeadUpdateHandler HeadUpdateHandler
|
||||
ResponseHandler ResponseHandler
|
||||
RequestHandler RequestHandler
|
||||
RequestSender RequestSender
|
||||
MergeFilter MergeFilterFunc
|
||||
ReadMessageConstructor func() drpc.Message
|
||||
type SyncHandler interface {
|
||||
app.Component
|
||||
HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (Request, error)
|
||||
TryAddMessage(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error
|
||||
HandleStreamRequest(ctx context.Context, rq Request, send func(resp proto.Message) error) (Request, error)
|
||||
SendStreamRequest(ctx context.Context, rq Request, receive func(stream drpc.Stream) error) (err error)
|
||||
HandleResponse(ctx context.Context, peerId, objectId string, resp Response) error
|
||||
NewResponse() Response
|
||||
NewMessage() drpc.Message
|
||||
}
|
||||
|
|
|
@ -1,10 +0,0 @@
|
|||
package syncdeps
|
||||
|
||||
import "github.com/anyproto/any-sync/app"
|
||||
|
||||
const CName = "common.sync.syncdeps"
|
||||
|
||||
type SyncDepsFactory interface {
|
||||
app.Component
|
||||
SyncDeps() SyncDeps
|
||||
}
|
65
commonspace/sync/synctest/countersynchandler.go
Normal file
65
commonspace/sync/synctest/countersynchandler.go
Normal file
|
@ -0,0 +1,65 @@
|
|||
package synctest
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"storj.io/drpc"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
|
||||
)
|
||||
|
||||
type CounterSyncHandler struct {
|
||||
requestHandler *CounterRequestHandler
|
||||
requestSender *CounterRequestSender
|
||||
responseHandler *CounterResponseHandler
|
||||
updateHandler *CounterUpdateHandler
|
||||
}
|
||||
|
||||
func NewCounterSyncHandler() syncdeps.SyncHandler {
|
||||
return &CounterSyncHandler{}
|
||||
}
|
||||
|
||||
func (c *CounterSyncHandler) HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (syncdeps.Request, error) {
|
||||
return c.updateHandler.HandleHeadUpdate(ctx, headUpdate)
|
||||
}
|
||||
|
||||
func (c *CounterSyncHandler) TryAddMessage(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error {
|
||||
return q.TryAdd(msg)
|
||||
}
|
||||
|
||||
func (c *CounterSyncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, send func(resp proto.Message) error) (syncdeps.Request, error) {
|
||||
return c.requestHandler.HandleStreamRequest(ctx, rq, send)
|
||||
}
|
||||
|
||||
func (c *CounterSyncHandler) SendStreamRequest(ctx context.Context, rq syncdeps.Request, receive func(stream drpc.Stream) error) (err error) {
|
||||
return c.requestSender.SendStreamRequest(ctx, rq, receive)
|
||||
}
|
||||
|
||||
func (c *CounterSyncHandler) HandleResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error {
|
||||
return c.responseHandler.HandleResponse(ctx, peerId, objectId, resp)
|
||||
}
|
||||
|
||||
func (c *CounterSyncHandler) NewResponse() syncdeps.Response {
|
||||
return c.responseHandler.NewResponse()
|
||||
}
|
||||
|
||||
func (c *CounterSyncHandler) Init(a *app.App) (err error) {
|
||||
counter := a.MustComponent(CounterName).(*Counter)
|
||||
peerProvider := a.MustComponent(PeerName).(*PeerProvider)
|
||||
c.requestHandler = &CounterRequestHandler{counter: counter}
|
||||
c.requestSender = &CounterRequestSender{peerProvider: a.MustComponent(PeerName).(*PeerProvider)}
|
||||
c.responseHandler = &CounterResponseHandler{counter: counter}
|
||||
c.updateHandler = &CounterUpdateHandler{counter: counter, peerProvider: peerProvider}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *CounterSyncHandler) Name() (name string) {
|
||||
return syncdeps.CName
|
||||
}
|
||||
|
||||
func (c *CounterSyncHandler) NewMessage() drpc.Message {
|
||||
return &CounterUpdate{}
|
||||
}
|
|
@ -1,49 +0,0 @@
|
|||
package synctest
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
"storj.io/drpc"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
|
||||
)
|
||||
|
||||
type CounterSyncDepsFactory struct {
|
||||
syncDeps syncdeps.SyncDeps
|
||||
}
|
||||
|
||||
func NewCounterSyncDepsFactory() syncdeps.SyncDepsFactory {
|
||||
return &CounterSyncDepsFactory{}
|
||||
}
|
||||
|
||||
func (c *CounterSyncDepsFactory) Init(a *app.App) (err error) {
|
||||
counter := a.MustComponent(CounterName).(*Counter)
|
||||
peerProvider := a.MustComponent(PeerName).(*PeerProvider)
|
||||
requestHandler := &CounterRequestHandler{counter: counter}
|
||||
requestSender := &CounterRequestSender{peerProvider: a.MustComponent(PeerName).(*PeerProvider)}
|
||||
responseHandler := &CounterResponseHandler{counter: counter}
|
||||
updateHandler := &CounterUpdateHandler{counter: counter, peerProvider: peerProvider}
|
||||
c.syncDeps = syncdeps.SyncDeps{
|
||||
HeadUpdateHandler: updateHandler,
|
||||
ResponseHandler: responseHandler,
|
||||
RequestHandler: requestHandler,
|
||||
RequestSender: requestSender,
|
||||
MergeFilter: func(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error {
|
||||
return q.TryAdd(msg)
|
||||
},
|
||||
ReadMessageConstructor: func() drpc.Message {
|
||||
return &CounterUpdate{}
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *CounterSyncDepsFactory) Name() (name string) {
|
||||
return syncdeps.CName
|
||||
}
|
||||
|
||||
func (c *CounterSyncDepsFactory) SyncDeps() syncdeps.SyncDeps {
|
||||
return c.syncDeps
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue