1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Rewrite to old streampool

This commit is contained in:
mcrakhman 2024-08-11 17:34:24 +02:00
parent 4e32768c19
commit 17af238f03
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
34 changed files with 394 additions and 875 deletions

View file

@ -10,6 +10,7 @@ import (
"github.com/anyproto/go-chash"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/drpc"
accountService "github.com/anyproto/any-sync/accountservice"
@ -28,6 +29,7 @@ import (
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages"
"github.com/anyproto/any-sync/commonspace/sync/synctest"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/anyproto/any-sync/coordinator/coordinatorclient"
@ -37,7 +39,7 @@ import (
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/net/rpc/rpctest"
"github.com/anyproto/any-sync/net/streampool"
"github.com/anyproto/any-sync/net/streampool/streamopener"
"github.com/anyproto/any-sync/net/streampool/streamhandler"
"github.com/anyproto/any-sync/node/nodeclient"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/testutil/accounttest"
@ -219,7 +221,7 @@ func (m *mockPeerManagerProvider) Name() (name string) {
}
func (m *mockPeerManagerProvider) NewPeerManager(ctx context.Context, spaceId string) (sm peermanager.PeerManager, err error) {
return synctest.NewCounterPeerManager(), nil
return synctest.NewTestPeerManager(), nil
}
//
@ -545,23 +547,56 @@ func (m mockCoordinatorClient) Name() (name string) {
// Stream opener
//
func newStreamOpener(spaceId string) streamopener.StreamOpener {
func newStreamOpener(spaceId string) streamhandler.StreamHandler {
return &streamOpener{spaceId: spaceId}
}
type streamOpener struct {
spaceId string
spaceId string
spaceGetter *RpcServer
streamPool streampool.StreamPool
}
func (c *streamOpener) Init(a *app.App) (err error) {
func (s *streamOpener) HandleMessage(peerCtx context.Context, peerId string, msg drpc.Message) (err error) {
syncMsg, ok := msg.(*objectmessages.HeadUpdate)
if !ok {
err = fmt.Errorf("unexpected message")
return
}
if syncMsg.SpaceId() == "" {
var msg = &spacesyncproto.SpaceSubscription{}
if err = msg.Unmarshal(syncMsg.Bytes); err != nil {
return
}
log.InfoCtx(peerCtx, "got subscription message", zap.Strings("spaceIds", msg.SpaceIds))
if msg.Action == spacesyncproto.SpaceSubscriptionAction_Subscribe {
return s.streamPool.AddTagsCtx(peerCtx, msg.SpaceIds...)
} else {
return s.streamPool.RemoveTagsCtx(peerCtx, msg.SpaceIds...)
}
}
sp, err := s.spaceGetter.GetSpace(peerCtx, syncMsg.SpaceId())
if err != nil {
return
}
return sp.HandleMessage(peerCtx, syncMsg)
}
func (s *streamOpener) NewReadMessage() drpc.Message {
return &objectmessages.HeadUpdate{}
}
func (s *streamOpener) Init(a *app.App) (err error) {
s.spaceGetter = a.MustComponent(RpcName).(*RpcServer)
s.streamPool = a.MustComponent(streampool.CName).(streampool.StreamPool)
return nil
}
func (c *streamOpener) Name() (name string) {
return streamopener.CName
func (s *streamOpener) Name() (name string) {
return streamhandler.CName
}
func (c *streamOpener) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) {
func (s *streamOpener) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) {
conn, err := p.AcquireDrpcConn(ctx)
if err != nil {
return
@ -571,7 +606,7 @@ func (c *streamOpener) OpenStream(ctx context.Context, p peer.Peer) (stream drpc
return
}
var msg = &spacesyncproto.SpaceSubscription{
SpaceIds: []string{c.spaceId},
SpaceIds: []string{s.spaceId},
Action: spacesyncproto.SpaceSubscriptionAction_Subscribe,
}
payload, err := msg.Marshal()
@ -580,7 +615,6 @@ func (c *streamOpener) OpenStream(ctx context.Context, p peer.Peer) (stream drpc
}
if err = objectStream.Send(&spacesyncproto.ObjectSyncMessage{
Payload: payload,
SpaceId: c.spaceId,
}); err != nil {
return
}
@ -599,7 +633,7 @@ type spaceFixture struct {
configurationService nodeconf.Service
storageProvider spacestorage.SpaceStorageProvider
peerManagerProvider peermanager.PeerManagerProvider
streamOpener streamopener.StreamOpener
streamOpener streamhandler.StreamHandler
credentialProvider credentialprovider.CredentialProvider
treeManager *mockTreeManager
pool *mockPool
@ -665,6 +699,8 @@ func newFixtureWithData(t *testing.T, spaceId string, keys *accountdata.AccountK
Register(synctest.NewPeerProvider(keys.PeerId)).
Register(pool.New()).
Register(credentialprovider.NewNoOp()).
Register(streampool.New()).
Register(fx.streamOpener).
Register(mockCoordinatorClient{}).
Register(mockNodeClient{}).
Register(fx.configurationService).