From d8edabf9cdf8160f5fef0e9911b9c3e6355d6888 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Mon, 2 Dec 2024 21:16:52 +0100 Subject: [PATCH] Use anystore, rewrite inmemory storage --- .../object/acl/list/inmemorystorage.go | 14 ++++ commonspace/object/acl/list/list.go | 73 +++++++------------ commonspace/object/acl/list/listutils.go | 34 +-------- commonspace/object/acl/list/storage.go | 12 ++- 4 files changed, 55 insertions(+), 78 deletions(-) diff --git a/commonspace/object/acl/list/inmemorystorage.go b/commonspace/object/acl/list/inmemorystorage.go index fd054192..73c1c0b9 100644 --- a/commonspace/object/acl/list/inmemorystorage.go +++ b/commonspace/object/acl/list/inmemorystorage.go @@ -97,6 +97,20 @@ func (t *inMemoryStorage) GetAfterOrder(ctx context.Context, order int, iter Sto return nil } +func (t *inMemoryStorage) GetBeforeOrder(ctx context.Context, order int, iter StorageIterator) error { + t.RLock() + defer t.RUnlock() + if order > len(t.records) || order < 0 { + return nil + } + for i := 0; i <= order; i++ { + if shouldContinue, err := iter(ctx, t.records[i]); !shouldContinue || err != nil { + return err + } + } + return nil +} + func (t *inMemoryStorage) AddAll(ctx context.Context, records []StorageRecord) error { t.Lock() defer t.Unlock() diff --git a/commonspace/object/acl/list/list.go b/commonspace/object/acl/list/list.go index a3c3cd11..b77602af 100644 --- a/commonspace/object/acl/list/list.go +++ b/commonspace/object/acl/list/list.go @@ -8,7 +8,6 @@ import ( "sync" "github.com/anyproto/any-sync/commonspace/object/accountdata" - "github.com/anyproto/any-sync/commonspace/object/acl/liststorage" "github.com/anyproto/any-sync/consensus/consensusproto" "github.com/anyproto/any-sync/util/cidutil" "github.com/anyproto/any-sync/util/crypto" @@ -75,20 +74,20 @@ type aclList struct { recordBuilder AclRecordBuilder keyStorage crypto.KeyStorage aclState *AclState - storage liststorage.ListStorage + storage Storage sync.RWMutex } type internalDeps struct { - storage liststorage.ListStorage + storage Storage keyStorage crypto.KeyStorage stateBuilder *aclStateBuilder recordBuilder AclRecordBuilder acceptorVerifier AcceptorVerifier } -func BuildAclListWithIdentity(acc *accountdata.AccountKeys, storage liststorage.ListStorage, verifier AcceptorVerifier) (AclList, error) { +func BuildAclListWithIdentity(acc *accountdata.AccountKeys, storage Storage, verifier AcceptorVerifier) (AclList, error) { keyStorage := crypto.NewKeyStorage() deps := internalDeps{ storage: storage, @@ -100,48 +99,37 @@ func BuildAclListWithIdentity(acc *accountdata.AccountKeys, storage liststorage. return build(deps) } -func BuildAclList(storage liststorage.ListStorage, verifier AcceptorVerifier) (AclList, error) { - keyStorage := crypto.NewKeyStorage() - deps := internalDeps{ - storage: storage, - keyStorage: keyStorage, - stateBuilder: newAclStateBuilder(), - recordBuilder: NewAclRecordBuilder(storage.Id(), keyStorage, nil, verifier), - acceptorVerifier: verifier, - } - return build(deps) -} - func build(deps internalDeps) (list AclList, err error) { var ( + ctx = context.Background() storage = deps.storage id = deps.storage.Id() recBuilder = deps.recordBuilder stateBuilder = deps.stateBuilder ) - head, err := storage.Head() + head, err := storage.Head(ctx) if err != nil { return } - rawRecordWithId, err := storage.GetRawRecord(context.Background(), head) + rec, err := storage.Get(ctx, head) if err != nil { return } - record, err := recBuilder.UnmarshallWithId(rawRecordWithId) + record, err := recBuilder.UnmarshallWithId(rec.RawRecordWithId()) if err != nil { return } records := []*AclRecord{record} for record.PrevId != "" { - rawRecordWithId, err = storage.GetRawRecord(context.Background(), record.PrevId) + rec, err = storage.Get(ctx, record.PrevId) if err != nil { return } - record, err = recBuilder.UnmarshallWithId(rawRecordWithId) + record, err = recBuilder.UnmarshallWithId(rec.RawRecordWithId()) if err != nil { return } @@ -159,13 +147,13 @@ func build(deps internalDeps) (list AclList, err error) { indexes[records[len(records)/2].Id] = len(records) / 2 } // TODO: check if this is correct (raw model instead of unmarshalled) - rootWithId, err := storage.Root() + rootWithId, err := storage.Root(ctx) if err != nil { return } list = &aclList{ - root: rootWithId, + root: rootWithId.RawRecordWithId(), records: records, indexes: indexes, stateBuilder: stateBuilder, @@ -208,7 +196,7 @@ func (a *aclList) ValidateRawRecord(rawRec *consensusproto.RawRecord, afterValid func (a *aclList) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) error { for _, rec := range rawRecords { err := a.AddRawRecord(rec) - if err != nil && err != ErrRecordAlreadyExists { + if err != nil && !errors.Is(err, ErrRecordAlreadyExists) { return err } } @@ -230,13 +218,14 @@ func (a *aclList) AddRawRecord(rawRec *consensusproto.RawRecordWithId) (err erro a.setState(copyState) a.records = append(a.records, record) a.indexes[record.Id] = len(a.records) - 1 - if err = a.storage.AddRawRecord(context.Background(), rawRec); err != nil { - return + storageRec := StorageRecord{ + RawRecord: rawRec.Payload, + PrevId: record.PrevId, + Id: record.Id, + Order: len(a.records) + 1, + ChangeSize: len(rawRec.Payload), } - if err = a.storage.SetHead(rawRec.Id); err != nil { - return - } - return + return a.storage.AddAll(context.Background(), []StorageRecord{storageRec}) } func (a *aclList) setState(state *AclState) { @@ -309,7 +298,7 @@ func (a *aclList) Iterate(iterFunc IterFunc) { func (a *aclList) RecordsAfter(ctx context.Context, id string) (records []*consensusproto.RawRecordWithId, err error) { var recIdx int if id == "" { - recIdx = -1 + recIdx = 0 } else { var ok bool recIdx, ok = a.indexes[id] @@ -317,13 +306,10 @@ func (a *aclList) RecordsAfter(ctx context.Context, id string) (records []*conse return nil, ErrNoSuchRecord } } - for i := recIdx + 1; i < len(a.records); i++ { - rawRec, err := a.storage.GetRawRecord(ctx, a.records[i].Id) - if err != nil { - return nil, err - } - records = append(records, rawRec) - } + err = a.storage.GetAfterOrder(ctx, recIdx, func(ctx context.Context, record StorageRecord) (shouldContinue bool, err error) { + records = append(records, record.RawRecordWithId()) + return true, nil + }) return } @@ -335,13 +321,10 @@ func (a *aclList) RecordsBefore(ctx context.Context, headId string) (records []* if !ok { return nil, ErrNoSuchRecord } - for i := 0; i <= recIdx; i++ { - rawRec, err := a.storage.GetRawRecord(ctx, a.records[i].Id) - if err != nil { - return nil, err - } - records = append(records, rawRec) - } + err = a.storage.GetBeforeOrder(ctx, recIdx, func(ctx context.Context, record StorageRecord) (shouldContinue bool, err error) { + records = append(records, record.RawRecordWithId()) + return true, nil + }) return } diff --git a/commonspace/object/acl/list/listutils.go b/commonspace/object/acl/list/listutils.go index 89a926f5..876a476f 100644 --- a/commonspace/object/acl/list/listutils.go +++ b/commonspace/object/acl/list/listutils.go @@ -1,10 +1,7 @@ package list import ( - "fmt" - "github.com/anyproto/any-sync/commonspace/object/accountdata" - "github.com/anyproto/any-sync/commonspace/object/acl/liststorage" "github.com/anyproto/any-sync/consensus/consensusproto" "github.com/anyproto/any-sync/util/crypto" ) @@ -37,7 +34,7 @@ func newInMemoryDerivedAclMetadata(spaceId string, keys *accountdata.AccountKeys if err != nil { return nil, err } - st, err := liststorage.NewInMemoryAclListStorage(root.Id, []*consensusproto.RawRecordWithId{ + st, err := NewInMemoryStorage(root.Id, []*consensusproto.RawRecordWithId{ root, }) if err != nil { @@ -47,7 +44,7 @@ func newInMemoryDerivedAclMetadata(spaceId string, keys *accountdata.AccountKeys } func newInMemoryAclWithRoot(keys *accountdata.AccountKeys, root *consensusproto.RawRecordWithId) (AclList, error) { - st, err := liststorage.NewInMemoryAclListStorage(root.Id, []*consensusproto.RawRecordWithId{ + st, err := NewInMemoryStorage(root.Id, []*consensusproto.RawRecordWithId{ root, }) if err != nil { @@ -55,30 +52,3 @@ func newInMemoryAclWithRoot(keys *accountdata.AccountKeys, root *consensusproto. } return BuildAclListWithIdentity(keys, st, NoOpAcceptorVerifier{}) } - -func NewTestAclStateWithUsers(numWriters, numReaders, numInvites int) *AclState { - st := &AclState{ - keys: make(map[string]AclKeys), - accountStates: make(map[string]AccountState), - inviteKeys: make(map[string]crypto.PubKey), - requestRecords: make(map[string]RequestRecord), - pendingRequests: make(map[string]string), - keyStore: crypto.NewKeyStorage(), - } - for i := 0; i < numWriters; i++ { - st.accountStates[fmt.Sprint("w", i)] = AccountState{ - Permissions: AclPermissionsWriter, - Status: StatusActive, - } - } - for i := 0; i < numReaders; i++ { - st.accountStates[fmt.Sprint("r", i)] = AccountState{ - Permissions: AclPermissionsReader, - Status: StatusActive, - } - } - for i := 0; i < numInvites; i++ { - st.inviteKeys[fmt.Sprint("r", i)] = nil - } - return st -} diff --git a/commonspace/object/acl/list/storage.go b/commonspace/object/acl/list/storage.go index 00f54056..0b37f632 100644 --- a/commonspace/object/acl/list/storage.go +++ b/commonspace/object/acl/list/storage.go @@ -46,6 +46,7 @@ type Storage interface { Has(ctx context.Context, id string) (bool, error) Get(ctx context.Context, id string) (StorageRecord, error) GetAfterOrder(ctx context.Context, order int, iter StorageIterator) error + GetBeforeOrder(ctx context.Context, order int, iter StorageIterator) error AddAll(ctx context.Context, records []StorageRecord) error } @@ -163,7 +164,16 @@ func (s *storage) Has(ctx context.Context, id string) (bool, error) { func (s *storage) GetAfterOrder(ctx context.Context, order int, storageIter StorageIterator) error { qry := s.changesColl.Find(query.Key{Path: []string{orderKey}, Filter: query.NewComp(query.CompOpGte, order)}).Sort(orderKey) - iter, err := qry.Iter(ctx) + return s.getWithQuery(ctx, qry, storageIter) +} + +func (s *storage) GetBeforeOrder(ctx context.Context, order int, storageIter StorageIterator) error { + qry := s.changesColl.Find(query.Key{Path: []string{orderKey}, Filter: query.NewComp(query.CompOpLte, order)}).Sort(orderKey) + return s.getWithQuery(ctx, qry, storageIter) +} + +func (s *storage) getWithQuery(ctx context.Context, qry anystore.Query, storageIter StorageIterator) error { + iter, err := s.changesColl.Find(qry).Iter(ctx) if err != nil { return fmt.Errorf("find iter: %w", err) }