mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 14:07:02 +09:00
215 lines
6.5 KiB
Go
215 lines
6.5 KiB
Go
//go:generate mockgen -destination mock_objectsync/mock_objectsync.go github.com/anyproto/any-sync/commonspace/objectsync ObjectSync
|
|
package objectsync
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/anyproto/any-sync/app"
|
|
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
|
"github.com/anyproto/any-sync/commonspace/object/treemanager"
|
|
"github.com/anyproto/any-sync/commonspace/spacestate"
|
|
"github.com/anyproto/any-sync/metric"
|
|
"github.com/anyproto/any-sync/net/peer"
|
|
"github.com/anyproto/any-sync/util/multiqueue"
|
|
"github.com/cheggaaa/mb/v3"
|
|
"github.com/gogo/protobuf/proto"
|
|
|
|
"github.com/anyproto/any-sync/app/logger"
|
|
"github.com/anyproto/any-sync/commonspace/object/syncobjectgetter"
|
|
"github.com/anyproto/any-sync/commonspace/spacestorage"
|
|
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
|
"github.com/anyproto/any-sync/nodeconf"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const CName = "common.commonspace.objectsync"
|
|
|
|
var log = logger.NewNamed(CName)
|
|
|
|
type ObjectSync interface {
|
|
LastUsage() time.Time
|
|
HandleMessage(ctx context.Context, hm HandleMessage) (err error)
|
|
HandleRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error)
|
|
CloseThread(id string) (err error)
|
|
app.ComponentRunnable
|
|
}
|
|
|
|
type HandleMessage struct {
|
|
Id uint64
|
|
ReceiveTime time.Time
|
|
StartHandlingTime time.Time
|
|
Deadline time.Time
|
|
SenderId string
|
|
Message *spacesyncproto.ObjectSyncMessage
|
|
PeerCtx context.Context
|
|
}
|
|
|
|
func (m HandleMessage) LogFields(fields ...zap.Field) []zap.Field {
|
|
return append(fields,
|
|
metric.SpaceId(m.Message.SpaceId),
|
|
metric.ObjectId(m.Message.ObjectId),
|
|
metric.QueueDur(m.StartHandlingTime.Sub(m.ReceiveTime)),
|
|
metric.TotalDur(time.Since(m.ReceiveTime)),
|
|
)
|
|
}
|
|
|
|
type objectSync struct {
|
|
spaceId string
|
|
|
|
objectGetter syncobjectgetter.SyncObjectGetter
|
|
configuration nodeconf.NodeConf
|
|
spaceStorage spacestorage.SpaceStorage
|
|
metric metric.Metric
|
|
|
|
handleQueue multiqueue.MultiQueue[HandleMessage]
|
|
}
|
|
|
|
func (s *objectSync) Init(a *app.App) (err error) {
|
|
s.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
|
|
s.objectGetter = a.MustComponent(treemanager.CName).(syncobjectgetter.SyncObjectGetter)
|
|
s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
|
|
sharedData := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
|
|
mc := a.Component(metric.CName)
|
|
if mc != nil {
|
|
s.metric = mc.(metric.Metric)
|
|
}
|
|
s.spaceId = sharedData.SpaceId
|
|
s.handleQueue = multiqueue.New[HandleMessage](s.processHandleMessage, 30)
|
|
return nil
|
|
}
|
|
|
|
func (s *objectSync) Name() (name string) {
|
|
return CName
|
|
}
|
|
|
|
func (s *objectSync) Run(ctx context.Context) (err error) {
|
|
return nil
|
|
}
|
|
|
|
func (s *objectSync) Close(ctx context.Context) (err error) {
|
|
return s.handleQueue.Close()
|
|
}
|
|
|
|
func New() ObjectSync {
|
|
return &objectSync{}
|
|
}
|
|
|
|
func (s *objectSync) LastUsage() time.Time {
|
|
// TODO: add time
|
|
return time.Time{}
|
|
}
|
|
|
|
func (s *objectSync) HandleRequest(ctx context.Context, req *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
|
|
peerId, err := peer.CtxPeerId(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.handleRequest(ctx, peerId, req)
|
|
}
|
|
|
|
func (s *objectSync) HandleMessage(ctx context.Context, hm HandleMessage) (err error) {
|
|
threadId := hm.Message.ObjectId
|
|
hm.ReceiveTime = time.Now()
|
|
if hm.PeerCtx == nil {
|
|
hm.PeerCtx = ctx
|
|
}
|
|
err = s.handleQueue.Add(ctx, threadId, hm)
|
|
if err == mb.ErrOverflowed {
|
|
log.InfoCtx(ctx, "queue overflowed", zap.String("spaceId", s.spaceId), zap.String("objectId", threadId))
|
|
// skip overflowed error
|
|
return nil
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *objectSync) processHandleMessage(msg HandleMessage) {
|
|
var err error
|
|
msg.StartHandlingTime = time.Now()
|
|
ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId)
|
|
ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId))
|
|
defer func() {
|
|
if s.metric == nil {
|
|
return
|
|
}
|
|
s.metric.RequestLog(msg.PeerCtx, "space.streamOp", msg.LogFields(
|
|
zap.Error(err),
|
|
)...)
|
|
}()
|
|
|
|
if !msg.Deadline.IsZero() {
|
|
now := time.Now()
|
|
if now.After(msg.Deadline) {
|
|
log.InfoCtx(ctx, "skip message: deadline exceed")
|
|
err = context.DeadlineExceeded
|
|
return
|
|
}
|
|
}
|
|
if err = s.handleMessage(ctx, msg.SenderId, msg.Message); err != nil {
|
|
if msg.Message.ObjectId != "" {
|
|
// cleanup thread on error
|
|
_ = s.handleQueue.CloseThread(msg.Message.ObjectId)
|
|
}
|
|
log.InfoCtx(ctx, "handleMessage error", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (s *objectSync) handleRequest(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
|
|
log := log.With(zap.String("objectId", msg.ObjectId))
|
|
err = s.checkEmptyFullSync(log, msg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
|
if err != nil {
|
|
return nil, treechangeproto.ErrGetTree
|
|
}
|
|
return obj.HandleRequest(ctx, senderId, msg)
|
|
}
|
|
|
|
func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
|
log := log.With(zap.String("objectId", msg.ObjectId))
|
|
err = s.checkEmptyFullSync(log, msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get object from cache: %w", err)
|
|
}
|
|
err = obj.HandleMessage(ctx, senderId, msg)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to handle message: %w", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *objectSync) CloseThread(id string) (err error) {
|
|
return s.handleQueue.CloseThread(id)
|
|
}
|
|
|
|
func (s *objectSync) checkEmptyFullSync(log logger.CtxLogger, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
|
hasTree, err := s.spaceStorage.HasTree(msg.ObjectId)
|
|
if err != nil {
|
|
log.Warn("failed to execute get operation on storage has tree", zap.Error(err))
|
|
return spacesyncproto.ErrUnexpected
|
|
}
|
|
// in this case we will try to get it from remote, unless the sender also sent us the same request :-)
|
|
if !hasTree {
|
|
treeMsg := &treechangeproto.TreeSyncMessage{}
|
|
err = proto.Unmarshal(msg.Payload, treeMsg)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
// this means that we don't have the tree locally and therefore can't return it
|
|
if s.isEmptyFullSyncRequest(treeMsg) {
|
|
return treechangeproto.ErrGetTree
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *objectSync) isEmptyFullSyncRequest(msg *treechangeproto.TreeSyncMessage) bool {
|
|
return msg.GetContent().GetFullSyncRequest() != nil && len(msg.GetContent().GetFullSyncRequest().GetHeads()) == 0
|
|
}
|