From 5492a43920389e299127d9307db45a2a7d88356f Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Tue, 3 Dec 2024 20:29:45 +0100 Subject: [PATCH] Headstorage --- .../headsync/headstorage/headstorage.go | 115 ++++++++++++++++++ .../object/tree/objecttree/objecttree.go | 4 +- commonspace/object/tree/objecttree/storage.go | 35 +++--- util/storeutil/store.go | 20 +++ 4 files changed, 151 insertions(+), 23 deletions(-) create mode 100644 commonspace/headsync/headstorage/headstorage.go create mode 100644 util/storeutil/store.go diff --git a/commonspace/headsync/headstorage/headstorage.go b/commonspace/headsync/headstorage/headstorage.go new file mode 100644 index 00000000..df478a86 --- /dev/null +++ b/commonspace/headsync/headstorage/headstorage.go @@ -0,0 +1,115 @@ +package headstorage + +import ( + "fmt" + + anystore "github.com/anyproto/any-store" + "github.com/anyproto/any-store/anyenc" + "github.com/anyproto/any-store/query" + "golang.org/x/net/context" + + "github.com/anyproto/any-sync/util/storeutil" +) + +const ( + headsKey = "h" + commonSnapshotKey = "s" + idKey = "id" + deletedStatusKey = "d" + headsCollectionName = "heads" +) + +type HeadsEntry struct { + Id string + Heads []string + CommonSnapshot string + DeletedStatus string +} + +type HeadsUpdate struct { + Id string + Heads []string + CommonSnapshot *string + DeletedStatus *string +} + +type EntryIterator func(entry HeadsEntry) (bool, error) + +type HeadStorage interface { + IterateEntries(ctx context.Context, iter EntryIterator) error + GetEntry(ctx context.Context, id string) (HeadsEntry, error) + UpdateEntryTx(txCtx context.Context, update HeadsUpdate) error +} + +type headStorage struct { + store anystore.DB + headsColl anystore.Collection + arena *anyenc.Arena +} + +func New(ctx context.Context, store anystore.DB) (HeadStorage, error) { + headsColl, err := store.Collection(ctx, headsCollectionName) + if err != nil { + return nil, err + } + st := &headStorage{ + store: store, + headsColl: headsColl, + arena: &anyenc.Arena{}, + } + return st, nil +} + +func (h *headStorage) IterateEntries(ctx context.Context, entryIter EntryIterator) error { + iter, err := h.headsColl.Find(nil).Sort(idKey).Iter(ctx) + if err != nil { + return fmt.Errorf("find iter: %w", err) + } + defer iter.Close() + + for iter.Next() { + doc, err := iter.Doc() + if err != nil { + return fmt.Errorf("doc not found: %w", err) + } + cont, err := entryIter(h.entryFromDoc(doc)) + if !cont { + return err + } + } + return nil +} + +func (h *headStorage) GetEntry(ctx context.Context, id string) (HeadsEntry, error) { + doc, err := h.headsColl.FindId(ctx, id) + if err != nil { + return HeadsEntry{}, err + } + return h.entryFromDoc(doc), nil +} + +func (h *headStorage) UpdateEntryTx(ctx context.Context, update HeadsUpdate) (err error) { + mod := query.ModifyFunc(func(a *anyenc.Arena, v *anyenc.Value) (result *anyenc.Value, modified bool, err error) { + if update.DeletedStatus != nil { + v.Set(deletedStatusKey, a.NewString(*update.DeletedStatus)) + } + if update.CommonSnapshot != nil { + v.Set(commonSnapshotKey, a.NewString(*update.CommonSnapshot)) + } + if update.Heads != nil { + v.Set(headsKey, storeutil.NewStringArrayValue(update.Heads, a)) + } + return v, true, nil + }) + _, err = h.headsColl.UpsertId(ctx, update.Id, mod) + return +} + +func (h *headStorage) entryFromDoc(doc anystore.Doc) HeadsEntry { + return HeadsEntry{ + Id: doc.Value().GetString(idKey), + Heads: storeutil.StringsFromArrayValue(doc.Value(), headsKey), + CommonSnapshot: doc.Value().GetString(commonSnapshotKey), + DeletedStatus: doc.Value().GetString(deletedStatusKey), + } +} diff --git a/commonspace/object/tree/objecttree/objecttree.go b/commonspace/object/tree/objecttree/objecttree.go index ba2494b2..d22f1e4e 100644 --- a/commonspace/object/tree/objecttree/objecttree.go +++ b/commonspace/object/tree/objecttree/objecttree.go @@ -724,7 +724,7 @@ func (ot *objectTree) TryClose(objectTTL time.Duration) (bool, error) { } func (ot *objectTree) Close() error { - return nil + return ot.storage.Close() } func (ot *objectTree) Delete() error { @@ -732,7 +732,7 @@ func (ot *objectTree) Delete() error { return nil } ot.isDeleted = true - return ot.storage.Delete() + return ot.storage.Delete(context.Background()) } func (ot *objectTree) SnapshotPath() []string { diff --git a/commonspace/object/tree/objecttree/storage.go b/commonspace/object/tree/objecttree/storage.go index e9f78856..4ee5e2c5 100644 --- a/commonspace/object/tree/objecttree/storage.go +++ b/commonspace/object/tree/objecttree/storage.go @@ -11,6 +11,7 @@ import ( "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/util/crypto" + "github.com/anyproto/any-sync/util/storeutil" ) const ( @@ -54,7 +55,8 @@ type Storage interface { Get(ctx context.Context, id string) (StorageChange, error) GetAfterOrder(ctx context.Context, orderId string, iter StorageIterator) error AddAll(ctx context.Context, changes []StorageChange, heads []string, commonSnapshot string) error - Delete() error + Delete(ctx context.Context) error + Close() error } type storage struct { @@ -118,7 +120,7 @@ func CreateStorage(ctx context.Context, root *treechangeproto.RawTreeChangeWithI return nil, err } headsDoc := st.arena.NewObject() - headsDoc.Set(headsKey, newStringArrayValue([]string{root.Id}, st.arena)) + headsDoc.Set(headsKey, storeutil.NewStringArrayValue([]string{root.Id}, st.arena)) headsDoc.Set(commonSnapshotKey, st.arena.NewString(root.Id)) headsDoc.Set(idKey, st.arena.NewString(root.Id)) err = st.headsColl.Insert(tx.Context(), headsDoc) @@ -195,7 +197,7 @@ func (s *storage) GetAfterOrder(ctx context.Context, orderId string, storageIter if err != nil { return fmt.Errorf("doc not found: %w", err) } - parsed, err := s.changeFromDoc(doc.Value().GetString("id"), doc) + parsed, err := s.changeFromDoc(doc.Value().GetString(idKey), doc) if err != nil { return fmt.Errorf("failed to make change from doc: %w", err) } @@ -225,7 +227,7 @@ func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []s return nil } mod := query.ModifyFunc(func(a *anyenc.Arena, v *anyenc.Value) (result *anyenc.Value, modified bool, err error) { - v.Set(headsKey, newStringArrayValue(heads, a)) + v.Set(headsKey, storeutil.NewStringArrayValue(heads, a)) v.Set(commonSnapshotKey, a.NewString(commonSnapshot)) return v, true, nil }) @@ -237,9 +239,8 @@ func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []s return tx.Commit() } -func (s *storage) Delete() error { - // TODO: add context - tx, err := s.store.WriteTx(context.Background()) +func (s *storage) Delete(ctx context.Context) error { + tx, err := s.store.WriteTx(ctx) if err != nil { return fmt.Errorf("failed to create write tx: %w", err) } @@ -256,6 +257,10 @@ func (s *storage) Delete() error { return tx.Commit() } +func (s *storage) Close() error { + return s.changesColl.Close() +} + func (s *storage) Id() string { return s.id } @@ -293,23 +298,11 @@ func (s *storage) changeFromDoc(id string, doc anystore.Doc) (StorageChange, err OrderId: doc.Value().GetString(orderKey), ChangeSize: doc.Value().GetInt(changeSizeKey), SnapshotCounter: doc.Value().GetInt(snapshotCounterKey), - } - prevIds := doc.Value().GetArray(prevIdsKey) - change.PrevIds = make([]string, 0, len(prevIds)) - for _, item := range prevIds { - change.PrevIds = append(change.PrevIds, item.GetString()) + PrevIds: storeutil.StringsFromArrayValue(doc.Value(), prevIdsKey), } return change, nil } -func newStringArrayValue(strings []string, arena *anyenc.Arena) *anyenc.Value { - val := arena.NewArray() - for idx, str := range strings { - val.SetArrayItem(idx, arena.NewString(str)) - } - return val -} - func newStorageChangeValue(ch StorageChange, arena *anyenc.Arena) *anyenc.Value { newVal := arena.NewObject() newVal.Set(orderKey, arena.NewString(ch.OrderId)) @@ -319,7 +312,7 @@ func newStorageChangeValue(ch StorageChange, arena *anyenc.Arena) *anyenc.Value newVal.Set(changeSizeKey, arena.NewNumberInt(ch.ChangeSize)) newVal.Set(idKey, arena.NewString(ch.Id)) if len(ch.PrevIds) != 0 { - newVal.Set(prevIdsKey, newStringArrayValue(ch.PrevIds, arena)) + newVal.Set(prevIdsKey, storeutil.NewStringArrayValue(ch.PrevIds, arena)) } return newVal } diff --git a/util/storeutil/store.go b/util/storeutil/store.go new file mode 100644 index 00000000..714ed2e7 --- /dev/null +++ b/util/storeutil/store.go @@ -0,0 +1,20 @@ +package storeutil + +import "github.com/anyproto/any-store/anyenc" + +func NewStringArrayValue(strings []string, arena *anyenc.Arena) *anyenc.Value { + val := arena.NewArray() + for idx, str := range strings { + val.SetArrayItem(idx, arena.NewString(str)) + } + return val +} + +func StringsFromArrayValue(val *anyenc.Value, key string) (res []string) { + vals := val.GetArray(key) + res = make([]string, 0, len(vals)) + for _, item := range vals { + res = append(res, item.GetString()) + } + return res +}