1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-10 01:51:11 +09:00

Fix putTree close and handshake params and add logs for trees used debug

This commit is contained in:
mcrakhman 2023-02-21 16:36:58 +01:00
parent 030a7d61ba
commit c37c7ba874
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
8 changed files with 33 additions and 17 deletions

View file

@ -178,6 +178,7 @@ Load:
if closing {
select {
case <-ctx.Done():
log.DebugCtx(ctx, "ctx done while waiting on object close", zap.String("id", id))
return nil, ctx.Err()
case <-e.close:
goto Load
@ -196,6 +197,7 @@ Load:
}
select {
case <-ctx.Done():
log.DebugCtx(ctx, "ctx done while waiting on object load", zap.String("id", id))
return nil, ctx.Err()
case <-e.load:
}

View file

@ -85,7 +85,7 @@ func (tb *treeBuilder) build(heads []string, theirHeads []string, newChanges []*
}
proposedHeads = append(proposedHeads, heads...)
log.With(zap.Strings("heads", proposedHeads)).Debug("building tree")
log.With(zap.Strings("heads", proposedHeads), zap.String("id", tb.treeStorage.Id())).Debug("building tree")
if err = tb.buildTree(proposedHeads, breakpoint); err != nil {
return nil, fmt.Errorf("buildTree error: %v", err)
}

View file

@ -109,7 +109,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
listener: deps.Listener,
syncStatus: deps.SyncStatus,
}
syncHandler := newSyncTreeHandler(syncTree, syncClient, deps.SyncStatus)
syncHandler := newSyncTreeHandler(deps.SpaceId, syncTree, syncClient, deps.SyncStatus)
syncTree.SyncHandler = syncHandler
t = syncTree
syncTree.Lock()
@ -191,7 +191,10 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree.
}
func (s *syncTree) Delete() (err error) {
log.With(zap.String("id", s.Id())).Debug("deleting sync tree")
log.Debug("deleting sync tree", zap.String("id", s.Id()))
defer func() {
log.Debug("deleted sync tree", zap.Error(err), zap.String("id", s.Id()))
}()
s.Lock()
defer s.Unlock()
if err = s.checkAlive(); err != nil {
@ -206,7 +209,10 @@ func (s *syncTree) Delete() (err error) {
}
func (s *syncTree) Close() (err error) {
log.With(zap.String("id", s.Id())).Debug("closing sync tree")
log.Debug("closing sync tree", zap.String("id", s.Id()))
defer func() {
log.Debug("closed sync tree", zap.Error(err), zap.String("id", s.Id()))
}()
s.Lock()
defer s.Unlock()
if s.isClosed {

View file

@ -18,16 +18,18 @@ type syncTreeHandler struct {
syncClient SyncClient
syncStatus syncstatus.StatusUpdater
handlerLock sync.Mutex
spaceId string
queue ReceiveQueue
}
const maxQueueSize = 5
func newSyncTreeHandler(objTree objecttree.ObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler {
func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler {
return &syncTreeHandler{
objTree: objTree,
syncClient: syncClient,
syncStatus: syncStatus,
spaceId: spaceId,
queue: newReceiveQueue(maxQueueSize),
}
}
@ -38,7 +40,6 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms
if err != nil {
return
}
s.syncStatus.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled))
queueFull := s.queue.AddMessage(senderId, unmarshalled, msg.RequestId)
@ -82,7 +83,7 @@ func (s *syncTreeHandler) handleHeadUpdate(
objTree = s.objTree
)
log := log.With(zap.Strings("heads", objTree.Heads()), zap.String("treeId", objTree.Id()))
log := log.With(zap.Strings("heads", objTree.Heads()), zap.String("treeId", objTree.Id()), zap.String("spaceId", s.spaceId))
log.DebugCtx(ctx, "received head update message")
defer func() {
@ -99,7 +100,6 @@ func (s *syncTreeHandler) handleHeadUpdate(
// isEmptyUpdate is sent when the tree is brought up from cache
if isEmptyUpdate {
headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads)
log.DebugCtx(ctx, "is empty update", zap.String("treeId", objTree.Id()), zap.Bool("headEquals", headEquals))
if headEquals {
@ -150,7 +150,11 @@ func (s *syncTreeHandler) handleFullSyncRequest(
objTree = s.objTree
)
log := log.With(zap.String("senderId", senderId), zap.Strings("heads", request.Heads), zap.String("treeId", s.objTree.Id()), zap.String("replyId", replyId))
log := log.With(zap.String("senderId", senderId),
zap.Strings("heads", request.Heads),
zap.String("treeId", s.objTree.Id()),
zap.String("replyId", replyId),
zap.String("spaceId", s.spaceId))
log.DebugCtx(ctx, "received full sync request message")
defer func() {
@ -188,7 +192,7 @@ func (s *syncTreeHandler) handleFullSyncResponse(
var (
objTree = s.objTree
)
log := log.With(zap.Strings("heads", response.Heads), zap.String("treeId", s.objTree.Id()))
log := log.With(zap.Strings("heads", response.Heads), zap.String("treeId", s.objTree.Id()), zap.String("spaceId", s.spaceId))
log.DebugCtx(ctx, "received full sync response message")
defer func() {

View file

@ -73,7 +73,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
delete(s.waiters, msg.RequestId)
s.waitersMx.Unlock()
log.With(zap.String("requestId", msg.RequestId)).WarnCtx(ctx, "time elapsed when waiting")
log.With(zap.String("requestId", msg.RequestId)).DebugCtx(ctx, "time elapsed when waiting")
err = fmt.Errorf("sendSync context error: %v", ctx.Err())
case reply = <-waiter.ch:
// success

View file

@ -295,7 +295,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea
Listener: listener,
AclList: s.aclList,
SpaceStorage: s.storage,
OnClose: func(id string) {},
OnClose: s.onObjectClose,
SyncStatus: s.syncStatus,
PeerGetter: s.peerManager,
}
@ -334,6 +334,7 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t
if t, err = synctree.BuildSyncTreeOrGetRemote(ctx, id, deps); err != nil {
return nil, err
}
log.Debug("incrementing counter", zap.String("id", id), zap.String("spaceId", s.id))
s.treesUsed.Add(1)
return
}
@ -401,6 +402,7 @@ func (s *space) handleMessage(msg HandleMessage) {
}
func (s *space) onObjectClose(id string) {
log.Debug("decrementing counter", zap.String("id", id), zap.String("spaceId", s.id))
s.treesUsed.Add(-1)
_ = s.handleQueue.CloseThread(id)
}

View file

@ -32,6 +32,7 @@ type Params struct {
ListenAddrs []string
Wrapper DRPCHandlerWrapper
TimeoutMillis int
Handshake func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error)
}
func NewBaseDrpcServer() *BaseDrpcServer {
@ -42,6 +43,7 @@ func (s *BaseDrpcServer) Run(ctx context.Context, params Params) (err error) {
s.drpcServer = drpcserver.NewWithOptions(params.Wrapper(s.Mux), drpcserver.Options{Manager: drpcmanager.Options{
Reader: drpcwire.ReaderOptions{MaximumBufferSize: params.BufferSizeMb * (1 << 20)},
}})
s.handshake = params.Handshake
ctx, s.cancel = context.WithCancel(ctx)
for _, addr := range params.ListenAddrs {
list, err := net.Listen("tcp", addr)

View file

@ -70,11 +70,11 @@ func (s *drpcServer) Run(ctx context.Context) (err error) {
SummaryVec: histVec,
}
},
}
s.handshake = func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
return s.transport.SecureInbound(ctx, conn)
Handshake: func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
return s.transport.SecureInbound(ctx, conn)
},
}
return s.BaseDrpcServer.Run(ctx, params)
}