1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00
any-sync/commonspace/deletionstate/deletionstate.go
2024-12-09 08:45:35 +01:00

163 lines
3.9 KiB
Go

//go:generate mockgen -destination mock_deletionstate/mock_deletionstate.go github.com/anyproto/any-sync/commonspace/deletionstate ObjectDeletionState
package deletionstate
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/headsync/headstorage"
"github.com/anyproto/any-sync/commonspace/spacestorage"
)
var log = logger.NewNamed(CName)
const CName = "common.commonspace.deletionstate"
type StateUpdateObserver func(ids []string)
type ObjectDeletionState interface {
app.Component
AddObserver(observer StateUpdateObserver)
Add(ids map[string]struct{})
GetQueued() (ids []string)
Delete(id string) (err error)
Exists(id string) bool
Filter(ids []string) (filtered []string)
}
const setTimeout = 5 * time.Second
type objectDeletionState struct {
sync.RWMutex
log logger.CtxLogger
queued map[string]struct{}
deleted map[string]struct{}
stateUpdateObservers []StateUpdateObserver
storage headstorage.HeadStorage
}
func (st *objectDeletionState) Run(ctx context.Context) (err error) {
return st.storage.IterateEntries(ctx, headstorage.IterOpts{Deleted: true}, func(entry headstorage.HeadsEntry) (bool, error) {
switch entry.DeletedStatus {
case headstorage.DeletedStatusQueued:
st.queued[entry.Id] = struct{}{}
case headstorage.DeletedStatusDeleted:
st.deleted[entry.Id] = struct{}{}
default:
}
return true, nil
})
}
func (st *objectDeletionState) Close(ctx context.Context) (err error) {
return nil
}
func (st *objectDeletionState) Init(a *app.App) (err error) {
st.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage).HeadStorage()
return nil
}
func (st *objectDeletionState) Name() (name string) {
return CName
}
func New() ObjectDeletionState {
return &objectDeletionState{
log: log,
queued: map[string]struct{}{},
deleted: map[string]struct{}{},
}
}
func (st *objectDeletionState) AddObserver(observer StateUpdateObserver) {
st.Lock()
defer st.Unlock()
st.stateUpdateObservers = append(st.stateUpdateObservers, observer)
}
func (st *objectDeletionState) Add(ids map[string]struct{}) {
var added []string
st.Lock()
defer func() {
st.Unlock()
for _, ob := range st.stateUpdateObservers {
ob(added)
}
}()
for id := range ids {
if _, exists := st.deleted[id]; exists {
continue
}
if _, exists := st.queued[id]; exists {
continue
}
err := st.updateStatus(id, headstorage.DeletedStatusQueued)
if err != nil {
st.log.Warn("failed to set deleted status", zap.String("treeId", id), zap.Error(err))
continue
}
st.queued[id] = struct{}{}
added = append(added, id)
}
}
func (st *objectDeletionState) GetQueued() (ids []string) {
st.RLock()
defer st.RUnlock()
ids = make([]string, 0, len(st.queued))
for id := range st.queued {
ids = append(ids, id)
}
return
}
func (st *objectDeletionState) updateStatus(id string, status headstorage.DeletedStatus) (err error) {
ctx, cancel := context.WithTimeout(context.Background(), setTimeout)
defer cancel()
return st.storage.UpdateEntry(ctx, headstorage.HeadsUpdate{
Id: id,
DeletedStatus: &status,
})
}
func (st *objectDeletionState) Delete(id string) (err error) {
st.Lock()
defer st.Unlock()
delete(st.queued, id)
st.deleted[id] = struct{}{}
return st.updateStatus(id, headstorage.DeletedStatusDeleted)
}
func (st *objectDeletionState) Exists(id string) bool {
st.RLock()
defer st.RUnlock()
return st.exists(id)
}
func (st *objectDeletionState) Filter(ids []string) (filtered []string) {
st.RLock()
defer st.RUnlock()
for _, id := range ids {
if !st.exists(id) {
filtered = append(filtered, id)
}
}
return
}
func (st *objectDeletionState) exists(id string) bool {
if _, exists := st.deleted[id]; exists {
return true
}
if _, exists := st.queued[id]; exists {
return true
}
return false
}