forked from 0x2E/fusion
Recover after feed fetch failure with exponential backoff (#108)
* Recover after feed fetch failure with exponential backoff The current implementation stops attempting to fetch a feed if fusion encounters any error fetching it. The only way to continue fetching the feed is if the user manually forces a refresh. This allows fusion to recover from feed fetch errors by tracking the number of consecutive failures and slowing down requests for consistent failure. If a feed always fails, we eventually slow to only checking it once per week. Fixes #67 * Add comment
This commit is contained in:
parent
db029950d2
commit
df412f17d3
6 changed files with 187 additions and 38 deletions
|
@ -22,10 +22,13 @@ type Feed struct {
|
||||||
Link *string `gorm:"link;not null;uniqueIndex:idx_link"`
|
Link *string `gorm:"link;not null;uniqueIndex:idx_link"`
|
||||||
// LastBuild is the last time the content of the feed changed
|
// LastBuild is the last time the content of the feed changed
|
||||||
LastBuild *time.Time `gorm:"last_build"`
|
LastBuild *time.Time `gorm:"last_build"`
|
||||||
// Failure is the reason of failure. If it is not null or empty, the fetch processor
|
// Failure is the error message for the last fetch.
|
||||||
// should skip this feed
|
Failure *string `gorm:"failure;default:''"`
|
||||||
Failure *string `gorm:"failure;default:''"`
|
// ConsecutiveFailures is the number of consecutive times we've failed to
|
||||||
Suspended *bool `gorm:"suspended;default:false"`
|
// retrieve this feed.
|
||||||
|
ConsecutiveFailures uint `gorm:"consecutive_failures;default:0"`
|
||||||
|
|
||||||
|
Suspended *bool `gorm:"suspended;default:false"`
|
||||||
|
|
||||||
FeedRequestOptions
|
FeedRequestOptions
|
||||||
|
|
||||||
|
@ -33,10 +36,6 @@ type Feed struct {
|
||||||
Group Group
|
Group Group
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f Feed) IsFailed() bool {
|
|
||||||
return f.Failure != nil && *f.Failure != ""
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f Feed) IsSuspended() bool {
|
func (f Feed) IsSuspended() bool {
|
||||||
return f.Suspended != nil && *f.Suspended
|
return f.Suspended != nil && *f.Suspended
|
||||||
}
|
}
|
||||||
|
|
31
service/pull/backoff.go
Normal file
31
service/pull/backoff.go
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
package pull
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// maxBackoff is the maximum time to wait before checking a feed due to past
|
||||||
|
// errors.
|
||||||
|
const maxBackoff = 7 * 24 * time.Hour
|
||||||
|
|
||||||
|
// CalculateBackoffTime calculates the exponential backoff time based on the
|
||||||
|
// number of consecutive failures.
|
||||||
|
// The formula is: interval * (1.8 ^ consecutiveFailures), capped at maxBackoff.
|
||||||
|
func CalculateBackoffTime(consecutiveFailures uint) time.Duration {
|
||||||
|
// If no failures, no backoff needed
|
||||||
|
if consecutiveFailures == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
intervalMinutes := float64(interval.Minutes())
|
||||||
|
backoffMinutes := intervalMinutes * math.Pow(1.8, float64(consecutiveFailures))
|
||||||
|
|
||||||
|
// floats go to Inf if the number is too large to represent in a float type,
|
||||||
|
// so check that it's not +/- Inf.
|
||||||
|
if math.IsInf(backoffMinutes, 0) || backoffMinutes > maxBackoff.Minutes() {
|
||||||
|
return maxBackoff
|
||||||
|
}
|
||||||
|
|
||||||
|
return time.Duration(backoffMinutes) * time.Minute
|
||||||
|
}
|
55
service/pull/backoff_test.go
Normal file
55
service/pull/backoff_test.go
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
package pull_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"github.com/0x2e/fusion/service/pull"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCalculateBackoffTime(t *testing.T) {
|
||||||
|
for _, tt := range []struct {
|
||||||
|
name string
|
||||||
|
consecutiveFailures uint
|
||||||
|
expectedBackoff time.Duration
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no failures",
|
||||||
|
consecutiveFailures: 0,
|
||||||
|
expectedBackoff: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "one failure",
|
||||||
|
consecutiveFailures: 1,
|
||||||
|
expectedBackoff: 54 * time.Minute, // 30 * (1.8^1) = 54 minutes
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "two failures",
|
||||||
|
consecutiveFailures: 2,
|
||||||
|
expectedBackoff: 97 * time.Minute, // 30 * (1.8^2) = 97.2 minutes ≈ 97 minutes
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "three failures",
|
||||||
|
consecutiveFailures: 3,
|
||||||
|
expectedBackoff: 174 * time.Minute, // 30 * (1.8^3) = 174.96 minutes ≈ 174 minutes
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "many failures",
|
||||||
|
consecutiveFailures: 10000,
|
||||||
|
expectedBackoff: 7 * 24 * time.Hour, // Maximum backoff (7 days)
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "maximum failures",
|
||||||
|
consecutiveFailures: math.MaxUint,
|
||||||
|
expectedBackoff: 7 * 24 * time.Hour, // Maximum backoff (7 days)
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
backoff := pull.CalculateBackoffTime(tt.consecutiveFailures)
|
||||||
|
assert.Equal(t, tt.expectedBackoff, backoff)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -57,16 +57,22 @@ func (r FeedSkipReason) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
SkipReasonSuspended = FeedSkipReason{"user suspended feed updates"}
|
SkipReasonSuspended = FeedSkipReason{"user suspended feed updates"}
|
||||||
SkipReasonLastUpdateFailed = FeedSkipReason{"last update failed"}
|
SkipReasonCoolingOff = FeedSkipReason{"slowing down requests due to past failures to update feed"}
|
||||||
SkipReasonTooSoon = FeedSkipReason{"feed was updated too recently"}
|
SkipReasonTooSoon = FeedSkipReason{"feed was updated too recently"}
|
||||||
)
|
)
|
||||||
|
|
||||||
func DecideFeedUpdateAction(f *model.Feed, now time.Time) (FeedUpdateAction, *FeedSkipReason) {
|
func DecideFeedUpdateAction(f *model.Feed, now time.Time) (FeedUpdateAction, *FeedSkipReason) {
|
||||||
if f.IsSuspended() {
|
if f.IsSuspended() {
|
||||||
return ActionSkipUpdate, &SkipReasonSuspended
|
return ActionSkipUpdate, &SkipReasonSuspended
|
||||||
} else if f.IsFailed() {
|
} else if f.ConsecutiveFailures > 0 {
|
||||||
return ActionSkipUpdate, &SkipReasonLastUpdateFailed
|
backoffTime := CalculateBackoffTime(f.ConsecutiveFailures)
|
||||||
|
timeSinceUpdate := now.Sub(f.UpdatedAt)
|
||||||
|
if timeSinceUpdate < backoffTime {
|
||||||
|
logger := pullLogger.With("feed_id", f.ID, "feed_name", f.Name)
|
||||||
|
logger.Infof("%d consecutive feed update failures, so next attempt is after %v", f.ConsecutiveFailures, f.UpdatedAt.Add(backoffTime).Format(time.RFC3339))
|
||||||
|
return ActionSkipUpdate, &SkipReasonCoolingOff
|
||||||
|
}
|
||||||
} else if now.Sub(f.UpdatedAt) < interval {
|
} else if now.Sub(f.UpdatedAt) < interval {
|
||||||
return ActionSkipUpdate, &SkipReasonTooSoon
|
return ActionSkipUpdate, &SkipReasonTooSoon
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package pull_test
|
package pull_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -38,28 +39,6 @@ func TestDecideFeedUpdateAction(t *testing.T) {
|
||||||
expectedAction: pull.ActionSkipUpdate,
|
expectedAction: pull.ActionSkipUpdate,
|
||||||
expectedSkipReason: &pull.SkipReasonSuspended,
|
expectedSkipReason: &pull.SkipReasonSuspended,
|
||||||
},
|
},
|
||||||
{
|
|
||||||
description: "failed feed should skip update",
|
|
||||||
currentTime: parseTime("2025-01-01T12:00:00Z"),
|
|
||||||
feed: model.Feed{
|
|
||||||
Failure: ptr.To("dummy previous error"),
|
|
||||||
Suspended: ptr.To(false),
|
|
||||||
UpdatedAt: parseTime("2025-01-01T12:00:00Z"),
|
|
||||||
},
|
|
||||||
expectedAction: pull.ActionSkipUpdate,
|
|
||||||
expectedSkipReason: &pull.SkipReasonLastUpdateFailed,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
description: "recently updated feed should skip update",
|
|
||||||
currentTime: parseTime("2025-01-01T12:00:00Z"),
|
|
||||||
feed: model.Feed{
|
|
||||||
Failure: ptr.To(""),
|
|
||||||
Suspended: ptr.To(false),
|
|
||||||
UpdatedAt: parseTime("2025-01-01T11:45:00Z"), // 15 minutes before current time
|
|
||||||
},
|
|
||||||
expectedAction: pull.ActionSkipUpdate,
|
|
||||||
expectedSkipReason: &pull.SkipReasonTooSoon,
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
description: "feed should be updated when conditions are met",
|
description: "feed should be updated when conditions are met",
|
||||||
currentTime: parseTime("2025-01-01T12:00:00Z"),
|
currentTime: parseTime("2025-01-01T12:00:00Z"),
|
||||||
|
@ -93,6 +72,78 @@ func TestDecideFeedUpdateAction(t *testing.T) {
|
||||||
expectedAction: pull.ActionFetchUpdate,
|
expectedAction: pull.ActionFetchUpdate,
|
||||||
expectedSkipReason: nil,
|
expectedSkipReason: nil,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
description: "failed feed with 1 consecutive failure should skip update before 54 minutes",
|
||||||
|
currentTime: parseTime("2025-01-01T12:00:00Z"),
|
||||||
|
feed: model.Feed{
|
||||||
|
Failure: ptr.To("dummy previous error"),
|
||||||
|
Suspended: ptr.To(false),
|
||||||
|
UpdatedAt: parseTime("2025-01-01T11:15:00Z"), // 45 minutes before current time
|
||||||
|
ConsecutiveFailures: 1,
|
||||||
|
},
|
||||||
|
expectedAction: pull.ActionSkipUpdate,
|
||||||
|
expectedSkipReason: &pull.SkipReasonCoolingOff,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "failed feed with 1 consecutive failure should be updated after 54 minutes",
|
||||||
|
currentTime: parseTime("2025-01-01T12:00:00Z"),
|
||||||
|
feed: model.Feed{
|
||||||
|
Failure: ptr.To("dummy previous error"),
|
||||||
|
Suspended: ptr.To(false),
|
||||||
|
UpdatedAt: parseTime("2025-01-01T11:06:00Z"), // 54 minutes before current time
|
||||||
|
ConsecutiveFailures: 1,
|
||||||
|
},
|
||||||
|
expectedAction: pull.ActionFetchUpdate,
|
||||||
|
expectedSkipReason: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "failed feed with 3 consecutive failures should skip update for 174 minutes",
|
||||||
|
currentTime: parseTime("2025-01-01T12:00:00Z"),
|
||||||
|
feed: model.Feed{
|
||||||
|
Failure: ptr.To("dummy previous error"),
|
||||||
|
Suspended: ptr.To(false),
|
||||||
|
UpdatedAt: parseTime("2025-01-01T09:10:00Z"), // 170 minutes before current time
|
||||||
|
ConsecutiveFailures: 3,
|
||||||
|
},
|
||||||
|
expectedAction: pull.ActionSkipUpdate,
|
||||||
|
expectedSkipReason: &pull.SkipReasonCoolingOff,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "failed feed with 3 consecutive failures should be updated after 174 minutes",
|
||||||
|
currentTime: parseTime("2025-01-01T12:00:00Z"),
|
||||||
|
feed: model.Feed{
|
||||||
|
Failure: ptr.To("dummy previous error"),
|
||||||
|
Suspended: ptr.To(false),
|
||||||
|
UpdatedAt: parseTime("2025-01-01T09:06:00Z"), // 174 minutes before current time
|
||||||
|
ConsecutiveFailures: 3,
|
||||||
|
},
|
||||||
|
expectedAction: pull.ActionFetchUpdate,
|
||||||
|
expectedSkipReason: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "failed feed with many consecutive failures should not exceed maximum wait time of 7 days",
|
||||||
|
currentTime: parseTime("2025-01-01T12:00:00Z"),
|
||||||
|
feed: model.Feed{
|
||||||
|
Failure: ptr.To("dummy previous error"),
|
||||||
|
Suspended: ptr.To(false),
|
||||||
|
UpdatedAt: parseTime("2024-12-30T12:00:00Z"), // 2 days before current time
|
||||||
|
ConsecutiveFailures: 10,
|
||||||
|
},
|
||||||
|
expectedAction: pull.ActionSkipUpdate,
|
||||||
|
expectedSkipReason: &pull.SkipReasonCoolingOff,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "failed feed with many consecutive failures should be updated after maximum wait time of 7 days",
|
||||||
|
currentTime: parseTime("2025-01-01T12:00:00Z"),
|
||||||
|
feed: model.Feed{
|
||||||
|
Failure: ptr.To("dummy previous error"),
|
||||||
|
Suspended: ptr.To(false),
|
||||||
|
UpdatedAt: parseTime("2024-12-25T12:00:00Z"), // 7 days before current time
|
||||||
|
ConsecutiveFailures: math.MaxUint,
|
||||||
|
},
|
||||||
|
expectedAction: pull.ActionFetchUpdate,
|
||||||
|
expectedSkipReason: nil,
|
||||||
|
},
|
||||||
} {
|
} {
|
||||||
t.Run(tt.description, func(t *testing.T) {
|
t.Run(tt.description, func(t *testing.T) {
|
||||||
action, skipReason := pull.DecideFeedUpdateAction(&tt.feed, tt.currentTime)
|
action, skipReason := pull.DecideFeedUpdateAction(&tt.feed, tt.currentTime)
|
||||||
|
|
|
@ -57,14 +57,21 @@ func (r *defaultSingleFeedRepo) InsertItems(items []*model.Item) error {
|
||||||
|
|
||||||
func (r *defaultSingleFeedRepo) RecordSuccess(lastBuild *time.Time) error {
|
func (r *defaultSingleFeedRepo) RecordSuccess(lastBuild *time.Time) error {
|
||||||
return r.feedRepo.Update(r.feedID, &model.Feed{
|
return r.feedRepo.Update(r.feedID, &model.Feed{
|
||||||
LastBuild: lastBuild,
|
LastBuild: lastBuild,
|
||||||
Failure: ptr.To(""),
|
Failure: ptr.To(""),
|
||||||
|
ConsecutiveFailures: 0,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *defaultSingleFeedRepo) RecordFailure(readErr error) error {
|
func (r *defaultSingleFeedRepo) RecordFailure(readErr error) error {
|
||||||
|
feed, err := r.feedRepo.Get(r.feedID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return r.feedRepo.Update(r.feedID, &model.Feed{
|
return r.feedRepo.Update(r.feedID, &model.Feed{
|
||||||
Failure: ptr.To(readErr.Error()),
|
Failure: ptr.To(readErr.Error()),
|
||||||
|
ConsecutiveFailures: feed.ConsecutiveFailures + 1,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue