1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-08 05:47:07 +09:00

GO-3305 Fix mb3 (#2058)

This commit is contained in:
Mikhail 2025-01-31 14:59:50 +01:00 committed by GitHub
parent 3ef48ed92a
commit 1146b53c6e
Signed by: github
GPG key ID: B5690EEEBB952194
12 changed files with 87 additions and 130 deletions

View file

@ -83,7 +83,6 @@ import (
paymentscache "github.com/anyproto/anytype-heart/core/payments/cache"
"github.com/anyproto/anytype-heart/core/peerstatus"
"github.com/anyproto/anytype-heart/core/publish"
"github.com/anyproto/anytype-heart/core/recordsbatcher"
"github.com/anyproto/anytype-heart/core/session"
"github.com/anyproto/anytype-heart/core/spaceview"
"github.com/anyproto/anytype-heart/core/subscription"
@ -276,7 +275,6 @@ func Bootstrap(a *app.App, components ...app.Component) {
Register(acl.New()).
Register(builtintemplate.New()).
Register(converter.NewLayoutConverter()).
Register(recordsbatcher.New()).
Register(configfetcher.New()).
Register(process.New()).
Register(core.NewTempDirService()).

View file

@ -8,7 +8,7 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/ocache"
"github.com/cheggaaa/mb"
"github.com/cheggaaa/mb/v3"
"github.com/samber/lo"
"github.com/anyproto/anytype-heart/core/block/editor/smartblock"
@ -56,10 +56,12 @@ type watcher struct {
resolver idresolver.Resolver
spaceService space.Service
infoBatch *mb.MB
infoBatch *mb.MB[spaceindex.LinksUpdateInfo]
lock sync.Mutex
accumulatedBacklinks map[string]*backLinksUpdate
aggregationInterval time.Duration
cancelCtx context.CancelFunc
ctx context.Context
}
func New() UpdateWatcher {
@ -75,22 +77,26 @@ func (w *watcher) Init(a *app.App) error {
w.store = app.MustComponent[objectstore.ObjectStore](a)
w.resolver = app.MustComponent[idresolver.Resolver](a)
w.spaceService = app.MustComponent[space.Service](a)
w.infoBatch = mb.New(0)
w.infoBatch = mb.New[spaceindex.LinksUpdateInfo](0)
w.accumulatedBacklinks = make(map[string]*backLinksUpdate)
w.aggregationInterval = defaultAggregationInterval
return nil
}
func (w *watcher) Close(context.Context) error {
if w.cancelCtx != nil {
w.cancelCtx()
}
if err := w.infoBatch.Close(); err != nil {
log.Errorf("failed to close message batch: %v", err)
}
return nil
}
func (w *watcher) Run(context.Context) error {
func (w *watcher) Run(ctx context.Context) error {
w.ctx, w.cancelCtx = context.WithCancel(context.Background())
w.updater.SubscribeLinksUpdate(func(info spaceindex.LinksUpdateInfo) {
if err := w.infoBatch.Add(info); err != nil {
if err := w.infoBatch.Add(w.ctx, info); err != nil {
log.With("objectId", info.LinksFromId).Errorf("failed to add backlinks update info to message batch: %v", err)
}
})
@ -165,17 +171,16 @@ func (w *watcher) backlinksUpdateHandler() {
}()
for {
msgs := w.infoBatch.Wait()
msgs, err := w.infoBatch.Wait(w.ctx)
if err != nil {
return
}
if len(msgs) == 0 {
return
}
w.lock.Lock()
for _, msg := range msgs {
info, ok := msg.(spaceindex.LinksUpdateInfo)
if !ok {
continue
}
for _, info := range msgs {
info = cleanSelfLinks(info)
applyUpdates(w.accumulatedBacklinks, info)
}
@ -210,7 +215,7 @@ func (w *watcher) updateBackLinksInObject(id string, backlinksUpdate *backLinksU
log.With("objectId", id).Errorf("failed to resolve space id for object: %v", err)
return
}
spc, err := w.spaceService.Get(context.Background(), spaceId)
spc, err := w.spaceService.Get(w.ctx, spaceId)
if err != nil {
log.With("objectId", id, "spaceId", spaceId).Errorf("failed to get space: %v", err)
return

View file

@ -5,7 +5,7 @@ import (
"time"
"github.com/anyproto/any-sync/app/ocache"
"github.com/cheggaaa/mb"
"github.com/cheggaaa/mb/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@ -56,7 +56,7 @@ func newFixture(t *testing.T, aggregationInterval time.Duration) *fixture {
spaceService: spaceSvc,
aggregationInterval: aggregationInterval,
infoBatch: mb.New(0),
infoBatch: mb.New[spaceindex.LinksUpdateInfo](0),
accumulatedBacklinks: make(map[string]*backLinksUpdate),
}

View file

@ -1,11 +1,12 @@
package process
import (
"context"
"errors"
"sync"
"sync/atomic"
"github.com/cheggaaa/mb"
"github.com/cheggaaa/mb/v3"
"github.com/globalsign/mgo/bson"
"github.com/anyproto/anytype-heart/pb"
@ -49,7 +50,7 @@ func (s *service) NewQueue(info pb.ModelProcess, workers int, noProgress bool, n
id: info.Id,
info: info,
state: pb.ModelProcess_None,
msgs: mb.New(0),
msgs: mb.New[Task](0),
done: make(chan struct{}),
cancel: make(chan struct{}),
s: s,
@ -66,7 +67,7 @@ type queue struct {
id string
info pb.ModelProcess
state pb.ModelProcessState
msgs *mb.MB
msgs *mb.MB[Task]
wg *sync.WaitGroup
done, cancel chan struct{}
pTotal, pDone int64
@ -106,7 +107,7 @@ func (p *queue) Add(ts ...Task) (err error) {
return
}
for _, t := range ts {
if err = p.msgs.Add(t); err != nil {
if err = p.msgs.Add(context.Background(), t); err != nil {
return ErrQueueDone
}
atomic.AddInt64(&p.pTotal, 1)
@ -123,7 +124,7 @@ func (p *queue) Wait(ts ...Task) (err error) {
p.m.Unlock()
var done = make(chan struct{}, len(ts))
for _, t := range ts {
if err = p.msgs.Add(taskFunction(t, done)); err != nil {
if err = p.msgs.Add(context.Background(), taskFunction(t, done)); err != nil {
return ErrQueueDone
}
atomic.AddInt64(&p.pTotal, 1)
@ -267,15 +268,15 @@ func (p *queue) checkRunning(checkStarted bool) (err error) {
func (p *queue) worker() {
defer p.wg.Done()
for {
msgs := p.msgs.WaitMax(1)
msgs, err := p.msgs.NewCond().WithMax(1).Wait(context.Background())
if err != nil {
log.Errorf("failed wait: %v", err)
return
}
if len(msgs) == 0 {
return
}
if f, ok := msgs[0].(func()); ok {
f()
} else if t, ok := msgs[0].(Task); ok {
t()
}
msgs[0]()
atomic.AddInt64(&p.pDone, 1)
}
}

View file

@ -1,60 +0,0 @@
package recordsbatcher
import (
"time"
"github.com/anyproto/any-sync/app"
"github.com/cheggaaa/mb"
)
const CName = "recordsbatcher"
type recordsBatcher struct {
batcher *mb.MB
packDelay time.Duration // delay for better packing of msgs
}
func (r *recordsBatcher) Init(a *app.App) (err error) {
r.batcher = mb.New(0)
r.packDelay = time.Millisecond * 100
return nil
}
func (r *recordsBatcher) Name() (name string) {
return CName
}
func (r *recordsBatcher) Add(msgs ...interface{}) error {
return r.batcher.Add(msgs...)
}
func (r *recordsBatcher) Read(buffer []interface{}) int {
defer func() {
time.Sleep(r.packDelay)
}()
msgs := r.batcher.WaitMax(len(buffer))
if len(msgs) == 0 {
return 0
}
for i, msg := range msgs {
buffer[i] = msg
}
return len(msgs)
}
func (r *recordsBatcher) Close() (err error) {
return r.batcher.Close()
}
func New() RecordsBatcher {
return &recordsBatcher{batcher: mb.New(0)}
}
type RecordsBatcher interface {
// Read reads a batch into the buffer, returns number of records that were read. 0 means no more data will be available
Read(buffer []interface{}) int
Add(msgs ...interface{}) error
app.Component
}

View file

@ -1,11 +1,12 @@
package subscription
import (
"context"
"fmt"
"sync"
"github.com/anyproto/any-store/query"
"github.com/cheggaaa/mb"
"github.com/cheggaaa/mb/v3"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
@ -29,7 +30,7 @@ type collectionObserver struct {
cache *cache
objectStore spaceindex.Store
collectionService CollectionService
recBatch *mb.MB
recBatch *mb.MB[database.Record]
recBatchMutex sync.Mutex
spaceSubscription *spaceSubscriptions
@ -114,7 +115,7 @@ func (c *collectionObserver) updateIds(ids []string) {
c.lock.Unlock()
entries := c.spaceSubscription.fetchEntriesLocked(append(removed, added...))
for _, e := range entries {
err := c.recBatch.Add(database.Record{
err := c.recBatch.Add(context.Background(), database.Record{
Details: e.data,
})
if err != nil {

View file

@ -1,9 +1,10 @@
package subscription
import (
"context"
"testing"
"github.com/cheggaaa/mb"
"github.com/cheggaaa/mb/v3"
"github.com/stretchr/testify/assert"
"github.com/anyproto/anytype-heart/core/domain"
@ -30,7 +31,7 @@ func Test_newCollectionObserver(t *testing.T) {
cache.Set(&entry{id: "id2", data: domain.NewDetailsFromMap(map[domain.RelationKey]domain.Value{
bundle.RelationKeyId: domain.String("id2"),
})})
batcher := mb.New(0)
batcher := mb.New[database.Record](0)
c := &spaceSubscriptions{
collectionService: collectionService,
objectStore: store,
@ -46,11 +47,12 @@ func Test_newCollectionObserver(t *testing.T) {
expectedIds := []string{"id1", "id2"}
ch <- expectedIds
close(observer.closeCh)
msgs := batcher.Wait()
msgs, err := batcher.NewCond().WithMin(2).Wait(context.Background())
assert.NoError(t, err)
var receivedIds []string
for _, msg := range msgs {
id := msg.(database.Record).Details.GetString(bundle.RelationKeyId)
id := msg.Details.GetString(bundle.RelationKeyId)
receivedIds = append(receivedIds, id)
}
assert.Equal(t, expectedIds, receivedIds)
@ -76,7 +78,7 @@ func Test_newCollectionObserver(t *testing.T) {
bundle.RelationKeySpaceId: domain.String(spaceId),
},
})
batcher := mb.New(0)
batcher := mb.New[database.Record](0)
c := &spaceSubscriptions{
collectionService: collectionService,
objectStore: store,
@ -92,11 +94,12 @@ func Test_newCollectionObserver(t *testing.T) {
expectedIds := []string{"id1", "id2"}
ch <- expectedIds
close(observer.closeCh)
msgs := batcher.Wait()
msgs, err := batcher.NewCond().WithMin(2).Wait(context.Background())
assert.NoError(t, err)
var receivedIds []string
for _, msg := range msgs {
id := msg.(database.Record).Details.GetString(bundle.RelationKeyId)
id := msg.Details.GetString(bundle.RelationKeyId)
receivedIds = append(receivedIds, id)
}
assert.Equal(t, expectedIds, receivedIds)

View file

@ -6,7 +6,7 @@ import (
"testing"
"time"
mb2 "github.com/cheggaaa/mb/v3"
"github.com/cheggaaa/mb/v3"
"github.com/stretchr/testify/require"
"github.com/anyproto/anytype-heart/core/domain"
@ -109,7 +109,7 @@ func TestInternalSubscriptionSingle(t *testing.T) {
require.NoError(t, err)
err = resp.Output.Add(context.Background(), event.NewMessage("", nil))
require.True(t, errors.Is(err, mb2.ErrClosed))
require.True(t, errors.Is(err, mb.ErrClosed))
})
t.Run("try to add after close", func(t *testing.T) {
@ -280,7 +280,7 @@ func TestInternalSubCustomQueue(t *testing.T) {
subId := "test"
fx := newFixtureWithRealObjectStore(t)
queue := mb2.New[*pb.EventMessage](0)
queue := mb.New[*pb.EventMessage](0)
resp, err := fx.Search(SubscribeRequest{
SpaceId: testSpaceId,

View file

@ -9,8 +9,7 @@ import (
"github.com/anyproto/any-store/anyenc"
"github.com/anyproto/any-sync/app"
"github.com/cheggaaa/mb"
mb2 "github.com/cheggaaa/mb/v3"
"github.com/cheggaaa/mb/v3"
"github.com/globalsign/mgo/bson"
"github.com/gogo/protobuf/types"
"golang.org/x/exp/slices"
@ -61,7 +60,7 @@ type SubscribeRequest struct {
// Internal indicates that subscription will send events into message queue instead of global client's event system
Internal bool
// InternalQueue is used when Internal flag is set to true. If it's nil, new queue will be created
InternalQueue *mb2.MB[*pb.EventMessage]
InternalQueue *mb.MB[*pb.EventMessage]
AsyncInit bool
}
@ -72,7 +71,7 @@ type SubscribeResponse struct {
Counters *pb.EventObjectSubscriptionCounters
// Used when Internal flag is set to true
Output *mb2.MB[*pb.EventMessage]
Output *mb.MB[*pb.EventMessage]
}
type Service interface {
@ -117,13 +116,13 @@ type service struct {
type internalSubOutput struct {
externallyManaged bool
queue *mb2.MB[*pb.EventMessage]
queue *mb.MB[*pb.EventMessage]
}
func newInternalSubOutput(queue *mb2.MB[*pb.EventMessage]) *internalSubOutput {
func newInternalSubOutput(queue *mb.MB[*pb.EventMessage]) *internalSubOutput {
if queue == nil {
return &internalSubOutput{
queue: mb2.New[*pb.EventMessage](0),
queue: mb.New[*pb.EventMessage](0),
}
}
return &internalSubOutput{
@ -270,7 +269,7 @@ func (s *service) getSpaceSubscriptions(spaceId string) (*spaceSubscriptions, er
subscriptionKeys: make([]string, 0, 20),
subscriptions: make(map[string]subscription, 20),
customOutput: map[string]*internalSubOutput{},
recBatch: mb.New(0),
recBatch: mb.New[database.Record](0),
objectStore: s.objectStore.SpaceIndex(spaceId),
kanban: s.kanban,
collectionService: s.collectionService,
@ -280,7 +279,7 @@ func (s *service) getSpaceSubscriptions(spaceId string) (*spaceSubscriptions, er
}
spaceSubs.ds = newDependencyService(spaceSubs)
spaceSubs.initDebugger()
err := spaceSubs.Run(context.Background())
err := spaceSubs.Run()
if err != nil {
return nil, fmt.Errorf("run space subscriptions: %w", err)
}
@ -294,7 +293,7 @@ type spaceSubscriptions struct {
subscriptions map[string]subscription
customOutput map[string]*internalSubOutput
recBatch *mb.MB
recBatch *mb.MB[database.Record]
// Deps
objectStore spaceindex.Store
@ -309,12 +308,19 @@ type spaceSubscriptions struct {
subDebugger *subDebugger
arenaPool *anyenc.ArenaPool
ctx context.Context
cancelCtx context.CancelFunc
}
func (s *spaceSubscriptions) Run(context.Context) (err error) {
func (s *spaceSubscriptions) Run() (err error) {
s.ctx, s.cancelCtx = context.WithCancel(context.Background())
var batchErr error
s.objectStore.SubscribeForAll(func(rec database.Record) {
s.recBatch.Add(rec)
batchErr = s.recBatch.Add(s.ctx, rec)
})
if batchErr != nil {
return batchErr
}
go s.recordsHandler()
return
}
@ -431,7 +437,7 @@ func (s *spaceSubscriptions) subscribeForQuery(req SubscribeRequest, f *database
}
}
var outputQueue *mb2.MB[*pb.EventMessage]
var outputQueue *mb.MB[*pb.EventMessage]
if req.Internal {
output := newInternalSubOutput(req.InternalQueue)
outputQueue = output.queue
@ -521,7 +527,7 @@ func (s *spaceSubscriptions) subscribeForCollection(req SubscribeRequest, f *dat
depRecords = sub.sortedSub.depSub.getActiveRecords()
}
var outputQueue *mb2.MB[*pb.EventMessage]
var outputQueue *mb.MB[*pb.EventMessage]
if req.Internal {
output := newInternalSubOutput(req.InternalQueue)
outputQueue = output.queue
@ -772,15 +778,18 @@ func (s *spaceSubscriptions) recordsHandler() {
}
}
for {
records := s.recBatch.Wait()
records, err := s.recBatch.Wait(s.ctx)
if err != nil {
return
}
if len(records) == 0 {
return
}
for _, rec := range records {
id := rec.(database.Record).Details.GetString(bundle.RelationKeyId)
id := rec.Details.GetString(bundle.RelationKeyId)
// nil previous version
nilIfExists(id)
entries = append(entries, newEntry(id, rec.(database.Record).Details))
entries = append(entries, newEntry(id, rec.Details))
}
// filter nil entries
filtered := entries[:0]
@ -845,7 +854,7 @@ func (s *spaceSubscriptions) onChangeWithinContext(entries []*entry, proc func(c
s.eventSender.Broadcast(&pb.Event{Messages: msgs})
} else {
err := s.customOutput[id].add(msgs...)
if err != nil && !errors.Is(err, mb2.ErrClosed) {
if err != nil && !errors.Is(err, mb.ErrClosed) {
log.With("subId", id, "error", err).Errorf("push to output")
}
}
@ -917,6 +926,9 @@ func (s *spaceSubscriptions) depIdsFromFilter(spaceId string, filters []database
func (s *spaceSubscriptions) Close(ctx context.Context) (err error) {
s.m.Lock()
defer s.m.Unlock()
if s.cancelCtx != nil {
s.cancelCtx()
}
s.recBatch.Close()
for subId, sub := range s.subscriptions {
sub.close()

1
go.mod
View file

@ -19,7 +19,6 @@ require (
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/avast/retry-go/v4 v4.6.0
github.com/chai2010/webp v1.1.2-0.20240612091223-aa1b379218b7
github.com/cheggaaa/mb v1.0.3
github.com/cheggaaa/mb/v3 v3.0.2
github.com/dave/jennifer v1.7.1
github.com/davecgh/go-spew v1.1.1

2
go.sum
View file

@ -184,8 +184,6 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chai2010/webp v1.1.2-0.20240612091223-aa1b379218b7 h1:EcFyQu4Hz/YC2lc3xWqn678e2FNfG0cgTr/EOA4ByWs=
github.com/chai2010/webp v1.1.2-0.20240612091223-aa1b379218b7/go.mod h1:0XVwvZWdjjdxpUEIf7b9g9VkHFnInUSYujwqTLEuldU=
github.com/cheggaaa/mb v1.0.3 h1:03ksWum+6kHclB+kjwKMaBtgl5gtNYUwNpxsHQciKe8=
github.com/cheggaaa/mb v1.0.3/go.mod h1:NUl0GBtFLlfg2o6iZwxzcG7Lslc2wV/ADTFbLXtVPE4=
github.com/cheggaaa/mb/v3 v3.0.2 h1:jd1Xx0zzihZlXL6HmnRXVCI1BHuXz/kY+VzX9WbvNDU=
github.com/cheggaaa/mb/v3 v3.0.2/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI=
github.com/chigopher/pathlib v0.19.1 h1:RoLlUJc0CqBGwq239cilyhxPNLXTK+HXoASGyGznx5A=

View file

@ -1,6 +1,7 @@
package logging
import (
"context"
"errors"
"expvar"
"fmt"
@ -10,7 +11,7 @@ import (
"time"
"github.com/anyproto/any-sync/app/logger"
"github.com/cheggaaa/mb"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
"gopkg.in/Graylog2/go-gelf.v2/gelf"
)
@ -29,7 +30,7 @@ var (
)
func registerGelfSink(config *logger.Config) {
gelfSinkWrapper.batch = mb.New(1000)
gelfSinkWrapper.batch = mb.New[gelf.Message](1000)
tlsWriter, err := gelf.NewTLSWriter(graylogHost, nil)
if err != nil {
fmt.Printf("failed to init gelf tls: %s", err)
@ -49,7 +50,7 @@ func registerGelfSink(config *logger.Config) {
type gelfSink struct {
sync.RWMutex
batch *mb.MB
batch *mb.MB[gelf.Message]
gelfWriter gelf.Writer
version string
account string
@ -66,17 +67,16 @@ func (gs *gelfSink) Run() {
continue
}
msgs := gs.batch.WaitMax(1)
msgs, err := gs.batch.NewCond().WithMax(1).Wait(context.Background())
if err != nil {
return
}
if len(msgs) == 0 {
return
}
for _, msg := range msgs {
msgCasted, ok := msg.(gelf.Message)
if !ok {
continue
}
err := gs.gelfWriter.WriteMessage(&msgCasted)
err := gs.gelfWriter.WriteMessage(&msg)
if err != nil {
if gs.lastErrorAt.IsZero() || gs.lastErrorAt.Add(printErrorThreshold).Before(time.Now()) {
fmt.Fprintf(os.Stderr, "failed to write to gelf: %v\n", err)