1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-07 21:37:04 +09:00

GO-3056: Gateway: add retry mechanism for file id resolving

This commit is contained in:
Sergey 2024-03-27 15:42:27 +01:00
parent 7a23998075
commit a2732b185b
No known key found for this signature in database
GPG key ID: 3B6BEF79160221C6
8 changed files with 383 additions and 44 deletions

View file

@ -94,3 +94,6 @@ packages:
github.com/anyproto/anytype-heart/core/payments/cache:
interfaces:
CacheService:
github.com/anyproto/anytype-heart/core/block/object/idresolver:
interfaces:
Resolver:

View file

@ -241,7 +241,7 @@ func Bootstrap(a *app.App, components ...app.Component) {
Register(space.New()).
Register(deletioncontroller.New()).
Register(invitestore.New()).
Register(fileobject.New()).
Register(fileobject.New(200*time.Millisecond, 500*time.Millisecond)).
Register(acl.New()).
Register(filesync.New()).
Register(builtintemplate.New()).

View file

@ -0,0 +1,183 @@
// Code generated by mockery. DO NOT EDIT.
package mock_idresolver
import (
app "github.com/anyproto/any-sync/app"
mock "github.com/stretchr/testify/mock"
)
// MockResolver is an autogenerated mock type for the Resolver type
type MockResolver struct {
mock.Mock
}
type MockResolver_Expecter struct {
mock *mock.Mock
}
func (_m *MockResolver) EXPECT() *MockResolver_Expecter {
return &MockResolver_Expecter{mock: &_m.Mock}
}
// Init provides a mock function with given fields: a
func (_m *MockResolver) Init(a *app.App) error {
ret := _m.Called(a)
if len(ret) == 0 {
panic("no return value specified for Init")
}
var r0 error
if rf, ok := ret.Get(0).(func(*app.App) error); ok {
r0 = rf(a)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockResolver_Init_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Init'
type MockResolver_Init_Call struct {
*mock.Call
}
// Init is a helper method to define mock.On call
// - a *app.App
func (_e *MockResolver_Expecter) Init(a interface{}) *MockResolver_Init_Call {
return &MockResolver_Init_Call{Call: _e.mock.On("Init", a)}
}
func (_c *MockResolver_Init_Call) Run(run func(a *app.App)) *MockResolver_Init_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*app.App))
})
return _c
}
func (_c *MockResolver_Init_Call) Return(err error) *MockResolver_Init_Call {
_c.Call.Return(err)
return _c
}
func (_c *MockResolver_Init_Call) RunAndReturn(run func(*app.App) error) *MockResolver_Init_Call {
_c.Call.Return(run)
return _c
}
// Name provides a mock function with given fields:
func (_m *MockResolver) Name() string {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Name")
}
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockResolver_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name'
type MockResolver_Name_Call struct {
*mock.Call
}
// Name is a helper method to define mock.On call
func (_e *MockResolver_Expecter) Name() *MockResolver_Name_Call {
return &MockResolver_Name_Call{Call: _e.mock.On("Name")}
}
func (_c *MockResolver_Name_Call) Run(run func()) *MockResolver_Name_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockResolver_Name_Call) Return(name string) *MockResolver_Name_Call {
_c.Call.Return(name)
return _c
}
func (_c *MockResolver_Name_Call) RunAndReturn(run func() string) *MockResolver_Name_Call {
_c.Call.Return(run)
return _c
}
// ResolveSpaceID provides a mock function with given fields: objectID
func (_m *MockResolver) ResolveSpaceID(objectID string) (string, error) {
ret := _m.Called(objectID)
if len(ret) == 0 {
panic("no return value specified for ResolveSpaceID")
}
var r0 string
var r1 error
if rf, ok := ret.Get(0).(func(string) (string, error)); ok {
return rf(objectID)
}
if rf, ok := ret.Get(0).(func(string) string); ok {
r0 = rf(objectID)
} else {
r0 = ret.Get(0).(string)
}
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(objectID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockResolver_ResolveSpaceID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ResolveSpaceID'
type MockResolver_ResolveSpaceID_Call struct {
*mock.Call
}
// ResolveSpaceID is a helper method to define mock.On call
// - objectID string
func (_e *MockResolver_Expecter) ResolveSpaceID(objectID interface{}) *MockResolver_ResolveSpaceID_Call {
return &MockResolver_ResolveSpaceID_Call{Call: _e.mock.On("ResolveSpaceID", objectID)}
}
func (_c *MockResolver_ResolveSpaceID_Call) Run(run func(objectID string)) *MockResolver_ResolveSpaceID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockResolver_ResolveSpaceID_Call) Return(_a0 string, _a1 error) *MockResolver_ResolveSpaceID_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockResolver_ResolveSpaceID_Call) RunAndReturn(run func(string) (string, error)) *MockResolver_ResolveSpaceID_Call {
_c.Call.Return(run)
return _c
}
// NewMockResolver creates a new instance of MockResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockResolver(t interface {
mock.TestingT
Cleanup(func())
}) *MockResolver {
mock := &MockResolver{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View file

@ -9,12 +9,14 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/avast/retry-go/v4"
"github.com/gogo/protobuf/types"
"github.com/ipfs/go-cid"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/editor/template"
"github.com/anyproto/anytype-heart/core/block/getblock"
"github.com/anyproto/anytype-heart/core/block/object/idresolver"
"github.com/anyproto/anytype-heart/core/block/object/objectcreator"
"github.com/anyproto/anytype-heart/core/block/object/payloadcreator"
"github.com/anyproto/anytype-heart/core/block/simple"
@ -39,7 +41,10 @@ import (
// TODO UNsugar
var log = logging.Logger("fileobject")
var ErrObjectNotFound = fmt.Errorf("file object not found")
var (
ErrObjectNotFound = fmt.Errorf("file object not found")
ErrEmptyFileId = fmt.Errorf("empty file id")
)
const CName = "fileobject"
@ -66,19 +71,28 @@ type objectCreatorService interface {
}
type service struct {
spaceService space.Service
objectCreator objectCreatorService
fileService files.Service
fileSync filesync.FileSync
fileStore filestore.FileStore
objectStore objectstore.ObjectStore
objectGetter getblock.ObjectGetter
spaceService space.Service
objectCreator objectCreatorService
fileService files.Service
fileSync filesync.FileSync
fileStore filestore.FileStore
objectStore objectstore.ObjectStore
spaceIdResolver idresolver.Resolver
indexer *indexer
resolverRetryStartDelay time.Duration
resolverRetryMaxDelay time.Duration
}
func New() Service {
return &service{}
func New(
resolverRetryStartDelay time.Duration,
resolverRetryMaxDelay time.Duration,
) Service {
return &service{
resolverRetryStartDelay: resolverRetryStartDelay,
resolverRetryMaxDelay: resolverRetryMaxDelay,
}
}
func (s *service) Name() string {
@ -92,7 +106,7 @@ func (s *service) Init(a *app.App) error {
s.fileSync = app.MustComponent[filesync.FileSync](a)
s.objectStore = app.MustComponent[objectstore.ObjectStore](a)
s.fileStore = app.MustComponent[filestore.FileStore](a)
s.objectGetter = app.MustComponent[getblock.ObjectGetter](a)
s.spaceIdResolver = app.MustComponent[idresolver.Resolver](a)
s.indexer = s.newIndexer()
return nil
}
@ -346,7 +360,7 @@ func (s *service) GetFileIdFromObject(objectId string) (domain.FullFileId, error
spaceId := pbtypes.GetString(details.Details, bundle.RelationKeySpaceId.String())
fileId := pbtypes.GetString(details.Details, bundle.RelationKeyFileId.String())
if fileId == "" {
return domain.FullFileId{}, fmt.Errorf("empty file hash")
return domain.FullFileId{}, ErrEmptyFileId
}
return domain.FullFileId{
SpaceId: spaceId,
@ -355,14 +369,23 @@ func (s *service) GetFileIdFromObject(objectId string) (domain.FullFileId, error
}
func (s *service) GetFileIdFromObjectWaitLoad(ctx context.Context, objectId string) (domain.FullFileId, error) {
var id domain.FullFileId
err := getblock.Do(s.objectGetter, objectId, func(sb smartblock.SmartBlock) error {
spaceId, err := s.resolveSpaceIdWithRetry(ctx, objectId)
if err != nil {
return domain.FullFileId{}, fmt.Errorf("resolve space id: %w", err)
}
spc, err := s.spaceService.Get(ctx, spaceId)
if err != nil {
return domain.FullFileId{}, fmt.Errorf("get space: %w", err)
}
id := domain.FullFileId{
SpaceId: spaceId,
}
err = spc.Do(objectId, func(sb smartblock.SmartBlock) error {
details := sb.Details()
id.FileId = domain.FileId(pbtypes.GetString(details, bundle.RelationKeyFileId.String()))
if id.FileId == "" {
return fmt.Errorf("empty file hash")
return ErrEmptyFileId
}
id.SpaceId = sb.SpaceID()
return nil
})
if err != nil {
@ -371,6 +394,28 @@ func (s *service) GetFileIdFromObjectWaitLoad(ctx context.Context, objectId stri
return id, nil
}
func (s *service) resolveSpaceIdWithRetry(ctx context.Context, objectId string) (string, error) {
_, err := cid.Decode(objectId)
if err != nil {
return "", fmt.Errorf("decode object id: %w", err)
}
if domain.IsFileId(objectId) {
return "", fmt.Errorf("object id is file cid")
}
spaceId, err := retry.DoWithData(func() (string, error) {
return s.spaceIdResolver.ResolveSpaceID(objectId)
},
retry.Context(ctx),
retry.Attempts(0),
retry.Delay(s.resolverRetryStartDelay),
retry.MaxDelay(s.resolverRetryMaxDelay),
retry.DelayType(retry.BackOffDelay),
retry.LastErrorOnly(true),
)
return spaceId, err
}
func (s *service) migrate(space clientspace.Space, objectId string, keys []*pb.ChangeFileKeys, fileId string, origin objectorigin.ObjectOrigin) string {
// Don't migrate empty or its own id
if fileId == "" || objectId == fileId {
@ -496,7 +541,7 @@ func (s *service) FileOffload(ctx context.Context, objectId string, includeNotPi
func (s *service) fileOffload(ctx context.Context, fileDetails *types.Struct, includeNotPinned bool) (uint64, error) {
fileId := pbtypes.GetString(fileDetails, bundle.RelationKeyFileId.String())
if fileId == "" {
return 0, fmt.Errorf("empty file hash")
return 0, ErrEmptyFileId
}
backupStatus := syncstatus.SyncStatus(pbtypes.GetInt64(fileDetails, bundle.RelationKeyFileBackupStatus.String()))
id := domain.FullFileId{

View file

@ -2,6 +2,7 @@ package fileobject
import (
"context"
"fmt"
"strings"
"testing"
"time"
@ -15,7 +16,9 @@ import (
"github.com/stretchr/testify/require"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock/smarttest"
"github.com/anyproto/anytype-heart/core/block/editor/state"
"github.com/anyproto/anytype-heart/core/block/object/idresolver/mock_idresolver"
"github.com/anyproto/anytype-heart/core/block/object/objectcreator"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/core/domain/objectorigin"
@ -35,31 +38,20 @@ import (
"github.com/anyproto/anytype-heart/space/mock_space"
bb "github.com/anyproto/anytype-heart/tests/blockbuilder"
"github.com/anyproto/anytype-heart/tests/testutil"
"github.com/anyproto/anytype-heart/util/mutex"
"github.com/anyproto/anytype-heart/util/pbtypes"
)
type fixture struct {
fileService files.Service
objectStore *objectstore.StoreFixture
objectCreator *objectCreatorStub
spaceService *mock_space.MockService
fileService files.Service
objectStore *objectstore.StoreFixture
objectCreator *objectCreatorStub
spaceService *mock_space.MockService
spaceIdResolver *mock_idresolver.MockResolver
*service
}
type dummyObjectGetter struct{}
func (g *dummyObjectGetter) Init(_ *app.App) error {
return nil
}
func (g *dummyObjectGetter) Name() string {
return "dummyObjectGetter"
}
func (g *dummyObjectGetter) GetObject(ctx context.Context, id string) (smartblock.SmartBlock, error) {
return nil, nil
}
const testResolveRetryDelay = 5 * time.Millisecond
func newFixture(t *testing.T) *fixture {
fileStore := filestore.New()
@ -74,9 +66,9 @@ func newFixture(t *testing.T) *fixture {
eventSender := mock_event.NewMockSender(t)
fileService := files.New()
spaceService := mock_space.NewMockService(t)
objectGetter := &dummyObjectGetter{}
spaceIdResolver := mock_idresolver.NewMockResolver(t)
svc := New()
svc := New(testResolveRetryDelay, testResolveRetryDelay)
ctx := context.Background()
a := new(app.App)
@ -92,7 +84,7 @@ func newFixture(t *testing.T) *fixture {
a.Register(fileService)
a.Register(objectCreator)
a.Register(svc)
a.Register(objectGetter)
a.Register(testutil.PrepareMock(ctx, a, spaceIdResolver))
err := a.Start(ctx)
require.NoError(t, err)
@ -102,10 +94,11 @@ func newFixture(t *testing.T) *fixture {
})
fx := &fixture{
fileService: fileService,
objectStore: objectStore,
objectCreator: objectCreator,
spaceService: spaceService,
fileService: fileService,
objectStore: objectStore,
objectCreator: objectCreator,
spaceService: spaceService,
spaceIdResolver: spaceIdResolver,
service: svc.(*service),
}
@ -359,3 +352,92 @@ func expectIndexerCalled(t *testing.T, fx *fixture, waitIndexerCh chan struct{},
fx.spaceService.EXPECT().Get(mock.Anything, mock.Anything).Return(space, nil)
}
const testFileObjectId = "bafyreiebxsn65332wl7qavcxxkfwnsroba5x5h2sshcn7f7cr66ztixb54"
func TestGetFileIdFromObjectWaitLoad(t *testing.T) {
t.Run("with invalid id expect error", func(t *testing.T) {
fx := newFixture(t)
_, err := fx.GetFileIdFromObjectWaitLoad(context.Background(), "invalid")
require.Error(t, err)
})
t.Run("with file id expect error", func(t *testing.T) {
fx := newFixture(t)
_, err := fx.GetFileIdFromObjectWaitLoad(context.Background(), testFileId.String())
require.Error(t, err)
})
t.Run("with not yet loaded object load object and when timed out expect return error", func(t *testing.T) {
fx := newFixture(t)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
fx.spaceIdResolver.EXPECT().ResolveSpaceID(testFileObjectId).Return("", fmt.Errorf("not yet resolved"))
_, err := fx.GetFileIdFromObjectWaitLoad(ctx, testFileObjectId)
require.Error(t, err)
require.ErrorIs(t, err, context.DeadlineExceeded)
})
t.Run("with not yet loaded object load object and return it's file id", func(t *testing.T) {
fx := newFixture(t)
ctx := context.Background()
spaceId := "spaceId"
resolvedSpace := mutex.NewValue("")
resolvedSpaceErr := mutex.NewValue(fmt.Errorf("not yet resolved"))
fx.spaceIdResolver.EXPECT().ResolveSpaceID(testFileObjectId).RunAndReturn(func(_ string) (string, error) {
return resolvedSpace.Get(), resolvedSpaceErr.Get()
})
go func() {
time.Sleep(3 * testResolveRetryDelay)
resolvedSpace.Set(spaceId)
resolvedSpaceErr.Set(nil)
}()
space := mock_clientspace.NewMockSpace(t)
space.EXPECT().Do(testFileObjectId, mock.Anything).RunAndReturn(func(_ string, apply func(smartblock.SmartBlock) error) error {
sb := smarttest.New(testFileObjectId)
st := sb.Doc.(*state.State)
st.SetDetailAndBundledRelation(bundle.RelationKeyFileId, pbtypes.String(testFileId.String()))
return apply(sb)
})
fx.spaceService.EXPECT().Get(ctx, spaceId).Return(space, nil)
id, err := fx.GetFileIdFromObjectWaitLoad(ctx, testFileObjectId)
require.NoError(t, err)
assert.Equal(t, domain.FullFileId{
SpaceId: spaceId,
FileId: testFileId,
}, id)
})
t.Run("with loaded object without file id expect error", func(t *testing.T) {
fx := newFixture(t)
ctx := context.Background()
spaceId := "spaceId"
fx.spaceIdResolver.EXPECT().ResolveSpaceID(testFileObjectId).Return(spaceId, nil)
space := mock_clientspace.NewMockSpace(t)
space.EXPECT().Do(testFileObjectId, mock.Anything).RunAndReturn(func(_ string, apply func(smartblock.SmartBlock) error) error {
sb := smarttest.New(testFileObjectId)
st := sb.Doc.(*state.State)
st.SetDetailAndBundledRelation(bundle.RelationKeyFileId, pbtypes.String(""))
return apply(sb)
})
fx.spaceService.EXPECT().Get(ctx, spaceId).Return(space, nil)
_, err := fx.GetFileIdFromObjectWaitLoad(ctx, testFileObjectId)
require.ErrorIs(t, err, ErrEmptyFileId)
})
}

1
go.mod
View file

@ -114,6 +114,7 @@ require (
github.com/anyproto/go-chash v0.1.0 // indirect
github.com/anyproto/go-slip10 v1.0.0 // indirect
github.com/anyproto/go-slip21 v1.0.0 // indirect
github.com/avast/retry-go/v4 v4.5.1 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect

4
go.sum
View file

@ -122,6 +122,10 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o=
github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc=
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=

View file

@ -7,3 +7,24 @@ func WithLock[T any](mutex sync.Locker, fun func() T) T {
defer mutex.Unlock()
return fun()
}
type Value[T any] struct {
lock sync.Mutex
value T
}
func NewValue[T any](value T) *Value[T] {
return &Value[T]{value: value}
}
func (v *Value[T]) Get() T {
v.lock.Lock()
defer v.lock.Unlock()
return v.value
}
func (v *Value[T]) Set(value T) {
v.lock.Lock()
defer v.lock.Unlock()
v.value = value
}