1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-09 17:44:59 +09:00

GO-4182 reindex prioritisation queue

This commit is contained in:
Roman Khafizianov 2024-10-04 17:18:41 +02:00
parent ee9ca0e6ab
commit 95da8f56cc
No known key found for this signature in database
GPG key ID: F07A7D55A2684852
15 changed files with 1419 additions and 196 deletions

View file

@ -17,8 +17,6 @@ import (
session "github.com/anyproto/anytype-heart/core/session"
smartblock "github.com/anyproto/anytype-heart/core/block/editor/smartblock"
types "github.com/gogo/protobuf/types"
)
@ -35,124 +33,6 @@ func (_m *MockService) EXPECT() *MockService_Expecter {
return &MockService_Expecter{mock: &_m.Mock}
}
// GetObject provides a mock function with given fields: ctx, objectID
func (_m *MockService) GetObject(ctx context.Context, objectID string) (smartblock.SmartBlock, error) {
ret := _m.Called(ctx, objectID)
if len(ret) == 0 {
panic("no return value specified for GetObject")
}
var r0 smartblock.SmartBlock
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (smartblock.SmartBlock, error)); ok {
return rf(ctx, objectID)
}
if rf, ok := ret.Get(0).(func(context.Context, string) smartblock.SmartBlock); ok {
r0 = rf(ctx, objectID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(smartblock.SmartBlock)
}
}
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, objectID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockService_GetObject_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetObject'
type MockService_GetObject_Call struct {
*mock.Call
}
// GetObject is a helper method to define mock.On call
// - ctx context.Context
// - objectID string
func (_e *MockService_Expecter) GetObject(ctx interface{}, objectID interface{}) *MockService_GetObject_Call {
return &MockService_GetObject_Call{Call: _e.mock.On("GetObject", ctx, objectID)}
}
func (_c *MockService_GetObject_Call) Run(run func(ctx context.Context, objectID string)) *MockService_GetObject_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *MockService_GetObject_Call) Return(sb smartblock.SmartBlock, err error) *MockService_GetObject_Call {
_c.Call.Return(sb, err)
return _c
}
func (_c *MockService_GetObject_Call) RunAndReturn(run func(context.Context, string) (smartblock.SmartBlock, error)) *MockService_GetObject_Call {
_c.Call.Return(run)
return _c
}
// GetObjectByFullID provides a mock function with given fields: ctx, id
func (_m *MockService) GetObjectByFullID(ctx context.Context, id domain.FullID) (smartblock.SmartBlock, error) {
ret := _m.Called(ctx, id)
if len(ret) == 0 {
panic("no return value specified for GetObjectByFullID")
}
var r0 smartblock.SmartBlock
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, domain.FullID) (smartblock.SmartBlock, error)); ok {
return rf(ctx, id)
}
if rf, ok := ret.Get(0).(func(context.Context, domain.FullID) smartblock.SmartBlock); ok {
r0 = rf(ctx, id)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(smartblock.SmartBlock)
}
}
if rf, ok := ret.Get(1).(func(context.Context, domain.FullID) error); ok {
r1 = rf(ctx, id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockService_GetObjectByFullID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetObjectByFullID'
type MockService_GetObjectByFullID_Call struct {
*mock.Call
}
// GetObjectByFullID is a helper method to define mock.On call
// - ctx context.Context
// - id domain.FullID
func (_e *MockService_Expecter) GetObjectByFullID(ctx interface{}, id interface{}) *MockService_GetObjectByFullID_Call {
return &MockService_GetObjectByFullID_Call{Call: _e.mock.On("GetObjectByFullID", ctx, id)}
}
func (_c *MockService_GetObjectByFullID_Call) Run(run func(ctx context.Context, id domain.FullID)) *MockService_GetObjectByFullID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(domain.FullID))
})
return _c
}
func (_c *MockService_GetObjectByFullID_Call) Return(sb smartblock.SmartBlock, err error) *MockService_GetObjectByFullID_Call {
_c.Call.Return(sb, err)
return _c
}
func (_c *MockService_GetObjectByFullID_Call) RunAndReturn(run func(context.Context, domain.FullID) (smartblock.SmartBlock, error)) *MockService_GetObjectByFullID_Call {
_c.Call.Return(run)
return _c
}
// Init provides a mock function with given fields: a
func (_m *MockService) Init(a *app.App) error {
ret := _m.Called(a)

View file

@ -33,6 +33,7 @@ import (
"github.com/anyproto/anytype-heart/tests/blockbuilder"
"github.com/anyproto/anytype-heart/tests/testutil"
"github.com/anyproto/anytype-heart/util/pbtypes"
"github.com/anyproto/anytype-heart/util/taskmanager"
)
type IndexerFixture struct {
@ -65,6 +66,7 @@ func NewIndexerFixture(t *testing.T) *IndexerFixture {
testApp.Register(objectStore.FTSearch())
indxr := &indexer{}
indxr.spaceReindexQueue = taskmanager.NewTasksManager(1, indxr.taskPrioritySorter)
indexerFx := &IndexerFixture{
indexer: indxr,

View file

@ -32,6 +32,7 @@ import (
"github.com/anyproto/anytype-heart/space/clientspace"
"github.com/anyproto/anytype-heart/space/spacecore/storage"
"github.com/anyproto/anytype-heart/util/pbtypes"
"github.com/anyproto/anytype-heart/util/taskmanager"
)
const (
@ -82,7 +83,9 @@ type indexer struct {
spacesPrioritySubscription *syncsubscriptions.ObjectSubscription[*types.Struct]
lock sync.Mutex
reindexLogFields []zap.Field
techSpaceId techSpaceIdGetter
spacesPriority []string
spaceReindexQueue *taskmanager.TasksManager
}
func (i *indexer) Init(a *app.App) (err error) {
@ -97,6 +100,8 @@ func (i *indexer) Init(a *app.App) (err error) {
i.forceFt = make(chan struct{})
i.config = app.MustComponent[*config.Config](a)
i.subscriptionService = app.MustComponent[subscription.Service](a)
i.spaceReindexQueue = taskmanager.NewTasksManager(1, i.reindexTasksSorter)
i.techSpaceId = app.MustComponent[techSpaceIdGetter](a)
i.componentCtx, i.componentCancel = context.WithCancel(context.Background())
return
}
@ -110,6 +115,7 @@ func (i *indexer) Run(context.Context) (err error) {
if err != nil {
return
}
go i.spaceReindexQueue.Run(i.componentCtx)
return i.StartFullTextIndex()
}
@ -124,6 +130,7 @@ func (i *indexer) StartFullTextIndex() (err error) {
func (i *indexer) Close(ctx context.Context) (err error) {
i.componentCancel()
i.spacesPrioritySubscription.Close()
i.spaceReindexQueue.WaitAndClose()
// we need to wait for the ftQueue processing to be finished gracefully. Because we may be in the middle of badger transaction
<-i.ftQueueFinished
return nil
@ -297,9 +304,15 @@ func (i *indexer) subscribeToSpaces() error {
}
func (i *indexer) spacesPriorityUpdate(priority []string) {
techSpaceId := i.techSpaceId.TechSpaceId()
i.lock.Lock()
defer i.lock.Unlock()
i.spacesPriority = priority
i.spacesPriority = append([]string{techSpaceId}, slices.DeleteFunc(priority, func(s string) bool {
return s == techSpaceId
})...)
i.lock.Unlock()
// notify reindex queue to refresh priority
i.spaceReindexQueue.RefreshPriority()
}
func (i *indexer) spacesPriorityGet() []string {

View file

@ -14,7 +14,6 @@ import (
"go.uber.org/zap"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/object/objectcache"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/core/syncstatus/detailsupdater/helper"
"github.com/anyproto/anytype-heart/metrics"
@ -136,9 +135,7 @@ func (i *indexer) ReindexSpace(space clientspace.Space) (err error) {
return fmt.Errorf("remove old files: %w", err)
}
ctx := objectcache.CacheOptsWithRemoteLoadDisabled(context.Background())
// for all ids except home and archive setting cache timeout for reindexing
// ctx = context.WithValue(ctx, ocache.CacheTimeout, cacheTimeout)
if flags.objects {
types := []coresb.SmartBlockType{
// System types first
@ -146,60 +143,40 @@ func (i *indexer) ReindexSpace(space clientspace.Space) (err error) {
coresb.SmartBlockTypeRelation,
coresb.SmartBlockTypeRelationOption,
coresb.SmartBlockTypeFileObject,
// todo: fix participants and other static sources reindex logic
coresb.SmartBlockTypePage,
coresb.SmartBlockTypeTemplate,
coresb.SmartBlockTypeArchive,
coresb.SmartBlockTypeHome,
coresb.SmartBlockTypeWorkspace,
coresb.SmartBlockTypeSpaceView,
coresb.SmartBlockTypeWidget,
coresb.SmartBlockTypeProfilePage,
}
ids, err := i.getIdsForTypes(space, types...)
if err != nil {
return err
}
start := time.Now()
successfullyReindexed := i.reindexIdsIgnoreErr(ctx, space, ids...)
i.logFinishedReindexStat(metrics.ReindexTypeThreads, len(ids), successfullyReindexed, time.Since(start))
l := log.With(zap.String("space", space.Id()), zap.Int("total", len(ids)), zap.Int("succeed", successfullyReindexed))
if successfullyReindexed != len(ids) {
l.Errorf("reindex partially failed")
} else {
l.Infof("reindex finished")
err = i.store.DeleteLastIndexedHeadHash(ids...)
if err != nil {
return fmt.Errorf("delete last indexed head hash: %w", err)
}
} else {
if flags.fileObjects {
err := i.reindexIDsForSmartblockTypes(ctx, space, metrics.ReindexTypeFiles, coresb.SmartBlockTypeFileObject)
ids, err := i.getIdsForTypes(space, coresb.SmartBlockTypeFileObject)
if err != nil {
return fmt.Errorf("reindex file objects: %w", err)
return err
}
err = i.store.DeleteLastIndexedHeadHash(ids...)
if err != nil {
return fmt.Errorf("delete last indexed head hash: %w", err)
}
}
// Index objects that updated, but not indexed yet
// we can have objects which actual state is newer than the indexed one
// this may happen e.g. if the app got closed in the middle of object updates processing
// So here we reindexOutdatedObjects which compare the last indexed heads hash with the actual one
go func() {
start := time.Now()
total, success, err := i.reindexOutdatedObjects(ctx, space)
if err != nil {
log.Errorf("reindex outdated failed: %s", err)
}
l := log.With(zap.String("space", space.Id()), zap.Int("total", total), zap.Int("succeed", success), zap.Int("spentMs", int(time.Since(start).Milliseconds())))
if success != total {
l.Errorf("reindex outdated partially failed")
} else {
l.Debugf("reindex outdated finished")
}
if total > 0 {
i.logFinishedReindexStat(metrics.ReindexTypeOutdatedHeads, total, success, time.Since(start))
}
}()
}
// add the task to recheck all the stored objects indexed heads and reindex if outdated
i.reindexAddSpaceTask(space)
if flags.deletedObjects {
err = i.reindexDeletedObjects(space)
if err != nil {
@ -476,43 +453,6 @@ func (i *indexer) reindexIDs(ctx context.Context, space smartblock.Space, reinde
return nil
}
func (i *indexer) reindexOutdatedObjects(ctx context.Context, space clientspace.Space) (toReindex, success int, err error) {
tids := space.StoredIds()
var idsToReindex []string
for _, tid := range tids {
logErr := func(err error) {
log.With("tree", tid).Errorf("reindexOutdatedObjects failed to get tree to reindex: %s", err)
}
lastHash, err := i.store.GetLastIndexedHeadsHash(tid)
if err != nil {
logErr(err)
continue
}
info, err := space.Storage().TreeStorage(tid)
if err != nil {
logErr(err)
continue
}
heads, err := info.Heads()
if err != nil {
logErr(err)
continue
}
hh := headsHash(heads)
if lastHash != hh {
if lastHash != "" {
log.With("tree", tid).Warnf("not equal indexed heads hash: %s!=%s (%d logs)", lastHash, hh, len(heads))
}
idsToReindex = append(idsToReindex, tid)
}
}
success = i.reindexIdsIgnoreErr(ctx, space, idsToReindex...)
return len(idsToReindex), success, nil
}
func (i *indexer) reindexDoc(ctx context.Context, space smartblock.Space, id string) error {
return space.Do(id, func(sb smartblock.SmartBlock) error {
return i.Index(ctx, sb.GetDocInfo())

View file

@ -3,6 +3,7 @@ package indexer
import (
"context"
"testing"
"time"
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacestorage/mock_spacestorage"
@ -155,6 +156,8 @@ func TestReindexDeletedObjects(t *testing.T) {
)
fx := NewIndexerFixture(t)
go fx.spaceReindexQueue.Run(context.Background())
defer fx.spaceReindexQueue.WaitAndClose()
fx.objectStore.AddObjects(t, []objectstore.TestObject{
{
bundle.RelationKeyId: pbtypes.String("1"),
@ -212,8 +215,10 @@ func TestReindexDeletedObjects(t *testing.T) {
fx.sourceFx.EXPECT().IDsListerBySmartblockType(mock.Anything, mock.Anything).Return(idsLister{Ids: []string{}}, nil)
err = fx.ReindexSpace(space2)
fx.spaceReindexQueue.RefreshPriority()
require.NoError(t, err)
time.Sleep(4 * time.Second)
sums, err := fx.objectStore.GetChecksums(spaceId2)
require.NoError(t, err)

View file

@ -0,0 +1,203 @@
package indexer
import (
"context"
"errors"
"fmt"
"slices"
"sort"
"strconv"
"strings"
"time"
"go.uber.org/zap"
coresb "github.com/anyproto/anytype-heart/pkg/lib/core/smartblock"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
"github.com/anyproto/anytype-heart/space/clientspace"
"github.com/anyproto/anytype-heart/util/slice"
"github.com/anyproto/anytype-heart/util/taskmanager"
)
const (
reindexTimeoutFirstAttempt = time.Second * 10 // next attempts will be increased by 2 times
taskRetrySeparator = "#"
)
func (i *indexer) getSpacesPriority() []string {
i.lock.Lock()
defer i.lock.Unlock()
return i.spacesPriority
}
// reindexAddTask reindex all objects in the space that have outdated head hashes
func (i *indexer) reindexAddSpaceTask(space clientspace.Space) {
task := i.reindexNewTask(space, 0)
i.spaceReindexQueue.AddTask(task)
go i.reindexWatchTaskAndRetry(task)
}
func (i *indexer) reindexWatchTaskAndRetry(task *reindexTask) {
for {
result, err := task.WaitResult(i.componentCtx)
l := log.With("hadTimeouts", task.hadTimeouts).With("spaceId", task.space.Id()).With("tryNumber", task.tryNumber).With("total", task.total).With("invalidated", task.invalidated).With("succeed", task.success)
if err != nil {
l.Error("reindex failed", zap.Error(err))
break
} else {
l = l.With("spentWorkMs", int(result.WorkTime.Milliseconds())).With("spentTotalMs", int(result.FinishTime.Sub(result.StartTime).Milliseconds()))
if task.invalidated-task.success > 0 {
l.Warn("reindex finished not fully")
if task.hadTimeouts {
// reschedule timeouted space task
// it will be executed after all tasks with previous tryNumber are finished
task = i.reindexNewTask(task.space, task.tryNumber+1)
i.spaceReindexQueue.AddTask(task)
} else {
break
}
} else {
if task.total > 0 {
l.Warn("reindex finished")
}
break
}
}
}
}
func (i *indexer) reindexNewTask(space clientspace.Space, tryNumber int) *reindexTask {
taskId := fmt.Sprintf("%s%s%d", space.Id(), taskRetrySeparator, tryNumber)
return &reindexTask{
TaskBase: taskmanager.NewTaskBase(taskId),
space: space,
store: i.store,
indexer: i,
tryNumber: tryNumber,
}
}
// taskPrioritySorter sort taskIds
// - first by the number of the try (0, 1, 2, ...)
// - then by the space priority. if space priority is not set for the space, it put to the end
func (i *indexer) reindexTasksSorter(taskIds []string) []string {
priority := i.getSpacesPriority()
// Sort the filtered task IDs based on retry attempts and space priority
sort.Slice(taskIds, func(a, b int) bool {
spaceA, tryA := reindexTaskId(taskIds[a]).Parse()
spaceB, tryB := reindexTaskId(taskIds[b]).Parse()
// First, sort by retry attempts (lower retries have higher priority)
if tryA != tryB {
return tryA < tryB
}
// Then, sort by the index in spacesPriority (earlier spaces have higher priority)
indexA := slices.Index(priority, spaceA)
indexB := slices.Index(priority, spaceB)
if indexA == -1 && indexB == -1 {
// to make it stable
return spaceA < spaceB
}
if indexA == -1 {
return false
}
if indexB == -1 {
return true
}
return indexA < indexB
})
return taskIds
}
type reindexTask struct {
taskmanager.TaskBase
space clientspace.Space
store objectstore.ObjectStore
indexer *indexer
total int
invalidated int
success int
tryNumber int
hadTimeouts bool
}
func (t *reindexTask) Timeout() time.Duration {
return reindexTimeoutFirstAttempt * time.Duration(1<<t.tryNumber)
}
func (t *reindexTask) Run(ctx context.Context) error {
objectIds := t.space.StoredIds()
var err error
t.total = len(objectIds)
// priorities indexing of system objects
priorityIds, err := t.indexer.getIdsForTypes(t.space, coresb.SmartBlockTypeObjectType, coresb.SmartBlockTypeRelation, coresb.SmartBlockTypeParticipant)
if err != nil {
log.Errorf("reindexOutdatedObjects failed to get priority ids: %s", err)
} else {
objectIds = append(priorityIds, slice.Difference(objectIds, priorityIds)...)
}
// todo: query lastIndexedHeadHashes for all tids
for _, objectId := range objectIds {
err = t.WaitIfPaused(ctx)
if err != nil {
return err
}
logErr := func(err error) {
log.With("objectId", objectId).Errorf("reindexOutdatedObjects failed to get tree to reindex: %s", err)
}
lastHash, err := t.store.GetLastIndexedHeadsHash(objectId)
if err != nil {
logErr(err)
continue
}
info, err := t.space.Storage().TreeStorage(objectId)
if err != nil {
logErr(err)
continue
}
heads, err := info.Heads()
if err != nil {
logErr(err)
continue
}
hh := headsHash(heads)
if lastHash == hh {
continue
}
if lastHash != "" {
log.With("objectId", objectId).Warnf("not equal indexed heads hash: %s!=%s (%d logs)", lastHash, hh, len(heads))
}
t.invalidated++
indexTimeout, cancel := context.WithTimeout(ctx, t.Timeout())
err = t.indexer.reindexDoc(indexTimeout, t.space, objectId)
cancel()
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
t.hadTimeouts = true
}
logErr(err)
continue
}
t.success++
}
return nil
}
type reindexTaskId string
func (t reindexTaskId) Parse() (spaceId string, try int) {
s := strings.Split(string(t), taskRetrySeparator)
if len(s) == 1 {
return s[0], 0
}
retry, _ := strconv.ParseInt(s[1], 10, 64)
return s[0], int(retry)
}

View file

@ -0,0 +1,172 @@
package indexer
import "testing"
func slicesEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
func TestReindexTaskIdParse(t *testing.T) {
testCases := []struct {
input string
expectedSpaceId string
expectedRetryNum int
}{
{"space1", "space1", 0},
{"space1#1", "space1", 1},
{"space2#2", "space2", 2},
{"space3#abc", "space3", 0}, // Invalid retry number
{"space4#", "space4", 0}, // Empty retry number
{"", "", 0},
}
for _, tc := range testCases {
spaceId, retryNum := reindexTaskId(tc.input).Parse()
if spaceId != tc.expectedSpaceId || retryNum != tc.expectedRetryNum {
t.Errorf("Parse(%q) = (%q, %d); want (%q, %d)", tc.input, spaceId, retryNum, tc.expectedSpaceId, tc.expectedRetryNum)
}
}
}
func TestTaskPrioritySorter(t *testing.T) {
i := &indexer{
spacesPriority: []string{"space1", "space2", "space3"},
}
testCases := []struct {
name string
taskIds []string
expected []string
}{
{
name: "Different retry attempts",
taskIds: []string{
"space1#2",
"space2#1",
"space3#3",
"space4#0",
"space5#1",
},
expected: []string{
"space4#0", // try=0, space not in priority list
"space2#1", // try=1, index=1
"space5#1", // try=1, index=-1 (space5 not in priority)
"space1#2", // try=2, index=0
"space3#3", // try=3, index=2
},
},
{
name: "Same retry attempts, different priorities",
taskIds: []string{
"space3#1",
"space1#1",
"space4#1",
"space2#1",
},
expected: []string{
"space1#1", // index=0
"space2#1", // index=1
"space3#1", // index=2
"space4#1", // index=-1
},
},
{
name: "Spaces not in priority list",
taskIds: []string{
"space4#0",
"space5#0",
"space6#0",
},
expected: []string{
"space4#0",
"space5#0",
"space6#0",
}, // Should be sorted alphabetically among themselves
},
{
name: "Mixed retry attempts and priorities",
taskIds: []string{
"space2#0",
"space4#0",
"space1#1",
"space5#1",
"space3#0",
},
expected: []string{
"space2#0", // try=0, index=1
"space3#0", // try=0, index=2
"space4#0", // try=0, index=-1
"space1#1", // try=1, index=0
"space5#1", // try=1, index=-1
},
},
{
name: "Tasks without retries",
taskIds: []string{
"space3",
"space2",
"space4",
"space1",
},
expected: []string{
"space1", // try=0, index=0
"space2", // try=0, index=1
"space3", // try=0, index=2
"space4", // try=0, index=-1
},
},
{
name: "Equal tries and no priority",
taskIds: []string{
"space4#1",
"space5#1",
"space6#1",
},
expected: []string{
"space4#1",
"space5#1",
"space6#1",
}, // Should be sorted alphabetically among themselves
},
{
name: "Complex mix",
taskIds: []string{
"space3#2",
"space1#0",
"space4#0",
"space2#2",
"space5#1",
"space2#0",
"space1#1",
},
expected: []string{
"space1#0", // try=0, index=0
"space2#0", // try=0, index=1
"space4#0", // try=0, index=-1
"space1#1", // try=1, index=0
"space5#1", // try=1, index=-1
"space2#2", // try=2, index=1
"space3#2", // try=2, index=2
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
taskIdsCopy := make([]string, len(tc.taskIds))
copy(taskIdsCopy, tc.taskIds)
i.reindexTasksSorter(taskIdsCopy)
if !slicesEqual(taskIdsCopy, tc.expected) {
t.Errorf("taskPrioritySorter(%v) = %v; want %v", tc.taskIds, taskIdsCopy, tc.expected)
}
})
}
}

View file

@ -163,3 +163,19 @@ func (s *dsObjectStore) SaveLastIndexedHeadsHash(id string, headsHash string) er
}))
return err
}
func (s *dsObjectStore) DeleteLastIndexedHeadHash(ids ...string) error {
txn, err := s.headsState.WriteTx(s.componentCtx)
if err != nil {
return fmt.Errorf("start write tx: %w", err)
}
for _, id := range ids {
err = s.headsState.DeleteId(txn.Context(), id)
if err != nil && !errors.Is(err, anystore.ErrDocNotFound) {
return errors.Join(txn.Rollback(), fmt.Errorf("delete id: %w", err))
}
}
return txn.Commit()
}

View file

@ -252,6 +252,65 @@ func (_c *MockObjectStore_DeleteDetails_Call) RunAndReturn(run func(...string) e
return _c
}
// DeleteLastIndexedHeadHash provides a mock function with given fields: ids
func (_m *MockObjectStore) DeleteLastIndexedHeadHash(ids ...string) error {
_va := make([]interface{}, len(ids))
for _i := range ids {
_va[_i] = ids[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for DeleteLastIndexedHeadHash")
}
var r0 error
if rf, ok := ret.Get(0).(func(...string) error); ok {
r0 = rf(ids...)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockObjectStore_DeleteLastIndexedHeadHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteLastIndexedHeadHash'
type MockObjectStore_DeleteLastIndexedHeadHash_Call struct {
*mock.Call
}
// DeleteLastIndexedHeadHash is a helper method to define mock.On call
// - ids ...string
func (_e *MockObjectStore_Expecter) DeleteLastIndexedHeadHash(ids ...interface{}) *MockObjectStore_DeleteLastIndexedHeadHash_Call {
return &MockObjectStore_DeleteLastIndexedHeadHash_Call{Call: _e.mock.On("DeleteLastIndexedHeadHash",
append([]interface{}{}, ids...)...)}
}
func (_c *MockObjectStore_DeleteLastIndexedHeadHash_Call) Run(run func(ids ...string)) *MockObjectStore_DeleteLastIndexedHeadHash_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]string, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(string)
}
}
run(variadicArgs...)
})
return _c
}
func (_c *MockObjectStore_DeleteLastIndexedHeadHash_Call) Return(err error) *MockObjectStore_DeleteLastIndexedHeadHash_Call {
_c.Call.Return(err)
return _c
}
func (_c *MockObjectStore_DeleteLastIndexedHeadHash_Call) RunAndReturn(run func(...string) error) *MockObjectStore_DeleteLastIndexedHeadHash_Call {
_c.Call.Return(run)
return _c
}
// DeleteLinks provides a mock function with given fields: id
func (_m *MockObjectStore) DeleteLinks(id ...string) error {
_va := make([]interface{}, len(id))

View file

@ -114,6 +114,7 @@ type IndexerStore interface {
GetLastIndexedHeadsHash(id string) (headsHash string, err error)
SaveLastIndexedHeadsHash(id string, headsHash string) (err error)
DeleteLastIndexedHeadHash(ids ...string) (err error)
}
type AccountStore interface {

View file

@ -28,6 +28,7 @@ import (
"github.com/anyproto/anytype-heart/space/spacecore"
"github.com/anyproto/anytype-heart/space/spacecore/peermanager"
"github.com/anyproto/anytype-heart/space/spacecore/storage"
"github.com/anyproto/anytype-heart/util/slice"
)
type Space interface {
@ -291,6 +292,12 @@ func (s *space) TryLoadBundledObjects(ctx context.Context) error {
if err != nil {
return err
}
storedIds, err := s.Storage().StoredIds()
if err != nil {
return err
}
// only load objects that are not already stored
objectIds = slice.Difference(objectIds, storedIds)
s.LoadObjectsIgnoreErrs(ctx, objectIds)
return nil
}

209
util/taskmanager/manager.go Normal file
View file

@ -0,0 +1,209 @@
/*
Package taskmanager implements a priority-based task queue that supports pausing, resuming, and dynamically adjusting task priorities.
This package provides a flexible and efficient way to manage tasks that need to be processed in a specific order based on their priority. Tasks can be added to the queue with an assigned priority, and higher-priority tasks are processed first. Additionally, tasks can be paused and resumed when their priority changes, ensuring that resources are allocated appropriately.
Features:
- Add tasks to the queue
- Automatically reorders tasks based on their priority.
- Pause and resume tasks dynamically based on priority changes.
*/
package taskmanager
import (
"context"
"sync"
"github.com/anyproto/any-sync/app/logger"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
var log = logger.NewNamed("taskmanager")
func PrioritySorterAsIs(ids []string) []string {
return ids
}
// TasksManager manages tasks based on a dynamic priority list and configurable concurrency.
type TasksManager struct {
tasks map[string]taskWrapper
prioritySorterFilter func(ids []string) []string
priorityList []string
priorityRefreshed chan struct{}
taskAddCh chan taskWrapper
mu sync.Mutex
currentTasks map[string]taskWrapper
maxConcurrent int
wg sync.WaitGroup
closed chan struct{}
started chan struct{}
taskFinishedCh chan string
}
func NewTasksManager(maxConcurrent int, prioritySorterFilter func(ids []string) []string) *TasksManager {
return &TasksManager{
tasks: make(map[string]taskWrapper),
prioritySorterFilter: prioritySorterFilter,
priorityList: []string{},
priorityRefreshed: make(chan struct{}),
taskAddCh: make(chan taskWrapper),
currentTasks: make(map[string]taskWrapper),
maxConcurrent: maxConcurrent,
taskFinishedCh: make(chan string),
closed: make(chan struct{}),
started: make(chan struct{}),
}
}
// AddTask adds a task to the manager.
// If the manager is closed, it panics.
func (qm *TasksManager) AddTask(task Task) {
taskWrapped := taskWrapper{Task: task, taskFinishedCh: qm.taskFinishedCh}
managerStateSelect:
select {
case <-qm.closed:
panic("cannot add task to closed TasksManager")
case <-qm.started:
default:
qm.mu.Lock()
// double check under the lock to protect from race conditions
select {
case <-qm.started:
qm.mu.Unlock()
break managerStateSelect
default:
}
qm.wg.Add(1)
qm.tasks[task.ID()] = taskWrapped
qm.priorityList = qm.prioritySorterFilter(maps.Keys(qm.tasks))
qm.mu.Unlock()
return
}
qm.wg.Add(1)
// by default, tasks are paused until the manager starts it
qm.taskAddCh <- taskWrapped
}
// RefreshPriority updates the priority list of the manager
// it should be used when prioritySorter should be re-run for the existing list, e.g. when the priority function is based on external data
func (qm *TasksManager) RefreshPriority() {
managerStateSelect:
select {
case <-qm.closed:
panic("cannot update priority of closed TasksManager")
case <-qm.started:
break
default:
qm.mu.Lock()
// double check under the lock to protect from race cond
select {
case <-qm.started:
qm.mu.Unlock()
break managerStateSelect
default:
}
qm.priorityList = qm.prioritySorterFilter(maps.Keys(qm.tasks))
qm.mu.Unlock()
return
}
qm.priorityRefreshed <- struct{}{}
}
// WaitAndClose waits for all tasks to finish and then closes the manager.
// MUST be called once, otherwise panics
func (qm *TasksManager) WaitAndClose() {
qm.wg.Wait()
close(qm.closed)
}
// Run starts the task manager's queue main loop.
// Should be called only once, next calls are no-ops.
// returns when the manager is closed(WaitAndClose is called)
func (qm *TasksManager) Run(ctx context.Context) {
select {
case <-qm.closed:
panic("called Run on closed TasksManager")
case <-qm.started:
return
default:
}
qm.mu.Lock()
for _, task := range qm.tasks {
go task.Run(ctx)
}
close(qm.started)
qm.manageTasks()
qm.mu.Unlock()
for {
select {
case <-qm.priorityRefreshed:
qm.mu.Lock()
qm.priorityList = qm.prioritySorterFilter(maps.Keys(qm.tasks))
qm.manageTasks()
qm.mu.Unlock()
case task := <-qm.taskAddCh:
qm.mu.Lock()
qm.tasks[task.ID()] = task
qm.priorityList = qm.prioritySorterFilter(maps.Keys(qm.tasks))
go task.Run(ctx)
qm.manageTasks()
qm.mu.Unlock()
case finishedTaskID := <-qm.taskFinishedCh:
qm.mu.Lock()
delete(qm.currentTasks, finishedTaskID)
qm.manageTasks()
qm.mu.Unlock()
qm.wg.Done() // closed chan closed only after wg.Wait in WaitAndClose()
case <-qm.closed:
return
}
}
}
// manageTasks MUST be run under the lock
func (qm *TasksManager) manageTasks() {
desiredTasks := make(map[string]taskWrapper)
runningCount := 0
if len(qm.priorityList) != len(qm.tasks) {
var tasksWithMissingPriority []string
for taskId := range qm.tasks {
if _, exists := desiredTasks[taskId]; !exists {
tasksWithMissingPriority = append(tasksWithMissingPriority, taskId)
}
}
log.With(zap.Int("count", len(tasksWithMissingPriority))).With(zap.Strings("ids", tasksWithMissingPriority)).Warn("priority list inconsistency detected, some tasks will not be started")
}
// Determine which tasks should be running based on priority and max concurrency
for _, taskID := range qm.priorityList {
if runningCount >= qm.maxConcurrent {
break
}
task, exists := qm.tasks[taskID]
if exists && !task.isDone() {
desiredTasks[taskID] = task
runningCount++
}
}
// Pause tasks that are running but no longer desired
for taskID, task := range qm.currentTasks {
if _, shouldRun := desiredTasks[taskID]; !shouldRun {
task.pause()
delete(qm.currentTasks, taskID)
}
}
// Resume tasks that are desired but not currently running
for taskID, task := range desiredTasks {
if _, isRunning := qm.currentTasks[taskID]; !isRunning {
task.resume()
qm.currentTasks[taskID] = task
}
}
}

166
util/taskmanager/task.go Normal file
View file

@ -0,0 +1,166 @@
package taskmanager
import (
"context"
"sync"
"time"
)
// Task interface allows for different task implementations without code duplication.
type Task interface {
TaskBase
Run(ctx context.Context) error
}
type TaskBase interface {
ID() string
WaitResult(ctx context.Context) (TaskResult, error)
GetResult() (TaskResult, bool)
WaitIfPaused(ctx context.Context) error
markDoneWithError(err error)
setStartTime(startTime time.Time)
resume()
pause()
isDone() bool
}
type TaskResultWithId struct {
Id string
TaskResult
}
type TaskResult struct {
Err error
StartTime time.Time
FinishTime time.Time
WorkTime time.Duration
}
// TaskBase provides common functionality for tasks, such as pause and resume mechanisms.
type taskBase struct {
id string
pauseChan chan struct{}
done chan struct{}
mu sync.RWMutex
result TaskResult
resultMu sync.Mutex
totalWorkTime time.Duration
lastResumed time.Time
startTime time.Time
}
func NewTaskBase(id string) TaskBase {
return &taskBase{
id: id,
done: make(chan struct{}),
pauseChan: make(chan struct{}), // by default, the task is paused
}
}
func (t *taskBase) ID() string {
return t.id
}
func (t *taskBase) GetResult() (TaskResult, bool) {
select {
case <-t.done:
t.resultMu.Lock()
defer t.resultMu.Unlock()
return t.result, true
default:
return TaskResult{}, false
}
}
func (t *taskBase) WaitResult(ctx context.Context) (TaskResult, error) {
select {
case <-t.done:
t.resultMu.Lock()
defer t.resultMu.Unlock()
return t.result, nil
case <-ctx.Done():
return TaskResult{}, ctx.Err()
}
}
func (t *taskBase) isDone() bool {
select {
case <-t.done:
return true
default:
return false
}
}
func (t *taskBase) pause() {
t.mu.Lock()
defer t.mu.Unlock()
if t.pauseChan == nil {
t.totalWorkTime += time.Since(t.lastResumed)
t.pauseChan = make(chan struct{})
}
}
func (t *taskBase) resume() {
t.mu.Lock()
defer t.mu.Unlock()
if t.pauseChan != nil {
close(t.pauseChan)
t.lastResumed = time.Now()
t.pauseChan = nil
}
}
func (t *taskBase) WaitIfPaused(ctx context.Context) error {
t.mu.RLock()
pauseChan := t.pauseChan
t.mu.RUnlock()
if pauseChan == nil {
return nil
}
select {
case <-pauseChan:
// Resumed
case <-ctx.Done():
// Context canceled
return ctx.Err()
}
return nil
}
func (t *taskBase) markDoneWithError(err error) {
t.resultMu.Lock()
defer t.resultMu.Unlock()
if !t.result.FinishTime.IsZero() {
return
}
close(t.done)
t.totalWorkTime += time.Since(t.lastResumed)
t.result = TaskResult{
Err: err,
StartTime: t.startTime,
FinishTime: time.Now(),
WorkTime: t.totalWorkTime,
}
}
func (t *taskBase) setStartTime(startTime time.Time) {
t.startTime = startTime
}
type taskWrapper struct {
Task
taskFinishedCh chan string
}
func (t *taskWrapper) Run(ctx context.Context) {
t.setStartTime(time.Now())
err := t.Task.Run(ctx)
t.markDoneWithError(err)
t.taskFinishedCh <- t.ID()
}

View file

@ -0,0 +1,532 @@
package taskmanager
import (
"context"
"fmt"
"sort"
"sync"
"testing"
"time"
)
// TestTask is a concrete implementation of the Task interface for testing purposes.
type TestTask struct {
*taskBase
iterations int
}
// NewTestTask creates a new TestTask with the given ID and number of iterations.
func NewTestTask(id string, iterations int) *TestTask {
return &TestTask{
taskBase: NewTaskBase(id).(*taskBase),
iterations: iterations,
}
}
// Run executes the task's work, respecting pause and resume signals and context cancellation.
func (t *TestTask) Run(ctx context.Context) error {
for i := 0; i < t.iterations; i++ {
// Check if the task is paused.
if err := t.WaitIfPaused(ctx); err != nil {
t.markDoneWithError(err)
return err
}
// Check for context cancellation.
select {
case <-ctx.Done():
err := ctx.Err()
t.markDoneWithError(err)
return err
default:
}
// Simulate work.
time.Sleep(10 * time.Millisecond)
fmt.Printf("Task %s is working (iteration %d)\n", t.ID(), i+1)
}
return nil
}
// TestTasksManager_AddTaskAndRun tests that tasks can be added and run,
// and that they finish successfully.
func TestTasksManager_AddTaskAndRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var sortedIds = []string{"task1", "task2", "task3"}
sorter := func(_ []string) []string {
return sortedIds
}
qm := NewTasksManager(2, sorter) // Allow up to 2 concurrent tasks.
go qm.Run(ctx)
// Create tasks.
task1 := NewTestTask("task1", 5)
task2 := NewTestTask("task2", 5)
task3 := NewTestTask("task3", 5)
// Add tasks to the manager.
qm.AddTask(task1)
qm.AddTask(task2)
qm.AddTask(task3)
// Set initial priority list.
qm.RefreshPriority()
// Wait for tasks to finish and close the manager.
qm.WaitAndClose()
// Check that all tasks finished successfully.
for _, task := range []*TestTask{task1, task2, task3} {
result, done := task.GetResult()
if !done {
t.Errorf("Task %s did not finish", task.ID())
} else if result.Err != nil {
t.Errorf("Task %s finished with error: %v", task.ID(), result.Err)
}
}
}
// TestTasksManager_PriorityUpdate tests that tasks are paused and resumed
// according to priority changes.
func TestTasksManager_PriorityUpdate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var sortedIds = []string{"task1", "task2"}
sorter := func(_ []string) []string {
return sortedIds
}
qm := NewTasksManager(1, sorter) // Only one task can run at a time.
go qm.Run(ctx)
// Create tasks.
task1 := NewTestTask("task1", 10)
task2 := NewTestTask("task2", 5)
// Add tasks.
qm.AddTask(task1)
qm.AddTask(task2)
// Set initial priority list.
qm.RefreshPriority()
// After a short delay, update the priority to give task2 higher priority.
time.Sleep(30 * time.Millisecond)
sortedIds = []string{"task2", "task1"}
qm.RefreshPriority()
// Wait for tasks to finish and close the manager.
qm.WaitAndClose()
// Get results.
result1, done1 := task1.GetResult()
result2, done2 := task2.GetResult()
// Check that both tasks finished.
if !done1 {
t.Errorf("Task %s did not finish", task1.ID())
}
if !done2 {
t.Errorf("Task %s did not finish", task2.ID())
}
// Verify that task2 finished before task1.
if result2.FinishTime.After(result1.FinishTime) {
t.Errorf("Expected task2 to finish before task1, but task1 finished at %v and task2 finished at %v", result1.FinishTime, result2.FinishTime)
}
}
// TestTasksManager_MaxConcurrency tests that the manager respects the
// maximum concurrency limit.
func TestTasksManager_MaxConcurrency(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sorter := func(_ []string) []string {
return []string{"task1", "task2", "task3"}
}
qm := NewTasksManager(2, sorter) // Allow up to 2 concurrent tasks.
go qm.Run(ctx)
// Create tasks.
task1 := NewTestTask("task1", 5)
task2 := NewTestTask("task2", 5)
task3 := NewTestTask("task3", 5)
// Add tasks.
qm.AddTask(task1)
qm.AddTask(task2)
qm.AddTask(task3)
// Set priority list.
qm.RefreshPriority()
// Wait for tasks to finish and close the manager.
qm.WaitAndClose()
// Check that all tasks finished successfully.
for _, task := range []*TestTask{task1, task2, task3} {
result, done := task.GetResult()
if !done {
t.Errorf("Task %s did not finish", task.ID())
} else if result.Err != nil {
t.Errorf("Task %s finished with error: %v", task.ID(), result.Err)
}
}
}
// TestTasksManager_AddTaskDuringExecution tests adding a new task while
// the manager is already running tasks.
func TestTasksManager_AddTaskDuringExecution(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
priorityList := []string{"task1", "task2"}
qm := NewTasksManager(2, func(ids []string) []string {
return priorityList
})
go qm.Run(ctx)
// Create initial tasks.
task1 := NewTestTask("task1", 5)
task2 := NewTestTask("task2", 5)
// Add initial tasks.
qm.AddTask(task1)
qm.AddTask(task2)
// Set priority.
qm.RefreshPriority()
var task3 *TestTask
// After a short delay, add a new task with higher priority.
time.AfterFunc(20*time.Millisecond, func() {
task3 = NewTestTask("task3", 5)
qm.AddTask(task3)
priorityList = []string{"task3", "task1", "task2"}
qm.RefreshPriority()
})
// Wait for tasks to finish and close the manager.
qm.WaitAndClose()
// Check that all tasks finished successfully.
for _, taskID := range []string{"task1", "task2", "task3"} {
var task *TestTask
switch taskID {
case "task1":
task = task1
case "task2":
task = task2
case "task3":
task = task3
}
result, done := task.GetResult()
if !done {
t.Errorf("Task %s did not finish", taskID)
} else if result.Err != nil {
t.Errorf("Task %s finished with error: %v", taskID, result.Err)
}
}
}
// TestTasksManager_ContextCancellation tests that tasks stop promptly and
// report errors when the context is canceled.
func TestTasksManager_ContextCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
qm := NewTasksManager(2, func(ids []string) []string {
return []string{"task1", "task2"}
})
go qm.Run(ctx)
// Create tasks with long iteration counts.
task1 := NewTestTask("task1", 100)
task2 := NewTestTask("task2", 100)
// Add tasks.
qm.AddTask(task1)
qm.AddTask(task2)
// Cancel context after a short delay.
time.AfterFunc(50*time.Millisecond, func() {
cancel()
})
// Wait for tasks to finish and close the manager.
qm.WaitAndClose()
// Get results.
result1, done1 := task1.GetResult()
result2, done2 := task2.GetResult()
// Check that tasks have finished.
if !done1 {
t.Errorf("Task %s did not finish", task1.ID())
}
if !done2 {
t.Errorf("Task %s did not finish", task2.ID())
}
// Check that tasks were canceled.
if result1.Err != context.Canceled {
t.Errorf("Expected task %s error to be context.Canceled, but got %v", task1.ID(), result1.Err)
}
if result2.Err != context.Canceled {
t.Errorf("Expected task %s error to be context.Canceled, but got %v", task2.ID(), result2.Err)
}
}
// TestTasksManager_TaskCompletionOrder tests that tasks complete in the expected order
// based on their priorities and pause/resume behavior.
func TestTasksManager_TaskCompletionOrder(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
priorityList := []string{"task1", "task2", "task3"}
qm := NewTasksManager(1, func(ids []string) []string {
return priorityList
})
go qm.Run(ctx)
// Create tasks with different iteration counts.
task1 := NewTestTask("task1", 10)
task2 := NewTestTask("task2", 5)
task3 := NewTestTask("task3", 3)
// Add tasks.
qm.AddTask(task1)
qm.AddTask(task2)
qm.AddTask(task3)
// Update priority to move task3 to the top.
time.AfterFunc(30*time.Millisecond, func() {
priorityList = []string{"task3", "task1", "task2"}
qm.RefreshPriority()
})
// Wait for tasks to finish and close the manager.
qm.WaitAndClose()
// Collect results.
tasks := []*TestTask{task1, task2, task3}
type taskResult struct {
taskID string
finishTime time.Time
}
var results []taskResult
for _, task := range tasks {
result, done := task.GetResult()
if !done {
t.Errorf("Task %s did not finish", task.ID())
continue
}
if result.Err != nil {
t.Errorf("Task %s finished with error: %v", task.ID(), result.Err)
}
results = append(results, taskResult{
taskID: task.ID(),
finishTime: result.FinishTime,
})
}
// Sort the results by finish time.
sort.Slice(results, func(i, j int) bool {
return results[i].finishTime.Before(results[j].finishTime)
})
// Verify the completion order.
expectedOrder := []string{"task3", "task1", "task2"}
for i, taskID := range expectedOrder {
if results[i].taskID != taskID {
t.Errorf("Expected task %s to finish at position %d, but got %s", taskID, i, results[i].taskID)
}
}
}
// TestTasksManager_AddTasksBeforeRun tests that tasks added before the manager's Run method are handled correctly.
func TestTasksManager_AddTasksBeforeRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
qm := NewTasksManager(2, func(ids []string) []string {
return []string{"task1", "task2"}
}) // Allow up to 2 concurrent tasks.
// Create tasks.
task1 := NewTestTask("task1", 5)
task2 := NewTestTask("task2", 5)
// Add tasks to the manager before calling Run.
qm.AddTask(task1)
qm.AddTask(task2)
// Start the manager.
go qm.Run(ctx)
// Wait for tasks to finish and close the manager.
qm.WaitAndClose()
// Check that both tasks finished successfully.
for _, task := range []*TestTask{task1, task2} {
result, done := task.GetResult()
if !done {
t.Errorf("Task %s did not finish", task.ID())
} else if result.Err != nil {
t.Errorf("Task %s finished with error: %v", task.ID(), result.Err)
}
}
}
// TestTasksManager_PriorityChangeBeforeRun tests that priority changes before the manager's Run method are handled correctly.
func TestTasksManager_PriorityChangeBeforeRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var priorityList = []string{"task1", "task2"}
qm := NewTasksManager(1, func(ids []string) []string {
return priorityList
}) // Only one task can run at a time.
// Create tasks.
task1 := NewTestTask("task1", 5)
task2 := NewTestTask("task2", 5)
// Add tasks to the manager before calling Run.
qm.AddTask(task1)
qm.AddTask(task2)
priorityList = []string{"task2", "task1"}
// Set initial priority list before Run.
qm.RefreshPriority()
// Start the manager.
go qm.Run(ctx)
// Wait for tasks to finish and close the manager.
qm.WaitAndClose()
// Get results.
result1, done1 := task1.GetResult()
result2, done2 := task2.GetResult()
// Check that both tasks finished.
if !done1 {
t.Errorf("Task %s did not finish", task1.ID())
}
if !done2 {
t.Errorf("Task %s did not finish", task2.ID())
}
// Verify that task2 finished before task1.
if result2.FinishTime.After(result1.FinishTime) {
t.Errorf("Expected task2 to finish before task1, but task1 finished at %v and task2 finished at %v", result1.FinishTime, result2.FinishTime)
}
}
// Create a custom TestTask that increments and decrements the running task counter.
type CountingTestTask struct {
mu sync.Mutex
runningTasks, maxObservedConcurrent int
*TestTask
}
// Override the Run method.
func (t *CountingTestTask) Run(ctx context.Context) error {
defer t.markDoneWithError(nil)
for i := 0; i < t.iterations; i++ {
if err := t.WaitIfPaused(ctx); err != nil {
t.markDoneWithError(err)
return err
}
select {
case <-ctx.Done():
err := ctx.Err()
t.markDoneWithError(err)
return err
default:
}
// Increment runningTasks counter.
t.mu.Lock()
t.runningTasks++
if t.runningTasks > t.maxObservedConcurrent {
t.maxObservedConcurrent = t.runningTasks
}
t.mu.Unlock()
// Simulate work.
time.Sleep(10 * time.Millisecond)
// Decrement runningTasks counter.
t.mu.Lock()
t.runningTasks--
t.mu.Unlock()
}
return nil
}
// TestTasksManager_MaxConcurrentTasks verifies that the manager does not exceed the maximum number of concurrent tasks.
func TestTasksManager_MaxConcurrentTasks(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
maxConcurrentTasks := 2
qm := NewTasksManager(maxConcurrentTasks, func(ids []string) []string {
return []string{"task1", "task2", "task3"}
})
maxObservedConcurrent := 0
runningTasks := 0
mu := sync.Mutex{}
// Function to create a CountingTestTask.
newCountingTestTask := func(id string, iterations int) *CountingTestTask {
return &CountingTestTask{
TestTask: NewTestTask(id, iterations),
mu: mu,
runningTasks: runningTasks,
maxObservedConcurrent: maxObservedConcurrent,
}
}
// Create tasks.
task1 := newCountingTestTask("task1", 10)
task2 := newCountingTestTask("task2", 10)
task3 := newCountingTestTask("task3", 10)
// Add tasks to the manager.
qm.AddTask(task1)
qm.AddTask(task2)
qm.AddTask(task3)
// Start the manager.
go qm.Run(ctx)
// Wait for tasks to finish and close the manager.
qm.WaitAndClose()
// Check that all tasks finished successfully.
for _, task := range []*CountingTestTask{task1, task2, task3} {
result, done := task.GetResult()
if !done {
t.Errorf("Task %s did not finish", task.ID())
} else if result.Err != nil {
t.Errorf("Task %s finished with error: %v", task.ID(), result.Err)
}
}
// Verify that maxObservedConcurrent does not exceed maxConcurrentTasks.
if maxObservedConcurrent > maxConcurrentTasks {
t.Errorf("Expected maximum concurrent tasks to be %d, but observed %d", maxConcurrentTasks, maxObservedConcurrent)
} else {
t.Logf("Maximum concurrent tasks observed: %d", maxObservedConcurrent)
}
}

View file

@ -112,6 +112,24 @@ func (mr *MockObjectStoreMockRecorder) DeleteDetails(arg0 ...any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteDetails", reflect.TypeOf((*MockObjectStore)(nil).DeleteDetails), arg0...)
}
// DeleteLastIndexedHeadHash mocks base method.
func (m *MockObjectStore) DeleteLastIndexedHeadHash(arg0 ...string) error {
m.ctrl.T.Helper()
varargs := []any{}
for _, a := range arg0 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "DeleteLastIndexedHeadHash", varargs...)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteLastIndexedHeadHash indicates an expected call of DeleteLastIndexedHeadHash.
func (mr *MockObjectStoreMockRecorder) DeleteLastIndexedHeadHash(arg0 ...any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteLastIndexedHeadHash", reflect.TypeOf((*MockObjectStore)(nil).DeleteLastIndexedHeadHash), arg0...)
}
// DeleteLinks mocks base method.
func (m *MockObjectStore) DeleteLinks(arg0 ...string) error {
m.ctrl.T.Helper()