1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00
any-sync/commonspace/spacerpc_test.go
2025-05-12 13:59:29 +02:00

278 lines
7.5 KiB
Go

package commonspace
import (
"context"
"math/rand"
"sync"
"time"
"go.uber.org/zap"
"github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/acl/recordverifier"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/object/treesyncer"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/rpc/rpctest"
"github.com/anyproto/any-sync/net/rpc/server"
"github.com/anyproto/any-sync/net/streampool"
"github.com/anyproto/any-sync/util/periodicsync"
)
func NewTreeSyncer(spaceId string) treesyncer.TreeSyncer {
return &treeSyncer{spaceId: spaceId}
}
type treeSyncer struct {
spaceId string
treeManager treemanager.TreeManager
}
func (t *treeSyncer) Init(a *app.App) (err error) {
t.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager)
return
}
func (t *treeSyncer) Name() (name string) {
return treesyncer.CName
}
func (t *treeSyncer) Run(ctx context.Context) (err error) {
return nil
}
func (t *treeSyncer) Close(ctx context.Context) (err error) {
return nil
}
func (t *treeSyncer) StartSync() {
}
func (t *treeSyncer) StopSync() {
}
func (t *treeSyncer) ShouldSync(peerId string) bool {
return true
}
func (t *treeSyncer) SyncAll(ctx context.Context, p peer.Peer, existing, missing []string) (err error) {
syncTrees := func(ids []string) {
for _, id := range ids {
log := log.With(zap.String("treeId", id))
tr, err := t.treeManager.GetTree(ctx, t.spaceId, id)
if err != nil {
log.WarnCtx(ctx, "can't load existing tree", zap.Error(err))
return
}
syncTree, ok := tr.(synctree.SyncTree)
if !ok {
log.WarnCtx(ctx, "not a sync tree")
}
if err = syncTree.SyncWithPeer(ctx, p); err != nil {
log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err))
} else {
log.DebugCtx(ctx, "success *synctree.SyncWithPeer")
}
}
}
syncTrees(missing)
syncTrees(existing)
return
}
const RpcName = "rpcserver"
type RpcServer struct {
spaceService SpaceService
spaces map[string]Space
streamPool streampool.StreamPool
sync.Mutex
}
func (r *RpcServer) StoreDiff(ctx2 context.Context, request *spacesyncproto.StoreDiffRequest) (*spacesyncproto.StoreDiffResponse, error) {
//TODO implement me
panic("implement me")
}
func (r *RpcServer) StoreElements(stream spacesyncproto.DRPCSpaceSync_StoreElementsStream) error {
//TODO implement me
panic("implement me")
}
func NewRpcServer() *RpcServer {
return &RpcServer{
spaces: make(map[string]Space),
}
}
type failingStream struct {
spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream
tryFail bool
}
func (f *failingStream) Send(msg *spacesyncproto.ObjectSyncMessage) error {
if f.tryFail && rand.Uint32()%2 == 0 {
return nil
}
return f.DRPCSpaceSync_ObjectSyncStreamStream.Send(msg)
}
func (r *RpcServer) getSpace(ctx context.Context, spaceId string) (sp Space, err error) {
r.Lock()
defer r.Unlock()
sp, ok := r.spaces[spaceId]
if !ok {
sp, err = r.spaceService.NewSpace(ctx, spaceId, Deps{
TreeSyncer: NewTreeSyncer(spaceId),
SyncStatus: syncstatus.NewNoOpSyncStatus(),
recordVerifier: recordverifier.NewValidateFull(),
})
if err != nil {
return nil, err
}
err = sp.Init(ctx)
if err != nil {
return nil, err
}
r.spaces[spaceId] = sp
}
return sp, nil
}
func (r *RpcServer) GetSpace(ctx context.Context, spaceId string) (Space, error) {
return r.getSpace(ctx, spaceId)
}
func (r *RpcServer) HeadSync(ctx context.Context, request *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) {
sp, err := r.getSpace(ctx, request.SpaceId)
if err != nil {
return nil, err
}
resp, err := sp.HandleRangeRequest(ctx, request)
return resp, err
}
func (r *RpcServer) SpacePush(ctx context.Context, request *spacesyncproto.SpacePushRequest) (*spacesyncproto.SpacePushResponse, error) {
//TODO implement me
panic("implement me")
}
func (r *RpcServer) SpacePull(ctx context.Context, request *spacesyncproto.SpacePullRequest) (*spacesyncproto.SpacePullResponse, error) {
//TODO implement me
panic("implement me")
}
func (r *RpcServer) ObjectSyncStream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) error {
return r.streamPool.ReadStream(&failingStream{stream, false}, 100)
}
func (r *RpcServer) ObjectSync(ctx context.Context, message *spacesyncproto.ObjectSyncMessage) (*spacesyncproto.ObjectSyncMessage, error) {
//TODO implement me
panic("implement me")
}
func (r *RpcServer) ObjectSyncRequestStream(message *spacesyncproto.ObjectSyncMessage, stream spacesyncproto.DRPCSpaceSync_ObjectSyncRequestStreamStream) error {
sp, err := r.getSpace(stream.Context(), message.SpaceId)
if err != nil {
return err
}
return sp.HandleStreamSyncRequest(stream.Context(), message, stream)
}
func (r *RpcServer) AclAddRecord(ctx context.Context, request *spacesyncproto.AclAddRecordRequest) (*spacesyncproto.AclAddRecordResponse, error) {
//TODO implement me
panic("implement me")
}
func (r *RpcServer) AclGetRecords(ctx context.Context, request *spacesyncproto.AclGetRecordsRequest) (*spacesyncproto.AclGetRecordsResponse, error) {
//TODO implement me
panic("implement me")
}
func (r *RpcServer) Init(a *app.App) (err error) {
serv := a.MustComponent(server.CName).(*rpctest.TestServer)
r.spaceService = a.MustComponent(CName).(SpaceService)
r.streamPool = a.MustComponent(streampool.CName).(streampool.StreamPool)
return spacesyncproto.DRPCRegisterSpaceSync(serv, r)
}
func (r *RpcServer) Name() (name string) {
return RpcName
}
const SpaceProcessName = "spaceprocess"
type spaceProcess struct {
spaceId string
spaceServer *RpcServer
accountService accountservice.Service
manager *testTreeManager
periodicCall periodicsync.PeriodicSync
}
func newSpaceProcess(spaceId string) *spaceProcess {
return &spaceProcess{spaceId: spaceId}
}
func (s *spaceProcess) Init(a *app.App) (err error) {
s.manager = a.MustComponent(treemanager.CName).(*testTreeManager)
s.spaceServer = a.MustComponent(RpcName).(*RpcServer)
s.accountService = a.MustComponent(accountservice.CName).(accountservice.Service)
s.periodicCall = periodicsync.NewPeriodicSyncDuration(50*time.Millisecond, 0, s.update, log)
return
}
func (s *spaceProcess) Name() (name string) {
return SpaceProcessName
}
func (s *spaceProcess) Run(ctx context.Context) (err error) {
s.periodicCall.Run()
return nil
}
func (s *spaceProcess) Close(ctx context.Context) (err error) {
s.periodicCall.Close()
return nil
}
func (s *spaceProcess) update(ctx context.Context) error {
sp, err := s.spaceServer.GetSpace(ctx, s.spaceId)
if err != nil {
return err
}
var tr objecttree.ObjectTree
newDoc := rand.Int()%20 == 0
snapshot := rand.Int()%10 == 0
allTrees := sp.StoredIds()
if newDoc || len(allTrees) == 0 {
tr, err = s.manager.CreateTree(ctx, s.spaceId)
if err != nil {
return err
}
} else {
rnd := rand.Int() % len(allTrees)
tr, err = s.manager.GetTree(ctx, s.spaceId, allTrees[rnd])
if err != nil {
return err
}
}
tr.Lock()
defer tr.Unlock()
bytes := make([]byte, 1024)
_, _ = rand.Read(bytes)
_, err = tr.AddContent(ctx, objecttree.SignableChangeContent{
Data: bytes,
Key: s.accountService.Account().SignKey,
IsSnapshot: snapshot,
IsEncrypted: true,
Timestamp: 0,
DataType: "",
})
return err
}