1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Fix rollbacks after commit

This commit is contained in:
Mikhail Rakhmanov 2025-04-22 15:50:17 +02:00
parent e94c4b9395
commit 68af5f1320
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
4 changed files with 32 additions and 39 deletions

View file

@ -108,12 +108,8 @@ func Create(ctx context.Context, state State, store anystore.DB) (st StateStorag
return nil, err return nil, err
} }
storage, err := CreateTx(tx.Context(), state, store) storage, err := CreateTx(tx.Context(), state, store)
defer func() {
if err != nil {
tx.Rollback()
}
}()
if err != nil { if err != nil {
tx.Rollback()
return nil, err return nil, err
} }
return storage, tx.Commit() return storage, tx.Commit()

View file

@ -66,12 +66,8 @@ func CreateStorage(ctx context.Context, root *consensusproto.RawRecordWithId, he
return nil, err return nil, err
} }
storage, err := CreateStorageTx(tx.Context(), root, headStorage, store) storage, err := CreateStorageTx(tx.Context(), root, headStorage, store)
defer func() {
if err != nil {
tx.Rollback()
}
}()
if err != nil { if err != nil {
tx.Rollback()
return nil, err return nil, err
} }
return storage, tx.Commit() return storage, tx.Commit()
@ -210,6 +206,13 @@ func (s *storage) AddAll(ctx context.Context, records []StorageRecord) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to create write tx: %w", err) return fmt.Errorf("failed to create write tx: %w", err)
} }
defer func() {
if err != nil {
_ = tx.Rollback()
} else {
err = tx.Commit()
}
}()
vals := make([]*anyenc.Value, 0, len(records)) vals := make([]*anyenc.Value, 0, len(records))
for _, ch := range records { for _, ch := range records {
newVal := newStorageRecordValue(ch, arena) newVal := newStorageRecordValue(ch, arena)
@ -217,20 +220,14 @@ func (s *storage) AddAll(ctx context.Context, records []StorageRecord) error {
} }
err = s.recordsColl.Insert(tx.Context(), vals...) err = s.recordsColl.Insert(tx.Context(), vals...)
if err != nil { if err != nil {
tx.Rollback() return err
return nil
} }
head := records[len(records)-1].Id head := records[len(records)-1].Id
update := headstorage.HeadsUpdate{ update := headstorage.HeadsUpdate{
Id: s.id, Id: s.id,
Heads: []string{head}, Heads: []string{head},
} }
err = s.headStorage.UpdateEntryTx(tx.Context(), update) return s.headStorage.UpdateEntryTx(tx.Context(), update)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
} }
func (s *storage) Id() string { func (s *storage) Id() string {

View file

@ -82,12 +82,8 @@ func CreateStorage(ctx context.Context, root *treechangeproto.RawTreeChangeWithI
return nil, err return nil, err
} }
storage, err := CreateStorageTx(tx.Context(), root, headStorage, store) storage, err := CreateStorageTx(tx.Context(), root, headStorage, store)
defer func() {
if err != nil {
tx.Rollback()
}
}()
if err != nil { if err != nil {
tx.Rollback()
return nil, err return nil, err
} }
return storage, tx.Commit() return storage, tx.Commit()
@ -225,13 +221,19 @@ func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []s
if err != nil { if err != nil {
return fmt.Errorf("failed to create write tx: %w", err) return fmt.Errorf("failed to create write tx: %w", err)
} }
defer func() {
if err != nil {
tx.Rollback()
} else {
err = tx.Commit()
}
}()
for _, ch := range changes { for _, ch := range changes {
ch.TreeId = s.id ch.TreeId = s.id
newVal := newStorageChangeValue(ch, arena) newVal := newStorageChangeValue(ch, arena)
err = s.changesColl.Insert(tx.Context(), newVal) err = s.changesColl.Insert(tx.Context(), newVal)
arena.Reset() arena.Reset()
if err != nil { if err != nil {
tx.Rollback()
return err return err
} }
} }
@ -240,12 +242,7 @@ func (s *storage) AddAll(ctx context.Context, changes []StorageChange, heads []s
Heads: heads, Heads: heads,
CommonSnapshot: &commonSnapshot, CommonSnapshot: &commonSnapshot,
} }
err = s.headStorage.UpdateEntryTx(tx.Context(), update) return s.headStorage.UpdateEntryTx(tx.Context(), update)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
} }
func (s *storage) AddAllNoError(ctx context.Context, changes []StorageChange, heads []string, commonSnapshot string) error { func (s *storage) AddAllNoError(ctx context.Context, changes []StorageChange, heads []string, commonSnapshot string) error {
@ -255,13 +252,19 @@ func (s *storage) AddAllNoError(ctx context.Context, changes []StorageChange, he
if err != nil { if err != nil {
return fmt.Errorf("failed to create write tx: %w", err) return fmt.Errorf("failed to create write tx: %w", err)
} }
defer func() {
if err != nil {
tx.Rollback()
} else {
err = tx.Commit()
}
}()
for _, ch := range changes { for _, ch := range changes {
ch.TreeId = s.id ch.TreeId = s.id
newVal := newStorageChangeValue(ch, arena) newVal := newStorageChangeValue(ch, arena)
err = s.changesColl.Insert(tx.Context(), newVal) err = s.changesColl.Insert(tx.Context(), newVal)
arena.Reset() arena.Reset()
if err != nil && !errors.Is(err, anystore.ErrDocExists) { if err != nil && !errors.Is(err, anystore.ErrDocExists) {
tx.Rollback()
return err return err
} }
} }
@ -270,12 +273,7 @@ func (s *storage) AddAllNoError(ctx context.Context, changes []StorageChange, he
Heads: heads, Heads: heads,
CommonSnapshot: &commonSnapshot, CommonSnapshot: &commonSnapshot,
} }
err = s.headStorage.UpdateEntryTx(tx.Context(), update) return s.headStorage.UpdateEntryTx(tx.Context(), update)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
} }
func (s *storage) Delete(ctx context.Context) error { func (s *storage) Delete(ctx context.Context) error {

View file

@ -66,7 +66,9 @@ func Create(ctx context.Context, store anystore.DB, payload SpaceStorageCreatePa
} }
defer func() { defer func() {
if err != nil { if err != nil {
tx.Rollback() _ = tx.Rollback()
} else {
err = tx.Commit()
} }
}() }()
changesColl, err := store.Collection(tx.Context(), objecttree.CollName) changesColl, err := store.Collection(tx.Context(), objecttree.CollName)
@ -110,7 +112,7 @@ func Create(ctx context.Context, store anystore.DB, payload SpaceStorageCreatePa
headStorage: headStorage, headStorage: headStorage,
stateStorage: stateStorage, stateStorage: stateStorage,
aclStorage: aclStorage, aclStorage: aclStorage,
}, tx.Commit() }, nil
} }
func New(ctx context.Context, spaceId string, store anystore.DB) (SpaceStorage, error) { func New(ctx context.Context, spaceId string, store anystore.DB) (SpaceStorage, error) {