1
0
Fork 1
mirror of https://github.com/0x2E/fusion.git synced 2025-06-08 05:27:15 +09:00
fusion/service/pull/handle.go
Yuan bc8109fe39
refactor: replace zap log with slog (#150)
* refactor: replace zap log with slog

* fix
2025-04-25 17:18:25 +08:00

82 lines
2.3 KiB
Go

package pull
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/0x2e/fusion/model"
"github.com/0x2e/fusion/pkg/ptr"
"github.com/0x2e/fusion/service/pull/client"
)
func (p *Puller) do(ctx context.Context, f *model.Feed, force bool) error {
logger := slog.With("feed_id", f.ID, "feed_link", ptr.From(f.Link))
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
updateAction, skipReason := DecideFeedUpdateAction(f, time.Now())
if skipReason == &SkipReasonSuspended {
logger.Info(fmt.Sprintf("skip: %s", skipReason))
return nil
}
if !force {
switch updateAction {
case ActionSkipUpdate:
logger.Info(fmt.Sprintf("skip: %s", skipReason))
return nil
case ActionFetchUpdate:
// Proceed to perform the fetch.
default:
panic("unexpected FeedUpdateAction")
}
}
repo := defaultSingleFeedRepo{
feedID: f.ID,
feedRepo: p.feedRepo,
itemRepo: p.itemRepo,
}
return NewSingleFeedPuller(client.NewFeedClient().FetchItems, &repo).Pull(ctx, f)
}
// FeedUpdateAction represents the action to take when considering checking a
// feed for updates.
type FeedUpdateAction uint8
const (
ActionFetchUpdate FeedUpdateAction = iota
ActionSkipUpdate
)
// FeedSkipReason represents a reason for skipping a feed update.
type FeedSkipReason struct {
reason string
}
func (r FeedSkipReason) String() string {
return r.reason
}
var (
SkipReasonSuspended = FeedSkipReason{"user suspended feed updates"}
SkipReasonCoolingOff = FeedSkipReason{"slowing down requests due to past failures to update feed"}
SkipReasonTooSoon = FeedSkipReason{"feed was updated too recently"}
)
func DecideFeedUpdateAction(f *model.Feed, now time.Time) (FeedUpdateAction, *FeedSkipReason) {
if f.IsSuspended() {
return ActionSkipUpdate, &SkipReasonSuspended
} else if f.ConsecutiveFailures > 0 {
backoffTime := CalculateBackoffTime(f.ConsecutiveFailures)
timeSinceUpdate := now.Sub(f.UpdatedAt)
if timeSinceUpdate < backoffTime {
slog.Info(fmt.Sprintf("%d consecutive feed update failures, so next attempt is after %v", f.ConsecutiveFailures, f.UpdatedAt.Add(backoffTime).Format(time.RFC3339)), "feed_id", f.ID, "feed_link", ptr.From(f.Link))
return ActionSkipUpdate, &SkipReasonCoolingOff
}
} else if now.Sub(f.UpdatedAt) < interval {
return ActionSkipUpdate, &SkipReasonTooSoon
}
return ActionFetchUpdate, nil
}