1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-10 18:10:54 +09:00

Merge pull request #113 from anyproto/GO-2340-p2p-space-pull

GO-2340 p2p space pull
This commit is contained in:
Sergey Cherepanov 2023-11-13 16:31:18 +01:00 committed by GitHub
commit 2be108b0ae
Signed by: github
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 103 additions and 38 deletions

View file

@ -3,9 +3,18 @@ package commonspace
import (
"context"
"errors"
"sync/atomic"
"time"
"go.uber.org/zap"
"github.com/anyproto/any-sync/commonspace/deletionmanager"
"github.com/anyproto/any-sync/commonspace/object/treesyncer"
"sync/atomic"
"github.com/anyproto/any-sync/net"
"github.com/anyproto/any-sync/net/peer"
"storj.io/drpc"
"github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app"
@ -30,11 +39,9 @@ import (
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/net/rpc/rpcerr"
"github.com/anyproto/any-sync/nodeconf"
"storj.io/drpc"
)
const CName = "common.commonspace"
@ -217,19 +224,47 @@ func (s *spaceService) addSpaceStorage(ctx context.Context, spaceDescription Spa
}
func (s *spaceService) getSpaceStorageFromRemote(ctx context.Context, id string) (st spacestorage.SpaceStorage, err error) {
var p peer.Peer
lastConfiguration := s.configurationService
// we can't connect to client if it is a node
if lastConfiguration.IsResponsible(id) {
if s.configurationService.IsResponsible(id) {
err = spacesyncproto.ErrSpaceMissing
return
}
p, err = s.pool.GetOneOf(ctx, lastConfiguration.NodeIds(id))
sm, err := s.peerManagerProvider.NewPeerManager(ctx, id)
if err != nil {
return
return nil, err
}
var peers []peer.Peer
for {
peers, err = sm.GetResponsiblePeers(ctx)
if err != nil && !errors.Is(err, net.ErrUnableToConnect) {
return nil, err
}
if len(peers) == 0 {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return nil, ctx.Err()
}
} else {
break
}
}
for i, p := range peers {
if st, err = s.spacePullWithPeer(ctx, p, id); err != nil {
if i+1 == len(peers) {
return
} else {
log.InfoCtx(ctx, "unable to pull space", zap.String("spaceId", id), zap.String("peerId", p.Id()))
}
} else {
return
}
}
return nil, net.ErrUnableToConnect
}
func (s *spaceService) spacePullWithPeer(ctx context.Context, p peer.Peer, id string) (st spacestorage.SpaceStorage, err error) {
var res *spacesyncproto.SpacePullResponse
err = p.DoDrpc(ctx, func(conn drpc.Conn) error {
cl := spacesyncproto.NewDRPCSpaceSyncClient(conn)
@ -241,7 +276,7 @@ func (s *spaceService) getSpaceStorageFromRemote(ctx context.Context, id string)
return
}
st, err = s.createSpaceStorage(spacestorage.SpaceStorageCreatePayload{
return s.createSpaceStorage(spacestorage.SpaceStorageCreatePayload{
AclWithId: &consensusproto.RawRecordWithId{
Payload: res.Payload.AclPayload,
Id: res.Payload.AclPayloadId,
@ -252,7 +287,6 @@ func (s *spaceService) getSpaceStorageFromRemote(ctx context.Context, id string)
},
SpaceHeaderWithId: res.Payload.SpaceHeader,
})
return
}
func (s *spaceService) createSpaceStorage(payload spacestorage.SpaceStorageCreatePayload) (spacestorage.SpaceStorage, error) {

2
go.mod
View file

@ -15,7 +15,7 @@ require (
github.com/google/uuid v1.4.0
github.com/hashicorp/yamux v0.1.1
github.com/huandu/skiplist v1.2.0
github.com/ipfs/boxo v0.15.0
github.com/ipfs/boxo v0.13.1
github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-ipld-format v0.6.0

4
go.sum
View file

@ -79,8 +79,8 @@ github.com/huandu/skiplist v1.2.0/go.mod h1:7v3iFjLcSAzO4fN5B8dvebvo/qsfumiLiDXM
github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/boxo v0.15.0 h1:BriLydj2nlK1nKeJQHxcKSuG5ZXcoutzhBklOtxC5pk=
github.com/ipfs/boxo v0.15.0/go.mod h1:X5ulcbR5Nh7sm3Db8+08AApUo6FsGC5mb23QDKAoB/M=
github.com/ipfs/boxo v0.13.1 h1:nQ5oQzcMZR3oL41REJDcTbrvDvuZh3J9ckc9+ILeRQI=
github.com/ipfs/boxo v0.13.1/go.mod h1:btrtHy0lmO1ODMECbbEY1pxNtrLilvKSYLoGQt1yYCk=
github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
github.com/ipfs/go-block-format v0.2.0 h1:ZqrkxBA2ICbDRbK8KJs/u0O3dlp6gmAuuXUJNiW1Ycs=

View file

@ -4,6 +4,12 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"go.uber.org/zap"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/net/peer"
@ -13,9 +19,6 @@ import (
"github.com/anyproto/any-sync/net/transport/quic"
"github.com/anyproto/any-sync/net/transport/yamux"
"github.com/anyproto/any-sync/nodeconf"
"go.uber.org/zap"
"strings"
"sync"
)
const CName = "net.peerservice"
@ -40,14 +43,16 @@ type PeerService interface {
}
type peerService struct {
yamux transport.Transport
quic transport.Transport
nodeConf nodeconf.NodeConf
peerAddrs map[string][]string
pool pool.Pool
server server.DRPCServer
preferQuic bool
mu sync.RWMutex
yamux transport.Transport
quic transport.Transport
nodeConf nodeconf.NodeConf
peerAddrs map[string][]string
ignoreAddrs *sync.Map
ignoreTimeout time.Duration
pool pool.Pool
server server.DRPCServer
preferQuic bool
mu sync.RWMutex
}
func (p *peerService) Init(a *app.App) (err error) {
@ -57,6 +62,8 @@ func (p *peerService) Init(a *app.App) (err error) {
p.pool = a.MustComponent(pool.CName).(pool.Pool)
p.server = a.MustComponent(server.CName).(server.DRPCServer)
p.peerAddrs = map[string][]string{}
p.ignoreAddrs = &sync.Map{}
p.ignoreTimeout = time.Minute * 3
p.yamux.SetAccepter(p)
p.quic.SetAccepter(p)
return nil
@ -79,21 +86,21 @@ func (p *peerService) PreferQuic(prefer bool) {
func (p *peerService) Dial(ctx context.Context, peerId string) (pr peer.Peer, err error) {
p.mu.RLock()
defer p.mu.RUnlock()
addrs, err := p.getPeerAddrs(peerId)
if err != nil {
return
}
var mc transport.MultiConn
log.DebugCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs))
var schemes = yamuxPreferSchemes
if p.preferQuic {
schemes = quicPreferSchemes
}
addrs, err := p.getPeerAddrs(peerId)
if err != nil {
p.mu.RUnlock()
return
}
p.mu.RUnlock()
var mc transport.MultiConn
log.DebugCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs))
err = ErrAddrsNotFound
for _, sch := range schemes {
if mc, err = p.dialScheme(ctx, sch, addrs); err == nil {
@ -125,14 +132,20 @@ func (p *peerService) dialScheme(ctx context.Context, sch string, addrs []string
}
err = ErrAddrsNotFound
now := time.Now()
for _, addr := range addrs {
if scheme(addr) != sch {
continue
}
if tm, ok := p.ignoreAddrs.Load(addr); ok && tm.(time.Time).After(now) {
continue
}
if mc, err = tr.Dial(ctx, stripScheme(addr)); err == nil {
p.ignoreAddrs.Delete(addr)
return
} else {
log.InfoCtx(ctx, "can't connect to host", zap.String("addr", addr), zap.Error(err))
p.ignoreAddrs.Store(addr, time.Now().Add(p.ignoreTimeout))
}
}
return

View file

@ -3,6 +3,12 @@ package peerservice
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
@ -12,10 +18,6 @@ import (
"github.com/anyproto/any-sync/net/transport/yamux"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"testing"
)
var ctx = context.Background()
@ -111,6 +113,22 @@ func TestPeerService_Dial(t *testing.T) {
require.NoError(t, err)
assert.NotNil(t, p)
})
t.Run("ignore", func(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)
fx.PreferQuic(false)
var peerId = "p1"
fx.nodeConf.EXPECT().PeerAddresses(peerId).Return([]string{"127.0.0.1:1111"}, true).AnyTimes()
fx.yamux.MockTransport.EXPECT().Dial(ctx, "127.0.0.1:1111").Return(nil, fmt.Errorf("error"))
_, err := fx.Dial(ctx, peerId)
require.Error(t, err)
_, err = fx.Dial(ctx, peerId)
require.EqualError(t, err, ErrAddrsNotFound.Error())
})
}
func TestPeerService_Accept(t *testing.T) {