1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 14:07:02 +09:00

Inject deps for test sync service

This commit is contained in:
mcrakhman 2024-05-24 16:52:43 +02:00
parent cdfc50eeb3
commit bcccf45831
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
3 changed files with 31 additions and 22 deletions

10
commonspace/sync/deps.go Normal file
View file

@ -0,0 +1,10 @@
package sync
type SyncDeps struct {
HeadUpdateHandler HeadUpdateHandler
HeadUpdateSender HeadUpdateSender
ResponseHandler ResponseHandler
RequestHandler RequestHandler
RequestSender RequestSender
MergeFilter MergeFilterFunc
}

View file

@ -1,9 +1,7 @@
package sync
import (
"context"
"strings"
"sync"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
@ -53,11 +51,15 @@ type requestManager struct {
requestHandler RequestHandler
responseHandler ResponseHandler
requestSender RequestSender
currentRequests map[string]struct{}
mx sync.Mutex
ctx context.Context
cancel context.CancelFunc
wait chan struct{}
}
func NewRequestManager(deps SyncDeps) RequestManager {
return &requestManager{
requestPool: NewRequestPool(),
requestHandler: deps.RequestHandler,
responseHandler: deps.ResponseHandler,
requestSender: deps.RequestSender,
}
}
func (r *requestManager) QueueRequest(rq Request) error {

View file

@ -22,28 +22,25 @@ type SyncService interface {
type MergeFilterFunc func(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error
type syncService struct {
// sendQueue is a multiqueue: peerId -> queue
// this queue exists for sending head updates
sendQueueProvider multiqueue.QueueProvider[drpc.Message]
// receiveQueue is a multiqueue: objectId -> queue
// this queue exists for receiving head updates
receiveQueue multiqueue.MultiQueue[drpc.Message]
// manager is a Request manager which works with both incoming and outgoing requests
manager RequestManager
// handler checks if head update is relevant and then queues Request intent if necessary
handler HeadUpdateHandler
// sender sends head updates to peers
sender HeadUpdateSender
mergeFilter MergeFilterFunc
ctx context.Context
cancel context.CancelFunc
}
func NewSyncService() SyncService {
func NewSyncService(deps SyncDeps) SyncService {
s := &syncService{}
s.ctx, s.cancel = context.WithCancel(context.Background())
s.sendQueueProvider = multiqueue.NewQueueProvider[drpc.Message](100, s.handleOutgoingMessage)
s.receiveQueue = multiqueue.New[drpc.Message](s.handleIncomingMessage, 100)
s.sender = deps.HeadUpdateSender
s.handler = deps.HeadUpdateHandler
s.mergeFilter = deps.MergeFilter
s.manager = NewRequestManager(deps)
return s
}