1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 14:07:02 +09:00
any-sync/coordinator/inboxclient/client.go
2025-04-22 16:52:52 +02:00

169 lines
4.7 KiB
Go

package inboxclient
import (
"context"
"errors"
"fmt"
"storj.io/drpc"
commonaccount "github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/coordinator/coordinatorproto"
"github.com/anyproto/any-sync/coordinator/subscribeclient"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/net/rpc/rpcerr"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/util/crypto"
"go.uber.org/zap"
)
const CName = "common.inboxclient"
type MessageReceiver func(message *coordinatorproto.NotifySubscribeEvent)
var (
ErrPubKeyMissing = errors.New("peer pub key missing")
ErrNetworkMismatched = errors.New("network mismatched")
)
var log = logger.NewNamed(CName)
type inboxClient struct {
nodeconf nodeconf.Service
pool pool.Pool
account commonaccount.Service
subscribeClient subscribeclient.SubscribeClientService
running bool
messageReceiver MessageReceiver
}
func New() InboxClient {
return new(inboxClient)
}
type InboxClient interface {
InboxFetch(ctx context.Context, offset string) (messages []*coordinatorproto.InboxMessage, hasMore bool, err error)
InboxAddMessage(ctx context.Context, receiverPubKey crypto.PubKey, message *coordinatorproto.InboxMessage) (err error)
SetMessageReceiver(receiver MessageReceiver) (err error)
app.ComponentRunnable
}
func (c *inboxClient) Name() (name string) {
return CName
}
func (c *inboxClient) Init(a *app.App) (err error) {
c.pool = a.MustComponent(pool.CName).(pool.Pool)
c.nodeconf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
c.account = a.MustComponent(commonaccount.CName).(commonaccount.Service)
c.subscribeClient = a.MustComponent(subscribeclient.CName).(subscribeclient.SubscribeClientService)
return nil
}
func (c *inboxClient) SetMessageReceiver(receiver MessageReceiver) (err error) {
if c.running {
return fmt.Errorf("set receiver must be called before Run")
}
c.messageReceiver = receiver
return
}
func (c *inboxClient) Run(ctx context.Context) (err error) {
c.running = true
if c.messageReceiver == nil {
err = fmt.Errorf("messageReceiver is nil: can't start streamWatcher()")
return err
}
c.subscribeClient.Subscribe(coordinatorproto.NotifyEventType_InboxNewMessageEvent, func(event *coordinatorproto.NotifySubscribeEvent) {
log.Debug("calling messagereceiver from subscribe()")
c.messageReceiver(event)
})
return nil
}
func (c *inboxClient) Close(_ context.Context) error {
return nil
}
// Fetches inbox messages for requster accountId.
// `offset` is `id` of the latest message fetched, i.e. response will
// contain messages with `timestamp` after this `id`, or all messages if empty.
//
// It is assumed that caller will save the last message id somewhere locally to reuse it
// in the next call.
func (c *inboxClient) InboxFetch(ctx context.Context, offset string) (messages []*coordinatorproto.InboxMessage, hasMore bool, err error) {
log.Debug("inbox fetch", zap.String("offset", offset))
err = c.doClient(ctx, func(cl coordinatorproto.DRPCCoordinatorClient) error {
resp, err := cl.InboxFetch(ctx, &coordinatorproto.InboxFetchRequest{
Offset: offset,
})
if err != nil {
return rpcerr.Unwrap(err)
}
messages = resp.Messages
hasMore = resp.HasMore
return nil
})
return
}
func (c *inboxClient) InboxAddMessage(ctx context.Context, receiverPubKey crypto.PubKey, message *coordinatorproto.InboxMessage) (err error) {
encrypted, err := receiverPubKey.Encrypt(message.Packet.Payload.Body)
if err != nil {
return
}
message.Packet.Payload.Body = encrypted
signature, err := c.account.Account().SignKey.Sign(message.Packet.Payload.Body)
if err != nil {
return fmt.Errorf("sign body: %w", err)
}
message.Packet.SenderSignature = signature
err = c.doClient(ctx, func(cl coordinatorproto.DRPCCoordinatorClient) error {
_, err := cl.InboxAddMessage(ctx, &coordinatorproto.InboxAddMessageRequest{
Message: message,
})
if err != nil {
return rpcerr.Unwrap(err)
}
return nil
})
return
}
func (c *inboxClient) getPeer(ctx context.Context) (peer.Peer, error) {
p, err := c.pool.GetOneOf(ctx, c.nodeconf.CoordinatorPeers())
if err != nil {
return nil, err
}
pubKey, err := peer.CtxPubKey(p.Context())
if err != nil {
return nil, ErrPubKeyMissing
}
if pubKey.Network() != c.nodeconf.Configuration().NetworkId {
return nil, ErrNetworkMismatched
}
return p, nil
}
func (c *inboxClient) doClient(ctx context.Context, f func(cl coordinatorproto.DRPCCoordinatorClient) error) error {
p, err := c.getPeer(ctx)
if err != nil {
return err
}
return p.DoDrpc(ctx, func(conn drpc.Conn) error {
return f(coordinatorproto.NewDRPCCoordinatorClient(conn))
})
}