forked from 0x2E/fusion
82 lines
2.3 KiB
Go
82 lines
2.3 KiB
Go
package pull
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/0x2e/fusion/model"
|
|
"github.com/0x2e/fusion/pkg/ptr"
|
|
"github.com/0x2e/fusion/service/pull/client"
|
|
)
|
|
|
|
func (p *Puller) do(ctx context.Context, f *model.Feed, force bool) error {
|
|
logger := slog.With("feed_id", f.ID, "feed_link", ptr.From(f.Link))
|
|
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
updateAction, skipReason := DecideFeedUpdateAction(f, time.Now())
|
|
if skipReason == &SkipReasonSuspended {
|
|
logger.Info(fmt.Sprintf("skip: %s", skipReason))
|
|
return nil
|
|
}
|
|
if !force {
|
|
switch updateAction {
|
|
case ActionSkipUpdate:
|
|
logger.Info(fmt.Sprintf("skip: %s", skipReason))
|
|
return nil
|
|
case ActionFetchUpdate:
|
|
// Proceed to perform the fetch.
|
|
default:
|
|
panic("unexpected FeedUpdateAction")
|
|
}
|
|
}
|
|
|
|
repo := defaultSingleFeedRepo{
|
|
feedID: f.ID,
|
|
feedRepo: p.feedRepo,
|
|
itemRepo: p.itemRepo,
|
|
}
|
|
return NewSingleFeedPuller(client.NewFeedClient().FetchItems, &repo).Pull(ctx, f)
|
|
}
|
|
|
|
// FeedUpdateAction represents the action to take when considering checking a
|
|
// feed for updates.
|
|
type FeedUpdateAction uint8
|
|
|
|
const (
|
|
ActionFetchUpdate FeedUpdateAction = iota
|
|
ActionSkipUpdate
|
|
)
|
|
|
|
// FeedSkipReason represents a reason for skipping a feed update.
|
|
type FeedSkipReason struct {
|
|
reason string
|
|
}
|
|
|
|
func (r FeedSkipReason) String() string {
|
|
return r.reason
|
|
}
|
|
|
|
var (
|
|
SkipReasonSuspended = FeedSkipReason{"user suspended feed updates"}
|
|
SkipReasonCoolingOff = FeedSkipReason{"slowing down requests due to past failures to update feed"}
|
|
SkipReasonTooSoon = FeedSkipReason{"feed was updated too recently"}
|
|
)
|
|
|
|
func DecideFeedUpdateAction(f *model.Feed, now time.Time) (FeedUpdateAction, *FeedSkipReason) {
|
|
if f.IsSuspended() {
|
|
return ActionSkipUpdate, &SkipReasonSuspended
|
|
} else if f.ConsecutiveFailures > 0 {
|
|
backoffTime := CalculateBackoffTime(f.ConsecutiveFailures)
|
|
timeSinceUpdate := now.Sub(f.UpdatedAt)
|
|
if timeSinceUpdate < backoffTime {
|
|
slog.Info(fmt.Sprintf("%d consecutive feed update failures, so next attempt is after %v", f.ConsecutiveFailures, f.UpdatedAt.Add(backoffTime).Format(time.RFC3339)), "feed_id", f.ID, "feed_link", ptr.From(f.Link))
|
|
return ActionSkipUpdate, &SkipReasonCoolingOff
|
|
}
|
|
} else if now.Sub(f.UpdatedAt) < interval {
|
|
return ActionSkipUpdate, &SkipReasonTooSoon
|
|
}
|
|
return ActionFetchUpdate, nil
|
|
}
|