1
0
Fork 0
forked from 0x2E/fusion
fusion/service/pull/pull.go
Yuan bc8109fe39
refactor: replace zap log with slog (#150)
* refactor: replace zap log with slog

* fix
2025-04-25 17:18:25 +08:00

97 lines
1.7 KiB
Go

package pull
import (
"context"
"errors"
"log/slog"
"sync"
"time"
"github.com/0x2e/fusion/model"
"github.com/0x2e/fusion/pkg/ptr"
"github.com/0x2e/fusion/repo"
)
var (
interval = 30 * time.Minute
)
type FeedRepo interface {
List(filter *repo.FeedListFilter) ([]*model.Feed, error)
Get(id uint) (*model.Feed, error)
Update(id uint, feed *model.Feed) error
}
type ItemRepo interface {
Insert(items []*model.Item) error
}
type Puller struct {
feedRepo FeedRepo
itemRepo ItemRepo
}
// TODO: cache favicon
func NewPuller(feedRepo FeedRepo, itemRepo ItemRepo) *Puller {
return &Puller{
feedRepo: feedRepo,
itemRepo: itemRepo,
}
}
func (p *Puller) Run() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
p.PullAll(context.Background(), false)
<-ticker.C
}
}
func (p *Puller) PullAll(ctx context.Context, force bool) error {
ctx, cancel := context.WithTimeout(ctx, interval/2)
defer cancel()
feeds, err := p.feedRepo.List(nil)
if err != nil {
if errors.Is(err, repo.ErrNotFound) {
err = nil
}
return err
}
if len(feeds) == 0 {
return nil
}
routinePool := make(chan struct{}, 10)
defer close(routinePool)
wg := sync.WaitGroup{}
for _, f := range feeds {
routinePool <- struct{}{}
wg.Add(1)
go func(f *model.Feed) {
defer func() {
wg.Done()
<-routinePool
}()
if err := p.do(ctx, f, force); err != nil {
slog.Error("failed to pull feed", "error", err, "feed_id", f.ID, "feed_link", ptr.From(f.Link))
}
}(f)
}
wg.Wait()
return nil
}
func (p *Puller) PullOne(ctx context.Context, id uint) error {
f, err := p.feedRepo.Get(id)
if err != nil {
return err
}
return p.do(ctx, f, true)
}