1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00
any-sync/commonspace/sync/sync.go
2025-04-07 23:56:53 +02:00

145 lines
4.1 KiB
Go

package sync
import (
"context"
"errors"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
"storj.io/drpc"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/util/multiqueue"
"github.com/anyproto/any-sync/util/syncqueues"
)
const CName = "common.commonspace.sync"
var log = logger.NewNamed("sync")
var ErrUnexpectedMessage = errors.New("unexpected message")
type SyncService interface {
app.Component
BroadcastMessage(ctx context.Context, msg drpc.Message) error
HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error
HandleMessage(ctx context.Context, msg drpc.Message) error
SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error
QueueRequest(ctx context.Context, rq syncdeps.Request) error
CloseReceiveQueue(id string) error
}
type syncService struct {
receiveQueue multiqueue.MultiQueue[msgCtx]
manager RequestManager
peerManager peermanager.PeerManager
nodeConf nodeconf.NodeConf
handler syncdeps.SyncHandler
spaceId string
metric *syncMetric
commonMetric metric.Metric
ctx context.Context
cancel context.CancelFunc
}
type msgCtx struct {
ctx context.Context
multiqueue.Sizeable
}
func (m msgCtx) MsgSize() uint64 {
return m.Sizeable.MsgSize()
}
func NewSyncService() SyncService {
return &syncService{}
}
func (s *syncService) Name() (name string) {
return CName
}
func (s *syncService) Init(a *app.App) (err error) {
s.metric = &syncMetric{}
s.spaceId = a.MustComponent(spacestate.CName).(*spacestate.SpaceState).SpaceId
s.handler = a.MustComponent(syncdeps.CName).(syncdeps.SyncHandler)
s.receiveQueue = multiqueue.New[msgCtx](s.handleIncomingMessage, s.metric, syncdeps.MsgTypeIncoming, 100)
s.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
s.commonMetric, _ = a.Component(metric.CName).(metric.Metric)
syncQueues := a.MustComponent(syncqueues.CName).(syncqueues.SyncQueues)
s.manager = NewRequestManager(s.handler, s.metric, syncQueues.ActionPool(s.spaceId), syncQueues.Limit(s.spaceId))
s.ctx, s.cancel = context.WithCancel(context.Background())
return nil
}
func (s *syncService) Run(ctx context.Context) (err error) {
if s.commonMetric != nil {
s.commonMetric.RegisterSyncMetric(s.spaceId, s.metric)
}
return nil
}
func (s *syncService) Close(ctx context.Context) (err error) {
err = s.receiveQueue.Close()
if s.commonMetric != nil {
s.commonMetric.UnregisterSyncMetric(s.spaceId)
}
s.manager.Close()
return
}
func (s *syncService) BroadcastMessage(ctx context.Context, msg drpc.Message) error {
return s.peerManager.BroadcastMessage(ctx, msg)
}
func (s *syncService) handleIncomingMessage(msg msgCtx) {
req, err := s.handler.HandleHeadUpdate(msg.ctx, msg.Sizeable)
if err != nil {
log.Error("failed to handle head update", zap.Error(err))
}
if req == nil {
return
}
err = s.manager.QueueRequest(req)
if err != nil {
log.Error("failed to queue request", zap.Error(err))
}
}
func (s *syncService) HandleMessage(ctx context.Context, msg drpc.Message) error {
idMsg, ok := msg.(syncdeps.Message)
if !ok {
return ErrUnexpectedMessage
}
objectId := idMsg.ObjectId()
err := s.receiveQueue.Add(ctx, objectId, msgCtx{
ctx: ctx,
Sizeable: idMsg,
})
if errors.Is(err, mb.ErrOverflowed) {
return nil
}
return err
}
func (s *syncService) QueueRequest(ctx context.Context, rq syncdeps.Request) error {
return s.manager.QueueRequest(rq)
}
func (s *syncService) SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error {
return s.manager.SendRequest(ctx, rq, collector)
}
func (s *syncService) HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error {
return s.manager.HandleStreamRequest(ctx, req, stream)
}
func (s *syncService) CloseReceiveQueue(id string) error {
return s.receiveQueue.CloseThread(id)
}