mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-10 01:51:07 +09:00
GO-5565 emailcollector: badger -> keyvaluestore
This commit is contained in:
parent
fb65a6960f
commit
bf2250eb2b
2 changed files with 43 additions and 65 deletions
|
@ -2,7 +2,6 @@ package emailcollector
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
|
@ -11,7 +10,6 @@ import (
|
|||
ppclient "github.com/anyproto/any-sync/paymentservice/paymentserviceclient"
|
||||
proto "github.com/anyproto/any-sync/paymentservice/paymentserviceproto"
|
||||
"github.com/anyproto/any-sync/util/periodicsync"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/anyproto/anytype-heart/core/anytype/config"
|
||||
|
@ -19,17 +17,17 @@ import (
|
|||
"github.com/anyproto/anytype-heart/pb"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/datastore"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/logging"
|
||||
"github.com/anyproto/anytype-heart/util/keyvaluestore"
|
||||
)
|
||||
|
||||
const CName = "emailcollector"
|
||||
|
||||
var log = logging.Logger(CName)
|
||||
|
||||
var dbKey = "payments/emailcollector/email"
|
||||
|
||||
const (
|
||||
refreshIntervalSecs = 60
|
||||
timeout = 30 * time.Second
|
||||
keyPrefix = "payments/emailcollector/"
|
||||
)
|
||||
|
||||
// EmailCollector is a simple component that will save email to the DB
|
||||
|
@ -45,15 +43,11 @@ type EmailCollector interface {
|
|||
type emailcollector struct {
|
||||
cfg *config.Config
|
||||
dbProvider datastore.Datastore
|
||||
db *badger.DB
|
||||
store keyvaluestore.Store[pb.RpcMembershipGetVerificationEmailRequest]
|
||||
periodic periodicsync.PeriodicSync
|
||||
ppclient ppclient.AnyPpClientService
|
||||
wallet wallet.Wallet
|
||||
closing chan struct{}
|
||||
|
||||
// this is in-memory object that was read from the DB
|
||||
// if Email field empty - then no need to send it to the pp node
|
||||
req pb.RpcMembershipGetVerificationEmailRequest
|
||||
}
|
||||
|
||||
func New() EmailCollector {
|
||||
|
@ -61,7 +55,7 @@ func New() EmailCollector {
|
|||
}
|
||||
|
||||
func (e *emailcollector) Name() string {
|
||||
return "emailcollector"
|
||||
return CName
|
||||
}
|
||||
|
||||
func (e *emailcollector) Init(a *app.App) error {
|
||||
|
@ -75,30 +69,28 @@ func (e *emailcollector) Init(a *app.App) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.db = db
|
||||
|
||||
// Initialize keyvaluestore
|
||||
e.store = keyvaluestore.NewJson[pb.RpcMembershipGetVerificationEmailRequest](db, []byte(keyPrefix+"email"))
|
||||
|
||||
// run periodic cycle to send email to the payment service
|
||||
e.periodic = periodicsync.NewPeriodicSync(refreshIntervalSecs, timeout, e.periodicUpdateEmail, logger.CtxLogger{Logger: log.Desugar()})
|
||||
|
||||
// read: db -> req field
|
||||
err = e.get()
|
||||
if err != nil {
|
||||
log.Error("emailcollector: failed to get email", zap.Error(err))
|
||||
// not an error, just no email in the DB
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *emailcollector) Run(ctx context.Context) (err error) {
|
||||
// skip running loop if we are on a custom network or in local-only mode
|
||||
if e.cfg.GetNetworkMode() != pb.RpcAccount_DefaultConfig {
|
||||
// do not trace to log to prevent spamming
|
||||
return nil
|
||||
}
|
||||
|
||||
e.periodic.Run()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *emailcollector) Close(_ context.Context) (err error) {
|
||||
close(e.closing)
|
||||
|
||||
e.periodic.Close()
|
||||
return nil
|
||||
}
|
||||
|
@ -108,9 +100,7 @@ func (e *emailcollector) Close(_ context.Context) (err error) {
|
|||
// Once email is set - this component will send it to the payment service
|
||||
// when it will be online
|
||||
func (e *emailcollector) SetRequest(req *pb.RpcMembershipGetVerificationEmailRequest) error {
|
||||
e.req = *req
|
||||
|
||||
log.Debug("emailcollector: setting email", zap.String("email", req.Email))
|
||||
log.Debug("emailcollector: setting email")
|
||||
|
||||
err := e.set(req)
|
||||
if err != nil {
|
||||
|
@ -128,8 +118,14 @@ func (e *emailcollector) periodicUpdateEmail(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
req, err := e.get()
|
||||
if err != nil {
|
||||
log.Error("emailcollector: failed to get email", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// 1 - check if we have something to send
|
||||
if e.req.Email == "" {
|
||||
if req.Email == "" {
|
||||
// skip if email is not set or was already sent
|
||||
log.Debug("emailcollector: email is not set or was already sent")
|
||||
return nil
|
||||
|
@ -137,10 +133,10 @@ func (e *emailcollector) periodicUpdateEmail(ctx context.Context) error {
|
|||
|
||||
// send to pp node (do not check response)
|
||||
// this is the default request without SubscribeToNewsletter, InsiderTipsAndTutorials fields set
|
||||
log.Debug("emailcollector: sending email to pp node", zap.String("email", e.req.Email))
|
||||
_, err := e.SendRequest(
|
||||
log.Debug("emailcollector: sending email to pp node")
|
||||
_, err = e.SendRequest(
|
||||
ctx,
|
||||
&e.req,
|
||||
&req,
|
||||
)
|
||||
if err != nil {
|
||||
log.Debug("emailcollector: failed to send email to pp node", zap.Error(err))
|
||||
|
@ -148,9 +144,9 @@ func (e *emailcollector) periodicUpdateEmail(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// save to db empty string (email sent)
|
||||
e.req.Email = ""
|
||||
req.Email = ""
|
||||
|
||||
err = e.set(&e.req)
|
||||
err = e.set(&req)
|
||||
if err != nil {
|
||||
log.Error("emailcollector: failed to set email", zap.Error(err))
|
||||
return err
|
||||
|
@ -202,39 +198,22 @@ func (e *emailcollector) SendRequest(ctx context.Context, req *pb.RpcMembershipG
|
|||
}
|
||||
|
||||
// will save data to e.email...
|
||||
func (e *emailcollector) get() (err error) {
|
||||
if e.db == nil {
|
||||
return errors.New("db not initialized")
|
||||
func (e *emailcollector) get() (pb.RpcMembershipGetVerificationEmailRequest, error) {
|
||||
if e.store == nil {
|
||||
return pb.RpcMembershipGetVerificationEmailRequest{}, errors.New("store not initialized")
|
||||
}
|
||||
|
||||
err = e.db.View(func(txn *badger.Txn) error {
|
||||
item, err := txn.Get([]byte(dbKey))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return item.Value(func(val []byte) error {
|
||||
// convert value to out
|
||||
return json.Unmarshal(val, &e.req)
|
||||
})
|
||||
})
|
||||
|
||||
return err
|
||||
req, err := e.store.Get("req")
|
||||
if err != nil {
|
||||
return pb.RpcMembershipGetVerificationEmailRequest{}, err
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (e *emailcollector) set(in *pb.RpcMembershipGetVerificationEmailRequest) (err error) {
|
||||
if e.db == nil {
|
||||
return errors.New("db not initialized")
|
||||
func (e *emailcollector) set(req *pb.RpcMembershipGetVerificationEmailRequest) error {
|
||||
if e.store == nil {
|
||||
return errors.New("store not initialized")
|
||||
}
|
||||
|
||||
// save to db
|
||||
return e.db.Update(func(txn *badger.Txn) error {
|
||||
// convert
|
||||
bytes, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return txn.Set([]byte(dbKey), bytes)
|
||||
})
|
||||
return e.store.Set("req", *req)
|
||||
}
|
||||
|
|
|
@ -702,12 +702,11 @@ func (s *service) GetVerificationEmail(ctx context.Context, req *pb.RpcMembershi
|
|||
}
|
||||
|
||||
// default OK response
|
||||
var out pb.RpcMembershipGetVerificationEmailResponse
|
||||
out.Error = &pb.RpcMembershipGetVerificationEmailResponseError{
|
||||
Code: pb.RpcMembershipGetVerificationEmailResponseError_NULL,
|
||||
}
|
||||
|
||||
return &out, nil
|
||||
return &pb.RpcMembershipGetVerificationEmailResponse{
|
||||
Error: &pb.RpcMembershipGetVerificationEmailResponseError{
|
||||
Code: pb.RpcMembershipGetVerificationEmailResponseError_NULL,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// send request to PP node directly
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue