1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Wire up the stuff

This commit is contained in:
mcrakhman 2023-06-02 00:59:33 +02:00
parent 796b66478b
commit 815bc7927d
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
18 changed files with 81 additions and 59 deletions

View file

@ -3,6 +3,7 @@ package credentialprovider
import (
"context"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
@ -13,12 +14,21 @@ func NewNoOp() CredentialProvider {
}
type CredentialProvider interface {
app.Component
GetCredential(ctx context.Context, spaceHeader *spacesyncproto.RawSpaceHeaderWithId) ([]byte, error)
}
type noOpProvider struct {
}
func (n noOpProvider) Init(a *app.App) (err error) {
return nil
}
func (n noOpProvider) Name() (name string) {
return CName
}
func (n noOpProvider) GetCredential(ctx context.Context, spaceHeader *spacesyncproto.RawSpaceHeaderWithId) ([]byte, error) {
return nil, nil
}

View file

@ -79,7 +79,7 @@ func TestSpaceDeleteIds(t *testing.T) {
// creating a tree
bytes := make([]byte, 32)
rand.Read(bytes)
doc, err := spc.CreateTree(ctx, objecttree.ObjectTreeCreatePayload{
doc, err := spc.TreeBuilder().CreateTree(ctx, objecttree.ObjectTreeCreatePayload{
PrivKey: acc.SignKey,
ChangeType: "some",
SpaceId: spc.Id(),
@ -88,7 +88,7 @@ func TestSpaceDeleteIds(t *testing.T) {
Timestamp: time.Now().Unix(),
})
require.NoError(t, err)
tr, err := spc.PutTree(ctx, doc, nil)
tr, err := spc.TreeBuilder().PutTree(ctx, doc, nil)
require.NoError(t, err)
ids = append(ids, tr.Id())
tr.Close()
@ -106,7 +106,7 @@ func TestSpaceDeleteIds(t *testing.T) {
func createTree(t *testing.T, ctx context.Context, spc Space, acc *accountdata.AccountKeys) string {
bytes := make([]byte, 32)
rand.Read(bytes)
doc, err := spc.CreateTree(ctx, objecttree.ObjectTreeCreatePayload{
doc, err := spc.TreeBuilder().CreateTree(ctx, objecttree.ObjectTreeCreatePayload{
PrivKey: acc.SignKey,
ChangeType: "some",
SpaceId: spc.Id(),
@ -115,7 +115,7 @@ func createTree(t *testing.T, ctx context.Context, spc Space, acc *accountdata.A
Timestamp: time.Now().Unix(),
})
require.NoError(t, err)
tr, err := spc.PutTree(ctx, doc, nil)
tr, err := spc.TreeBuilder().PutTree(ctx, doc, nil)
require.NoError(t, err)
tr.Close()
return tr.Id()
@ -149,7 +149,7 @@ func TestSpaceDeleteIdsIncorrectSnapshot(t *testing.T) {
err = spc.Init(ctx)
require.NoError(t, err)
settingsObject := spc.(*space).settingsObject
settingsObject := spc.(*space).app.MustComponent(settings.CName).(settings.Settings).SettingsObject()
var ids []string
for i := 0; i < totalObjs; i++ {
id := createTree(t, ctx, spc, acc)
@ -193,7 +193,7 @@ func TestSpaceDeleteIdsIncorrectSnapshot(t *testing.T) {
require.Equal(t, len(ids), len(fx.treeManager.deletedIds))
// checking that new snapshot will contain all the changes
settingsObject = spc.(*space).settingsObject
settingsObject = spc.(*space).app.MustComponent(settings.CName).(settings.Settings).SettingsObject()
settings.DoSnapshot = func(treeLen int) bool {
return true
}
@ -231,7 +231,7 @@ func TestSpaceDeleteIdsMarkDeleted(t *testing.T) {
err = spc.Init(ctx)
require.NoError(t, err)
settingsObject := spc.(*space).settingsObject
settingsObject := spc.(*space).app.MustComponent(settings.CName).(settings.Settings).SettingsObject()
var ids []string
for i := 0; i < totalObjs; i++ {
id := createTree(t, ctx, spc, acc)

View file

@ -4,7 +4,6 @@ package deletionstate
import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"go.uber.org/zap"
"sync"
@ -36,7 +35,7 @@ type objectDeletionState struct {
}
func (st *objectDeletionState) Init(a *app.App) (err error) {
st.storage = a.MustComponent(spacestate.CName).(*spacestate.SpaceState).SpaceStorage
st.storage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage)
return nil
}

View file

@ -67,18 +67,18 @@ func New() HeadSync {
func (h *headSync) Init(a *app.App) (err error) {
shared := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
cfg := a.MustComponent("cfg").(config2.ConfigGetter)
cfg := a.MustComponent("config").(config2.ConfigGetter)
h.spaceId = shared.SpaceId
h.spaceIsDeleted = shared.SpaceIsDeleted
h.syncPeriod = cfg.GetSpace().SyncPeriod
h.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
h.log = log.With(zap.String("spaceId", h.spaceId))
h.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
h.storage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage)
h.diff = ldiff.New(16, 16)
h.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
h.peerManager = a.MustComponent(peermanager.ManagerName).(peermanager.PeerManager)
h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider)
h.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusProvider)
h.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager)
h.treeManager = app.MustComponent[treemanager.TreeManager](a)
h.deletionState = a.MustComponent(deletionstate.CName).(deletionstate.ObjectDeletionState)
h.syncer = newDiffSyncer(h)
sync := func(ctx context.Context) (err error) {

View file

@ -24,7 +24,7 @@ func (s *SyncAcl) HandleMessage(ctx context.Context, senderId string, request *s
}
func (s *SyncAcl) Init(a *app.App) (err error) {
storage := a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
storage := a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage)
aclStorage, err := storage.AclStorage()
if err != nil {
return err

View file

@ -25,6 +25,7 @@ type syncTreeHandler struct {
const maxQueueSize = 5
// TODO: Make sync and async message handling
func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient syncclient.SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler {
return &syncTreeHandler{
objTree: objTree,
@ -119,7 +120,7 @@ func (s *syncTreeHandler) handleHeadUpdate(
return
}
return s.syncClient.QueueRequest(ctx, senderId, treeId, fullRequest, replyId)
return s.syncClient.QueueRequest(senderId, treeId, fullRequest)
}
if s.alreadyHasHeads(objTree, update.Heads) {
@ -143,7 +144,7 @@ func (s *syncTreeHandler) handleHeadUpdate(
return
}
return s.syncClient.QueueRequest(ctx, senderId, treeId, fullRequest, replyId)
return s.syncClient.QueueRequest(senderId, treeId, fullRequest)
}
func (s *syncTreeHandler) handleFullSyncRequest(
@ -169,7 +170,7 @@ func (s *syncTreeHandler) handleFullSyncRequest(
defer func() {
if err != nil {
log.ErrorCtx(ctx, "full sync request finished with error", zap.Error(err))
s.syncClient.QueueRequest(ctx, senderId, treeId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header), replyId)
s.syncClient.QueueRequest(senderId, treeId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header))
return
} else if fullResponse != nil {
cnt := fullResponse.Content.GetFullSyncResponse()
@ -192,7 +193,7 @@ func (s *syncTreeHandler) handleFullSyncRequest(
return
}
return s.syncClient.QueueRequest(ctx, senderId, treeId, fullResponse, replyId)
return s.syncClient.QueueRequest(senderId, treeId, fullResponse)
}
func (s *syncTreeHandler) handleFullSyncResponse(

View file

@ -1,7 +1,8 @@
package commonspace
package objectmanager
import (
"context"
"errors"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
"github.com/anyproto/any-sync/commonspace/object/syncobjectgetter"
@ -12,6 +13,12 @@ import (
"sync/atomic"
)
var (
ErrSpaceClosed = errors.New("space is closed")
)
const CName = "common.commonspace.objectmanager"
type ObjectManager interface {
treemanager.TreeManager
AddObject(object syncobjectgetter.SyncObject)
@ -25,7 +32,7 @@ type objectManager struct {
spaceIsClosed *atomic.Bool
}
func NewObjectManager(manager treemanager.TreeManager) ObjectManager {
func New(manager treemanager.TreeManager) ObjectManager {
return &objectManager{
TreeManager: manager,
}
@ -54,6 +61,10 @@ func (o *objectManager) AddObject(object syncobjectgetter.SyncObject) {
o.reservedObjects = append(o.reservedObjects, object)
}
func (o *objectManager) Name() string {
return CName
}
func (o *objectManager) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) {
if o.spaceIsClosed.Load() {
return nil, ErrSpaceClosed

View file

@ -5,7 +5,6 @@ import (
"context"
"fmt"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/objectsync/syncclient"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/metric"
@ -69,11 +68,14 @@ type objectSync struct {
func (s *objectSync) Init(a *app.App) (err error) {
s.syncClient = a.MustComponent(syncclient.CName).(syncclient.SyncClient)
s.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
s.objectGetter = a.MustComponent(treemanager.CName).(treemanager.TreeManager).(syncobjectgetter.SyncObjectGetter)
s.spaceStorage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage)
s.objectGetter = app.MustComponent[syncobjectgetter.SyncObjectGetter](a)
s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
sharedData := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
s.metric = a.MustComponent(metric.CName).(metric.Metric)
mc := a.Component(metric.CName)
if mc != nil {
s.metric = mc.(metric.Metric)
}
s.spaceIsDeleted = sharedData.SpaceIsDeleted
s.spaceId = sharedData.SpaceId
s.handleQueue = multiqueue.New[HandleMessage](s.processHandleMessage, 100)

View file

@ -81,11 +81,11 @@ func (t *treeBuilder) Init(a *app.App) (err error) {
t.treesUsed = state.TreesUsed
t.builder = state.TreeBuilderFunc
t.aclList = a.MustComponent(syncacl.CName).(*syncacl.SyncAcl)
t.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
t.spaceStorage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage)
t.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
t.headsNotifiable = a.MustComponent(headsync.CName).(headsync.HeadSync)
t.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusUpdater)
t.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
t.peerManager = a.MustComponent(peermanager.ManagerName).(peermanager.PeerManager)
t.objectSync = a.MustComponent(objectsync.CName).(objectsync.ObjectSync)
t.log = log.With(zap.String("spaceId", t.spaceId))
return nil

View file

@ -8,7 +8,10 @@ import (
"github.com/anyproto/any-sync/net/peer"
)
const CName = "common.commonspace.peermanager"
const (
ProviderName = "common.commonspace.peermanagerprovider"
ManagerName = "common.commonspace.peermanager"
)
type PeerManager interface {
app.Component

View file

@ -48,14 +48,14 @@ type settings struct {
func (s *settings) Init(a *app.App) (err error) {
s.account = a.MustComponent(accountservice.CName).(accountservice.Service)
s.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager)
s.treeManager = app.MustComponent[treemanager.TreeManager](a)
s.headsync = a.MustComponent(headsync.CName).(headsync.HeadSync)
s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
s.deletionState = a.MustComponent(deletionstate.CName).(deletionstate.ObjectDeletionState)
s.treeBuilder = a.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilderComponent)
sharedState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState)
s.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
s.storage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage)
s.spaceIsDeleted = sharedState.SpaceIsDeleted
deps := Deps{

View file

@ -2,7 +2,6 @@ package commonspace
import (
"context"
"errors"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/headsync"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
@ -21,10 +20,6 @@ import (
"time"
)
var (
ErrSpaceClosed = errors.New("space is closed")
)
type SpaceCreatePayload struct {
// SigningKey is the signing key of the owner
SigningKey crypto.PrivKey
@ -126,7 +121,7 @@ func (s *space) Init(ctx context.Context) (err error) {
s.syncStatus = s.app.MustComponent(syncstatus.CName).(syncstatus.StatusProvider)
s.settings = s.app.MustComponent(settings.CName).(settings.Settings)
s.objectSync = s.app.MustComponent(objectsync.CName).(objectsync.ObjectSync)
s.storage = s.app.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
s.storage = s.app.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage)
return nil
}

View file

@ -14,6 +14,7 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/objectmanager"
"github.com/anyproto/any-sync/commonspace/objectsync"
"github.com/anyproto/any-sync/commonspace/objectsync/syncclient"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
@ -24,7 +25,6 @@ import (
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/streamsender"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/metric"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
@ -69,16 +69,10 @@ type spaceService struct {
func (s *spaceService) Init(a *app.App) (err error) {
s.config = a.MustComponent("config").(config.ConfigGetter).GetSpace()
s.account = a.MustComponent(accountservice.CName).(accountservice.Service)
s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider)
s.storageProvider = a.MustComponent(spacestorage.ProviderName).(spacestorage.SpaceStorageProvider)
s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service)
s.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager)
s.peermanagerProvider = a.MustComponent(peermanager.CName).(peermanager.PeerManagerProvider)
credProvider := a.Component(credentialprovider.CName)
if credProvider != nil {
s.credentialProvider = credProvider.(credentialprovider.CredentialProvider)
} else {
s.credentialProvider = credentialprovider.NewNoOp()
}
s.peermanagerProvider = a.MustComponent(peermanager.ProviderName).(peermanager.PeerManagerProvider)
s.pool = a.MustComponent(pool.CName).(pool.Pool)
s.metric, _ = a.Component(metric.CName).(metric.Metric)
s.app = a
@ -169,13 +163,6 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
} else {
state.TreeBuilderFunc = objecttree.BuildEmptyDataObjectTree
}
var syncStatus syncstatus.StatusProvider
if !s.configurationService.IsResponsible(st.Id()) {
// TODO: move it to the client package and add possibility to inject StatusProvider from the client
syncStatus = syncstatus.NewSyncStatusProvider()
} else {
syncStatus = syncstatus.NewNoOpSyncStatus()
}
peerManager, err := s.peermanagerProvider.NewPeerManager(ctx, id)
if err != nil {
return nil, err
@ -185,12 +172,11 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
Register(peerManager).
Register(newCommonStorage(st)).
Register(syncacl.New()).
Register(syncStatus).
Register(streamsender.New()).
Register(requestsender.New()).
Register(deletionstate.New()).
Register(settings.New()).
Register(NewObjectManager(s.treeManager)).
Register(objectmanager.New(s.treeManager)).
Register(syncclient.New()).
Register(objecttreebuilder.New()).
Register(objectsync.New()).

View file

@ -22,7 +22,7 @@ func (i *InMemorySpaceStorageProvider) Init(a *app.App) (err error) {
}
func (i *InMemorySpaceStorageProvider) Name() (name string) {
return CName
return ProviderName
}
func (i *InMemorySpaceStorageProvider) WaitSpaceStorage(ctx context.Context, id string) (SpaceStorage, error) {

View file

@ -27,7 +27,7 @@ func (i *InMemorySpaceStorage) Init(a *app.App) (err error) {
}
func (i *InMemorySpaceStorage) Name() (name string) {
return CName
return StorageName
}
func NewInMemorySpaceStorage(payload SpaceStorageCreatePayload) (SpaceStorage, error) {

View file

@ -12,7 +12,10 @@ import (
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
const CName = "common.commonspace.spacestorage"
const (
ProviderName = "common.commonspace.spacestorageprovider"
StorageName = "common.commonspace.spacestorage"
)
var (
ErrSpaceStorageExists = errors.New("space storage exists")

View file

@ -10,9 +10,11 @@ import (
"github.com/anyproto/any-sync/commonspace/credentialprovider"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/nodeconf"
@ -129,6 +131,14 @@ func (m *mockConf) NodeTypes(nodeId string) []nodeconf.NodeType {
type mockPeerManager struct {
}
func (p *mockPeerManager) Init(a *app.App) (err error) {
return nil
}
func (p *mockPeerManager) Name() (name string) {
return peermanager.ManagerName
}
func (p *mockPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
return nil
}
@ -153,7 +163,7 @@ func (m *mockPeerManagerProvider) Init(a *app.App) (err error) {
}
func (m *mockPeerManagerProvider) Name() (name string) {
return peermanager.CName
return peermanager.ProviderName
}
func (m *mockPeerManagerProvider) NewPeerManager(ctx context.Context, spaceId string) (sm peermanager.PeerManager, err error) {
@ -250,7 +260,7 @@ func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId s
func (t *mockTreeManager) Init(a *app.App) (err error) {
t.cache = ocache.New(func(ctx context.Context, id string) (value ocache.Object, err error) {
return t.space.BuildTree(ctx, id, BuildTreeOpts{})
return t.space.TreeBuilder().BuildTree(ctx, id, objecttreebuilder.BuildTreeOpts{})
},
ocache.WithGCPeriod(time.Minute),
ocache.WithTTL(time.Duration(60)*time.Second))
@ -325,6 +335,8 @@ func newFixture(t *testing.T) *spaceFixture {
}
fx.app.Register(fx.account).
Register(fx.config).
Register(syncstatus.NewNoOpSyncStatus()).
Register(credentialprovider.NewNoOp()).
Register(fx.configurationService).
Register(fx.storageProvider).
Register(fx.peermanagerProvider).

View file

@ -104,7 +104,7 @@ func (s *syncStatusProvider) Init(a *app.App) (err error) {
s.updateTimeout = syncTimeout
s.spaceId = sharedState.SpaceId
s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
s.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
s.storage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage)
return
}