From c38a48e29005d212f25472024dd9e5e4fd12c048 Mon Sep 17 00:00:00 2001 From: Sergey Date: Mon, 17 Jun 2024 15:53:41 +0200 Subject: [PATCH] File sync: fix limit reached log message pollution Key-value store: add handy JSON constructor --- core/files/reconciler/reconciler.go | 9 +------- core/filestorage/filesync/filesync.go | 21 +++++++------------ core/filestorage/filesync/upload.go | 30 ++++++++++++++++++++++----- util/keyvaluestore/store.go | 24 +++++++++++++++++++++ 4 files changed, 58 insertions(+), 26 deletions(-) diff --git a/core/files/reconciler/reconciler.go b/core/files/reconciler/reconciler.go index 25f70ba63..b4d4935f0 100644 --- a/core/files/reconciler/reconciler.go +++ b/core/files/reconciler/reconciler.go @@ -2,7 +2,6 @@ package reconciler import ( "context" - "encoding/json" "errors" "fmt" "sync" @@ -84,13 +83,7 @@ func (r *reconciler) Init(a *app.App) error { return fmt.Errorf("get badger: %w", err) } - r.isStartedStore = keyvaluestore.New(db, []byte("file_reconciler/is_started"), func(val bool) ([]byte, error) { - return json.Marshal(val) - }, func(data []byte) (bool, error) { - var val bool - err := json.Unmarshal(data, &val) - return val, err - }) + r.isStartedStore = keyvaluestore.NewJson[bool](db, []byte("file_reconciler/is_started")) r.deletedFiles = keyvaluestore.New(db, []byte("file_reconciler/deleted_files"), func(_ struct{}) ([]byte, error) { return []byte(""), nil }, func(data []byte) (struct{}, error) { diff --git a/core/filestorage/filesync/filesync.go b/core/filestorage/filesync/filesync.go index f77bd0c90..3a28badd5 100644 --- a/core/filestorage/filesync/filesync.go +++ b/core/filestorage/filesync/filesync.go @@ -2,7 +2,6 @@ package filesync import ( "context" - "encoding/json" "errors" "io" "net/http" @@ -78,11 +77,12 @@ type fileSync struct { onUploadStarted StatusCallback onLimited StatusCallback - uploadingQueue *persistentqueue.Queue[*QueueItem] - retryUploadingQueue *persistentqueue.Queue[*QueueItem] - deletionQueue *persistentqueue.Queue[*deletionQueueItem] - retryDeletionQueue *persistentqueue.Queue[*deletionQueueItem] - blocksAvailabilityCache keyvaluestore.Store[*blocksAvailabilityResponse] + uploadingQueue *persistentqueue.Queue[*QueueItem] + retryUploadingQueue *persistentqueue.Queue[*QueueItem] + deletionQueue *persistentqueue.Queue[*deletionQueueItem] + retryDeletionQueue *persistentqueue.Queue[*deletionQueueItem] + blocksAvailabilityCache keyvaluestore.Store[*blocksAvailabilityResponse] + isLimitReachedErrorLogged keyvaluestore.Store[bool] importEventsMutex sync.Mutex importEvents []*pb.Event @@ -103,13 +103,8 @@ func (s *fileSync) Init(a *app.App) (err error) { return } - s.blocksAvailabilityCache = keyvaluestore.New(db, []byte(keyPrefix+"bytes_to_upload"), func(val *blocksAvailabilityResponse) ([]byte, error) { - return json.Marshal(val) - }, func(data []byte) (*blocksAvailabilityResponse, error) { - val := &blocksAvailabilityResponse{} - err := json.Unmarshal(data, val) - return val, err - }) + s.blocksAvailabilityCache = keyvaluestore.NewJson[*blocksAvailabilityResponse](db, []byte(keyPrefix+"bytes_to_upload")) + s.isLimitReachedErrorLogged = keyvaluestore.NewJson[bool](db, []byte(keyPrefix+"limit_reached_error_logged")) s.uploadingQueue = persistentqueue.New(persistentqueue.NewBadgerStorage(db, uploadingKeyPrefix, makeQueueItem), log.Logger, s.uploadingHandler) s.retryUploadingQueue = persistentqueue.New(persistentqueue.NewBadgerStorage(db, retryUploadingKeyPrefix, makeQueueItem), log.Logger, s.retryingHandler, persistentqueue.WithRetryPause(loopTimeout)) diff --git a/core/filestorage/filesync/upload.go b/core/filestorage/filesync/upload.go index de7cdc356..adfd45e49 100644 --- a/core/filestorage/filesync/upload.go +++ b/core/filestorage/filesync/upload.go @@ -72,6 +72,11 @@ func (s *fileSync) handleLimitReachedError(err error, it *QueueItem) *errLimitRe } var limitReachedErr *errLimitReached if errors.As(err, &limitReachedErr) { + setErr := s.isLimitReachedErrorLogged.Set(it.ObjectId, true) + if setErr != nil { + log.Error("set limit reached error logged", zap.String("objectId", it.ObjectId), zap.Error(setErr)) + } + s.runOnLimitedHook(it.ObjectId, it.FullFileId()) if it.AddedByUser && !it.Imported { @@ -140,11 +145,22 @@ func (s *fileSync) retryingHandler(ctx context.Context, it *QueueItem) (persiste } err = s.uploadFile(ctx, spaceId, fileId) if err != nil { - log.Error("retry uploading file error", - zap.String("fileId", fileId.String()), zap.Error(err), - zap.String("objectId", it.ObjectId), - ) - s.handleLimitReachedError(err, it) + limitErr := s.handleLimitReachedError(err, it) + var limitErrorIsLogged bool + if limitErr != nil { + var hasErr error + limitErrorIsLogged, hasErr = s.isLimitReachedErrorLogged.Has(it.ObjectId) + if hasErr != nil { + log.Error("check if limit reached error is logged", zap.String("objectId", it.ObjectId), zap.Error(hasErr)) + } + } + if limitErr == nil || !limitErrorIsLogged { + log.Error("retry uploading file error", + zap.String("fileId", fileId.String()), zap.Error(err), + zap.String("objectId", it.ObjectId), + ) + } + return persistentqueue.ActionRetry, nil } @@ -312,6 +328,10 @@ func (s *fileSync) uploadFile(ctx context.Context, spaceID string, fileId domain if err != nil { log.Warn("delete blocks availability cache entry", zap.String("fileId", fileId.String()), zap.Error(err)) } + err = s.isLimitReachedErrorLogged.Delete(fileId.String()) + if err != nil { + log.Warn("delete limit reached error logged", zap.String("fileId", fileId.String()), zap.Error(err)) + } log.Warn("done upload", zap.String("fileId", fileId.String()), zap.Int("bytesToUpload", blocksAvailability.bytesToUpload), zap.Int("bytesUploaded", totalBytesUploaded)) return nil diff --git a/util/keyvaluestore/store.go b/util/keyvaluestore/store.go index d4fc611bc..16b61f10f 100644 --- a/util/keyvaluestore/store.go +++ b/util/keyvaluestore/store.go @@ -1,6 +1,7 @@ package keyvaluestore import ( + "encoding/json" "fmt" "github.com/dgraph-io/badger/v4" @@ -40,6 +41,19 @@ func New[T any]( } } +// NewJson creates a new Store that marshals and unmarshals values as JSON +func NewJson[T any]( + db *badger.DB, + prefix []byte, +) Store[T] { + return &store[T]{ + prefix: prefix, + db: db, + marshaller: JsonMarshal[T], + unmarshaller: JsonUnmarshal[T], + } +} + func (s *store[T]) Get(key string) (T, error) { val, err := badgerhelper.GetValue(s.db, s.makeKey(key), s.unmarshaller) if badgerhelper.IsNotFound(err) { @@ -77,3 +91,13 @@ func (s *store[T]) Delete(key string) error { func (s *store[T]) makeKey(key string) []byte { return append(s.prefix, []byte(key)...) } + +func JsonMarshal[T any](val T) ([]byte, error) { + return json.Marshal(val) +} + +func JsonUnmarshal[T any](data []byte) (T, error) { + var val T + err := json.Unmarshal(data, &val) + return val, err +}