mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
Merge pull request #419 from anyproto/GO-5295-fix-panic-sync-protocol
GO-5925: Add more node stats
This commit is contained in:
commit
81d69857af
7 changed files with 146 additions and 22 deletions
|
@ -41,6 +41,13 @@ func NewSyncHandler(tree SyncTree, syncClient SyncClient, spaceId string) syncde
|
|||
}
|
||||
|
||||
func (s *syncHandler) HandleHeadUpdate(ctx context.Context, statusUpdater syncstatus.StatusUpdater, headUpdate drpc.Message) (req syncdeps.Request, err error) {
|
||||
var objectRequest *objectmessages.Request
|
||||
defer func() {
|
||||
// we mitigate the problem of a nil value being wrapped in an interface
|
||||
if err == nil && objectRequest != nil {
|
||||
req = objectRequest
|
||||
}
|
||||
}()
|
||||
update, ok := headUpdate.(*objectmessages.HeadUpdate)
|
||||
if !ok {
|
||||
return nil, ErrUnexpectedResponseType
|
||||
|
@ -73,7 +80,8 @@ func (s *syncHandler) HandleHeadUpdate(ctx context.Context, statusUpdater syncst
|
|||
return nil, nil
|
||||
}
|
||||
statusUpdater.HeadsApply(peerId, update.ObjectId(), contentUpdate.Heads, false)
|
||||
return s.syncClient.CreateFullSyncRequest(peerId, s.tree)
|
||||
objectRequest, err = s.syncClient.CreateFullSyncRequest(peerId, s.tree)
|
||||
return
|
||||
}
|
||||
rawChangesPayload := objecttree.RawChangesPayload{
|
||||
NewHeads: contentUpdate.Heads,
|
||||
|
@ -85,7 +93,8 @@ func (s *syncHandler) HandleHeadUpdate(ctx context.Context, statusUpdater syncst
|
|||
return nil, err
|
||||
}
|
||||
if !slice.UnsortedEquals(res.Heads, contentUpdate.Heads) {
|
||||
return s.syncClient.CreateFullSyncRequest(peerId, s.tree)
|
||||
objectRequest, err = s.syncClient.CreateFullSyncRequest(peerId, s.tree)
|
||||
return
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -56,9 +56,11 @@ type syncTree struct {
|
|||
syncClient SyncClient
|
||||
syncStatus syncstatus.StatusUpdater
|
||||
listener updatelistener.UpdateListener
|
||||
statsCollector *TreeStatsCollector
|
||||
onClose func(id string)
|
||||
isClosed bool
|
||||
isDeleted bool
|
||||
buildTime time.Duration
|
||||
}
|
||||
|
||||
var log = logger.NewNamed("common.commonspace.synctree")
|
||||
|
@ -81,6 +83,7 @@ type BuildDeps struct {
|
|||
PeerGetter ResponsiblePeersGetter
|
||||
BuildObjectTree objecttree.BuildObjectTreeFunc
|
||||
ValidateObjectTree objecttree.ValidatorFunc
|
||||
StatsCollector *TreeStatsCollector
|
||||
}
|
||||
|
||||
var newTreeGetter = func(deps BuildDeps, treeId string) treeGetter {
|
||||
|
@ -112,6 +115,7 @@ func PutSyncTree(ctx context.Context, payload treestorage.TreeStorageCreatePaylo
|
|||
}
|
||||
|
||||
func buildSyncTree(ctx context.Context, peerId string, deps BuildDeps) (t SyncTree, err error) {
|
||||
buildStart := time.Now()
|
||||
objTree, err := deps.BuildObjectTree(deps.TreeStorage, deps.AclList)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -123,6 +127,8 @@ func buildSyncTree(ctx context.Context, peerId string, deps BuildDeps) (t SyncTr
|
|||
onClose: deps.OnClose,
|
||||
listener: deps.Listener,
|
||||
syncStatus: deps.SyncStatus,
|
||||
statsCollector: deps.StatsCollector,
|
||||
buildTime: time.Since(buildStart),
|
||||
}
|
||||
syncHandler := NewSyncHandler(syncTree, syncClient, deps.SpaceId)
|
||||
syncTree.ObjectSyncHandler = syncHandler
|
||||
|
@ -146,6 +152,9 @@ func buildSyncTree(ctx context.Context, peerId string, deps BuildDeps) (t SyncTr
|
|||
deps.SyncStatus.ObjectReceive(peerId, syncTree.Id(), syncTree.Heads())
|
||||
}
|
||||
}
|
||||
if syncTree.statsCollector != nil {
|
||||
syncTree.statsCollector.Register(syncTree)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -285,6 +294,11 @@ func (s *syncTree) Close() (err error) {
|
|||
}
|
||||
|
||||
func (s *syncTree) close() (err error) {
|
||||
defer func() {
|
||||
if s.statsCollector != nil {
|
||||
s.statsCollector.Unregister(s)
|
||||
}
|
||||
}()
|
||||
defer s.Unlock()
|
||||
defer func() {
|
||||
log.Debug("closed sync tree", zap.Error(err), zap.String("id", s.Id()))
|
||||
|
|
58
commonspace/object/tree/synctree/treestats.go
Normal file
58
commonspace/object/tree/synctree/treestats.go
Normal file
|
@ -0,0 +1,58 @@
|
|||
package synctree
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type TreeStatsCollector struct {
|
||||
trees map[string]*syncTree
|
||||
mutex sync.Mutex
|
||||
spaceId string
|
||||
}
|
||||
|
||||
func NewTreeStatsCollector(spaceId string) *TreeStatsCollector {
|
||||
return &TreeStatsCollector{
|
||||
trees: make(map[string]*syncTree),
|
||||
spaceId: spaceId,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TreeStatsCollector) Register(tree *syncTree) {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
t.trees[tree.Id()] = tree
|
||||
}
|
||||
|
||||
func (t *TreeStatsCollector) Collect() []TreeStats {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
stats := make([]TreeStats, 0, len(t.trees))
|
||||
for _, tree := range t.trees {
|
||||
tree.Lock()
|
||||
stats = append(stats, TreeStats{
|
||||
TreeLen: tree.Len(),
|
||||
SnapshotCounter: tree.Root().SnapshotCounter,
|
||||
Heads: tree.Heads(),
|
||||
ObjectId: tree.Id(),
|
||||
SpaceId: t.spaceId,
|
||||
BuildTimeMillis: int(tree.buildTime.Milliseconds()),
|
||||
})
|
||||
tree.Unlock()
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
func (t *TreeStatsCollector) Unregister(tree SyncTree) {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
delete(t.trees, tree.Id())
|
||||
}
|
||||
|
||||
type TreeStats struct {
|
||||
TreeLen int `json:"tree_len"`
|
||||
SnapshotCounter int `json:"snapshot_counter"`
|
||||
Heads []string `json:"heads"`
|
||||
ObjectId string `json:"object_id"`
|
||||
SpaceId string `json:"space_id"`
|
||||
BuildTimeMillis int `json:"build_time_millis"`
|
||||
}
|
8
commonspace/objecttreebuilder/stat.go
Normal file
8
commonspace/objecttreebuilder/stat.go
Normal file
|
@ -0,0 +1,8 @@
|
|||
package objecttreebuilder
|
||||
|
||||
import "github.com/anyproto/any-sync/commonspace/object/tree/synctree"
|
||||
|
||||
type debugStat struct {
|
||||
TreeStats []synctree.TreeStats `json:"tree_stats"`
|
||||
SpaceId string `json:"space_id"`
|
||||
}
|
|
@ -9,6 +9,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/app/debugstat"
|
||||
"github.com/anyproto/any-sync/app/logger"
|
||||
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
||||
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
|
||||
|
@ -69,14 +70,47 @@ type treeBuilder struct {
|
|||
|
||||
log logger.CtxLogger
|
||||
builder objecttree.BuildObjectTreeFunc
|
||||
treeStats *synctree.TreeStatsCollector
|
||||
debugStat debugstat.StatService
|
||||
spaceId string
|
||||
aclList list.AclList
|
||||
treesUsed *atomic.Int32
|
||||
isClosed *atomic.Bool
|
||||
}
|
||||
|
||||
func (t *treeBuilder) ProvideStat() any {
|
||||
return debugStat{
|
||||
TreeStats: t.treeStats.Collect(),
|
||||
SpaceId: t.spaceId,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *treeBuilder) StatId() string {
|
||||
return t.spaceId
|
||||
}
|
||||
|
||||
func (t *treeBuilder) StatType() string {
|
||||
return CName
|
||||
}
|
||||
|
||||
func (t *treeBuilder) Run(ctx context.Context) (err error) {
|
||||
t.debugStat.AddProvider(t)
|
||||
return
|
||||
}
|
||||
|
||||
func (t *treeBuilder) Close(ctx context.Context) (err error) {
|
||||
t.debugStat.RemoveProvider(t)
|
||||
return
|
||||
}
|
||||
|
||||
func (t *treeBuilder) Init(a *app.App) (err error) {
|
||||
state := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
|
||||
comp, ok := a.Component(debugstat.CName).(debugstat.StatService)
|
||||
if !ok {
|
||||
comp = debugstat.NewNoOp()
|
||||
}
|
||||
t.treeStats = synctree.NewTreeStatsCollector(state.SpaceId)
|
||||
t.debugStat = comp
|
||||
t.spaceId = state.SpaceId
|
||||
t.isClosed = state.SpaceIsClosed
|
||||
t.treesUsed = state.TreesUsed
|
||||
|
@ -119,6 +153,7 @@ func (t *treeBuilder) BuildTree(ctx context.Context, id string, opts BuildTreeOp
|
|||
PeerGetter: t.peerManager,
|
||||
BuildObjectTree: treeBuilder,
|
||||
ValidateObjectTree: opts.TreeValidator,
|
||||
StatsCollector: t.treeStats,
|
||||
}
|
||||
t.treesUsed.Add(1)
|
||||
t.log.Debug("incrementing counter", zap.String("id", id), zap.Int32("trees", t.treesUsed.Load()))
|
||||
|
|
6
go.mod
6
go.mod
|
@ -6,7 +6,7 @@ toolchain go1.23.5
|
|||
|
||||
require (
|
||||
filippo.io/edwards25519 v1.1.0
|
||||
github.com/anyproto/any-store v0.1.10
|
||||
github.com/anyproto/any-store v0.1.11
|
||||
github.com/anyproto/go-chash v0.1.0
|
||||
github.com/anyproto/go-slip10 v1.0.0
|
||||
github.com/anyproto/go-slip21 v1.0.0
|
||||
|
@ -38,7 +38,7 @@ require (
|
|||
go.uber.org/mock v0.5.0
|
||||
go.uber.org/zap v1.27.0
|
||||
golang.org/x/crypto v0.36.0
|
||||
golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa
|
||||
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394
|
||||
golang.org/x/net v0.37.0
|
||||
golang.org/x/sys v0.31.0
|
||||
golang.org/x/time v0.11.0
|
||||
|
@ -116,6 +116,6 @@ require (
|
|||
modernc.org/libc v1.61.13 // indirect
|
||||
modernc.org/mathutil v1.7.1 // indirect
|
||||
modernc.org/memory v1.8.2 // indirect
|
||||
modernc.org/sqlite v1.36.0 // indirect
|
||||
modernc.org/sqlite v1.36.1 // indirect
|
||||
zombiezen.com/go/sqlite v1.4.0 // indirect
|
||||
)
|
||||
|
|
12
go.sum
12
go.sum
|
@ -6,8 +6,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
|
|||
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
||||
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b h1:mimo19zliBX/vSQ6PWWSL9lK8qwHozUj03+zLoEB8O0=
|
||||
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b/go.mod h1:fvzegU4vN3H1qMT+8wDmzjAcDONcgo2/SZ/TyfdUOFs=
|
||||
github.com/anyproto/any-store v0.1.10 h1:X3vjmoFMrgXbF4jdqD3AoiwPa9ODcojeu99tW+Qusz0=
|
||||
github.com/anyproto/any-store v0.1.10/go.mod h1:GpnVhcGm5aUQtOwCnKeTt4jsWgVXZ773WbQVLFdeCFo=
|
||||
github.com/anyproto/any-store v0.1.11 h1:xoaDVF8FJEI6V37fMw/R3ptBCLHj0kYiImwWxC1Ryu8=
|
||||
github.com/anyproto/any-store v0.1.11/go.mod h1:X3UkQ2zLATYNED3gFhY2VcdfDOeJvpEQ0PmDO90A9Yo=
|
||||
github.com/anyproto/go-chash v0.1.0 h1:I9meTPjXFRfXZHRJzjOHC/XF7Q5vzysKkiT/grsogXY=
|
||||
github.com/anyproto/go-chash v0.1.0/go.mod h1:0UjNQi3PDazP0fINpFYu6VKhuna+W/V+1vpXHAfNgLY=
|
||||
github.com/anyproto/go-slip10 v1.0.0 h1:uAEtSuudR3jJBOfkOXf3bErxVoxbuKwdoJN55M1i6IA=
|
||||
|
@ -379,8 +379,8 @@ golang.org/x/crypto v0.0.0-20200602180216-279210d13fed/go.mod h1:LzIPMQfyMNhhGPh
|
|||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
|
||||
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
|
||||
golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa h1:t2QcU6V556bFjYgu4L6C+6VrCPyJZ+eyRsABUPs1mz4=
|
||||
golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa/go.mod h1:BHOTPb3L19zxehTsLoJXVaTktb06DFgmdW6Wb9s8jqk=
|
||||
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 h1:nDVHiLt8aIbd/VzvPWN6kSOPE7+F/fNFDSXLVYkE/Iw=
|
||||
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394/go.mod h1:sIifuuw/Yco/y6yb6+bDNfyeQ/MdPUy/hKEMYQV17cM=
|
||||
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||
golang.org/x/image v0.21.0 h1:c5qV36ajHpdj4Qi0GnE0jUc/yuo33OLFaa0d+crTD5s=
|
||||
golang.org/x/image v0.21.0/go.mod h1:vUbsLavqK/W303ZroQQVKQ+Af3Yl6Uz1Ppu5J/cLz78=
|
||||
|
@ -475,8 +475,8 @@ modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
|
|||
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
|
||||
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
|
||||
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
|
||||
modernc.org/sqlite v1.36.0 h1:EQXNRn4nIS+gfsKeUTymHIz1waxuv5BzU7558dHSfH8=
|
||||
modernc.org/sqlite v1.36.0/go.mod h1:7MPwH7Z6bREicF9ZVUR78P1IKuxfZ8mRIDHD0iD+8TU=
|
||||
modernc.org/sqlite v1.36.1 h1:bDa8BJUH4lg6EGkLbahKe/8QqoF8p9gArSc6fTqYhyQ=
|
||||
modernc.org/sqlite v1.36.1/go.mod h1:7MPwH7Z6bREicF9ZVUR78P1IKuxfZ8mRIDHD0iD+8TU=
|
||||
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
|
||||
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
|
||||
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue