1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-10 01:51:11 +09:00

Headstorage

This commit is contained in:
mcrakhman 2024-12-03 20:29:45 +01:00
parent ad3d0c40d0
commit 5492a43920
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
4 changed files with 151 additions and 23 deletions

View file

@ -0,0 +1,115 @@
package headstorage
import (
"fmt"
anystore "github.com/anyproto/any-store"
"github.com/anyproto/any-store/anyenc"
"github.com/anyproto/any-store/query"
"golang.org/x/net/context"
"github.com/anyproto/any-sync/util/storeutil"
)
const (
headsKey = "h"
commonSnapshotKey = "s"
idKey = "id"
deletedStatusKey = "d"
headsCollectionName = "heads"
)
type HeadsEntry struct {
Id string
Heads []string
CommonSnapshot string
DeletedStatus string
}
type HeadsUpdate struct {
Id string
Heads []string
CommonSnapshot *string
DeletedStatus *string
}
type EntryIterator func(entry HeadsEntry) (bool, error)
type HeadStorage interface {
IterateEntries(ctx context.Context, iter EntryIterator) error
GetEntry(ctx context.Context, id string) (HeadsEntry, error)
UpdateEntryTx(txCtx context.Context, update HeadsUpdate) error
}
type headStorage struct {
store anystore.DB
headsColl anystore.Collection
arena *anyenc.Arena
}
func New(ctx context.Context, store anystore.DB) (HeadStorage, error) {
headsColl, err := store.Collection(ctx, headsCollectionName)
if err != nil {
return nil, err
}
st := &headStorage{
store: store,
headsColl: headsColl,
arena: &anyenc.Arena{},
}
return st, nil
}
func (h *headStorage) IterateEntries(ctx context.Context, entryIter EntryIterator) error {
iter, err := h.headsColl.Find(nil).Sort(idKey).Iter(ctx)
if err != nil {
return fmt.Errorf("find iter: %w", err)
}
defer iter.Close()
for iter.Next() {
doc, err := iter.Doc()
if err != nil {
return fmt.Errorf("doc not found: %w", err)
}
cont, err := entryIter(h.entryFromDoc(doc))
if !cont {
return err
}
}
return nil
}
func (h *headStorage) GetEntry(ctx context.Context, id string) (HeadsEntry, error) {
doc, err := h.headsColl.FindId(ctx, id)
if err != nil {
return HeadsEntry{}, err
}
return h.entryFromDoc(doc), nil
}
func (h *headStorage) UpdateEntryTx(ctx context.Context, update HeadsUpdate) (err error) {
mod := query.ModifyFunc(func(a *anyenc.Arena, v *anyenc.Value) (result *anyenc.Value, modified bool, err error) {
if update.DeletedStatus != nil {
v.Set(deletedStatusKey, a.NewString(*update.DeletedStatus))
}
if update.CommonSnapshot != nil {
v.Set(commonSnapshotKey, a.NewString(*update.CommonSnapshot))
}
if update.Heads != nil {
v.Set(headsKey, storeutil.NewStringArrayValue(update.Heads, a))
}
return v, true, nil
})
_, err = h.headsColl.UpsertId(ctx, update.Id, mod)
return
}
func (h *headStorage) entryFromDoc(doc anystore.Doc) HeadsEntry {
return HeadsEntry{
Id: doc.Value().GetString(idKey),
Heads: storeutil.StringsFromArrayValue(doc.Value(), headsKey),
CommonSnapshot: doc.Value().GetString(commonSnapshotKey),
DeletedStatus: doc.Value().GetString(deletedStatusKey),
}
}

View file

@ -724,7 +724,7 @@ func (ot *objectTree) TryClose(objectTTL time.Duration) (bool, error) {
}
func (ot *objectTree) Close() error {
return nil
return ot.storage.Close()
}
func (ot *objectTree) Delete() error {
@ -732,7 +732,7 @@ func (ot *objectTree) Delete() error {
return nil
}
ot.isDeleted = true
return ot.storage.Delete()
return ot.storage.Delete(context.Background())
}
func (ot *objectTree) SnapshotPath() []string {

View file

@ -11,6 +11,7 @@ import (
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/util/crypto"
"github.com/anyproto/any-sync/util/storeutil"
)
const (
@ -54,7 +55,8 @@ type Storage interface {
Get(ctx context.Context, id string) (StorageChange, error)
GetAfterOrder(ctx context.Context, orderId string, iter StorageIterator) error
AddAll(ctx context.Context, changes []StorageChange, heads []string, commonSnapshot string) error
Delete() error
Delete(ctx context.Context) error
Close() error
}
type storage struct {
@ -118,7 +120,7 @@ func CreateStorage(ctx context.Context, root *treechangeproto.RawTreeChangeWithI
return nil, err
}
headsDoc := st.arena.NewObject()
headsDoc.Set(headsKey, newStringArrayValue([]string{root.Id}, st.arena))
headsDoc.Set(headsKey, storeutil.NewStringArrayValue([]string{root.Id}, st.arena))
headsDoc.Set(commonSnapshotKey, st.arena.NewString(root.Id))
headsDoc.Set(idKey, st.arena.NewString(root.Id))
err = st.headsColl.Insert(tx.Context(), headsDoc)
@ -195,7 +197,7 @@ func (s *storage) GetAfterOrder(ctx context.Context, orderId string, storageIter
if err != nil {
return fmt.Errorf("doc not found: %w", err)
}
parsed, err := s.changeFromDoc(doc.Value().GetString("id"), doc)
parsed, err := s.changeFromDoc(doc.Value().GetString(idKey), doc)
if err != nil {
return fmt.Errorf("failed to make change from doc: %w", err)
}
@ -225,7 +227,7 @@ func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []s
return nil
}
mod := query.ModifyFunc(func(a *anyenc.Arena, v *anyenc.Value) (result *anyenc.Value, modified bool, err error) {
v.Set(headsKey, newStringArrayValue(heads, a))
v.Set(headsKey, storeutil.NewStringArrayValue(heads, a))
v.Set(commonSnapshotKey, a.NewString(commonSnapshot))
return v, true, nil
})
@ -237,9 +239,8 @@ func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []s
return tx.Commit()
}
func (s *storage) Delete() error {
// TODO: add context
tx, err := s.store.WriteTx(context.Background())
func (s *storage) Delete(ctx context.Context) error {
tx, err := s.store.WriteTx(ctx)
if err != nil {
return fmt.Errorf("failed to create write tx: %w", err)
}
@ -256,6 +257,10 @@ func (s *storage) Delete() error {
return tx.Commit()
}
func (s *storage) Close() error {
return s.changesColl.Close()
}
func (s *storage) Id() string {
return s.id
}
@ -293,23 +298,11 @@ func (s *storage) changeFromDoc(id string, doc anystore.Doc) (StorageChange, err
OrderId: doc.Value().GetString(orderKey),
ChangeSize: doc.Value().GetInt(changeSizeKey),
SnapshotCounter: doc.Value().GetInt(snapshotCounterKey),
}
prevIds := doc.Value().GetArray(prevIdsKey)
change.PrevIds = make([]string, 0, len(prevIds))
for _, item := range prevIds {
change.PrevIds = append(change.PrevIds, item.GetString())
PrevIds: storeutil.StringsFromArrayValue(doc.Value(), prevIdsKey),
}
return change, nil
}
func newStringArrayValue(strings []string, arena *anyenc.Arena) *anyenc.Value {
val := arena.NewArray()
for idx, str := range strings {
val.SetArrayItem(idx, arena.NewString(str))
}
return val
}
func newStorageChangeValue(ch StorageChange, arena *anyenc.Arena) *anyenc.Value {
newVal := arena.NewObject()
newVal.Set(orderKey, arena.NewString(ch.OrderId))
@ -319,7 +312,7 @@ func newStorageChangeValue(ch StorageChange, arena *anyenc.Arena) *anyenc.Value
newVal.Set(changeSizeKey, arena.NewNumberInt(ch.ChangeSize))
newVal.Set(idKey, arena.NewString(ch.Id))
if len(ch.PrevIds) != 0 {
newVal.Set(prevIdsKey, newStringArrayValue(ch.PrevIds, arena))
newVal.Set(prevIdsKey, storeutil.NewStringArrayValue(ch.PrevIds, arena))
}
return newVal
}

20
util/storeutil/store.go Normal file
View file

@ -0,0 +1,20 @@
package storeutil
import "github.com/anyproto/any-store/anyenc"
func NewStringArrayValue(strings []string, arena *anyenc.Arena) *anyenc.Value {
val := arena.NewArray()
for idx, str := range strings {
val.SetArrayItem(idx, arena.NewString(str))
}
return val
}
func StringsFromArrayValue(val *anyenc.Value, key string) (res []string) {
vals := val.GetArray(key)
res = make([]string, 0, len(vals))
for _, item := range vals {
res = append(res, item.GetString())
}
return res
}