mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
145 lines
4.1 KiB
Go
145 lines
4.1 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"github.com/cheggaaa/mb/v3"
|
|
"go.uber.org/zap"
|
|
"storj.io/drpc"
|
|
|
|
"github.com/anyproto/any-sync/app"
|
|
"github.com/anyproto/any-sync/app/logger"
|
|
"github.com/anyproto/any-sync/commonspace/peermanager"
|
|
"github.com/anyproto/any-sync/commonspace/spacestate"
|
|
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
|
|
"github.com/anyproto/any-sync/metric"
|
|
"github.com/anyproto/any-sync/nodeconf"
|
|
"github.com/anyproto/any-sync/util/multiqueue"
|
|
"github.com/anyproto/any-sync/util/syncqueues"
|
|
)
|
|
|
|
const CName = "common.commonspace.sync"
|
|
|
|
var log = logger.NewNamed("sync")
|
|
|
|
var ErrUnexpectedMessage = errors.New("unexpected message")
|
|
|
|
type SyncService interface {
|
|
app.Component
|
|
BroadcastMessage(ctx context.Context, msg drpc.Message) error
|
|
HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error
|
|
HandleMessage(ctx context.Context, msg drpc.Message) error
|
|
SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error
|
|
QueueRequest(ctx context.Context, rq syncdeps.Request) error
|
|
CloseReceiveQueue(id string) error
|
|
}
|
|
|
|
type syncService struct {
|
|
receiveQueue multiqueue.MultiQueue[msgCtx]
|
|
manager RequestManager
|
|
peerManager peermanager.PeerManager
|
|
nodeConf nodeconf.NodeConf
|
|
handler syncdeps.SyncHandler
|
|
spaceId string
|
|
metric *syncMetric
|
|
commonMetric metric.Metric
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
type msgCtx struct {
|
|
ctx context.Context
|
|
multiqueue.Sizeable
|
|
}
|
|
|
|
func (m msgCtx) MsgSize() uint64 {
|
|
return m.Sizeable.MsgSize()
|
|
}
|
|
|
|
func NewSyncService() SyncService {
|
|
return &syncService{}
|
|
}
|
|
|
|
func (s *syncService) Name() (name string) {
|
|
return CName
|
|
}
|
|
|
|
func (s *syncService) Init(a *app.App) (err error) {
|
|
s.metric = &syncMetric{}
|
|
s.spaceId = a.MustComponent(spacestate.CName).(*spacestate.SpaceState).SpaceId
|
|
s.handler = a.MustComponent(syncdeps.CName).(syncdeps.SyncHandler)
|
|
s.receiveQueue = multiqueue.New[msgCtx](s.handleIncomingMessage, s.metric, syncdeps.MsgTypeIncoming, 100)
|
|
s.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
|
|
s.commonMetric, _ = a.Component(metric.CName).(metric.Metric)
|
|
syncQueues := a.MustComponent(syncqueues.CName).(syncqueues.SyncQueues)
|
|
s.manager = NewRequestManager(s.handler, s.metric, syncQueues.ActionPool(s.spaceId), syncQueues.Limit(s.spaceId))
|
|
s.ctx, s.cancel = context.WithCancel(context.Background())
|
|
return nil
|
|
}
|
|
|
|
func (s *syncService) Run(ctx context.Context) (err error) {
|
|
if s.commonMetric != nil {
|
|
s.commonMetric.RegisterSyncMetric(s.spaceId, s.metric)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *syncService) Close(ctx context.Context) (err error) {
|
|
err = s.receiveQueue.Close()
|
|
if s.commonMetric != nil {
|
|
s.commonMetric.UnregisterSyncMetric(s.spaceId)
|
|
}
|
|
s.manager.Close()
|
|
return
|
|
}
|
|
|
|
func (s *syncService) BroadcastMessage(ctx context.Context, msg drpc.Message) error {
|
|
return s.peerManager.BroadcastMessage(ctx, msg)
|
|
}
|
|
|
|
func (s *syncService) handleIncomingMessage(msg msgCtx) {
|
|
req, err := s.handler.HandleHeadUpdate(msg.ctx, msg.Sizeable)
|
|
if err != nil {
|
|
log.Error("failed to handle head update", zap.Error(err))
|
|
}
|
|
if req == nil {
|
|
return
|
|
}
|
|
err = s.manager.QueueRequest(req)
|
|
if err != nil {
|
|
log.Error("failed to queue request", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (s *syncService) HandleMessage(ctx context.Context, msg drpc.Message) error {
|
|
idMsg, ok := msg.(syncdeps.Message)
|
|
if !ok {
|
|
return ErrUnexpectedMessage
|
|
}
|
|
objectId := idMsg.ObjectId()
|
|
err := s.receiveQueue.Add(ctx, objectId, msgCtx{
|
|
ctx: ctx,
|
|
Sizeable: idMsg,
|
|
})
|
|
if errors.Is(err, mb.ErrOverflowed) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *syncService) QueueRequest(ctx context.Context, rq syncdeps.Request) error {
|
|
return s.manager.QueueRequest(rq)
|
|
}
|
|
|
|
func (s *syncService) SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error {
|
|
return s.manager.SendRequest(ctx, rq, collector)
|
|
}
|
|
|
|
func (s *syncService) HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error {
|
|
return s.manager.HandleStreamRequest(ctx, req, stream)
|
|
}
|
|
|
|
func (s *syncService) CloseReceiveQueue(id string) error {
|
|
return s.receiveQueue.CloseThread(id)
|
|
}
|