1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 14:07:02 +09:00

Use anystore, rewrite inmemory storage

This commit is contained in:
mcrakhman 2024-12-02 21:16:52 +01:00
parent 76db272232
commit d8edabf9cd
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
4 changed files with 55 additions and 78 deletions

View file

@ -97,6 +97,20 @@ func (t *inMemoryStorage) GetAfterOrder(ctx context.Context, order int, iter Sto
return nil
}
func (t *inMemoryStorage) GetBeforeOrder(ctx context.Context, order int, iter StorageIterator) error {
t.RLock()
defer t.RUnlock()
if order > len(t.records) || order < 0 {
return nil
}
for i := 0; i <= order; i++ {
if shouldContinue, err := iter(ctx, t.records[i]); !shouldContinue || err != nil {
return err
}
}
return nil
}
func (t *inMemoryStorage) AddAll(ctx context.Context, records []StorageRecord) error {
t.Lock()
defer t.Unlock()

View file

@ -8,7 +8,6 @@ import (
"sync"
"github.com/anyproto/any-sync/commonspace/object/accountdata"
"github.com/anyproto/any-sync/commonspace/object/acl/liststorage"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/anyproto/any-sync/util/cidutil"
"github.com/anyproto/any-sync/util/crypto"
@ -75,20 +74,20 @@ type aclList struct {
recordBuilder AclRecordBuilder
keyStorage crypto.KeyStorage
aclState *AclState
storage liststorage.ListStorage
storage Storage
sync.RWMutex
}
type internalDeps struct {
storage liststorage.ListStorage
storage Storage
keyStorage crypto.KeyStorage
stateBuilder *aclStateBuilder
recordBuilder AclRecordBuilder
acceptorVerifier AcceptorVerifier
}
func BuildAclListWithIdentity(acc *accountdata.AccountKeys, storage liststorage.ListStorage, verifier AcceptorVerifier) (AclList, error) {
func BuildAclListWithIdentity(acc *accountdata.AccountKeys, storage Storage, verifier AcceptorVerifier) (AclList, error) {
keyStorage := crypto.NewKeyStorage()
deps := internalDeps{
storage: storage,
@ -100,48 +99,37 @@ func BuildAclListWithIdentity(acc *accountdata.AccountKeys, storage liststorage.
return build(deps)
}
func BuildAclList(storage liststorage.ListStorage, verifier AcceptorVerifier) (AclList, error) {
keyStorage := crypto.NewKeyStorage()
deps := internalDeps{
storage: storage,
keyStorage: keyStorage,
stateBuilder: newAclStateBuilder(),
recordBuilder: NewAclRecordBuilder(storage.Id(), keyStorage, nil, verifier),
acceptorVerifier: verifier,
}
return build(deps)
}
func build(deps internalDeps) (list AclList, err error) {
var (
ctx = context.Background()
storage = deps.storage
id = deps.storage.Id()
recBuilder = deps.recordBuilder
stateBuilder = deps.stateBuilder
)
head, err := storage.Head()
head, err := storage.Head(ctx)
if err != nil {
return
}
rawRecordWithId, err := storage.GetRawRecord(context.Background(), head)
rec, err := storage.Get(ctx, head)
if err != nil {
return
}
record, err := recBuilder.UnmarshallWithId(rawRecordWithId)
record, err := recBuilder.UnmarshallWithId(rec.RawRecordWithId())
if err != nil {
return
}
records := []*AclRecord{record}
for record.PrevId != "" {
rawRecordWithId, err = storage.GetRawRecord(context.Background(), record.PrevId)
rec, err = storage.Get(ctx, record.PrevId)
if err != nil {
return
}
record, err = recBuilder.UnmarshallWithId(rawRecordWithId)
record, err = recBuilder.UnmarshallWithId(rec.RawRecordWithId())
if err != nil {
return
}
@ -159,13 +147,13 @@ func build(deps internalDeps) (list AclList, err error) {
indexes[records[len(records)/2].Id] = len(records) / 2
}
// TODO: check if this is correct (raw model instead of unmarshalled)
rootWithId, err := storage.Root()
rootWithId, err := storage.Root(ctx)
if err != nil {
return
}
list = &aclList{
root: rootWithId,
root: rootWithId.RawRecordWithId(),
records: records,
indexes: indexes,
stateBuilder: stateBuilder,
@ -208,7 +196,7 @@ func (a *aclList) ValidateRawRecord(rawRec *consensusproto.RawRecord, afterValid
func (a *aclList) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) error {
for _, rec := range rawRecords {
err := a.AddRawRecord(rec)
if err != nil && err != ErrRecordAlreadyExists {
if err != nil && !errors.Is(err, ErrRecordAlreadyExists) {
return err
}
}
@ -230,13 +218,14 @@ func (a *aclList) AddRawRecord(rawRec *consensusproto.RawRecordWithId) (err erro
a.setState(copyState)
a.records = append(a.records, record)
a.indexes[record.Id] = len(a.records) - 1
if err = a.storage.AddRawRecord(context.Background(), rawRec); err != nil {
return
storageRec := StorageRecord{
RawRecord: rawRec.Payload,
PrevId: record.PrevId,
Id: record.Id,
Order: len(a.records) + 1,
ChangeSize: len(rawRec.Payload),
}
if err = a.storage.SetHead(rawRec.Id); err != nil {
return
}
return
return a.storage.AddAll(context.Background(), []StorageRecord{storageRec})
}
func (a *aclList) setState(state *AclState) {
@ -309,7 +298,7 @@ func (a *aclList) Iterate(iterFunc IterFunc) {
func (a *aclList) RecordsAfter(ctx context.Context, id string) (records []*consensusproto.RawRecordWithId, err error) {
var recIdx int
if id == "" {
recIdx = -1
recIdx = 0
} else {
var ok bool
recIdx, ok = a.indexes[id]
@ -317,13 +306,10 @@ func (a *aclList) RecordsAfter(ctx context.Context, id string) (records []*conse
return nil, ErrNoSuchRecord
}
}
for i := recIdx + 1; i < len(a.records); i++ {
rawRec, err := a.storage.GetRawRecord(ctx, a.records[i].Id)
if err != nil {
return nil, err
}
records = append(records, rawRec)
}
err = a.storage.GetAfterOrder(ctx, recIdx, func(ctx context.Context, record StorageRecord) (shouldContinue bool, err error) {
records = append(records, record.RawRecordWithId())
return true, nil
})
return
}
@ -335,13 +321,10 @@ func (a *aclList) RecordsBefore(ctx context.Context, headId string) (records []*
if !ok {
return nil, ErrNoSuchRecord
}
for i := 0; i <= recIdx; i++ {
rawRec, err := a.storage.GetRawRecord(ctx, a.records[i].Id)
if err != nil {
return nil, err
}
records = append(records, rawRec)
}
err = a.storage.GetBeforeOrder(ctx, recIdx, func(ctx context.Context, record StorageRecord) (shouldContinue bool, err error) {
records = append(records, record.RawRecordWithId())
return true, nil
})
return
}

View file

@ -1,10 +1,7 @@
package list
import (
"fmt"
"github.com/anyproto/any-sync/commonspace/object/accountdata"
"github.com/anyproto/any-sync/commonspace/object/acl/liststorage"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/anyproto/any-sync/util/crypto"
)
@ -37,7 +34,7 @@ func newInMemoryDerivedAclMetadata(spaceId string, keys *accountdata.AccountKeys
if err != nil {
return nil, err
}
st, err := liststorage.NewInMemoryAclListStorage(root.Id, []*consensusproto.RawRecordWithId{
st, err := NewInMemoryStorage(root.Id, []*consensusproto.RawRecordWithId{
root,
})
if err != nil {
@ -47,7 +44,7 @@ func newInMemoryDerivedAclMetadata(spaceId string, keys *accountdata.AccountKeys
}
func newInMemoryAclWithRoot(keys *accountdata.AccountKeys, root *consensusproto.RawRecordWithId) (AclList, error) {
st, err := liststorage.NewInMemoryAclListStorage(root.Id, []*consensusproto.RawRecordWithId{
st, err := NewInMemoryStorage(root.Id, []*consensusproto.RawRecordWithId{
root,
})
if err != nil {
@ -55,30 +52,3 @@ func newInMemoryAclWithRoot(keys *accountdata.AccountKeys, root *consensusproto.
}
return BuildAclListWithIdentity(keys, st, NoOpAcceptorVerifier{})
}
func NewTestAclStateWithUsers(numWriters, numReaders, numInvites int) *AclState {
st := &AclState{
keys: make(map[string]AclKeys),
accountStates: make(map[string]AccountState),
inviteKeys: make(map[string]crypto.PubKey),
requestRecords: make(map[string]RequestRecord),
pendingRequests: make(map[string]string),
keyStore: crypto.NewKeyStorage(),
}
for i := 0; i < numWriters; i++ {
st.accountStates[fmt.Sprint("w", i)] = AccountState{
Permissions: AclPermissionsWriter,
Status: StatusActive,
}
}
for i := 0; i < numReaders; i++ {
st.accountStates[fmt.Sprint("r", i)] = AccountState{
Permissions: AclPermissionsReader,
Status: StatusActive,
}
}
for i := 0; i < numInvites; i++ {
st.inviteKeys[fmt.Sprint("r", i)] = nil
}
return st
}

View file

@ -46,6 +46,7 @@ type Storage interface {
Has(ctx context.Context, id string) (bool, error)
Get(ctx context.Context, id string) (StorageRecord, error)
GetAfterOrder(ctx context.Context, order int, iter StorageIterator) error
GetBeforeOrder(ctx context.Context, order int, iter StorageIterator) error
AddAll(ctx context.Context, records []StorageRecord) error
}
@ -163,7 +164,16 @@ func (s *storage) Has(ctx context.Context, id string) (bool, error) {
func (s *storage) GetAfterOrder(ctx context.Context, order int, storageIter StorageIterator) error {
qry := s.changesColl.Find(query.Key{Path: []string{orderKey}, Filter: query.NewComp(query.CompOpGte, order)}).Sort(orderKey)
iter, err := qry.Iter(ctx)
return s.getWithQuery(ctx, qry, storageIter)
}
func (s *storage) GetBeforeOrder(ctx context.Context, order int, storageIter StorageIterator) error {
qry := s.changesColl.Find(query.Key{Path: []string{orderKey}, Filter: query.NewComp(query.CompOpLte, order)}).Sort(orderKey)
return s.getWithQuery(ctx, qry, storageIter)
}
func (s *storage) getWithQuery(ctx context.Context, qry anystore.Query, storageIter StorageIterator) error {
iter, err := s.changesColl.Find(qry).Iter(ctx)
if err != nil {
return fmt.Errorf("find iter: %w", err)
}