1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-11 18:20:28 +09:00

Few sync protocol fixes

This commit is contained in:
mcrakhman 2024-06-19 15:43:20 +02:00
parent 7c928b3b8f
commit ebd1f5de22
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
5 changed files with 35 additions and 24 deletions

View file

@ -74,7 +74,7 @@ func (o *objectSync) HandleHeadUpdate(ctx context.Context, headUpdate drpc.Messa
func (o *objectSync) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, sendResponse func(resp proto.Message) error) (syncdeps.Request, error) {
obj, err := o.manager.GetTree(context.Background(), o.spaceId, rq.ObjectId())
if err != nil {
return nil, treechangeproto.ErrGetTree
return synctree.NewRequest(rq.PeerId(), o.spaceId, rq.ObjectId(), nil, nil, nil), treechangeproto.ErrGetTree
}
objHandler, ok := obj.(syncdeps.ObjectSyncHandler)
if !ok {

View file

@ -78,12 +78,16 @@ func (r *requestManager) HandleStreamRequest(ctx context.Context, rq syncdeps.Re
newRq, err := r.handler.HandleStreamRequest(ctx, rq, func(resp proto.Message) error {
return stream.MsgSend(resp, streampool.EncodingProto)
})
// here is a little bit non-standard decision, because we can return error but still can queue the request
if newRq != nil {
rqErr := r.QueueRequest(newRq)
if rqErr != nil {
log.Debug("failed to queue request", zap.Error(err))
}
}
if err != nil {
return err
}
if newRq != nil {
return r.QueueRequest(newRq)
}
return nil
}

View file

@ -114,6 +114,10 @@ func (s *syncService) GetQueue(peerId string) *multiqueue.Queue[drpc.Message] {
return queue
}
func (s *syncService) RemoveQueue(peerId string) error {
return s.sendQueueProvider.RemoveQueue(peerId)
}
func (s *syncService) NewReadMessage() drpc.Message {
return s.handler.NewMessage()
}