1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-07 21:37:04 +09:00

File sync: fix limit reached log message pollution

Key-value store: add handy JSON constructor
This commit is contained in:
Sergey 2024-06-17 15:53:41 +02:00
parent 629af192c4
commit c38a48e290
No known key found for this signature in database
GPG key ID: 3B6BEF79160221C6
4 changed files with 58 additions and 26 deletions

View file

@ -2,7 +2,6 @@ package reconciler
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
@ -84,13 +83,7 @@ func (r *reconciler) Init(a *app.App) error {
return fmt.Errorf("get badger: %w", err)
}
r.isStartedStore = keyvaluestore.New(db, []byte("file_reconciler/is_started"), func(val bool) ([]byte, error) {
return json.Marshal(val)
}, func(data []byte) (bool, error) {
var val bool
err := json.Unmarshal(data, &val)
return val, err
})
r.isStartedStore = keyvaluestore.NewJson[bool](db, []byte("file_reconciler/is_started"))
r.deletedFiles = keyvaluestore.New(db, []byte("file_reconciler/deleted_files"), func(_ struct{}) ([]byte, error) {
return []byte(""), nil
}, func(data []byte) (struct{}, error) {

View file

@ -2,7 +2,6 @@ package filesync
import (
"context"
"encoding/json"
"errors"
"io"
"net/http"
@ -78,11 +77,12 @@ type fileSync struct {
onUploadStarted StatusCallback
onLimited StatusCallback
uploadingQueue *persistentqueue.Queue[*QueueItem]
retryUploadingQueue *persistentqueue.Queue[*QueueItem]
deletionQueue *persistentqueue.Queue[*deletionQueueItem]
retryDeletionQueue *persistentqueue.Queue[*deletionQueueItem]
blocksAvailabilityCache keyvaluestore.Store[*blocksAvailabilityResponse]
uploadingQueue *persistentqueue.Queue[*QueueItem]
retryUploadingQueue *persistentqueue.Queue[*QueueItem]
deletionQueue *persistentqueue.Queue[*deletionQueueItem]
retryDeletionQueue *persistentqueue.Queue[*deletionQueueItem]
blocksAvailabilityCache keyvaluestore.Store[*blocksAvailabilityResponse]
isLimitReachedErrorLogged keyvaluestore.Store[bool]
importEventsMutex sync.Mutex
importEvents []*pb.Event
@ -103,13 +103,8 @@ func (s *fileSync) Init(a *app.App) (err error) {
return
}
s.blocksAvailabilityCache = keyvaluestore.New(db, []byte(keyPrefix+"bytes_to_upload"), func(val *blocksAvailabilityResponse) ([]byte, error) {
return json.Marshal(val)
}, func(data []byte) (*blocksAvailabilityResponse, error) {
val := &blocksAvailabilityResponse{}
err := json.Unmarshal(data, val)
return val, err
})
s.blocksAvailabilityCache = keyvaluestore.NewJson[*blocksAvailabilityResponse](db, []byte(keyPrefix+"bytes_to_upload"))
s.isLimitReachedErrorLogged = keyvaluestore.NewJson[bool](db, []byte(keyPrefix+"limit_reached_error_logged"))
s.uploadingQueue = persistentqueue.New(persistentqueue.NewBadgerStorage(db, uploadingKeyPrefix, makeQueueItem), log.Logger, s.uploadingHandler)
s.retryUploadingQueue = persistentqueue.New(persistentqueue.NewBadgerStorage(db, retryUploadingKeyPrefix, makeQueueItem), log.Logger, s.retryingHandler, persistentqueue.WithRetryPause(loopTimeout))

View file

@ -72,6 +72,11 @@ func (s *fileSync) handleLimitReachedError(err error, it *QueueItem) *errLimitRe
}
var limitReachedErr *errLimitReached
if errors.As(err, &limitReachedErr) {
setErr := s.isLimitReachedErrorLogged.Set(it.ObjectId, true)
if setErr != nil {
log.Error("set limit reached error logged", zap.String("objectId", it.ObjectId), zap.Error(setErr))
}
s.runOnLimitedHook(it.ObjectId, it.FullFileId())
if it.AddedByUser && !it.Imported {
@ -140,11 +145,22 @@ func (s *fileSync) retryingHandler(ctx context.Context, it *QueueItem) (persiste
}
err = s.uploadFile(ctx, spaceId, fileId)
if err != nil {
log.Error("retry uploading file error",
zap.String("fileId", fileId.String()), zap.Error(err),
zap.String("objectId", it.ObjectId),
)
s.handleLimitReachedError(err, it)
limitErr := s.handleLimitReachedError(err, it)
var limitErrorIsLogged bool
if limitErr != nil {
var hasErr error
limitErrorIsLogged, hasErr = s.isLimitReachedErrorLogged.Has(it.ObjectId)
if hasErr != nil {
log.Error("check if limit reached error is logged", zap.String("objectId", it.ObjectId), zap.Error(hasErr))
}
}
if limitErr == nil || !limitErrorIsLogged {
log.Error("retry uploading file error",
zap.String("fileId", fileId.String()), zap.Error(err),
zap.String("objectId", it.ObjectId),
)
}
return persistentqueue.ActionRetry, nil
}
@ -312,6 +328,10 @@ func (s *fileSync) uploadFile(ctx context.Context, spaceID string, fileId domain
if err != nil {
log.Warn("delete blocks availability cache entry", zap.String("fileId", fileId.String()), zap.Error(err))
}
err = s.isLimitReachedErrorLogged.Delete(fileId.String())
if err != nil {
log.Warn("delete limit reached error logged", zap.String("fileId", fileId.String()), zap.Error(err))
}
log.Warn("done upload", zap.String("fileId", fileId.String()), zap.Int("bytesToUpload", blocksAvailability.bytesToUpload), zap.Int("bytesUploaded", totalBytesUploaded))
return nil

View file

@ -1,6 +1,7 @@
package keyvaluestore
import (
"encoding/json"
"fmt"
"github.com/dgraph-io/badger/v4"
@ -40,6 +41,19 @@ func New[T any](
}
}
// NewJson creates a new Store that marshals and unmarshals values as JSON
func NewJson[T any](
db *badger.DB,
prefix []byte,
) Store[T] {
return &store[T]{
prefix: prefix,
db: db,
marshaller: JsonMarshal[T],
unmarshaller: JsonUnmarshal[T],
}
}
func (s *store[T]) Get(key string) (T, error) {
val, err := badgerhelper.GetValue(s.db, s.makeKey(key), s.unmarshaller)
if badgerhelper.IsNotFound(err) {
@ -77,3 +91,13 @@ func (s *store[T]) Delete(key string) error {
func (s *store[T]) makeKey(key string) []byte {
return append(s.prefix, []byte(key)...)
}
func JsonMarshal[T any](val T) ([]byte, error) {
return json.Marshal(val)
}
func JsonUnmarshal[T any](data []byte) (T, error) {
var val T
err := json.Unmarshal(data, &val)
return val, err
}