diff --git a/app/ocache/ocache.go b/app/ocache/ocache.go index 7132d43d..df19b220 100644 --- a/app/ocache/ocache.go +++ b/app/ocache/ocache.go @@ -178,6 +178,7 @@ Load: if closing { select { case <-ctx.Done(): + log.DebugCtx(ctx, "ctx done while waiting on object close", zap.String("id", id)) return nil, ctx.Err() case <-e.close: goto Load @@ -196,6 +197,7 @@ Load: } select { case <-ctx.Done(): + log.DebugCtx(ctx, "ctx done while waiting on object load", zap.String("id", id)) return nil, ctx.Err() case <-e.load: } diff --git a/commonspace/object/tree/objecttree/treebuilder.go b/commonspace/object/tree/objecttree/treebuilder.go index 5ec64b9a..113b8381 100644 --- a/commonspace/object/tree/objecttree/treebuilder.go +++ b/commonspace/object/tree/objecttree/treebuilder.go @@ -85,7 +85,7 @@ func (tb *treeBuilder) build(heads []string, theirHeads []string, newChanges []* } proposedHeads = append(proposedHeads, heads...) - log.With(zap.Strings("heads", proposedHeads)).Debug("building tree") + log.With(zap.Strings("heads", proposedHeads), zap.String("id", tb.treeStorage.Id())).Debug("building tree") if err = tb.buildTree(proposedHeads, breakpoint); err != nil { return nil, fmt.Errorf("buildTree error: %v", err) } diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 08bffb01..d9084017 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -109,7 +109,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy listener: deps.Listener, syncStatus: deps.SyncStatus, } - syncHandler := newSyncTreeHandler(syncTree, syncClient, deps.SyncStatus) + syncHandler := newSyncTreeHandler(deps.SpaceId, syncTree, syncClient, deps.SyncStatus) syncTree.SyncHandler = syncHandler t = syncTree syncTree.Lock() @@ -191,7 +191,10 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree. } func (s *syncTree) Delete() (err error) { - log.With(zap.String("id", s.Id())).Debug("deleting sync tree") + log.Debug("deleting sync tree", zap.String("id", s.Id())) + defer func() { + log.Debug("deleted sync tree", zap.Error(err), zap.String("id", s.Id())) + }() s.Lock() defer s.Unlock() if err = s.checkAlive(); err != nil { @@ -206,7 +209,10 @@ func (s *syncTree) Delete() (err error) { } func (s *syncTree) Close() (err error) { - log.With(zap.String("id", s.Id())).Debug("closing sync tree") + log.Debug("closing sync tree", zap.String("id", s.Id())) + defer func() { + log.Debug("closed sync tree", zap.Error(err), zap.String("id", s.Id())) + }() s.Lock() defer s.Unlock() if s.isClosed { diff --git a/commonspace/object/tree/synctree/synctreehandler.go b/commonspace/object/tree/synctree/synctreehandler.go index 2d3e4484..de79d0de 100644 --- a/commonspace/object/tree/synctree/synctreehandler.go +++ b/commonspace/object/tree/synctree/synctreehandler.go @@ -18,16 +18,18 @@ type syncTreeHandler struct { syncClient SyncClient syncStatus syncstatus.StatusUpdater handlerLock sync.Mutex + spaceId string queue ReceiveQueue } const maxQueueSize = 5 -func newSyncTreeHandler(objTree objecttree.ObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler { +func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler { return &syncTreeHandler{ objTree: objTree, syncClient: syncClient, syncStatus: syncStatus, + spaceId: spaceId, queue: newReceiveQueue(maxQueueSize), } } @@ -38,7 +40,6 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms if err != nil { return } - s.syncStatus.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled)) queueFull := s.queue.AddMessage(senderId, unmarshalled, msg.RequestId) @@ -82,7 +83,7 @@ func (s *syncTreeHandler) handleHeadUpdate( objTree = s.objTree ) - log := log.With(zap.Strings("heads", objTree.Heads()), zap.String("treeId", objTree.Id())) + log := log.With(zap.Strings("heads", objTree.Heads()), zap.String("treeId", objTree.Id()), zap.String("spaceId", s.spaceId)) log.DebugCtx(ctx, "received head update message") defer func() { @@ -99,7 +100,6 @@ func (s *syncTreeHandler) handleHeadUpdate( // isEmptyUpdate is sent when the tree is brought up from cache if isEmptyUpdate { - headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads) log.DebugCtx(ctx, "is empty update", zap.String("treeId", objTree.Id()), zap.Bool("headEquals", headEquals)) if headEquals { @@ -150,7 +150,11 @@ func (s *syncTreeHandler) handleFullSyncRequest( objTree = s.objTree ) - log := log.With(zap.String("senderId", senderId), zap.Strings("heads", request.Heads), zap.String("treeId", s.objTree.Id()), zap.String("replyId", replyId)) + log := log.With(zap.String("senderId", senderId), + zap.Strings("heads", request.Heads), + zap.String("treeId", s.objTree.Id()), + zap.String("replyId", replyId), + zap.String("spaceId", s.spaceId)) log.DebugCtx(ctx, "received full sync request message") defer func() { @@ -188,7 +192,7 @@ func (s *syncTreeHandler) handleFullSyncResponse( var ( objTree = s.objTree ) - log := log.With(zap.Strings("heads", response.Heads), zap.String("treeId", s.objTree.Id())) + log := log.With(zap.Strings("heads", response.Heads), zap.String("treeId", s.objTree.Id()), zap.String("spaceId", s.spaceId)) log.DebugCtx(ctx, "received full sync response message") defer func() { diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index c47fc30d..3dd14a2e 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -73,7 +73,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn delete(s.waiters, msg.RequestId) s.waitersMx.Unlock() - log.With(zap.String("requestId", msg.RequestId)).WarnCtx(ctx, "time elapsed when waiting") + log.With(zap.String("requestId", msg.RequestId)).DebugCtx(ctx, "time elapsed when waiting") err = fmt.Errorf("sendSync context error: %v", ctx.Err()) case reply = <-waiter.ch: // success diff --git a/commonspace/space.go b/commonspace/space.go index c481a912..46e42aed 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -295,7 +295,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea Listener: listener, AclList: s.aclList, SpaceStorage: s.storage, - OnClose: func(id string) {}, + OnClose: s.onObjectClose, SyncStatus: s.syncStatus, PeerGetter: s.peerManager, } @@ -334,6 +334,7 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t if t, err = synctree.BuildSyncTreeOrGetRemote(ctx, id, deps); err != nil { return nil, err } + log.Debug("incrementing counter", zap.String("id", id), zap.String("spaceId", s.id)) s.treesUsed.Add(1) return } @@ -401,6 +402,7 @@ func (s *space) handleMessage(msg HandleMessage) { } func (s *space) onObjectClose(id string) { + log.Debug("decrementing counter", zap.String("id", id), zap.String("spaceId", s.id)) s.treesUsed.Add(-1) _ = s.handleQueue.CloseThread(id) } diff --git a/net/rpc/server/baseserver.go b/net/rpc/server/baseserver.go index 49136aaf..85d23bf3 100644 --- a/net/rpc/server/baseserver.go +++ b/net/rpc/server/baseserver.go @@ -32,6 +32,7 @@ type Params struct { ListenAddrs []string Wrapper DRPCHandlerWrapper TimeoutMillis int + Handshake func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error) } func NewBaseDrpcServer() *BaseDrpcServer { @@ -42,6 +43,7 @@ func (s *BaseDrpcServer) Run(ctx context.Context, params Params) (err error) { s.drpcServer = drpcserver.NewWithOptions(params.Wrapper(s.Mux), drpcserver.Options{Manager: drpcmanager.Options{ Reader: drpcwire.ReaderOptions{MaximumBufferSize: params.BufferSizeMb * (1 << 20)}, }}) + s.handshake = params.Handshake ctx, s.cancel = context.WithCancel(ctx) for _, addr := range params.ListenAddrs { list, err := net.Listen("tcp", addr) diff --git a/net/rpc/server/drpcserver.go b/net/rpc/server/drpcserver.go index 088eb777..4d9d54ac 100644 --- a/net/rpc/server/drpcserver.go +++ b/net/rpc/server/drpcserver.go @@ -70,11 +70,11 @@ func (s *drpcServer) Run(ctx context.Context) (err error) { SummaryVec: histVec, } }, - } - s.handshake = func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - return s.transport.SecureInbound(ctx, conn) + Handshake: func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + return s.transport.SecureInbound(ctx, conn) + }, } return s.BaseDrpcServer.Run(ctx, params) }