mirror of
https://github.com/0x2E/fusion.git
synced 2025-06-08 13:37:11 +09:00
107 lines
3.4 KiB
Go
107 lines
3.4 KiB
Go
package pull
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/0x2e/fusion/model"
|
|
"github.com/0x2e/fusion/pkg/ptr"
|
|
"github.com/0x2e/fusion/service/pull/client"
|
|
)
|
|
|
|
// ReadFeedItemsFn is responsible for reading a feed from an HTTP server and
|
|
// converting the result to fusion-native data types. The error return value
|
|
// is for request errors (e.g. HTTP errors).
|
|
type ReadFeedItemsFn func(ctx context.Context, feedURL string, options model.FeedRequestOptions) (client.FetchItemsResult, error)
|
|
|
|
// UpdateFeedInStoreFn is responsible for saving the result of a feed fetch to a data
|
|
// store. If the fetch failed, it records that in the data store. If the fetch
|
|
// succeeds, it stores the latest build time in the data store and adds any new
|
|
// feed items to the datastore.
|
|
type UpdateFeedInStoreFn func(feedID uint, items []*model.Item, lastBuild *time.Time, requestError error) error
|
|
|
|
// SingleFeedRepo represents a datastore for storing information about a feed.
|
|
type SingleFeedRepo interface {
|
|
InsertItems(items []*model.Item) error
|
|
RecordSuccess(lastBuild *time.Time) error
|
|
RecordFailure(readErr error) error
|
|
}
|
|
|
|
type SingleFeedPuller struct {
|
|
readFeed ReadFeedItemsFn
|
|
repo SingleFeedRepo
|
|
}
|
|
|
|
// NewSingleFeedPuller creates a new SingleFeedPuller with the given ReadFeedItemsFn and repository.
|
|
func NewSingleFeedPuller(readFeed ReadFeedItemsFn, repo SingleFeedRepo) SingleFeedPuller {
|
|
return SingleFeedPuller{
|
|
readFeed: readFeed,
|
|
repo: repo,
|
|
}
|
|
}
|
|
|
|
// defaultSingleFeedRepo is the default implementation of SingleFeedRepo
|
|
type defaultSingleFeedRepo struct {
|
|
feedID uint
|
|
feedRepo FeedRepo
|
|
itemRepo ItemRepo
|
|
}
|
|
|
|
func (r *defaultSingleFeedRepo) InsertItems(items []*model.Item) error {
|
|
// Set the correct feed ID for all items.
|
|
for _, item := range items {
|
|
item.FeedID = r.feedID
|
|
}
|
|
return r.itemRepo.Insert(items)
|
|
}
|
|
|
|
func (r *defaultSingleFeedRepo) RecordSuccess(lastBuild *time.Time) error {
|
|
return r.feedRepo.Update(r.feedID, &model.Feed{
|
|
LastBuild: lastBuild,
|
|
Failure: ptr.To(""),
|
|
ConsecutiveFailures: 0,
|
|
})
|
|
}
|
|
|
|
func (r *defaultSingleFeedRepo) RecordFailure(readErr error) error {
|
|
feed, err := r.feedRepo.Get(r.feedID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return r.feedRepo.Update(r.feedID, &model.Feed{
|
|
Failure: ptr.To(readErr.Error()),
|
|
ConsecutiveFailures: feed.ConsecutiveFailures + 1,
|
|
})
|
|
}
|
|
|
|
func (p SingleFeedPuller) Pull(ctx context.Context, feed *model.Feed) error {
|
|
logger := slog.With("feed_id", feed.ID, "feed_link", ptr.From(feed.Link))
|
|
|
|
// We don't exit on error, as we want to record any error in the data store.
|
|
fetchResult, readErr := p.readFeed(ctx, *feed.Link, feed.FeedRequestOptions)
|
|
if readErr == nil {
|
|
logger.Info(fmt.Sprintf("fetched %d items", len(fetchResult.Items)))
|
|
} else {
|
|
logger.Warn("failed to fetch feed", "error", readErr)
|
|
}
|
|
|
|
return p.updateFeedInStore(feed.ID, fetchResult.Items, fetchResult.LastBuild, readErr)
|
|
}
|
|
|
|
// updateFeedInStore saves the result of a feed fetch to the data store.
|
|
// If the fetch failed, it records that in the data store.
|
|
// If the fetch succeeds, it stores the latest build time and adds any new feed items.
|
|
func (p SingleFeedPuller) updateFeedInStore(feedID uint, items []*model.Item, lastBuild *time.Time, requestError error) error {
|
|
if requestError != nil {
|
|
return p.repo.RecordFailure(requestError)
|
|
}
|
|
|
|
if err := p.repo.InsertItems(items); err != nil {
|
|
return err
|
|
}
|
|
|
|
return p.repo.RecordSuccess(lastBuild)
|
|
}
|