1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-11 18:20:28 +09:00

Fix sync logic, move acks after handling finished, made message service async

This commit is contained in:
mcrakhman 2022-08-06 10:22:09 +02:00 committed by Mikhail Iudin
parent 1a892b05cf
commit 84d1f0df06
No known key found for this signature in database
GPG key ID: FAAAA8BAABDFF1C0
6 changed files with 110 additions and 74 deletions

View file

@ -482,7 +482,7 @@ func (a *aclTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh
log.With(
zap.Int("len(changes)", len(rawChanges)),
zap.String("id", a.id)).
Debug("sending all changes after common snapshot")
Debug("returning all changes after common snapshot")
return rawChanges, nil
}

View file

@ -38,14 +38,26 @@ func (m *Message) Ack() (err error) {
}
rep := &syncproto.Message{
Header: &syncproto.Header{
TraceId: m.GetHeader().TraceId,
ReplyId: m.GetHeader().RequestId,
Type: syncproto.MessageType_MessageTypeSystem,
TraceId: m.GetHeader().TraceId,
ReplyId: m.GetHeader().RequestId,
Type: syncproto.MessageType_MessageTypeSystem,
DebugInfo: "Ack",
},
Data: data,
}
log.With(zap.String("header", rep.Header.String())).Info("sending ack to peer")
return m.peer.Send(rep)
err = m.peer.Send(rep)
if err != nil {
log.With(
zap.String("peerId", m.peer.Id()),
zap.String("header", rep.GetHeader().String())).
Error("failed sending ack to peer", zap.Error(err))
} else {
log.With(
zap.String("peerId", m.peer.Id()),
zap.String("header", rep.GetHeader().String())).
Debug("sent ack to peer")
}
return
}
func (m *Message) AckError(code syncproto.SystemErrorCode, description string) (err error) {
@ -63,11 +75,23 @@ func (m *Message) AckError(code syncproto.SystemErrorCode, description string) (
}
rep := &syncproto.Message{
Header: &syncproto.Header{
TraceId: []byte(bson.NewObjectId()),
ReplyId: m.GetHeader().RequestId,
Type: syncproto.MessageType_MessageTypeSystem,
TraceId: []byte(bson.NewObjectId()),
ReplyId: m.GetHeader().RequestId,
Type: syncproto.MessageType_MessageTypeSystem,
DebugInfo: "AckError",
},
Data: data,
}
if err != nil {
log.With(
zap.String("peerId", m.peer.Id()),
zap.String("header", rep.GetHeader().String())).
Error("failed sending ackError to peer", zap.Error(err))
} else {
log.With(
zap.String("peerId", m.peer.Id()),
zap.String("header", rep.GetHeader().String())).
Debug("sent ackError to peer")
}
return m.peer.Send(rep)
}

View file

@ -154,41 +154,59 @@ func (p *pool) RemovePeerIdFromGroup(peerId, groupId string) (err error) {
}
func (p *pool) SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error) {
defer func() {
if err != nil {
log.With(
zap.String("peerId", peerId),
zap.String("header", msg.GetHeader().String())).
Error("failed sending message to peer", zap.Error(err))
} else {
log.With(
zap.String("peerId", peerId),
zap.String("header", msg.GetHeader().String())).
Debug("sent message to peer")
}
}()
p.mu.RLock()
peer := p.peersById[peerId]
p.mu.RUnlock()
if peer == nil {
return ErrPeerNotFound
err = ErrPeerNotFound
return
}
repId := p.waiters.NewReplyId()
msg.GetHeader().RequestId = repId
ch := make(chan Reply, 1)
log.With(zap.Uint64("reply id", repId)).Debug("adding waiter for reply id")
p.waiters.Add(repId, &waiter{ch: ch})
defer p.waiters.Remove(repId)
if err = peer.peer.Send(msg); err != nil {
return
}
select {
case rep := <-ch:
if rep.Error != nil {
return rep.Error
err = rep.Error
}
return nil
case <-ctx.Done():
log.Debug("context error happened in send and wait")
return ctx.Err()
log.Debug("context done in SendAndWait")
err = ctx.Err()
}
return
}
func (p *pool) Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error) {
//TODO implement me
panic("implement me")
}
func (p *pool) readPeerLoop(peer peer.Peer) (err error) {
defer p.wg.Done()
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream)
for i := 0; i < maxSimultaneousOperationsPerStream; i++ {
limiter <- struct{}{}
@ -213,7 +231,7 @@ Loop:
}()
}
if err = p.removePeer(peer.Id()); err != nil {
log.Error("remove peer error", zap.String("peerId", peer.Id()))
log.Error("remove peer error", zap.String("peerId", peer.Id()), zap.Error(err))
}
return
}

View file

@ -100,7 +100,7 @@ func (s *service) UpdateDocument(ctx context.Context, id, text string) (err erro
zap.String("header", header.String())).
Debug("document updated in the database")
return s.messageService.SendToSpace(ctx, "", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{
return s.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{
Heads: heads,
Changes: []*aclpb.RawChange{ch},
TreeId: id,
@ -155,7 +155,7 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e
log.With(zap.String("id", id), zap.String("text", text)).
Debug("creating document")
err = s.messageService.SendToSpace(ctx, "", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{
err = s.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{
Heads: heads,
Changes: []*aclpb.RawChange{ch},
TreeId: id,

View file

@ -9,7 +9,6 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"sync"
"time"
)
@ -30,8 +29,8 @@ func New() app.Component {
}
type Service interface {
SendMessage(ctx context.Context, peerId string, msg *syncproto.Sync) error
SendToSpace(ctx context.Context, spaceId string, msg *syncproto.Sync) error
SendMessageAsync(peerId string, msg *syncproto.Sync) error
SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error
}
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
@ -55,72 +54,60 @@ func (s *service) Close(ctx context.Context) (err error) {
}
func (s *service) HandleMessage(ctx context.Context, msg *pool.Message) (err error) {
log.With(
zap.String("peerId", msg.Peer().Id())).
Debug("handling message from peer")
defer func() {
if err != nil {
msg.AckError(syncproto.SystemError_UNKNOWN, err.Error())
} else {
msg.Ack()
}
}()
err = msg.Ack()
if err != nil {
log.With(zap.String("peerId", msg.Peer().Id()), zap.Error(err)).
Error("could not ack message")
} else {
log.With(zap.String("peerId", msg.Peer().Id()), zap.Int("type", int(msg.Header.Type))).
Debug("ack returned")
}
syncMsg := &syncproto.Sync{}
err = proto.Unmarshal(msg.Data, syncMsg)
if err != nil {
return err
return
}
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
return s.requestHandler.HandleSyncMessage(timeoutCtx, msg.Peer().Id(), syncMsg)
err = s.requestHandler.HandleSyncMessage(timeoutCtx, msg.Peer().Id(), syncMsg)
return
}
func (s *service) SendMessage(ctx context.Context, peerId string, msg *syncproto.Sync) error {
log.With(
zap.String("peerId", peerId),
zap.String("message", msgType(msg))).
Debug("sending message to peer")
err := s.pool.DialAndAddPeer(context.Background(), peerId)
func (s *service) SendMessageAsync(peerId string, msg *syncproto.Sync) (err error) {
err = s.pool.DialAndAddPeer(context.Background(), peerId)
if err != nil {
return err
return
}
marshalled, err := proto.Marshal(msg)
if err != nil {
return err
return
}
err = s.pool.SendAndWait(ctx, peerId, &syncproto.Message{
Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync},
Data: marshalled,
})
if err != nil {
log.With(
zap.String("peerId", peerId),
zap.String("message", msgType(msg)),
zap.Error(err)).
Debug("failed to send message to peer")
} else {
log.With(
zap.String("peerId", peerId),
zap.String("message", msgType(msg))).
Debug("message send to peer completed")
}
return err
go s.sendAsync(peerId, msgType(msg), marshalled)
return
}
func (s *service) SendToSpace(ctx context.Context, spaceId string, msg *syncproto.Sync) error {
func (s *service) SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error {
for _, rp := range s.nodes {
s.SendMessage(ctx, rp.PeerId, msg)
s.SendMessageAsync(rp.PeerId, msg)
}
return nil
}
func (s *service) sendAsync(peerId string, msgTypeStr string, marshalled []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
return s.pool.SendAndWait(ctx, peerId, &syncproto.Message{
Header: &syncproto.Header{
Type: syncproto.MessageType_MessageTypeSync,
DebugInfo: msgTypeStr,
},
Data: marshalled,
})
}
func msgType(content *syncproto.Sync) string {
msg := content.GetMessage()
switch {

View file

@ -32,8 +32,8 @@ type RequestHandler interface {
}
type MessageSender interface {
SendMessage(ctx context.Context, peerId string, msg *syncproto.Sync) error
SendToSpace(ctx context.Context, spaceId string, msg *syncproto.Sync) error
SendMessageAsync(peerId string, msg *syncproto.Sync) error
SendToSpaceAsync(spaceId string, msg *syncproto.Sync) error
}
const CName = "SyncRequestHandler"
@ -77,7 +77,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
result acltree.AddResult
)
log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)).
Debug("received head update message")
Debug("processing head update")
err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes
@ -85,6 +85,8 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
if err != nil {
return err
}
log.With(zap.Strings("update heads", update.Heads), zap.Strings("tree heads", tree.Heads())).
Debug("comparing heads after head update")
shouldFullSync := !slice.UnsortedEquals(update.Heads, tree.Heads())
snapshotPath = tree.SnapshotPath()
if shouldFullSync {
@ -105,7 +107,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
}
// if we have incompatible heads, or we haven't seen the tree at all
if fullRequest != nil {
return r.messageService.SendMessage(ctx, senderId, syncproto.WrapFullRequest(fullRequest))
return r.messageService.SendMessageAsync(senderId, syncproto.WrapFullRequest(fullRequest))
}
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
@ -119,7 +121,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
TreeId: update.TreeId,
TreeHeader: update.TreeHeader,
}
return r.messageService.SendToSpace(ctx, "", syncproto.WrapHeadUpdate(newUpdate))
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate))
}
func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncproto.SyncFullRequest) (err error) {
@ -129,7 +131,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
result acltree.AddResult
)
log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)).
Debug("received full sync request message")
Debug("processing full sync request")
err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes
@ -150,7 +152,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
if err != nil {
return err
}
err = r.messageService.SendMessage(ctx, senderId, syncproto.WrapFullResponse(fullResponse))
err = r.messageService.SendMessageAsync(senderId, syncproto.WrapFullResponse(fullResponse))
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
return err
@ -164,7 +166,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
TreeId: request.TreeId,
TreeHeader: request.TreeHeader,
}
return r.messageService.SendToSpace(ctx, "def", syncproto.WrapHeadUpdate(newUpdate))
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate))
}
func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncproto.SyncFullResponse) (err error) {
@ -173,7 +175,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
result acltree.AddResult
)
log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)).
Debug("received full sync response message")
Debug("processing full sync response")
err = r.treeCache.Do(ctx, response.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes
@ -194,6 +196,11 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
if err != nil {
return err
}
result = acltree.AddResult{
OldHeads: []string{},
Heads: response.Heads,
Added: response.Changes,
}
}
// sending heads update message
newUpdate := &syncproto.SyncHeadUpdate{
@ -202,7 +209,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
SnapshotPath: snapshotPath,
TreeId: response.TreeId,
}
return r.messageService.SendToSpace(ctx, "", syncproto.WrapHeadUpdate(newUpdate))
return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate))
}
func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncproto.SyncFullRequest, error) {
@ -242,7 +249,7 @@ func (r *requestHandler) prepareFullSyncResponse(
}
}
log.With(zap.Int("len(changes)", len(final)), zap.String("id", treeId)).
Debug("sending changes for tree")
Debug("preparing changes for tree")
return &syncproto.SyncFullResponse{
Heads: tree.Heads(),