1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

move acl service from coordinator

This commit is contained in:
Sergey Cherepanov 2024-02-29 09:15:58 +01:00
parent d9830ae5cb
commit 92d3ba6237
No known key found for this signature in database
GPG key ID: 87F8EDE8FBDF637C
3 changed files with 334 additions and 0 deletions

104
acl/acl.go Normal file
View file

@ -0,0 +1,104 @@
package acl
import (
"context"
"time"
"github.com/prometheus/client_golang/prometheus"
commonaccount "github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/app/ocache"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/consensus/consensusclient"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/anyproto/any-sync/metric"
)
const CName = "coordinator.acl"
var log = logger.NewNamed(CName)
func New() Acl {
return &aclService{}
}
type Acl interface {
AddRecord(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (result *consensusproto.RawRecordWithId, err error)
RecordsAfter(ctx context.Context, spaceId, aclHead string) (result []*consensusproto.RawRecordWithId, err error)
app.ComponentRunnable
}
type aclService struct {
consService consensusclient.Service
cache ocache.OCache
accountService commonaccount.Service
}
func (as *aclService) Init(a *app.App) (err error) {
as.consService = app.MustComponent[consensusclient.Service](a)
as.accountService = app.MustComponent[commonaccount.Service](a)
var metricReg *prometheus.Registry
if m := a.Component(metric.CName); m != nil {
metricReg = m.(metric.Metric).Registry()
}
as.cache = ocache.New(as.loadObject,
ocache.WithTTL(5*time.Minute),
ocache.WithLogger(log.Sugar()),
ocache.WithPrometheus(metricReg, "coordinator", "acl"),
)
return
}
func (as *aclService) Name() (name string) {
return CName
}
func (as *aclService) loadObject(ctx context.Context, id string) (ocache.Object, error) {
return as.newAclObject(ctx, id)
}
func (as *aclService) get(ctx context.Context, spaceId string) (list.AclList, error) {
obj, err := as.cache.Get(ctx, spaceId)
if err != nil {
return nil, err
}
aObj := obj.(*aclObject)
aObj.lastUsage.Store(time.Now())
return aObj.AclList, nil
}
func (as *aclService) AddRecord(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (result *consensusproto.RawRecordWithId, err error) {
acl, err := as.get(ctx, spaceId)
if err != nil {
return nil, err
}
acl.RLock()
defer acl.RUnlock()
err = acl.ValidateRawRecord(rec, nil)
if err != nil {
return
}
return as.consService.AddRecord(ctx, spaceId, rec)
}
func (as *aclService) RecordsAfter(ctx context.Context, spaceId, aclHead string) (result []*consensusproto.RawRecordWithId, err error) {
acl, err := as.get(ctx, spaceId)
if err != nil {
return nil, err
}
acl.RLock()
defer acl.RUnlock()
return acl.RecordsAfter(ctx, aclHead)
}
func (as *aclService) Run(ctx context.Context) (err error) {
return
}
func (as *aclService) Close(ctx context.Context) (err error) {
return as.cache.Close()
}

136
acl/acl_test.go Normal file
View file

@ -0,0 +1,136 @@
package acl
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/accountdata"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/consensus/consensusclient"
"github.com/anyproto/any-sync/consensus/consensusclient/mock_consensusclient"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/anyproto/any-sync/testutil/accounttest"
)
var ctx = context.Background()
func TestAclService_AddRecord(t *testing.T) {
ownerKeys, err := accountdata.NewRandom()
require.NoError(t, err)
spaceId := "spaceId"
ownerAcl, err := list.NewTestDerivedAcl(spaceId, ownerKeys)
require.NoError(t, err)
inv, err := ownerAcl.RecordBuilder().BuildInvite()
require.NoError(t, err)
t.Run("success", func(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)
expRes := list.WrapAclRecord(inv.InviteRec)
var watcherCh = make(chan consensusclient.Watcher)
fx.consCl.EXPECT().Watch(spaceId, gomock.Any()).DoAndReturn(func(spaceId string, w consensusclient.Watcher) error {
go func() {
w.AddConsensusRecords([]*consensusproto.RawRecordWithId{
ownerAcl.Root(),
})
watcherCh <- w
}()
return nil
})
fx.consCl.EXPECT().AddRecord(ctx, spaceId, inv.InviteRec).Return(expRes, nil)
fx.consCl.EXPECT().UnWatch(spaceId)
res, err := fx.AddRecord(ctx, spaceId, inv.InviteRec)
assert.Equal(t, expRes, res)
assert.NoError(t, err)
w := <-watcherCh
w.AddConsensusRecords([]*consensusproto.RawRecordWithId{
expRes,
})
})
t.Run("error", func(t *testing.T) {
fx := newFixture(t)
defer fx.finish(t)
var testErr = errors.New("test")
fx.consCl.EXPECT().Watch(spaceId, gomock.Any()).DoAndReturn(func(spaceId string, w consensusclient.Watcher) error {
go func() {
w.AddConsensusError(testErr)
}()
return nil
})
fx.consCl.EXPECT().UnWatch(spaceId)
res, err := fx.AddRecord(ctx, spaceId, inv.InviteRec)
assert.Nil(t, res)
assert.EqualError(t, err, testErr.Error())
})
}
func TestAclService_RecordsAfter(t *testing.T) {
ownerKeys, err := accountdata.NewRandom()
require.NoError(t, err)
spaceId := "spaceId"
ownerAcl, err := list.NewTestDerivedAcl(spaceId, ownerKeys)
require.NoError(t, err)
fx := newFixture(t)
defer fx.finish(t)
fx.consCl.EXPECT().Watch(spaceId, gomock.Any()).DoAndReturn(func(spaceId string, w consensusclient.Watcher) error {
go func() {
w.AddConsensusRecords([]*consensusproto.RawRecordWithId{
ownerAcl.Root(),
})
}()
return nil
})
fx.consCl.EXPECT().UnWatch(spaceId)
res, err := fx.RecordsAfter(ctx, spaceId, "")
require.NoError(t, err)
assert.Len(t, res, 1)
}
func newFixture(t *testing.T) *fixture {
ctrl := gomock.NewController(t)
fx := &fixture{
a: new(app.App),
ctrl: ctrl,
consCl: mock_consensusclient.NewMockService(ctrl),
Acl: New(),
}
fx.consCl.EXPECT().Name().Return(consensusclient.CName).AnyTimes()
fx.consCl.EXPECT().Init(gomock.Any()).AnyTimes()
fx.consCl.EXPECT().Run(gomock.Any()).AnyTimes()
fx.consCl.EXPECT().Close(gomock.Any()).AnyTimes()
fx.a.Register(fx.consCl).Register(fx.Acl).Register(&accounttest.AccountTestService{})
require.NoError(t, fx.a.Start(ctx))
return fx
}
type fixture struct {
a *app.App
ctrl *gomock.Controller
consCl *mock_consensusclient.MockService
Acl
}
func (fx *fixture) finish(t *testing.T) {
require.NoError(t, fx.a.Close(ctx))
fx.ctrl.Finish()
}

94
acl/object.go Normal file
View file

@ -0,0 +1,94 @@
package acl
import (
"context"
"slices"
"sync"
"time"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/acl/liststorage"
"github.com/anyproto/any-sync/consensus/consensusproto"
"go.uber.org/atomic"
"go.uber.org/zap"
)
func (as *aclService) newAclObject(ctx context.Context, id string) (*aclObject, error) {
obj := &aclObject{
id: id,
aclService: as,
ready: make(chan struct{}),
}
if err := as.consService.Watch(id, obj); err != nil {
return nil, err
}
select {
case <-obj.ready:
if obj.consErr != nil {
_ = as.consService.UnWatch(id)
return nil, obj.consErr
}
return obj, nil
case <-ctx.Done():
_ = as.consService.UnWatch(id)
return nil, ctx.Err()
}
}
type aclObject struct {
id string
aclService *aclService
store liststorage.ListStorage
list.AclList
ready chan struct{}
consErr error
lastUsage atomic.Time
mu sync.Mutex
}
func (a *aclObject) AddConsensusRecords(recs []*consensusproto.RawRecordWithId) {
a.mu.Lock()
defer a.mu.Unlock()
slices.Reverse(recs)
if a.store == nil {
defer close(a.ready)
if a.store, a.consErr = liststorage.NewInMemoryAclListStorage(a.id, recs); a.consErr != nil {
return
}
if a.AclList, a.consErr = list.BuildAclListWithIdentity(a.aclService.accountService.Account(), a.store, list.NoOpAcceptorVerifier{}); a.consErr != nil {
return
}
} else {
a.Lock()
defer a.Unlock()
if err := a.AddRawRecords(recs); err != nil {
log.Warn("unable to add consensus records", zap.Error(err), zap.String("spaceId", a.id))
return
}
}
}
func (a *aclObject) AddConsensusError(err error) {
a.mu.Lock()
defer a.mu.Unlock()
if a.store == nil {
a.consErr = err
close(a.ready)
} else {
log.Warn("got consensus error", zap.Error(err))
}
}
func (a *aclObject) Close() (err error) {
return a.aclService.consService.UnWatch(a.id)
}
func (a *aclObject) TryClose(objectTTL time.Duration) (res bool, err error) {
if a.lastUsage.Load().Before(time.Now().Add(-objectTTL)) {
return true, a.Close()
}
return false, nil
}