From ab754b0dd4cec0023e35fc61e452c8c678df4449 Mon Sep 17 00:00:00 2001 From: Mikhail Rakhmanov Date: Wed, 5 Mar 2025 16:37:21 +0100 Subject: [PATCH] Remove logs --- commonspace/object/acl/syncacl/syncacl.go | 7 +++--- .../object/tree/synctree/synchandler.go | 13 +++++++++-- commonspace/sync/objectsync/synchandler.go | 22 +++---------------- commonspace/sync/requestmanager.go | 14 +++--------- commonspace/sync/sync.go | 2 -- 5 files changed, 21 insertions(+), 37 deletions(-) diff --git a/commonspace/object/acl/syncacl/syncacl.go b/commonspace/object/acl/syncacl/syncacl.go index 495cdc3a..800d540f 100644 --- a/commonspace/object/acl/syncacl/syncacl.go +++ b/commonspace/object/acl/syncacl/syncacl.go @@ -82,11 +82,13 @@ func (s *syncAcl) AddRawRecord(rawRec *consensusproto.RawRecordWithId) (err erro if s.isClosed { return ErrSyncAclClosed } - log.Debug("received update", zap.String("aclId", s.AclList.Id()), zap.String("prevHead", s.AclList.Head().Id), zap.String("newHead", rawRec.Id)) + prevHead := s.AclList.Head().Id + log := log.With(zap.String("aclId", s.AclList.Id()), zap.String("prevHead", prevHead)) err = s.AclList.AddRawRecord(rawRec) if err != nil { return } + log.Debug("acl record updated", zap.String("head", s.AclList.Head().Id), zap.Int("len(total)", len(s.AclList.Records()))) headUpdate, err := s.syncClient.CreateHeadUpdate(s, []*consensusproto.RawRecordWithId{rawRec}) if err != nil { return @@ -111,12 +113,11 @@ func (s *syncAcl) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (e } prevHead := s.AclList.Head().Id log := log.With(zap.String("aclId", s.AclList.Id()), zap.String("prevHead", prevHead)) - log.Debug("received updates", zap.String("newHead", rawRecords[len(rawRecords)-1].Id)) err = s.AclList.AddRawRecords(rawRecords) if err != nil || s.AclList.Head().Id == prevHead { return } - log.Debug("records updated", zap.String("head", s.AclList.Head().Id), zap.Int("len(total)", len(s.AclList.Records()))) + log.Debug("acl records updated", zap.String("head", s.AclList.Head().Id), zap.Int("len(total)", len(s.AclList.Records()))) headUpdate, err := s.syncClient.CreateHeadUpdate(s, rawRecords) if err != nil { return diff --git a/commonspace/object/tree/synctree/synchandler.go b/commonspace/object/tree/synctree/synchandler.go index 1a52c932..5a6a22a2 100644 --- a/commonspace/object/tree/synctree/synchandler.go +++ b/commonspace/object/tree/synctree/synchandler.go @@ -9,7 +9,7 @@ import ( "storj.io/drpc" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" - response "github.com/anyproto/any-sync/commonspace/object/tree/synctree/response" + "github.com/anyproto/any-sync/commonspace/object/tree/synctree/response" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages" "github.com/anyproto/any-sync/commonspace/sync/syncdeps" @@ -62,6 +62,11 @@ func (s *syncHandler) HandleHeadUpdate(ctx context.Context, statusUpdater syncst statusUpdater.HeadsReceive(peerId, update.ObjectId(), contentUpdate.Heads) s.tree.Lock() defer s.tree.Unlock() + log.Debug("got head update", + zap.String("objectId", update.ObjectId()), + zap.String("peerId", peerId), + zap.Strings("theirHeads", contentUpdate.Heads), + zap.Strings("ourHeads", s.tree.Heads())) if len(contentUpdate.Changes) == 0 { if s.hasHeads(s.tree, contentUpdate.Heads) { statusUpdater.HeadsApply(peerId, update.ObjectId(), contentUpdate.Heads, true) @@ -101,7 +106,11 @@ func (s *syncHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Reque } s.tree.Lock() curHeads := s.tree.Heads() - log.Debug("got stream request", zap.String("objectId", req.ObjectId()), zap.String("peerId", rq.PeerId())) + log.Debug("got stream request", + zap.String("objectId", req.ObjectId()), + zap.String("peerId", rq.PeerId()), + zap.Strings("theirHeads", request.Heads), + zap.Strings("ourHeads", curHeads)) producer, err := createResponseProducer(s.spaceId, s.tree, request.Heads, request.SnapshotPath) if err != nil { s.tree.Unlock() diff --git a/commonspace/sync/objectsync/synchandler.go b/commonspace/sync/objectsync/synchandler.go index c2ae112e..382f355c 100644 --- a/commonspace/sync/objectsync/synchandler.go +++ b/commonspace/sync/objectsync/synchandler.go @@ -23,7 +23,6 @@ import ( "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/pool" "github.com/anyproto/any-sync/net/rpc/rpcerr" - "github.com/anyproto/any-sync/net/secureservice" ) var ErrUnexpectedHeadUpdateType = errors.New("unexpected head update type") @@ -62,36 +61,21 @@ func (o *objectSync) HandleHeadUpdate(ctx context.Context, headUpdate drpc.Messa if err != nil { return nil, err } - protoVersion, err := peer.CtxProtoVersion(ctx) - if err != nil { - return nil, err - } - log.Debug("handle head update", zap.String("spaceId", o.spaceId), zap.String("peerId", peerId), zap.String("objectId", update.Meta.ObjectId)) - isNewProto := protoVersion >= secureservice.ProtoVersion obj, err := o.manager.GetObject(context.Background(), update.Meta.ObjectId) if err != nil { - if isNewProto { - log.Debug("return request", zap.String("spaceId", o.spaceId), zap.String("peerId", peerId), zap.String("objectId", update.Meta.ObjectId)) - return synctree.NewRequest(peerId, update.Meta.SpaceId, update.Meta.ObjectId, nil, nil, nil), nil - } - return nil, err + return synctree.NewRequest(peerId, update.Meta.SpaceId, update.Meta.ObjectId, nil, nil, nil), nil } objHandler, ok := obj.(syncdeps.ObjectSyncHandler) if !ok { return nil, fmt.Errorf("object %s does not support sync", obj.Id()) } - req, err := objHandler.HandleHeadUpdate(ctx, o.status, update) - if isNewProto { - return req, err - } - return nil, err + return objHandler.HandleHeadUpdate(ctx, o.status, update) } func (o *objectSync) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, updater syncdeps.QueueSizeUpdater, sendResponse func(resp proto.Message) error) (syncdeps.Request, error) { - log.Debug("handle stream request", zap.String("spaceId", o.spaceId), zap.String("peerId", rq.PeerId()), zap.String("objectId", rq.ObjectId())) obj, err := o.manager.GetObject(context.Background(), rq.ObjectId()) if err != nil { - log.Debug("object not found", zap.String("spaceId", o.spaceId), zap.String("peerId", rq.PeerId()), zap.String("objectId", rq.ObjectId())) + log.Debug("handle stream request no object", zap.String("spaceId", o.spaceId), zap.String("peerId", rq.PeerId()), zap.String("objectId", rq.ObjectId())) req, ok := rq.(*objectmessages.Request) if !ok { return nil, treechangeproto.ErrGetTree diff --git a/commonspace/sync/requestmanager.go b/commonspace/sync/requestmanager.go index e08958de..1832dd43 100644 --- a/commonspace/sync/requestmanager.go +++ b/commonspace/sync/requestmanager.go @@ -7,13 +7,12 @@ import ( "strings" "github.com/anyproto/protobuf/proto" - "go.uber.org/zap" "storj.io/drpc" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/sync/syncdeps" "github.com/anyproto/any-sync/net/streampool" - syncqueues "github.com/anyproto/any-sync/util/syncqueues" + "github.com/anyproto/any-sync/util/syncqueues" ) type RequestManager interface { @@ -47,7 +46,6 @@ func NewRequestManager(handler syncdeps.SyncHandler, metric syncdeps.QueueSizeUp } func (r *requestManager) SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error { - log.Debug("send request", zap.String("peerId", rq.PeerId()), zap.String("objectId", rq.ObjectId())) return r.handler.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error { calledOnce := false for { @@ -75,10 +73,7 @@ func (r *requestManager) QueueRequest(rq syncdeps.Request) error { size := rq.MsgSize() r.metric.UpdateQueueSize(size, syncdeps.MsgTypeOutgoingRequest, true) r.requestPool.Add(rq.PeerId(), rq.ObjectId(), func(ctx context.Context) { - err := r.handler.ApplyRequest(ctx, rq, r) - if err != nil { - log.Debug("failed to apply request", zap.Error(err), zap.String("peerId", rq.PeerId()), zap.String("objectId", rq.ObjectId())) - } + r.handler.ApplyRequest(ctx, rq, r) }, func() { r.metric.UpdateQueueSize(size, syncdeps.MsgTypeOutgoingRequest, false) }) @@ -102,10 +97,7 @@ func (r *requestManager) HandleStreamRequest(ctx context.Context, rq syncdeps.Re }) // here is a little bit non-standard decision, because we can return error but still can queue the request if newRq != nil { - rqErr := r.QueueRequest(newRq) - if rqErr != nil { - log.Debug("failed to queue request", zap.Error(err)) - } + r.QueueRequest(newRq) } if err != nil { return err diff --git a/commonspace/sync/sync.go b/commonspace/sync/sync.go index 91bb4bb0..b58d75db 100644 --- a/commonspace/sync/sync.go +++ b/commonspace/sync/sync.go @@ -106,7 +106,6 @@ func (s *syncService) handleIncomingMessage(msg msgCtx) { if req == nil { return } - log.Debug("queue request", zap.String("objectId", req.ObjectId()), zap.String("spaceId", s.spaceId)) err = s.manager.QueueRequest(req) if err != nil { log.Error("failed to queue request", zap.Error(err)) @@ -124,7 +123,6 @@ func (s *syncService) HandleMessage(ctx context.Context, msg drpc.Message) error Sizeable: idMsg, }) if errors.Is(err, mb.ErrOverflowed) { - log.Info("queue overflowed", zap.String("objectId", objectId)) return nil } return err