1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-10 18:10:49 +09:00

Merge pull request #1690 from anyproto/go-4171-fix-potential-race-in-subscription-cache

GO-4171: Fix potential race in subscription cache
This commit is contained in:
Anastasia Shemyakinskaya 2024-10-17 12:44:19 +02:00 committed by GitHub
commit 6a44819a88
Signed by: github
GPG key ID: B5690EEEBB952194
2 changed files with 123 additions and 7 deletions

View file

@ -31,6 +31,8 @@ type collectionObserver struct {
objectStore spaceindex.Store
collectionService CollectionService
recBatch *mb.MB
spaceSubscription *spaceSubscriptions
}
func (s *spaceSubscriptions) newCollectionObserver(spaceId string, collectionID string, subID string) (*collectionObserver, error) {
@ -53,6 +55,8 @@ func (s *spaceSubscriptions) newCollectionObserver(spaceId string, collectionID
collectionService: s.collectionService,
idsSet: map[string]struct{}{},
spaceSubscription: s,
}
obs.ids = initialObjectIDs
for _, id := range initialObjectIDs {
@ -63,7 +67,7 @@ func (s *spaceSubscriptions) newCollectionObserver(spaceId string, collectionID
for {
select {
case objectIDs := <-objectsCh:
obs.updateIDs(objectIDs)
obs.updateIds(objectIDs)
case <-obs.closeCh:
return
}
@ -88,13 +92,13 @@ func (c *collectionObserver) close() {
func (c *collectionObserver) listEntries() []*entry {
c.lock.RLock()
defer c.lock.RUnlock()
entries := c.fetchEntries(c.ids)
entries := c.spaceSubscription.fetchEntries(c.ids)
res := make([]*entry, len(entries))
copy(res, entries)
return res
}
func (c *collectionObserver) updateIDs(ids []string) {
func (c *collectionObserver) updateIds(ids []string) {
c.lock.Lock()
defer c.lock.Unlock()
@ -107,7 +111,7 @@ func (c *collectionObserver) updateIDs(ids []string) {
}
c.ids = ids
entries := c.fetchEntries(append(removed, added...))
entries := c.spaceSubscription.fetchEntriesLocked(append(removed, added...))
for _, e := range entries {
err := c.recBatch.Add(database.Record{
Details: e.data,
@ -215,11 +219,17 @@ func (s *spaceSubscriptions) newCollectionSub(id string, spaceId string, collect
return sub, nil
}
func (c *collectionObserver) fetchEntries(ids []string) []*entry {
func (s *spaceSubscriptions) fetchEntriesLocked(ids []string) []*entry {
s.m.Lock()
defer s.m.Unlock()
return s.fetchEntries(ids)
}
func (s *spaceSubscriptions) fetchEntries(ids []string) []*entry {
res := make([]*entry, 0, len(ids))
missingIDs := make([]string, 0, len(ids))
for _, id := range ids {
if e := c.cache.Get(id); e != nil {
if e := s.cache.Get(id); e != nil {
res = append(res, e)
continue
}
@ -229,7 +239,7 @@ func (c *collectionObserver) fetchEntries(ids []string) []*entry {
if len(missingIDs) == 0 {
return res
}
recs, err := c.objectStore.QueryByIds(missingIDs)
recs, err := s.objectStore.QueryByIds(missingIDs)
if err != nil {
log.Error("can't query by ids:", err)
}

View file

@ -0,0 +1,106 @@
package subscription
import (
"testing"
"github.com/cheggaaa/mb"
"github.com/gogo/protobuf/types"
"github.com/stretchr/testify/assert"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/database"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore/spaceindex"
"github.com/anyproto/anytype-heart/util/pbtypes"
)
func Test_newCollectionObserver(t *testing.T) {
spaceId := "spaceId"
t.Run("fetch entries from cache", func(t *testing.T) {
// given
collectionService := NewMockCollectionService(t)
collectionID := "collectionId"
subId := "subId"
ch := make(chan []string)
collectionService.EXPECT().SubscribeForCollection(collectionID, subId).Return([]string{"id"}, ch, nil)
store := spaceindex.NewStoreFixture(t)
cache := newCache()
cache.Set(&entry{id: "id1", data: &types.Struct{Fields: map[string]*types.Value{
bundle.RelationKeyId.String(): pbtypes.String("id1"),
}}})
cache.Set(&entry{id: "id2", data: &types.Struct{Fields: map[string]*types.Value{
bundle.RelationKeyId.String(): pbtypes.String("id2"),
}}})
batcher := mb.New(0)
c := &spaceSubscriptions{
collectionService: collectionService,
objectStore: store,
recBatch: batcher,
cache: cache,
}
// when
observer, err := c.newCollectionObserver(spaceId, collectionID, subId)
// then
assert.NoError(t, err)
expectedIds := []string{"id1", "id2"}
ch <- expectedIds
close(observer.closeCh)
msgs := batcher.Wait()
var receivedIds []string
for _, msg := range msgs {
id := pbtypes.GetString(msg.(database.Record).Details, "id")
receivedIds = append(receivedIds, id)
}
assert.Equal(t, expectedIds, receivedIds)
err = batcher.Close()
assert.NoError(t, err)
})
t.Run("fetch entries from object store", func(t *testing.T) {
// given
collectionService := NewMockCollectionService(t)
collectionID := "collectionId"
subId := "subId"
ch := make(chan []string)
collectionService.EXPECT().SubscribeForCollection(collectionID, subId).Return([]string{"id"}, ch, nil)
store := spaceindex.NewStoreFixture(t)
store.AddObjects(t, []spaceindex.TestObject{
{
bundle.RelationKeyId: pbtypes.String("id1"),
bundle.RelationKeySpaceId: pbtypes.String(spaceId),
},
{
bundle.RelationKeyId: pbtypes.String("id2"),
bundle.RelationKeySpaceId: pbtypes.String(spaceId),
},
})
batcher := mb.New(0)
c := &spaceSubscriptions{
collectionService: collectionService,
objectStore: store,
recBatch: batcher,
cache: newCache(),
}
// when
observer, err := c.newCollectionObserver(spaceId, collectionID, subId)
// then
assert.NoError(t, err)
expectedIds := []string{"id1", "id2"}
ch <- expectedIds
close(observer.closeCh)
msgs := batcher.Wait()
var receivedIds []string
for _, msg := range msgs {
id := pbtypes.GetString(msg.(database.Record).Details, "id")
receivedIds = append(receivedIds, id)
}
assert.Equal(t, expectedIds, receivedIds)
err = batcher.Close()
assert.NoError(t, err)
})
}