1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

WIP key value architecture

This commit is contained in:
Mikhail Rakhmanov 2025-04-04 21:34:22 +02:00
parent bb63dca331
commit 38d822ec48
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
6 changed files with 82 additions and 17 deletions

View file

@ -8,6 +8,7 @@ import (
"github.com/quic-go/quic-go"
"github.com/anyproto/any-sync/commonspace/headsync/headstorage"
"github.com/anyproto/any-sync/commonspace/object/keyvalue/kvinterfaces"
"github.com/anyproto/any-sync/commonspace/object/treesyncer"
"github.com/anyproto/any-sync/net/rpc/rpcerr"
@ -42,6 +43,7 @@ func newDiffSyncer(hs *headSync) DiffSyncer {
peerManager: hs.peerManager,
clientFactory: spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient),
credentialProvider: hs.credentialProvider,
keyValue: hs.keyValue,
log: newSyncLogger(hs.log, logPeriodSecs),
deletionState: hs.deletionState,
syncAcl: hs.syncAcl,
@ -63,6 +65,7 @@ type diffSyncer struct {
cancel context.CancelFunc
deletionState deletionstate.ObjectDeletionState
credentialProvider credentialprovider.CredentialProvider
keyValue kvinterfaces.KeyValueService
syncAcl syncacl.SyncAcl
}
@ -90,10 +93,17 @@ func (d *diffSyncer) updateHeads(update headstorage.HeadsUpdate) {
if update.IsDerived != nil && *update.IsDerived && len(update.Heads) == 1 && update.Heads[0] == update.Id {
return
}
d.diffContainer.Set(ldiff.Element{
Id: update.Id,
Head: concatStrings(update.Heads),
})
if update.Id == d.keyValue.DefaultStore().Id() {
d.diffContainer.NewDiff().Set(ldiff.Element{
Id: update.Id,
Head: concatStrings(update.Heads),
})
} else {
d.diffContainer.Set(ldiff.Element{
Id: update.Id,
Head: concatStrings(update.Heads),
})
}
}
// probably we should somehow batch the updates
oldHash := d.diffContainer.OldDiff().Hash()
@ -126,7 +136,6 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
}
}
d.log.DebugCtx(ctx, "diff done", zap.String("spaceId", d.spaceId), zap.Duration("dur", time.Since(st)))
d.peerManager.KeepAlive(ctx)
return nil
}
@ -153,6 +162,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
syncAclId = d.syncAcl.Id()
newIds, changedIds, removedIds []string
)
storageId := d.keyValue.DefaultStore().Id()
needsSync, diff, err := d.diffContainer.DiffTypeCheck(ctx, rdiff)
err = rpcerr.Unwrap(err)
if err != nil {
@ -169,17 +179,32 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
// not syncing ids which were removed through settings document
missingIds := d.deletionState.Filter(newIds)
existingIds := append(d.deletionState.Filter(removedIds), d.deletionState.Filter(changedIds)...)
prevExistingLen := len(existingIds)
var (
isStorage = false
isAcl = false
)
existingIds = slice.DiscardFromSlice(existingIds, func(s string) bool {
return s == syncAclId
if s == storageId {
isStorage = true
return true
}
if s == syncAclId {
isAcl = true
return true
}
return false
})
// if we removed acl head from the list
if len(existingIds) < prevExistingLen {
if isAcl {
if syncErr := d.syncAcl.SyncWithPeer(ctx, p); syncErr != nil {
log.Warn("failed to send acl sync message to peer", zap.String("aclId", syncAclId))
}
}
if isStorage {
if err = d.keyValue.SyncWithPeer(p); err != nil {
log.Warn("failed to send storage sync message to peer", zap.String("storageId", storageId))
}
}
// treeSyncer should not get acl id, that's why we filter existing ids before
err = d.treeSyncer.SyncAll(ctx, p, existingIds, missingIds)

View file

@ -16,6 +16,7 @@ import (
"github.com/anyproto/any-sync/commonspace/deletionstate"
"github.com/anyproto/any-sync/commonspace/headsync/headstorage"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
"github.com/anyproto/any-sync/commonspace/object/keyvalue/kvinterfaces"
"github.com/anyproto/any-sync/commonspace/object/treesyncer"
"github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/spacestate"
@ -58,6 +59,7 @@ type headSync struct {
credentialProvider credentialprovider.CredentialProvider
deletionState deletionstate.ObjectDeletionState
syncAcl syncacl.SyncAcl
keyValue kvinterfaces.KeyValueService
}
func New() HeadSync {
@ -80,6 +82,7 @@ func (h *headSync) Init(a *app.App) (err error) {
h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider)
h.treeSyncer = a.MustComponent(treesyncer.CName).(treesyncer.TreeSyncer)
h.deletionState = a.MustComponent(deletionstate.CName).(deletionstate.ObjectDeletionState)
h.keyValue = a.MustComponent(kvinterfaces.CName).(kvinterfaces.KeyValueService)
h.syncer = createDiffSyncer(h)
sync := func(ctx context.Context) (err error) {
return h.syncer.Sync(ctx)
@ -130,14 +133,23 @@ func (h *headSync) Close(ctx context.Context) (err error) {
func (h *headSync) fillDiff(ctx context.Context) error {
var els = make([]ldiff.Element, 0, 100)
var aclOrStorage []ldiff.Element
err := h.storage.HeadStorage().IterateEntries(ctx, headstorage.IterOpts{}, func(entry headstorage.HeadsEntry) (bool, error) {
if entry.IsDerived && entry.Heads[0] == entry.Id {
return true, nil
}
els = append(els, ldiff.Element{
Id: entry.Id,
Head: concatStrings(entry.Heads),
})
if entry.CommonSnapshot != "" {
els = append(els, ldiff.Element{
Id: entry.Id,
Head: concatStrings(entry.Heads),
})
} else {
// this whole stuff is done to prevent storage hash from being set to old diff
aclOrStorage = append(aclOrStorage, ldiff.Element{
Id: entry.Id,
Head: concatStrings(entry.Heads),
})
}
return true, nil
})
if err != nil {
@ -149,6 +161,8 @@ func (h *headSync) fillDiff(ctx context.Context) error {
})
log.Debug("setting acl", zap.String("aclId", h.syncAcl.Id()), zap.String("headId", h.syncAcl.Head().Id))
h.diffContainer.Set(els...)
// acl will be set twice to the diff but it doesn't matter
h.diffContainer.NewDiff().Set(aclOrStorage...)
oldHash := h.diffContainer.OldDiff().Hash()
newHash := h.diffContainer.NewDiff().Hash()
if err := h.storage.StateStorage().SetHash(ctx, oldHash, newHash); err != nil {

View file

@ -54,7 +54,7 @@ func (k *keyValueService) SyncWithPeer(p peer.Peer) (err error) {
k.limiter.ScheduleRequest(k.ctx, p.Id(), func() {
err = k.syncWithPeer(k.ctx, p)
if err != nil {
log.Error("failed to sync with peer", zap.String("peerId", p.ID()), zap.Error(err))
log.Error("failed to sync with peer", zap.String("peerId", p.Id()), zap.Error(err))
}
})
return nil
@ -192,6 +192,10 @@ func (k *keyValueService) Init(a *app.App) (err error) {
k.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
k.syncService = a.MustComponent(sync.CName).(sync.SyncService)
k.storageId = storageIdFromSpace(k.spaceId)
indexer := a.Component(keyvaluestorage.IndexerCName).(keyvaluestorage.Indexer)
if indexer == nil {
indexer = keyvaluestorage.NoOpIndexer{}
}
syncClient := syncstorage.New(spaceState.SpaceId, k.syncService)
k.defaultStore, err = keyvaluestorage.New(
k.ctx,
@ -201,7 +205,7 @@ func (k *keyValueService) Init(a *app.App) (err error) {
accountService.Account(),
syncClient,
aclList,
keyvaluestorage.NoOpIndexer{})
indexer)
return
}

View file

@ -8,6 +8,7 @@ import (
anystore "github.com/anyproto/any-store"
"go.uber.org/zap"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/headsync/headstorage"
"github.com/anyproto/any-sync/commonspace/object/accountdata"
@ -19,12 +20,23 @@ import (
var log = logger.NewNamed("common.keyvalue.keyvaluestorage")
const IndexerCName = "common.keyvalue.indexer"
type Indexer interface {
app.Component
Index(keyValue ...innerstorage.KeyValue) error
}
type NoOpIndexer struct{}
func (n NoOpIndexer) Init(a *app.App) (err error) {
return nil
}
func (n NoOpIndexer) Name() (name string) {
return IndexerCName
}
func (n NoOpIndexer) Index(keyValue ...innerstorage.KeyValue) error {
return nil
}

View file

@ -7,6 +7,7 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/keyvalue/keyvaluestorage"
"github.com/anyproto/any-sync/net/peer"
)
const CName = "common.object.keyvalue"
@ -15,4 +16,5 @@ type KeyValueService interface {
app.ComponentRunnable
DefaultStore() keyvaluestorage.Storage
HandleMessage(ctx context.Context, msg drpc.Message) (err error)
}
SyncWithPeer(p peer.Peer) (err error)
}

View file

@ -16,6 +16,7 @@ import (
"github.com/anyproto/any-sync/commonspace/headsync/headstorage"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
"github.com/anyproto/any-sync/commonspace/object/keyvalue/kvinterfaces"
"github.com/anyproto/any-sync/commonspace/object/treesyncer"
"github.com/anyproto/any-sync/commonspace/objecttreebuilder"
"github.com/anyproto/any-sync/commonspace/peermanager"
@ -83,6 +84,7 @@ type Space interface {
AclClient() aclclient.AclSpaceClient
SyncStatus() syncstatus.StatusUpdater
Storage() spacestorage.SpaceStorage
KeyValue() kvinterfaces.KeyValueService
DeleteTree(ctx context.Context, id string) (err error)
GetNodePeers(ctx context.Context) (peer []peer.Peer, err error)
@ -110,6 +112,7 @@ type space struct {
settings settings.Settings
storage spacestorage.SpaceStorage
aclClient aclclient.AclSpaceClient
keyValue kvinterfaces.KeyValueService
aclList list.AclList
creationTime time.Time
}
@ -150,7 +153,7 @@ func (s *space) DebugAllHeads() (heads []headsync.TreeHeads) {
s.storage.HeadStorage().IterateEntries(context.Background(), headstorage.IterOpts{}, func(entry headstorage.HeadsEntry) (bool, error) {
if entry.CommonSnapshot != "" {
heads = append(heads, headsync.TreeHeads{
Id: entry.Id,
Id: entry.Id,
Heads: entry.Heads,
})
}
@ -221,6 +224,7 @@ func (s *space) Init(ctx context.Context) (err error) {
s.streamPool = s.app.MustComponent(streampool.CName).(streampool.StreamPool)
s.treeSyncer = s.app.MustComponent(treesyncer.CName).(treesyncer.TreeSyncer)
s.aclClient = s.app.MustComponent(aclclient.CName).(aclclient.AclSpaceClient)
s.keyValue = s.app.MustComponent(kvinterfaces.CName).(kvinterfaces.KeyValueService)
return
}
@ -228,6 +232,10 @@ func (s *space) SyncStatus() syncstatus.StatusUpdater {
return s.syncStatus
}
func (s *space) KeyValue() kvinterfaces.KeyValueService {
return s.keyValue
}
func (s *space) Storage() spacestorage.SpaceStorage {
return s.storage
}