1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-11 18:20:33 +09:00

GO-1084: Improve adding to file sync queue; fix bug with binding

This commit is contained in:
Sergey 2023-04-25 18:17:59 +02:00 committed by Mikhail Iudin
parent 247554c376
commit ce9e6411b0
No known key found for this signature in database
GPG key ID: FAAAA8BAABDFF1C0
6 changed files with 40 additions and 43 deletions

View file

@ -24,7 +24,6 @@ import (
"github.com/anytypeio/go-anytype-middleware/core/block/import/workerpool"
"github.com/anytypeio/go-anytype-middleware/core/block/object/objectcreator"
"github.com/anytypeio/go-anytype-middleware/core/block/process"
"github.com/anytypeio/go-anytype-middleware/core/filestorage/filesync"
"github.com/anytypeio/go-anytype-middleware/core/session"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/bundle"
@ -33,7 +32,6 @@ import (
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/filestore"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/objectstore"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/logging"
"github.com/anytypeio/go-anytype-middleware/space"
"github.com/anytypeio/go-anytype-middleware/space/typeprovider"
"github.com/anytypeio/go-anytype-middleware/util/pbtypes"
)
@ -88,9 +86,7 @@ func (i *Import) Init(a *app.App) (err error) {
relationCreator := NewRelationCreator(i.s, objCreator, fs, coreService, store)
i.objectIDGetter = NewObjectIDGetter(store, coreService, i.s)
fileStore := app.MustComponent[filestore.FileStore](a)
spaceService := app.MustComponent[space.Service](a)
fileSyncService := app.MustComponent[filesync.FileSync](a)
i.oc = NewCreator(i.s, objCreator, coreService, factory, relationCreator, store, fileStore, fileSyncService, spaceService)
i.oc = NewCreator(i.s, objCreator, coreService, factory, relationCreator, store, fileStore)
return nil
}

View file

@ -61,8 +61,6 @@ func NewCreator(service *block.Service,
relationCreator RelationCreator,
objectStore objectstore.ObjectStore,
fileStore filestore.FileStore,
fileSync filesync.FileSync,
spaceService space.Service,
) Creator {
return &ObjectCreator{
service: service,
@ -72,8 +70,6 @@ func NewCreator(service *block.Service,
relationCreator: relationCreator,
objectStore: objectStore,
fileStore: fileStore,
fileSync: fileSync,
spaceService: spaceService,
}
}
@ -150,7 +146,7 @@ func (oc *ObjectCreator) Create(ctx *session.Context,
oc.setArchived(snapshot, newID)
// we do not change relation ids during the migration
//oc.relationCreator.ReplaceRelationBlock(ctx, oldRelationBlocksToNew, newID)
// oc.relationCreator.ReplaceRelationBlock(ctx, oldRelationBlocksToNew, newID)
syncErr := oc.syncFilesAndLinks(ctx, st, newID)
if syncErr != nil {
@ -210,13 +206,6 @@ func (oc *ObjectCreator) updateRootBlock(snapshot *model.SmartBlockSnapshotBase,
}
func (oc *ObjectCreator) syncFilesAndLinks(ctx *session.Context, st *state.State, newID string) error {
for _, fileID := range st.GetAllFileHashes(st.FileRelationKeys()) {
log.With(zap.String("fileID", fileID)).Info("sync file link")
if sErr := oc.fileSync.AddFile(oc.spaceService.AccountId(), fileID); sErr != nil {
log.With(zap.String("object id", newID)).Errorf("sync file link: %s", sErr)
}
}
return st.Iterate(func(bl simple.Block) (isContinue bool) {
s := oc.syncFactory.GetSyncer(bl)
if s != nil {

View file

@ -124,10 +124,6 @@ func (s *service) fileAdd(ctx context.Context, opts AddOptions) (string, *storag
return "", nil, err
}
if err = s.fileSync.AddFile(s.spaceService.AccountId(), nodeHash); err != nil {
return "", nil, err
}
if err = s.fileStore.AddFileKeys(filestore.FileKeys{
Hash: nodeHash,
Keys: keys.KeysByPath,
@ -369,20 +365,23 @@ func (s *service) fileIndexData(ctx context.Context, inode ipld.Node, data strin
}
// fileIndexNode walks a file node, indexing file links
func (s *service) fileIndexNode(ctx context.Context, inode ipld.Node, data string) error {
links := inode.Links()
if looksLikeFileNode(inode) {
return s.fileIndexLink(ctx, inode, data)
func (s *service) fileIndexNode(ctx context.Context, inode ipld.Node, fileID string) error {
if err := s.addToSyncQueue(fileID); err != nil {
return fmt.Errorf("add file %s to sync queue: %w", fileID, err)
}
if looksLikeFileNode(inode) {
return s.fileIndexLink(ctx, inode, fileID)
}
links := inode.Links()
for _, link := range links {
n, err := helpers.NodeAtLink(ctx, s.dagService, link)
if err != nil {
return err
}
err = s.fileIndexLink(ctx, n, data)
err = s.fileIndexLink(ctx, n, fileID)
if err != nil {
return err
}
@ -392,13 +391,12 @@ func (s *service) fileIndexNode(ctx context.Context, inode ipld.Node, data strin
}
// fileIndexLink indexes a file link
func (s *service) fileIndexLink(ctx context.Context, inode ipld.Node, data string) error {
func (s *service) fileIndexLink(ctx context.Context, inode ipld.Node, fileID string) error {
dlink := schema.LinkByName(inode.Links(), ValidContentLinkNames)
if dlink == nil {
return ErrMissingContentLink
}
return s.fileStore.AddTarget(dlink.Cid.String(), data)
return s.fileStore.AddTarget(dlink.Cid.String(), fileID)
}
func (s *service) fileInfoFromPath(target string, path string, key string) (*storage.FileInfo, error) {
@ -753,6 +751,10 @@ func (s *service) fileBuildDirectory(ctx context.Context, reader io.ReadSeeker,
}
func (s *service) fileIndexInfo(ctx context.Context, hash string, updateIfExists bool) ([]*storage.FileInfo, error) {
if err := s.addToSyncQueue(hash); err != nil {
return nil, fmt.Errorf("add file %s to sync queue: %w", hash, err)
}
links, err := helpers.LinksAtCid(ctx, s.dagService, hash)
if err != nil {
return nil, err
@ -806,6 +808,10 @@ func (s *service) fileIndexInfo(ctx context.Context, hash string, updateIfExists
return files, nil
}
func (s *service) addToSyncQueue(fileID string) error {
return s.fileSync.AddFile(s.spaceService.AccountId(), fileID)
}
// looksLikeFileNode returns whether a node appears to
// be a textile node. It doesn't inspect the actual data.
func looksLikeFileNode(node ipld.Node) bool {

View file

@ -98,10 +98,6 @@ func (s *service) imageAdd(ctx context.Context, opts AddOptions) (string, map[in
return "", nil, err
}
if err = s.fileSync.AddFile(s.spaceService.AccountId(), nodeHash); err != nil {
return "", nil, err
}
var variantsByWidth = make(map[int]*storage.FileInfo, len(dir.Files))
for _, f := range dir.Files {
if f.Mill != "/image/resize" {

View file

@ -229,7 +229,7 @@ func (f *fileSync) uploadFile(ctx context.Context, spaceId, fileId string) (err
return fmt.Errorf("collect file blocks: %w", err)
}
bytesToUpload, blocksToUpload, err := f.selectBlocksToUpload(ctx, spaceId, fileBlocks)
bytesToUpload, blocksToUpload, err := f.selectBlocksToUpload(ctx, spaceId, fileId, fileBlocks)
if err != nil {
return fmt.Errorf("select blocks to upload: %w", err)
}
@ -276,7 +276,7 @@ func (f *fileSync) uploadFile(ctx context.Context, spaceId, fileId string) (err
return <-dagErr
}
func (f *fileSync) selectBlocksToUpload(ctx context.Context, spaceId string, fileBlocks []blocks.Block) (int, []blocks.Block, error) {
func (f *fileSync) selectBlocksToUpload(ctx context.Context, spaceId string, fileId string, fileBlocks []blocks.Block) (int, []blocks.Block, error) {
fileCids := lo.Map(fileBlocks, func(b blocks.Block, _ int) cid.Cid {
return b.Cid()
})
@ -288,14 +288,15 @@ func (f *fileSync) selectBlocksToUpload(ctx context.Context, spaceId string, fil
var (
bytesToUpload int
blocksToUpload []blocks.Block
cidsToBind []cid.Cid
)
for _, availability := range availabilities {
if availability.Status == fileproto.AvailabilityStatus_NotExists {
blockCid, err := cid.Cast(availability.Cid)
if err != nil {
return 0, nil, fmt.Errorf("cast cid: %w", err)
}
blockCid, err := cid.Cast(availability.Cid)
if err != nil {
return 0, nil, fmt.Errorf("cast cid: %w", err)
}
if availability.Status == fileproto.AvailabilityStatus_NotExists {
b, ok := lo.Find(fileBlocks, func(b blocks.Block) bool {
return b.Cid() == blockCid
})
@ -305,8 +306,15 @@ func (f *fileSync) selectBlocksToUpload(ctx context.Context, spaceId string, fil
blocksToUpload = append(blocksToUpload, b)
bytesToUpload += len(b.RawData())
} else {
cidsToBind = append(cidsToBind, blockCid)
}
}
if bindErr := f.rpcStore.BindCids(ctx, spaceId, fileId, cidsToBind); bindErr != nil {
return 0, nil, fmt.Errorf("bind cids: %w", bindErr)
}
return bytesToUpload, blocksToUpload, nil
}

View file

@ -27,6 +27,8 @@ type RpcStore interface {
fileblockstore.BlockStore
CheckAvailability(ctx context.Context, spaceID string, cids []cid.Cid) (checkResult []*fileproto.BlockAvailability, err error)
BindCids(ctx context.Context, spaceID string, fileID string, cids []cid.Cid) (err error)
AddToFile(ctx context.Context, spaceId string, fileId string, bs []blocks.Block) (err error)
DeleteFiles(ctx context.Context, spaceId string, fileIds ...string) (err error)
SpaceInfo(ctx context.Context, spaceId string) (info *fileproto.SpaceInfoResponse, err error)
@ -175,7 +177,7 @@ func (s *store) AddToFile(ctx context.Context, spaceID string, fileID string, bs
if len(excludeCids) > 0 {
// bind existing ids
if err = s.bindCids(ctx, spaceID, fileID, excludeCids); err != nil {
if err = s.BindCids(ctx, spaceID, fileID, excludeCids); err != nil {
return err
}
@ -216,7 +218,7 @@ func (s *store) CheckAvailability(ctx context.Context, spaceID string, cids []ci
return
}
func (s *store) bindCids(ctx context.Context, spaceID string, fileID string, cids []cid.Cid) (err error) {
func (s *store) BindCids(ctx context.Context, spaceID string, fileID string, cids []cid.Cid) (err error) {
var ready = make(chan result, 1)
// check blocks availability
if err = s.cm.WriteOp(ctx, ready, func(c *client) (err error) {