mirror of
https://github.com/0x2E/fusion.git
synced 2025-06-08 05:27:15 +09:00
Refactor Puller to create a separate SingleFeedPuller (#102)
This commit is contained in:
parent
8de93295b6
commit
c0eaea70de
3 changed files with 306 additions and 23 deletions
|
@ -5,7 +5,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/0x2e/fusion/model"
|
||||
"github.com/0x2e/fusion/pkg/ptr"
|
||||
"github.com/0x2e/fusion/service/pull/client"
|
||||
)
|
||||
|
||||
|
@ -31,29 +30,12 @@ func (p *Puller) do(ctx context.Context, f *model.Feed, force bool) error {
|
|||
}
|
||||
}
|
||||
|
||||
result, err := client.NewFeedClient().FetchItems(ctx, *f.Link, f.FeedRequestOptions)
|
||||
if err != nil {
|
||||
p.feedRepo.Update(f.ID, &model.Feed{Failure: ptr.To(err.Error())})
|
||||
return err
|
||||
repo := defaultSingleFeedRepo{
|
||||
feedID: f.ID,
|
||||
feedRepo: p.feedRepo,
|
||||
itemRepo: p.itemRepo,
|
||||
}
|
||||
isLatestBuild := f.LastBuild != nil && result.LastBuild != nil &&
|
||||
result.LastBuild.Equal(*f.LastBuild)
|
||||
if len(result.Items) != 0 && !isLatestBuild {
|
||||
|
||||
// Set the correct feed ID for all items.
|
||||
for _, item := range result.Items {
|
||||
item.FeedID = f.ID
|
||||
}
|
||||
|
||||
if err := p.itemRepo.Insert(result.Items); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
logger.Infof("fetched %d items", len(result.Items))
|
||||
return p.feedRepo.Update(f.ID, &model.Feed{
|
||||
LastBuild: result.LastBuild,
|
||||
Failure: ptr.To(""),
|
||||
})
|
||||
return NewSingleFeedPuller(client.NewFeedClient().FetchItems, &repo).Pull(ctx, f)
|
||||
}
|
||||
|
||||
// FeedUpdateAction represents the action to take when considering checking a
|
||||
|
|
98
service/pull/singlefeed.go
Normal file
98
service/pull/singlefeed.go
Normal file
|
@ -0,0 +1,98 @@
|
|||
package pull
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/0x2e/fusion/model"
|
||||
"github.com/0x2e/fusion/pkg/ptr"
|
||||
"github.com/0x2e/fusion/service/pull/client"
|
||||
)
|
||||
|
||||
// ReadFeedItemsFn is responsible for reading a feed from an HTTP server and
|
||||
// converting the result to fusion-native data types. The error return value
|
||||
// is for request errors (e.g. HTTP errors).
|
||||
type ReadFeedItemsFn func(ctx context.Context, feedURL string, options model.FeedRequestOptions) (client.FetchItemsResult, error)
|
||||
|
||||
// UpdateFeedInStoreFn is responsible for saving the result of a feed fetch to a data
|
||||
// store. If the fetch failed, it records that in the data store. If the fetch
|
||||
// succeeds, it stores the latest build time in the data store and adds any new
|
||||
// feed items to the datastore.
|
||||
type UpdateFeedInStoreFn func(feedID uint, items []*model.Item, lastBuild *time.Time, requestError error) error
|
||||
|
||||
// SingleFeedRepo represents a datastore for storing information about a feed.
|
||||
type SingleFeedRepo interface {
|
||||
InsertItems(items []*model.Item) error
|
||||
RecordSuccess(lastBuild *time.Time) error
|
||||
RecordFailure(readErr error) error
|
||||
}
|
||||
|
||||
type SingleFeedPuller struct {
|
||||
readFeed ReadFeedItemsFn
|
||||
repo SingleFeedRepo
|
||||
}
|
||||
|
||||
// NewSingleFeedPuller creates a new SingleFeedPuller with the given ReadFeedItemsFn and repository.
|
||||
func NewSingleFeedPuller(readFeed ReadFeedItemsFn, repo SingleFeedRepo) SingleFeedPuller {
|
||||
return SingleFeedPuller{
|
||||
readFeed: readFeed,
|
||||
repo: repo,
|
||||
}
|
||||
}
|
||||
|
||||
// defaultSingleFeedRepo is the default implementation of SingleFeedRepo
|
||||
type defaultSingleFeedRepo struct {
|
||||
feedID uint
|
||||
feedRepo FeedRepo
|
||||
itemRepo ItemRepo
|
||||
}
|
||||
|
||||
func (r *defaultSingleFeedRepo) InsertItems(items []*model.Item) error {
|
||||
// Set the correct feed ID for all items.
|
||||
for _, item := range items {
|
||||
item.FeedID = r.feedID
|
||||
}
|
||||
return r.itemRepo.Insert(items)
|
||||
}
|
||||
|
||||
func (r *defaultSingleFeedRepo) RecordSuccess(lastBuild *time.Time) error {
|
||||
return r.feedRepo.Update(r.feedID, &model.Feed{
|
||||
LastBuild: lastBuild,
|
||||
Failure: ptr.To(""),
|
||||
})
|
||||
}
|
||||
|
||||
func (r *defaultSingleFeedRepo) RecordFailure(readErr error) error {
|
||||
return r.feedRepo.Update(r.feedID, &model.Feed{
|
||||
Failure: ptr.To(readErr.Error()),
|
||||
})
|
||||
}
|
||||
|
||||
func (p SingleFeedPuller) Pull(ctx context.Context, feed *model.Feed) error {
|
||||
logger := pullLogger.With("feed_id", feed.ID, "feed_name", feed.Name)
|
||||
|
||||
// We don't exit on error, as we want to record any error in the data store.
|
||||
fetchResult, readErr := p.readFeed(ctx, *feed.Link, feed.FeedRequestOptions)
|
||||
if readErr == nil {
|
||||
logger.Infof("fetched %d items", len(fetchResult.Items))
|
||||
} else {
|
||||
logger.Infof("fetch failed: %v", readErr)
|
||||
}
|
||||
|
||||
return p.updateFeedInStore(feed.ID, fetchResult.Items, fetchResult.LastBuild, readErr)
|
||||
}
|
||||
|
||||
// updateFeedInStore saves the result of a feed fetch to the data store.
|
||||
// If the fetch failed, it records that in the data store.
|
||||
// If the fetch succeeds, it stores the latest build time and adds any new feed items.
|
||||
func (p SingleFeedPuller) updateFeedInStore(feedID uint, items []*model.Item, lastBuild *time.Time, requestError error) error {
|
||||
if requestError != nil {
|
||||
return p.repo.RecordFailure(requestError)
|
||||
}
|
||||
|
||||
if err := p.repo.InsertItems(items); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.repo.RecordSuccess(lastBuild)
|
||||
}
|
203
service/pull/singlefeed_test.go
Normal file
203
service/pull/singlefeed_test.go
Normal file
|
@ -0,0 +1,203 @@
|
|||
package pull_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/0x2e/fusion/model"
|
||||
"github.com/0x2e/fusion/pkg/ptr"
|
||||
"github.com/0x2e/fusion/service/pull"
|
||||
"github.com/0x2e/fusion/service/pull/client"
|
||||
)
|
||||
|
||||
// mockFeedReader is a mock implementation of ReadFeedItemsFn
|
||||
type mockFeedReader struct {
|
||||
result client.FetchItemsResult
|
||||
err error
|
||||
lastFeedURL string
|
||||
lastOptions model.FeedRequestOptions
|
||||
}
|
||||
|
||||
func (m *mockFeedReader) Read(ctx context.Context, feedURL string, options model.FeedRequestOptions) (client.FetchItemsResult, error) {
|
||||
m.lastFeedURL = feedURL
|
||||
m.lastOptions = options
|
||||
|
||||
return m.result, m.err
|
||||
}
|
||||
|
||||
// mockSingleFeedRepo is a mock implementation of the SingleFeedRepo interface
|
||||
type mockSingleFeedRepo struct {
|
||||
err error
|
||||
items []*model.Item
|
||||
lastBuild *time.Time
|
||||
requestError error
|
||||
}
|
||||
|
||||
func (m *mockSingleFeedRepo) InsertItems(items []*model.Item) error {
|
||||
if m.err != nil {
|
||||
return m.err
|
||||
}
|
||||
m.items = items
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockSingleFeedRepo) RecordSuccess(lastBuild *time.Time) error {
|
||||
if m.err != nil {
|
||||
return m.err
|
||||
}
|
||||
m.lastBuild = lastBuild
|
||||
m.requestError = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockSingleFeedRepo) RecordFailure(readErr error) error {
|
||||
if m.err != nil {
|
||||
return m.err
|
||||
}
|
||||
m.requestError = readErr
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestSingleFeedPullerPull(t *testing.T) {
|
||||
for _, tt := range []struct {
|
||||
description string
|
||||
feed model.Feed
|
||||
mockFeedReader *mockFeedReader
|
||||
mockDbErr error
|
||||
expectedErrMsg string
|
||||
expectedStoredItems []*model.Item
|
||||
expectedStoredLastBuild *time.Time
|
||||
expectedStoredRequestError error
|
||||
}{
|
||||
{
|
||||
description: "successful pull with no errors",
|
||||
feed: model.Feed{
|
||||
ID: 42,
|
||||
Name: ptr.To("Test Feed"),
|
||||
Link: ptr.To("https://example.com/feed.xml"),
|
||||
FeedRequestOptions: model.FeedRequestOptions{
|
||||
ReqProxy: ptr.To("http://proxy.example.com"),
|
||||
},
|
||||
},
|
||||
mockFeedReader: &mockFeedReader{
|
||||
result: client.FetchItemsResult{
|
||||
LastBuild: mustParseTime("2025-01-01T12:00:00Z"),
|
||||
Items: []*model.Item{
|
||||
{
|
||||
Title: ptr.To("Test Item 1"),
|
||||
GUID: ptr.To("guid1"),
|
||||
Link: ptr.To("https://example.com/item1"),
|
||||
Content: ptr.To("Content 1"),
|
||||
FeedID: 42,
|
||||
},
|
||||
{
|
||||
Title: ptr.To("Test Item 2"),
|
||||
GUID: ptr.To("guid2"),
|
||||
Link: ptr.To("https://example.com/item2"),
|
||||
Content: ptr.To("Content 2"),
|
||||
FeedID: 42,
|
||||
},
|
||||
},
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
mockDbErr: nil,
|
||||
expectedStoredItems: []*model.Item{
|
||||
{
|
||||
Title: ptr.To("Test Item 1"),
|
||||
GUID: ptr.To("guid1"),
|
||||
Link: ptr.To("https://example.com/item1"),
|
||||
Content: ptr.To("Content 1"),
|
||||
FeedID: 42,
|
||||
},
|
||||
{
|
||||
Title: ptr.To("Test Item 2"),
|
||||
GUID: ptr.To("guid2"),
|
||||
Link: ptr.To("https://example.com/item2"),
|
||||
Content: ptr.To("Content 2"),
|
||||
FeedID: 42,
|
||||
},
|
||||
},
|
||||
expectedStoredLastBuild: mustParseTime("2025-01-01T12:00:00Z"),
|
||||
expectedStoredRequestError: nil,
|
||||
},
|
||||
{
|
||||
description: "readFeed returns error",
|
||||
feed: model.Feed{
|
||||
ID: 42,
|
||||
Name: ptr.To("Test Feed"),
|
||||
Link: ptr.To("https://example.com/feed.xml"),
|
||||
},
|
||||
mockFeedReader: &mockFeedReader{
|
||||
err: errors.New("dummy feed read error"),
|
||||
},
|
||||
expectedErrMsg: "",
|
||||
expectedStoredItems: nil,
|
||||
expectedStoredLastBuild: nil,
|
||||
expectedStoredRequestError: errors.New("dummy feed read error"),
|
||||
},
|
||||
{
|
||||
description: "readFeed succeeds but updateFeedInStore fails",
|
||||
feed: model.Feed{
|
||||
ID: 42,
|
||||
Name: ptr.To("Test Feed"),
|
||||
Link: ptr.To("https://example.com/feed.xml"),
|
||||
},
|
||||
mockFeedReader: &mockFeedReader{
|
||||
result: client.FetchItemsResult{
|
||||
LastBuild: mustParseTime("2025-01-01T12:00:00Z"),
|
||||
Items: []*model.Item{
|
||||
{
|
||||
Title: ptr.To("Test Item 1"),
|
||||
GUID: ptr.To("guid1"),
|
||||
Link: ptr.To("https://example.com/item1"),
|
||||
Content: ptr.To("Content 1"),
|
||||
FeedID: 42,
|
||||
},
|
||||
},
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
mockDbErr: errors.New("dummy database error"),
|
||||
expectedErrMsg: "dummy database error",
|
||||
expectedStoredItems: nil,
|
||||
expectedStoredLastBuild: nil,
|
||||
expectedStoredRequestError: nil,
|
||||
},
|
||||
} {
|
||||
t.Run(tt.description, func(t *testing.T) {
|
||||
mockRepo := &mockSingleFeedRepo{
|
||||
err: tt.mockDbErr,
|
||||
}
|
||||
|
||||
err := pull.NewSingleFeedPuller(tt.mockFeedReader.Read, mockRepo).Pull(context.Background(), &tt.feed)
|
||||
|
||||
if tt.expectedErrMsg != "" {
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), tt.expectedErrMsg)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
assert.Equal(t, *tt.feed.Link, tt.mockFeedReader.lastFeedURL)
|
||||
assert.Equal(t, tt.feed.FeedRequestOptions, tt.mockFeedReader.lastOptions)
|
||||
|
||||
assert.Equal(t, tt.expectedStoredRequestError, mockRepo.requestError)
|
||||
assert.Equal(t, tt.expectedStoredItems, mockRepo.items)
|
||||
assert.Equal(t, tt.expectedStoredLastBuild, mockRepo.lastBuild)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func mustParseTime(iso8601 string) *time.Time {
|
||||
t, err := time.Parse(time.RFC3339, iso8601)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &t
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue