1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-10 18:10:49 +09:00

GO-1340: Bytes usage events; some queue refactoring

This commit is contained in:
Sergey 2023-04-25 18:17:59 +02:00 committed by Mikhail Iudin
parent 1e7e85e7f5
commit 5bdd3a4f74
No known key found for this signature in database
GPG key ID: FAAAA8BAABDFF1C0
9 changed files with 1022 additions and 375 deletions

View file

@ -158,7 +158,7 @@ func Bootstrap(a *app.App, components ...app.Component) {
Register(rpcstore.New()).
Register(fileStore).
Register(fileservice.New()).
Register(filestorage.New()).
Register(filestorage.New(eventService.Send)).
Register(fileSyncService).
Register(localdiscovery.New()).
Register(spaceService).

View file

@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/app/logger"
@ -20,6 +21,7 @@ import (
"github.com/anytypeio/go-anytype-middleware/core/anytype/config"
"github.com/anytypeio/go-anytype-middleware/core/filestorage/rpcstore"
"github.com/anytypeio/go-anytype-middleware/core/wallet"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/datastore"
"github.com/anytypeio/go-anytype-middleware/space"
"github.com/anytypeio/go-anytype-middleware/space/storage"
@ -30,8 +32,10 @@ const FlatfsDirName = "flatfs"
var log = logger.NewNamed(CName)
func New() FileStorage {
return &fileStorage{}
func New(sendEvent func(event *pb.Event)) FileStorage {
return &fileStorage{
sendEvent: sendEvent,
}
}
type FileStorage interface {
@ -42,20 +46,19 @@ type FileStorage interface {
}
type fileStorage struct {
proxy *proxyStore
handler *rpcHandler
cfg *config.Config
flatfsPath string
fileblockstore.BlockStoreLocal
cfg *config.Config
flatfsPath string
provider datastore.Datastore
rpcStore rpcstore.Service
spaceService space.Service
handler *rpcHandler
spaceStorage storage.ClientStorage
sendEvent func(event *pb.Event)
localStore *flatStore
}
var _ fileblockstore.BlockStoreLocal = &fileStorage{}
func (f *fileStorage) Init(a *app.App) (err error) {
cfg := app.MustComponent[*config.Config](a)
f.cfg = cfg
@ -89,11 +92,12 @@ func (f *fileStorage) patchAccountIdCtx(ctx context.Context) context.Context {
}
func (f *fileStorage) Run(ctx context.Context) (err error) {
localStore, err := newFlatStore(f.flatfsPath)
localStore, err := newFlatStore(f.flatfsPath, f.sendEvent, 2*time.Second)
if err != nil {
return fmt.Errorf("flatstore: %w", err)
}
f.handler.store = localStore
f.localStore = localStore
oldStore, storeErr := f.initOldStore()
if storeErr != nil {
@ -104,7 +108,7 @@ func (f *fileStorage) Run(ctx context.Context) (err error) {
origin: f.rpcStore.NewStore(),
oldStore: oldStore,
}
f.proxy = ps
f.BlockStoreLocal = ps
return
}
@ -119,33 +123,33 @@ func (f *fileStorage) initOldStore() (*badger.DB, error) {
}
func (f *fileStorage) LocalDiskUsage(ctx context.Context) (uint64, error) {
return f.proxy.localStore.ds.DiskUsage(ctx)
return f.localStore.ds.DiskUsage(ctx)
}
func (f *fileStorage) Get(ctx context.Context, k cid.Cid) (b blocks.Block, err error) {
return f.proxy.Get(f.patchAccountIdCtx(ctx), k)
return f.BlockStoreLocal.Get(f.patchAccountIdCtx(ctx), k)
}
func (f *fileStorage) GetMany(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
return f.proxy.GetMany(f.patchAccountIdCtx(ctx), ks)
return f.BlockStoreLocal.GetMany(f.patchAccountIdCtx(ctx), ks)
}
func (f *fileStorage) Add(ctx context.Context, bs []blocks.Block) (err error) {
return f.proxy.Add(f.patchAccountIdCtx(ctx), bs)
return f.BlockStoreLocal.Add(f.patchAccountIdCtx(ctx), bs)
}
func (f *fileStorage) Delete(ctx context.Context, k cid.Cid) error {
return f.proxy.Delete(f.patchAccountIdCtx(ctx), k)
return f.BlockStoreLocal.Delete(f.patchAccountIdCtx(ctx), k)
}
func (f *fileStorage) ExistsCids(ctx context.Context, ks []cid.Cid) (exists []cid.Cid, err error) {
return f.proxy.ExistsCids(f.patchAccountIdCtx(ctx), ks)
return f.BlockStoreLocal.ExistsCids(f.patchAccountIdCtx(ctx), ks)
}
func (f *fileStorage) NotExistsBlocks(ctx context.Context, bs []blocks.Block) (notExists []blocks.Block, err error) {
return f.proxy.NotExistsBlocks(f.patchAccountIdCtx(ctx), bs)
return f.BlockStoreLocal.NotExistsBlocks(f.patchAccountIdCtx(ctx), bs)
}
func (f *fileStorage) Close(ctx context.Context) (err error) {
return f.proxy.Close()
return
}

View file

@ -3,6 +3,7 @@ package filesync
import (
"context"
"fmt"
"sync"
"time"
"github.com/anytypeio/any-sync/app"
@ -31,12 +32,6 @@ var loopTimeout = time.Minute
var errReachedLimit = fmt.Errorf("file upload limit has been reached")
func New(sendEvent func(event *pb.Event)) FileSync {
return &fileSync{
sendEvent: sendEvent,
}
}
//go:generate mockgen -package mock_filesync -destination ./mock_filesync/filesync_mock.go github.com/anytypeio/go-anytype-middleware/core/filestorage/filesync FileSync
type FileSync interface {
AddFile(spaceId, fileId string) (err error)
@ -66,6 +61,16 @@ type fileSync struct {
dagService ipld.DAGService
fileStore filestore.FileStore
sendEvent func(event *pb.Event)
spaceStatsLock sync.Mutex
spaceStats map[string]SpaceStat
}
func New(sendEvent func(event *pb.Event)) FileSync {
return &fileSync{
sendEvent: sendEvent,
spaceStats: map[string]SpaceStat{},
}
}
func (f *fileSync) Init(a *app.App) (err error) {
@ -159,7 +164,6 @@ func (f *fileSync) addOperation() {
}
if err != nil {
log.Warn("can't upload file", zap.String("fileID", fileID), zap.Error(err))
return
}
}
}
@ -205,9 +209,18 @@ func (f *fileSync) tryToUpload() (string, error) {
return fileId, err
}
log.Info("done upload", zap.String("fileID", fileId))
f.updateSpaceUsageInformation(spaceId)
return fileId, f.queue.DoneUpload(spaceId, fileId)
}
func (f *fileSync) updateSpaceUsageInformation(spaceId string) {
if _, err := f.SpaceStat(context.Background(), spaceId); err != nil {
log.Warn("can't get space usage information", zap.String("spaceId", spaceId), zap.Error(err))
}
}
func (f *fileSync) sendLimitReachedEvent(spaceID string, fileID string) {
f.sendEvent(&pb.Event{
Messages: []*pb.EventMessage{
@ -239,27 +252,42 @@ func (f *fileSync) removeLoop() {
case <-f.removePingCh:
case <-time.After(loopTimeout):
}
for {
spaceId, fileId, err := f.queue.GetRemove()
if err != nil {
if err != errQueueIsEmpty {
log.Warn("queue get remove task error", zap.Error(err))
}
break
}
if err = f.removeFile(f.loopCtx, spaceId, fileId); err != nil {
log.Warn("remove file error", zap.Error(err))
break
} else {
if err = f.queue.DoneRemove(spaceId, fileId); err != nil {
log.Warn("can't mark remove task as done", zap.Error(err))
break
}
}
f.removeOperation()
}
}
func (f *fileSync) removeOperation() {
for {
fileID, err := f.tryToRemove()
if err == errQueueIsEmpty {
return
}
if err != nil {
log.Warn("can't remove file", zap.String("fileID", fileID), zap.Error(err))
}
}
}
func (f *fileSync) tryToRemove() (string, error) {
spaceID, fileID, err := f.queue.GetRemove()
if err == errQueueIsEmpty {
return fileID, errQueueIsEmpty
}
if err != nil {
return fileID, fmt.Errorf("get remove task from queue: %w", err)
}
if err = f.removeFile(f.loopCtx, spaceID, fileID); err != nil {
return fileID, fmt.Errorf("remove file: %w", err)
}
if err = f.queue.DoneRemove(spaceID, fileID); err != nil {
return fileID, fmt.Errorf("mark remove task as done: %w", err)
}
f.updateSpaceUsageInformation(spaceID)
return fileID, nil
}
func (f *fileSync) uploadFile(ctx context.Context, spaceId, fileId string) (err error) {
log.Info("uploading file", zap.String("fileId", fileId))

View file

@ -106,6 +106,7 @@ func (s *fileSyncStore) GetRemove() (spaceId, fileId string, err error) {
return s.getOne(removeKeyPrefix)
}
// getOne returns the oldest key from the queue with given prefix
func (s *fileSyncStore) getOne(prefix []byte) (spaceId, fileId string, err error) {
err = s.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.IteratorOptions{
@ -119,10 +120,6 @@ func (s *fileSyncStore) getOne(prefix []byte) (spaceId, fileId string, err error
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
timestamp, err := getTimestamp(item)
{
fid, sid := extractFileAndSpaceID(item)
fmt.Printf("fileId: %s, spaceId: %s, timestamp: %d\n", fid, sid, timestamp)
}
if err != nil {
return fmt.Errorf("get timestamp: %w", err)
}

View file

@ -9,6 +9,7 @@ import (
ipld "github.com/ipfs/go-ipld-format"
"github.com/samber/lo"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/util/conc"
)
@ -41,7 +42,7 @@ func (f *fileSync) SpaceStat(ctx context.Context, spaceId string) (ss SpaceStat,
if err != nil {
return
}
return SpaceStat{
newStats := SpaceStat{
SpaceId: spaceId,
FileCount: int(info.FilesCount),
CidsCount: int(info.CidsCount),
@ -49,7 +50,33 @@ func (f *fileSync) SpaceStat(ctx context.Context, spaceId string) (ss SpaceStat,
// TODO Remove after test
BytesLimit: 52_428_800,
// BytesLimit: int(info.LimitBytes),
}, nil
}
f.spaceStatsLock.Lock()
prevStats, ok := f.spaceStats[spaceId]
if prevStats != newStats {
f.spaceStats[spaceId] = newStats
// Do not send event if it is first time we get stats
if ok {
f.sendSpaceUsageEvent(uint64(newStats.BytesUsage))
}
}
f.spaceStatsLock.Unlock()
return newStats, nil
}
func (f *fileSync) sendSpaceUsageEvent(bytesUsage uint64) {
f.sendEvent(&pb.Event{
Messages: []*pb.EventMessage{
{
Value: &pb.EventMessageValueOfFileSpaceUsage{
FileSpaceUsage: &pb.EventFileSpaceUsage{
BytesUsage: bytesUsage,
},
},
},
},
})
}
func (f *fileSync) FileListStats(ctx context.Context, spaceID string, fileIDs []string) ([]FileStat, error) {

View file

@ -5,6 +5,8 @@ import (
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/anytypeio/any-sync/commonfile/fileproto"
blocks "github.com/ipfs/go-block-format"
@ -13,13 +15,16 @@ import (
flatfs "github.com/ipfs/go-ds-flatfs"
format "github.com/ipfs/go-ipld-format"
"go.uber.org/zap"
"github.com/anytypeio/go-anytype-middleware/pb"
)
type flatStore struct {
ds *flatfs.Datastore
ds *flatfs.Datastore
localBytesUsageEventSender *localBytesUsageEventSender
}
func newFlatStore(path string) (*flatStore, error) {
func newFlatStore(path string, sendEvent func(event *pb.Event), sendEventBatchTimeout time.Duration) (*flatStore, error) {
if _, err := os.Stat(path); os.IsNotExist(err) {
if err := os.MkdirAll(path, 0755); err != nil {
return nil, fmt.Errorf("mkdir: %w", err)
@ -29,7 +34,15 @@ func newFlatStore(path string) (*flatStore, error) {
if err != nil {
return nil, err
}
return &flatStore{ds: ds}, nil
bytesUsage, err := ds.DiskUsage(context.Background())
if err != nil {
log.Error("can't get initial disk usage", zap.Error(err))
}
return &flatStore{
ds: ds,
localBytesUsageEventSender: newLocalBytesUsageEventSender(sendEvent, sendEventBatchTimeout, bytesUsage),
}, nil
}
func (f *flatStore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
@ -70,11 +83,24 @@ func (f *flatStore) Add(ctx context.Context, bs []blocks.Block) error {
return fmt.Errorf("put %s: %w", dskey(b.Cid()), err)
}
}
f.sendLocalBytesUsageEvent(ctx)
return nil
}
func (f *flatStore) Delete(ctx context.Context, c cid.Cid) error {
return f.ds.Delete(ctx, dskey(c))
err := f.ds.Delete(ctx, dskey(c))
if err != nil {
return err
}
f.sendLocalBytesUsageEvent(ctx)
return nil
}
func (f *flatStore) sendLocalBytesUsageEvent(ctx context.Context) {
du, err := f.ds.DiskUsage(ctx)
if err == nil {
f.localBytesUsageEventSender.sendLocalBytesUsageEvent(du)
}
}
func (f *flatStore) PartitionByExistence(ctx context.Context, ks []cid.Cid) (exist []cid.Cid, notExist []cid.Cid, err error) {
@ -126,3 +152,51 @@ func (f *flatStore) BlockAvailability(ctx context.Context, ks []cid.Cid) (availa
func (f *flatStore) Close() error {
return f.ds.Close()
}
type localBytesUsageEventSender struct {
sendEvent func(event *pb.Event)
batchPeriod time.Duration
sync.Mutex
timer *time.Timer
localBytesUsage uint64
}
func newLocalBytesUsageEventSender(sendEvent func(event *pb.Event), batchPeriod time.Duration, initialLocalBytesUsage uint64) *localBytesUsageEventSender {
d := &localBytesUsageEventSender{
sendEvent: sendEvent,
batchPeriod: batchPeriod,
localBytesUsage: initialLocalBytesUsage,
}
return d
}
func (d *localBytesUsageEventSender) sendLocalBytesUsageEvent(localBytesUsage uint64) {
d.Lock()
defer d.Unlock()
d.localBytesUsage = localBytesUsage
if d.timer == nil {
d.timer = time.AfterFunc(d.batchPeriod, func() {
d.Lock()
defer d.Unlock()
d.send(d.localBytesUsage)
d.timer = nil
})
}
}
func (d *localBytesUsageEventSender) send(usage uint64) {
d.sendEvent(&pb.Event{
Messages: []*pb.EventMessage{
{
Value: &pb.EventMessageValueOfFileLocalUsage{
FileLocalUsage: &pb.EventFileLocalUsage{
LocalBytesUsage: usage,
},
},
},
},
})
}

View file

@ -1234,6 +1234,8 @@
- [Event.Block.Set.Widget.Layout](#anytype-Event-Block-Set-Widget-Layout)
- [Event.File](#anytype-Event-File)
- [Event.File.LimitReached](#anytype-Event-File-LimitReached)
- [Event.File.LocalUsage](#anytype-Event-File-LocalUsage)
- [Event.File.SpaceUsage](#anytype-Event-File-SpaceUsage)
- [Event.Message](#anytype-Event-Message)
- [Event.Object](#anytype-Event-Object)
- [Event.Object.Details](#anytype-Event-Object-Details)
@ -19394,6 +19396,36 @@ Precondition: user A opened a block
<a name="anytype-Event-File-LocalUsage"></a>
### Event.File.LocalUsage
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| localBytesUsage | [uint64](#uint64) | | |
<a name="anytype-Event-File-SpaceUsage"></a>
### Event.File.SpaceUsage
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| bytesUsage | [uint64](#uint64) | | |
<a name="anytype-Event-Message"></a>
### Event.Message
@ -19460,6 +19492,8 @@ Precondition: user A opened a block
| processDone | [Event.Process.Done](#anytype-Event-Process-Done) | | |
| threadStatus | [Event.Status.Thread](#anytype-Event-Status-Thread) | | |
| fileLimitReached | [Event.File.LimitReached](#anytype-Event-File-LimitReached) | | |
| fileSpaceUsage | [Event.File.SpaceUsage](#anytype-Event-File-SpaceUsage) | | |
| fileLocalUsage | [Event.File.LocalUsage](#anytype-Event-File-LocalUsage) | | |

File diff suppressed because it is too large Load diff

View file

@ -92,6 +92,8 @@ message Event {
Status.Thread threadStatus = 110;
File.LimitReached fileLimitReached = 111;
File.SpaceUsage fileSpaceUsage = 112;
File.LocalUsage fileLocalUsage = 113;
}
}
@ -987,7 +989,10 @@ message Event {
message SpaceUsage {
uint64 bytesUsage = 1;
uint64 localBytesUsage = 2;
}
message LocalUsage {
uint64 localBytesUsage = 1;
}
}
}