mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-11 10:18:28 +09:00
update any-sync + priority for reply messages
This commit is contained in:
parent
27a18f4081
commit
acc8d6fc31
4 changed files with 72 additions and 44 deletions
7
go.mod
7
go.mod
|
@ -7,7 +7,7 @@ require (
|
|||
github.com/PuerkitoBio/goquery v1.8.1
|
||||
github.com/VividCortex/ewma v1.2.0
|
||||
github.com/adrium/goheif v0.0.0-20230113233934-ca402e77a786
|
||||
github.com/anytypeio/any-sync v0.0.45
|
||||
github.com/anytypeio/any-sync v0.0.46-0.20230515183222-a92dcffc93cd
|
||||
github.com/anytypeio/go-naturaldate/v2 v2.0.1
|
||||
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
|
||||
github.com/blevesearch/bleve/v2 v2.3.6
|
||||
|
@ -56,7 +56,7 @@ require (
|
|||
github.com/joho/godotenv v1.5.1
|
||||
github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e
|
||||
github.com/kelseyhightower/envconfig v1.4.0
|
||||
github.com/libp2p/go-libp2p v0.27.2
|
||||
github.com/libp2p/go-libp2p v0.27.3
|
||||
github.com/libp2p/zeroconf/v2 v2.2.0
|
||||
github.com/logrusorgru/aurora v2.0.3+incompatible
|
||||
github.com/magiconair/properties v1.8.4
|
||||
|
@ -89,7 +89,6 @@ require (
|
|||
go.uber.org/zap v1.24.0
|
||||
golang.org/x/exp v0.0.0-20230420155640-133eef4313cb
|
||||
golang.org/x/image v0.6.0
|
||||
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028
|
||||
golang.org/x/net v0.10.0
|
||||
golang.org/x/oauth2 v0.5.0
|
||||
golang.org/x/text v0.9.0
|
||||
|
@ -245,7 +244,7 @@ require (
|
|||
go.opentelemetry.io/otel v1.11.2 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.11.2 // indirect
|
||||
go.uber.org/atomic v1.10.0 // indirect
|
||||
golang.org/x/crypto v0.8.0 // indirect
|
||||
golang.org/x/crypto v0.9.0 // indirect
|
||||
golang.org/x/mod v0.10.0 // indirect
|
||||
golang.org/x/sync v0.1.0 // indirect
|
||||
golang.org/x/sys v0.8.0 // indirect
|
||||
|
|
13
go.sum
13
go.sum
|
@ -42,8 +42,8 @@ github.com/andybalholm/cascadia v1.2.0/go.mod h1:YCyR8vOZT9aZ1CHEd8ap0gMVm2aFgxB
|
|||
github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
|
||||
github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
|
||||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||
github.com/anytypeio/any-sync v0.0.45 h1:I6/i0SGCDnKwRVn0PpXtGsp5nRkODtFp6BXKUBodAPQ=
|
||||
github.com/anytypeio/any-sync v0.0.45/go.mod h1:LK/bCkCHLK3l8b0Bp3MpAD1sIUvJe0vwJNonBqiegsA=
|
||||
github.com/anytypeio/any-sync v0.0.46-0.20230515183222-a92dcffc93cd h1:SzXcSIXREFdTq8UQn1MJKEbrNqgMg25xMd7/8ZTqSWM=
|
||||
github.com/anytypeio/any-sync v0.0.46-0.20230515183222-a92dcffc93cd/go.mod h1:sHaXDZaufpyESUT+R5SYNwrU5zsdSFjFoOHo26gUQic=
|
||||
github.com/anytypeio/go-chash v0.1.0 h1:nYCMh13SEai/7cXRUoKfU27uASj7XEF6NhvY6hFMKY8=
|
||||
github.com/anytypeio/go-chash v0.1.0/go.mod h1:Q7XiggkMrThRFAwYSItzLOT9OPC8a497SLZtgmJtC/I=
|
||||
github.com/anytypeio/go-ds-badger3 v0.3.1-0.20221103102622-3233d4e13cb8 h1:LC9w0M0SbA5VuZeBtUdq+uR4mdjbJhxurNtovmRiOrU=
|
||||
|
@ -591,8 +591,8 @@ github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QT
|
|||
github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c=
|
||||
github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM=
|
||||
github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro=
|
||||
github.com/libp2p/go-libp2p v0.27.2 h1:I1fxqxdm/O0TFoAZKje8wSMu9tfLlLdzTQvgT3HA6v0=
|
||||
github.com/libp2p/go-libp2p v0.27.2/go.mod h1:FAvvfQa/YOShUYdiSS03IR9OXzkcJXwcNA2FUCh9ImE=
|
||||
github.com/libp2p/go-libp2p v0.27.3 h1:tkV/zm3KCZ4R5er9Xcs2pt0YNB4JH0iBfGAtHJdLHRs=
|
||||
github.com/libp2p/go-libp2p v0.27.3/go.mod h1:FAvvfQa/YOShUYdiSS03IR9OXzkcJXwcNA2FUCh9ImE=
|
||||
github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s=
|
||||
github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0=
|
||||
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
|
||||
|
@ -1007,8 +1007,8 @@ golang.org/x/crypto v0.0.0-20200602180216-279210d13fed/go.mod h1:LzIPMQfyMNhhGPh
|
|||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ=
|
||||
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
|
||||
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
|
||||
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
|
||||
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
|
@ -1029,7 +1029,6 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx
|
|||
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028 h1:4+4C/Iv2U4fMZBiMCc98MG1In4gJY5YRhtpDNeDeHWs=
|
||||
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
|
||||
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
|
||||
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
|
||||
|
|
|
@ -2,73 +2,104 @@ package peermanager
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/app/logger"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
"github.com/anytypeio/any-sync/net/streampool"
|
||||
"github.com/anytypeio/go-anytype-middleware/space/peerstore"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
"storj.io/drpc"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type clientPeerManager struct {
|
||||
spaceId string
|
||||
responsiblePeerIds []string
|
||||
p *provider
|
||||
peerStore peerstore.PeerStore
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func (n *clientPeerManager) init() {
|
||||
n.responsiblePeerIds = n.p.nodeconf.GetLast().NodeIds(n.spaceId)
|
||||
n.responsiblePeerIds = n.peerStore.ResponsibleNodeIds(n.spaceId)
|
||||
}
|
||||
|
||||
func (n *clientPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
ctx = logger.CtxWithFields(context.Background(), logger.CtxGetFields(ctx)...)
|
||||
if n.isResponsible(peerId) {
|
||||
return n.p.streamPool.Send(ctx, msg, func(ctx context.Context) (peers []peer.Peer, err error) {
|
||||
return n.getStreamResponsiblePeers(ctx, peerId)
|
||||
})
|
||||
var drpcMsg drpc.Message
|
||||
drpcMsg = msg
|
||||
if msg.ReplyId != "" || msg.RequestId != "" {
|
||||
// prioritize messages with the request or reply by sending it to a separate queue
|
||||
drpcMsg = streampool.WithQueueId(msg, "replyQueue")
|
||||
}
|
||||
return n.p.streamPool.SendById(ctx, msg, peerId)
|
||||
}
|
||||
|
||||
func (n *clientPeerManager) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
ctx = logger.CtxWithFields(context.Background(), logger.CtxGetFields(ctx)...)
|
||||
return n.p.streamPool.Send(ctx, msg, func(ctx context.Context) (peers []peer.Peer, err error) {
|
||||
return n.getStreamResponsiblePeers(ctx, "")
|
||||
return n.p.streamPool.Send(ctx, drpcMsg, func(ctx context.Context) (peers []peer.Peer, err error) {
|
||||
return n.getExactPeer(ctx, peerId)
|
||||
})
|
||||
}
|
||||
|
||||
func (n *clientPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
ctx = logger.CtxWithFields(context.Background(), logger.CtxGetFields(ctx)...)
|
||||
if e := n.SendResponsible(ctx, msg); e != nil {
|
||||
log.Info("broadcast sendResponsible error", zap.Error(e))
|
||||
}
|
||||
return n.p.streamPool.Broadcast(ctx, msg, n.spaceId)
|
||||
return n.p.streamPool.Send(ctx, msg, func(ctx context.Context) (peers []peer.Peer, err error) {
|
||||
return n.getStreamResponsiblePeers(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
func (n *clientPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) {
|
||||
p, err := n.p.commonPool.GetOneOf(ctx, n.responsiblePeerIds)
|
||||
if err != nil {
|
||||
return
|
||||
if err == nil {
|
||||
peers = []peer.Peer{p}
|
||||
}
|
||||
return []peer.Peer{p}, nil
|
||||
}
|
||||
|
||||
func (n *clientPeerManager) getStreamResponsiblePeers(ctx context.Context, exactId string) (peers []peer.Peer, err error) {
|
||||
if exactId == "" {
|
||||
// lookup in common pool for existing connection
|
||||
p, e := n.p.commonPool.GetOneOf(ctx, n.responsiblePeerIds)
|
||||
if e != nil {
|
||||
return nil, e
|
||||
log.Debug("local responsible peers are", zap.Strings("local peers", n.peerStore.LocalPeerIds(n.spaceId)))
|
||||
for _, peerId := range n.peerStore.LocalPeerIds(n.spaceId) {
|
||||
if slices.ContainsFunc(peers, func(p peer.Peer) bool { return p.Id() == peerId }) {
|
||||
continue
|
||||
}
|
||||
exactId = p.Id()
|
||||
clientPeer, err := n.p.commonPool.Get(ctx, peerId)
|
||||
if err != nil {
|
||||
log.Debug("removing peer", zap.String("peerId", peerId), zap.Error(err))
|
||||
n.peerStore.RemoveLocalPeer(peerId)
|
||||
continue
|
||||
}
|
||||
peers = append(peers, clientPeer)
|
||||
}
|
||||
if err != nil && len(peers) > 0 {
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
p, err := n.p.pool.Get(ctx, exactId)
|
||||
func (n *clientPeerManager) getExactPeer(ctx context.Context, peerId string) (peers []peer.Peer, err error) {
|
||||
p, err := n.p.pool.Get(ctx, peerId)
|
||||
if err != nil {
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
return []peer.Peer{p}, nil
|
||||
}
|
||||
|
||||
func (n *clientPeerManager) isResponsible(peerId string) bool {
|
||||
return slices.Contains(n.responsiblePeerIds, peerId)
|
||||
func (n *clientPeerManager) getStreamResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) {
|
||||
var peerIds []string
|
||||
// lookup in common pool for existing connection
|
||||
p, nodeErr := n.p.commonPool.GetOneOf(ctx, n.responsiblePeerIds)
|
||||
if nodeErr != nil {
|
||||
log.Warn("failed to get responsible peer from common pool", zap.Error(nodeErr))
|
||||
} else {
|
||||
peerIds = []string{p.Id()}
|
||||
}
|
||||
peerIds = append(peerIds, n.peerStore.LocalPeerIds(n.spaceId)...)
|
||||
for _, peerId := range peerIds {
|
||||
p, err := n.p.pool.Get(ctx, peerId)
|
||||
if err != nil {
|
||||
n.peerStore.RemoveLocalPeer(peerId)
|
||||
log.Warn("failed to get peer from stream pool", zap.String("peerId", peerId), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
peers = append(peers, p)
|
||||
}
|
||||
// set node error if no local peers
|
||||
if len(peers) == 0 {
|
||||
err = fmt.Errorf("failed to get peers for stream")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -94,7 +94,6 @@ func (s *service) Init(a *app.App) (err error) {
|
|||
s.streamHandler = &streamHandler{s: s}
|
||||
|
||||
s.streamPool = a.MustComponent(streampool.CName).(streampool.Service).NewStreamPool(s.streamHandler, streampool.StreamConfig{
|
||||
SendQueueWorkers: 10,
|
||||
SendQueueSize: 300,
|
||||
DialQueueWorkers: 4,
|
||||
DialQueueSize: 300,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue