mirror of
https://github.com/anyproto/anytype-heart.git
synced 2025-06-08 05:47:07 +09:00
GO-5296 Merge branch 'main' of github.com:anyproto/anytype-heart into go-5296-add-counters-for-unread-messages-to-subscription-revived
This commit is contained in:
commit
b58805e31d
13 changed files with 783 additions and 814 deletions
11
.github/workflows/perftests.yml
vendored
11
.github/workflows/perftests.yml
vendored
|
@ -110,6 +110,17 @@ jobs:
|
|||
name: traces-macos
|
||||
path: |
|
||||
*.log
|
||||
- name: Report Status when failure
|
||||
if: always()
|
||||
uses: ravsamhq/notify-slack-action@2.0.0
|
||||
with:
|
||||
status: ${{ job.status }}
|
||||
notify_when: "failure"
|
||||
notification_title: "{workflow}: {job} has {status_message}"
|
||||
footer: "Linked Repo <{repo_url}|{repo}>"
|
||||
message_format: "{emoji} *{job}* from *{workflow}* {status_message} in <{repo_url}|{repo}>"
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_ALERT_WEBHOOK }}
|
||||
|
||||
perftests-windows:
|
||||
timeout-minutes: 60
|
||||
|
|
|
@ -16,10 +16,8 @@ import (
|
|||
"github.com/anyproto/any-sync/nodeconf"
|
||||
"github.com/anyproto/any-sync/util/crypto"
|
||||
"github.com/ipfs/go-cid"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/anyproto/anytype-heart/core/anytype/account"
|
||||
"github.com/anyproto/anytype-heart/core/block/chats/chatpush"
|
||||
"github.com/anyproto/anytype-heart/core/domain"
|
||||
"github.com/anyproto/anytype-heart/core/inviteservice"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
|
||||
|
@ -40,11 +38,6 @@ type NodeConfGetter interface {
|
|||
GetNodeConf() (conf nodeconf.Configuration)
|
||||
}
|
||||
|
||||
type pushService interface {
|
||||
CreateSpace(ctx context.Context, spaceId string) (err error)
|
||||
SubscribeToTopics(ctx context.Context, spaceId string, topic []string)
|
||||
}
|
||||
|
||||
type AccountPermissions struct {
|
||||
Account crypto.PubKey
|
||||
Permissions model.ParticipantPermissions
|
||||
|
@ -89,7 +82,6 @@ type aclService struct {
|
|||
accountService account.Service
|
||||
coordClient coordinatorclient.CoordinatorClient
|
||||
identityRepo identityRepoClient
|
||||
pushService pushService
|
||||
}
|
||||
|
||||
func (a *aclService) Init(ap *app.App) (err error) {
|
||||
|
@ -100,7 +92,6 @@ func (a *aclService) Init(ap *app.App) (err error) {
|
|||
a.inviteService = app.MustComponent[inviteservice.InviteService](ap)
|
||||
a.coordClient = app.MustComponent[coordinatorclient.CoordinatorClient](ap)
|
||||
a.identityRepo = app.MustComponent[identityRepoClient](ap)
|
||||
a.pushService = app.MustComponent[pushService](ap)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -119,19 +110,9 @@ func (a *aclService) MakeShareable(ctx context.Context, spaceId string) error {
|
|||
if err != nil {
|
||||
return convertedOrInternalError("set local info", err)
|
||||
}
|
||||
a.subscribeToPushNotifications(err, spaceId)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *aclService) subscribeToPushNotifications(err error, spaceId string) {
|
||||
err = a.pushService.CreateSpace(context.Background(), spaceId)
|
||||
if err != nil {
|
||||
log.Error("create space for push message", zap.Error(err))
|
||||
} else {
|
||||
a.pushService.SubscribeToTopics(context.Background(), spaceId, []string{chatpush.ChatsTopicName})
|
||||
}
|
||||
}
|
||||
|
||||
func (a *aclService) pushGuest(ctx context.Context, privKey crypto.PrivKey) (metadata []byte, err error) {
|
||||
metadataModel, _, err := space.DeriveAccountMetadata(privKey)
|
||||
if err != nil {
|
||||
|
|
|
@ -63,24 +63,6 @@ func (c *mockConfig) GetNodeConf() (conf nodeconf.Configuration) {
|
|||
return c.Config
|
||||
}
|
||||
|
||||
type pushServiceDummy struct {
|
||||
}
|
||||
|
||||
func (p *pushServiceDummy) Init(a *app.App) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pushServiceDummy) Name() (name string) {
|
||||
return "pushServiceDummy"
|
||||
}
|
||||
|
||||
func (p *pushServiceDummy) CreateSpace(ctx context.Context, spaceId string) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pushServiceDummy) SubscribeToTopics(ctx context.Context, spaceId string, topic []string) {
|
||||
}
|
||||
|
||||
type fixture struct {
|
||||
*aclService
|
||||
a *app.App
|
||||
|
@ -124,8 +106,7 @@ func newFixture(t *testing.T) *fixture {
|
|||
Register(testutil.PrepareMock(ctx, fx.a, fx.mockInviteService)).
|
||||
Register(testutil.PrepareMock(ctx, fx.a, fx.mockCoordinatorClient)).
|
||||
Register(fx.mockConfig).
|
||||
Register(fx.aclService).
|
||||
Register(&pushServiceDummy{})
|
||||
Register(fx.aclService)
|
||||
require.NoError(t, fx.a.Start(ctx))
|
||||
return fx
|
||||
}
|
||||
|
|
|
@ -13,21 +13,10 @@ type Payload struct {
|
|||
NewMessagePayload *NewMessagePayload `json:"newMessage,omitempty"`
|
||||
}
|
||||
|
||||
func MakePushPayload(spaceId, accountId, chatId string, messageId string, messageText string) *Payload {
|
||||
return &Payload{
|
||||
SpaceId: spaceId,
|
||||
SenderId: accountId,
|
||||
Type: ChatMessage,
|
||||
NewMessagePayload: &NewMessagePayload{
|
||||
ChatId: chatId,
|
||||
MsgId: messageId,
|
||||
Text: messageText,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type NewMessagePayload struct {
|
||||
ChatId string `json:"chatId"`
|
||||
MsgId string `json:"msgId"`
|
||||
Text string `json:"text"`
|
||||
ChatId string `json:"chatId"`
|
||||
MsgId string `json:"msgId"`
|
||||
SpaceName string `json:"spaceName"`
|
||||
SenderName string `json:"senderName"`
|
||||
Text string `json:"text"`
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/anyproto/anytype-heart/pb"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/database"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/logging"
|
||||
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
|
||||
)
|
||||
|
@ -62,6 +63,7 @@ type service struct {
|
|||
crossSpaceSubService crossspacesub.Service
|
||||
pushService pushService
|
||||
accountService accountService
|
||||
objectStore objectstore.ObjectStore
|
||||
|
||||
componentCtx context.Context
|
||||
componentCtxCancel context.CancelFunc
|
||||
|
@ -94,6 +96,8 @@ func (s *service) Init(a *app.App) error {
|
|||
s.componentCtx, s.componentCtxCancel = context.WithCancel(context.Background())
|
||||
s.pushService = app.MustComponent[pushService](a)
|
||||
s.accountService = app.MustComponent[accountService](a)
|
||||
s.objectStore = app.MustComponent[objectstore.ObjectStore](a)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -304,21 +308,55 @@ func (s *service) AddMessage(ctx context.Context, sessionCtx session.Context, ch
|
|||
return err
|
||||
})
|
||||
if err == nil {
|
||||
go s.sendPushNotification(spaceId, chatObjectId, messageId, message.Message.Text)
|
||||
go func() {
|
||||
err := s.sendPushNotification(spaceId, chatObjectId, messageId, message.Message.Text)
|
||||
if err != nil {
|
||||
log.Error("sendPushNotification: ", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
}
|
||||
return messageId, err
|
||||
}
|
||||
|
||||
func (s *service) sendPushNotification(spaceId, chatObjectId string, messageId string, messageText string) {
|
||||
payload := chatpush.MakePushPayload(spaceId, s.accountService.AccountID(), chatObjectId, messageId, messageText)
|
||||
jsonPayload, err := json.Marshal(payload)
|
||||
func (s *service) sendPushNotification(spaceId, chatObjectId string, messageId string, messageText string) (err error) {
|
||||
accountId := s.accountService.AccountID()
|
||||
spaceName := s.objectStore.GetSpaceName(spaceId)
|
||||
details, err := s.objectStore.SpaceIndex(spaceId).GetDetails(domain.NewParticipantId(spaceId, accountId))
|
||||
var senderName string
|
||||
if err != nil {
|
||||
log.Error("marshal push payload", zap.Error(err))
|
||||
log.Warn("sendPushNotification: failed to get profile name, details are empty", zap.Error(err))
|
||||
} else {
|
||||
senderName = details.GetString(bundle.RelationKeyName)
|
||||
}
|
||||
|
||||
payload := &chatpush.Payload{
|
||||
SpaceId: spaceId,
|
||||
SenderId: accountId,
|
||||
Type: chatpush.ChatMessage,
|
||||
NewMessagePayload: &chatpush.NewMessagePayload{
|
||||
ChatId: chatObjectId,
|
||||
MsgId: messageId,
|
||||
SpaceName: spaceName,
|
||||
SenderName: senderName,
|
||||
Text: messageText,
|
||||
},
|
||||
}
|
||||
|
||||
jsonPayload, err := json.Marshal(payload)
|
||||
|
||||
if err != nil {
|
||||
err = fmt.Errorf("marshal push payload: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = s.pushService.Notify(s.componentCtx, spaceId, []string{chatpush.ChatsTopicName}, jsonPayload)
|
||||
if err != nil {
|
||||
log.Error("notify push message", zap.Error(err))
|
||||
err = fmt.Errorf("pushService.Notify: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *service) EditMessage(ctx context.Context, chatObjectId string, messageId string, newMessage *chatobject.Message) error {
|
||||
|
|
|
@ -2,31 +2,21 @@ package pushnotification
|
|||
|
||||
import (
|
||||
"github.com/anyproto/any-sync/util/crypto"
|
||||
"github.com/anyproto/any-sync/util/strkey"
|
||||
|
||||
"github.com/anyproto/anytype-heart/util/privkey"
|
||||
)
|
||||
|
||||
const (
|
||||
spaceKeyPath = "m/99999'/1'"
|
||||
spaceVersion = 0xB5
|
||||
spacePath = "m/SLIP-0021/anytype/space/key"
|
||||
)
|
||||
|
||||
func deriveSpaceKey(firstMetadataKey crypto.PrivKey) (string, crypto.PrivKey, error) {
|
||||
func deriveSpaceKey(firstMetadataKey crypto.PrivKey) (crypto.PrivKey, error) {
|
||||
key, err := privkey.DeriveFromPrivKey(spaceKeyPath, firstMetadataKey)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
return nil, err
|
||||
}
|
||||
rawKey, err := key.GetPublic().Raw()
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
encodedKey, err := strkey.Encode(spaceVersion, rawKey)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
return encodedKey, key, nil
|
||||
return key, nil
|
||||
}
|
||||
|
||||
func deriveSymmetricKey(readKey crypto.SymKey) (crypto.SymKey, error) {
|
||||
|
|
|
@ -2,7 +2,9 @@ package pushnotification
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
|
@ -11,6 +13,7 @@ import (
|
|||
"github.com/anyproto/any-sync/util/crypto"
|
||||
"github.com/anyproto/anytype-push-server/pushclient/pushapi"
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/anyproto/anytype-heart/core/anytype/config"
|
||||
"github.com/anyproto/anytype-heart/core/event"
|
||||
|
@ -23,11 +26,11 @@ import (
|
|||
|
||||
const CName = "core.pushnotification.service"
|
||||
|
||||
var log = logging.Logger(CName)
|
||||
var log = logging.Logger(CName).Desugar()
|
||||
|
||||
type newSubscription struct {
|
||||
SpaceId string
|
||||
Topics []string
|
||||
type requestSubscribe struct {
|
||||
spaceId string
|
||||
topics []string
|
||||
}
|
||||
|
||||
type Service interface {
|
||||
|
@ -39,7 +42,7 @@ type Service interface {
|
|||
}
|
||||
|
||||
func New() Service {
|
||||
return &service{activeSubscriptions: make(map[string]TopicSet)}
|
||||
return &service{topicSubscriptions: make(map[spaceKeyType]map[string]*pushapi.Topic)}
|
||||
}
|
||||
|
||||
type service struct {
|
||||
|
@ -51,16 +54,19 @@ type service struct {
|
|||
|
||||
started bool
|
||||
activeSubscriptionsLock sync.Mutex
|
||||
activeSubscriptions map[string]TopicSet
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
batcher *mb.MB[newSubscription]
|
||||
|
||||
topicSubscriptions map[spaceKeyType]map[string]*pushapi.Topic
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
requestsQueue *mb.MB[requestSubscribe]
|
||||
}
|
||||
|
||||
type spaceKeyType string
|
||||
|
||||
func (s *service) SubscribeToTopics(ctx context.Context, spaceId string, topics []string) {
|
||||
err := s.batcher.Add(ctx, newSubscription{spaceId, topics})
|
||||
err := s.requestsQueue.Add(ctx, requestSubscribe{spaceId: spaceId, topics: topics})
|
||||
if err != nil {
|
||||
log.Errorf("add topic err: %v", err)
|
||||
log.Error("add topic", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -82,14 +88,14 @@ func (s *service) Close(ctx context.Context) (err error) {
|
|||
if s.cancel != nil {
|
||||
s.cancel()
|
||||
}
|
||||
return s.batcher.Close()
|
||||
return s.requestsQueue.Close()
|
||||
}
|
||||
|
||||
func (s *service) Init(a *app.App) (err error) {
|
||||
s.cfg = app.MustComponent[*config.Config](a)
|
||||
s.pushClient = app.MustComponent[pushclient.Client](a)
|
||||
s.wallet = app.MustComponent[wallet.Wallet](a)
|
||||
s.batcher = mb.New[newSubscription](0)
|
||||
s.requestsQueue = mb.New[requestSubscribe](0)
|
||||
s.spaceService = app.MustComponent[space.Service](a)
|
||||
s.eventSender = app.MustComponent[event.Sender](a)
|
||||
return
|
||||
|
@ -117,104 +123,66 @@ func (s *service) subscribeAll(ctx context.Context) error {
|
|||
if !s.started {
|
||||
return nil
|
||||
}
|
||||
topics := s.buildPushTopics()
|
||||
|
||||
s.activeSubscriptionsLock.Lock()
|
||||
var allTopics []*pushapi.Topic
|
||||
for _, topics := range s.topicSubscriptions {
|
||||
for _, topic := range topics {
|
||||
allTopics = append(allTopics, topic)
|
||||
}
|
||||
}
|
||||
s.activeSubscriptionsLock.Unlock()
|
||||
|
||||
err := s.pushClient.SubscribeAll(ctx, &pushapi.SubscribeAllRequest{
|
||||
Topics: &pushapi.Topics{Topics: topics},
|
||||
Topics: &pushapi.Topics{Topics: allTopics},
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *service) buildPushTopics() []*pushapi.Topic {
|
||||
s.activeSubscriptionsLock.Lock()
|
||||
defer s.activeSubscriptionsLock.Unlock()
|
||||
|
||||
var allTopics []*pushapi.Topic
|
||||
for spaceId, topicsSet := range s.activeSubscriptions {
|
||||
spaceTopics, err := s.createTopicsForSpace(spaceId, topicsSet.Slice())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
allTopics = append(allTopics, spaceTopics...)
|
||||
}
|
||||
return allTopics
|
||||
}
|
||||
|
||||
func (s *service) createTopicsForSpace(spaceId string, topicNames []string) ([]*pushapi.Topic, error) {
|
||||
space, err := s.spaceService.Get(s.ctx, spaceId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
state := space.CommonSpace().Acl().AclState()
|
||||
firstMetadataKey, err := state.FirstMetadataKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, signKey, err := deriveSpaceKey(firstMetadataKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get key for space %s: %w", spaceId, err)
|
||||
}
|
||||
topics, err := s.makeTopics(signKey, topicNames)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to make topics for space %s: %w", spaceId, err)
|
||||
}
|
||||
return topics, nil
|
||||
}
|
||||
|
||||
func (s *service) CreateSpace(ctx context.Context, spaceId string) (err error) {
|
||||
if !s.started {
|
||||
return nil
|
||||
}
|
||||
space, err := s.spaceService.Get(s.ctx, spaceId)
|
||||
keys, err := s.getSpaceKeys(spaceId)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("get space keys: %w", err)
|
||||
}
|
||||
state := space.CommonSpace().Acl().AclState()
|
||||
firstMetadataKey, err := state.FirstMetadataKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, signKey, err := deriveSpaceKey(firstMetadataKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
signature, err := signKey.Sign([]byte(s.wallet.GetAccountPrivkey().GetPublic().Account()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pubKey := signKey.GetPublic()
|
||||
rawKey, err := pubKey.Raw()
|
||||
|
||||
signature, err := keys.signKey.Sign([]byte(s.wallet.GetAccountPrivkey().GetPublic().Account()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.pushClient.CreateSpace(ctx, &pushapi.CreateSpaceRequest{
|
||||
SpaceKey: rawKey,
|
||||
SpaceKey: []byte(keys.spaceKey),
|
||||
AccountSignature: signature,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
type spaceKeys struct {
|
||||
// spaceKey is a public part of signKey
|
||||
spaceKey spaceKeyType
|
||||
signKey crypto.PrivKey
|
||||
|
||||
// id of current encryption key
|
||||
encryptionKeyId string
|
||||
encryptionKey crypto.SymKey
|
||||
}
|
||||
|
||||
func (s *service) Notify(ctx context.Context, spaceId string, topic []string, payload []byte) (err error) {
|
||||
if !s.started {
|
||||
return nil
|
||||
}
|
||||
space, err := s.spaceService.Get(s.ctx, spaceId)
|
||||
|
||||
keys, err := s.getSpaceKeys(spaceId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get space keys: %w", err)
|
||||
}
|
||||
topics, err := s.makeTopics(keys.signKey, topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
state := space.CommonSpace().Acl().AclState()
|
||||
firstMetadataKey, err := state.FirstMetadataKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
spaceKeyId, signKey, err := deriveSpaceKey(firstMetadataKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
topics, err := s.makeTopics(signKey, topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
encryptedJson, err := s.prepareEncryptedPayload(state, payload)
|
||||
encryptedJson, err := keys.encryptionKey.Encrypt(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -223,7 +191,7 @@ func (s *service) Notify(ctx context.Context, spaceId string, topic []string, pa
|
|||
return err
|
||||
}
|
||||
p := &pushapi.Message{
|
||||
KeyId: spaceKeyId,
|
||||
KeyId: keys.encryptionKeyId,
|
||||
Payload: encryptedJson,
|
||||
Signature: signature,
|
||||
}
|
||||
|
@ -234,23 +202,7 @@ func (s *service) Notify(ctx context.Context, spaceId string, topic []string, pa
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *service) prepareEncryptedPayload(state *list.AclState, payload []byte) ([]byte, error) {
|
||||
symKey, err := state.CurrentReadKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
encryptionKey, err := deriveSymmetricKey(symKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
encryptedJson, err := encryptionKey.Encrypt(payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return encryptedJson, nil
|
||||
}
|
||||
|
||||
func (s *service) fillSubscriptions(ctx context.Context) (err error) {
|
||||
func (s *service) loadSubscriptions(ctx context.Context) (err error) {
|
||||
if !s.started {
|
||||
return nil
|
||||
}
|
||||
|
@ -261,19 +213,57 @@ func (s *service) fillSubscriptions(ctx context.Context) (err error) {
|
|||
if subscriptions == nil || subscriptions.Topics == nil {
|
||||
return nil
|
||||
}
|
||||
s.activeSubscriptionsLock.Lock()
|
||||
for _, topic := range subscriptions.Topics.Topics {
|
||||
spaceKey := string(topic.SpaceKey)
|
||||
s.activeSubscriptionsLock.Lock()
|
||||
if _, ok := s.activeSubscriptions[spaceKey]; !ok {
|
||||
s.activeSubscriptions[spaceKey] = NewTopicSet()
|
||||
}
|
||||
topicSet := s.activeSubscriptions[spaceKey]
|
||||
topicSet.Add(topic.Topic)
|
||||
s.activeSubscriptionsLock.Unlock()
|
||||
s.putTopicSubscription(spaceKeyType(topic.SpaceKey), topic)
|
||||
}
|
||||
s.activeSubscriptionsLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) addPendingTopicSubscription(spaceKeys *spaceKeys, topic string) (bool, error) {
|
||||
s.activeSubscriptionsLock.Lock()
|
||||
defer s.activeSubscriptionsLock.Unlock()
|
||||
|
||||
if s.hasTopicSubscription(spaceKeys.spaceKey, topic) {
|
||||
return false, nil
|
||||
}
|
||||
signature, err := spaceKeys.signKey.Sign([]byte(topic))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("sign topic: %w", err)
|
||||
}
|
||||
s.putTopicSubscription(spaceKeys.spaceKey, &pushapi.Topic{
|
||||
Topic: topic,
|
||||
SpaceKey: []byte(spaceKeys.spaceKey),
|
||||
Signature: signature,
|
||||
})
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (s *service) hasTopicSubscription(spaceKey spaceKeyType, topic string) bool {
|
||||
topics, ok := s.topicSubscriptions[spaceKey]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if _, ok := topics[topic]; ok {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *service) putTopicSubscription(spaceKey spaceKeyType, topic *pushapi.Topic) bool {
|
||||
topics, ok := s.topicSubscriptions[spaceKey]
|
||||
if !ok {
|
||||
topics = map[string]*pushapi.Topic{}
|
||||
s.topicSubscriptions[spaceKey] = topics
|
||||
}
|
||||
if _, ok := topics[topic.Topic]; ok {
|
||||
return false
|
||||
}
|
||||
topics[topic.Topic] = topic
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *service) makeTopics(spaceKey crypto.PrivKey, topics []string) ([]*pushapi.Topic, error) {
|
||||
pushApiTopics := make([]*pushapi.Topic, 0, len(topics))
|
||||
pubKey := spaceKey.GetPublic()
|
||||
|
@ -296,97 +286,112 @@ func (s *service) makeTopics(spaceKey crypto.PrivKey, topics []string) ([]*pusha
|
|||
}
|
||||
|
||||
func (s *service) run() {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
err := s.fillSubscriptions(s.ctx)
|
||||
err := s.loadSubscriptions(s.ctx)
|
||||
if err != nil {
|
||||
log.Error("failed to fill subscriptions: ", err)
|
||||
return
|
||||
log.Error("failed to load subscriptions", zap.Error(err))
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
msgs, err := s.batcher.Wait(s.ctx)
|
||||
requests, err := s.requestsQueue.Wait(s.ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if len(msgs) == 0 {
|
||||
return
|
||||
}
|
||||
var shouldUpdate bool
|
||||
for _, sub := range msgs {
|
||||
shouldUpdate, err = s.addNewSubscription(sub)
|
||||
|
||||
var shouldUpdateSubscriptions bool
|
||||
for _, req := range requests {
|
||||
keys, err := s.getSpaceKeys(req.spaceId)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get space key from keystore for space %s: %v", sub.SpaceId, err)
|
||||
continue
|
||||
log.Error("failed to get space keys", zap.Error(err))
|
||||
}
|
||||
|
||||
for _, topic := range req.topics {
|
||||
shouldUpdate, err := s.addPendingTopicSubscription(keys, topic)
|
||||
if err != nil {
|
||||
log.Error("failed to add pending topic subscription: ", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if shouldUpdate {
|
||||
shouldUpdateSubscriptions = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !shouldUpdate {
|
||||
if !shouldUpdateSubscriptions {
|
||||
continue
|
||||
}
|
||||
err = s.subscribeAll(s.ctx)
|
||||
if err != nil {
|
||||
log.Errorf("failed to subscribe to topic: %v", err)
|
||||
log.Error("failed to subscribe to topic", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) addNewSubscription(sub newSubscription) (shouldUpdate bool, err error) {
|
||||
s.activeSubscriptionsLock.Lock()
|
||||
defer s.activeSubscriptionsLock.Unlock()
|
||||
activeTopics, ok := s.activeSubscriptions[sub.SpaceId]
|
||||
if !ok {
|
||||
activeTopics = NewTopicSet()
|
||||
func (s *service) BroadcastKeyUpdate(spaceId string, aclState *list.AclState) error {
|
||||
keys, err := s.getSpaceKeysFromAcl(aclState)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get space keys: %w", err)
|
||||
}
|
||||
for _, topic := range sub.Topics {
|
||||
if activeTopics.Add(topic) {
|
||||
shouldUpdate = true
|
||||
}
|
||||
}
|
||||
s.activeSubscriptions[sub.SpaceId] = activeTopics
|
||||
return shouldUpdate, nil
|
||||
}
|
||||
|
||||
func (a *service) BroadcastKeyUpdate(spaceId string, aclState *list.AclState) error {
|
||||
firstMetadataKey, err := aclState.FirstMetadataKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
readKey, err := aclState.CurrentReadKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
spaceKeyId, _, err := deriveSpaceKey(firstMetadataKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
encryptionKey, err := deriveSymmetricKey(readKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
raw, err := encryptionKey.Raw()
|
||||
raw, err := keys.encryptionKey.Raw()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
encodedKey := base64.StdEncoding.EncodeToString(raw)
|
||||
a.eventSender.Broadcast(&pb.Event{
|
||||
s.eventSender.Broadcast(&pb.Event{
|
||||
Messages: []*pb.EventMessage{
|
||||
{
|
||||
SpaceId: spaceId,
|
||||
Value: &pb.EventMessageValueOfKeyUpdate{KeyUpdate: &pb.EventKeyUpdate{
|
||||
SpaceKeyId: spaceKeyId,
|
||||
EncryptionKeyId: aclState.CurrentReadKeyId(),
|
||||
EncryptionKey: encodedKey,
|
||||
}},
|
||||
Value: &pb.EventMessageValueOfPushEncryptionKeyUpdate{
|
||||
PushEncryptionKeyUpdate: &pb.EventPushEncryptionKeyUpdate{
|
||||
EncryptionKeyId: keys.encryptionKeyId,
|
||||
EncryptionKey: encodedKey,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) getSpaceKeys(spaceId string) (*spaceKeys, error) {
|
||||
space, err := s.spaceService.Get(s.ctx, spaceId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get space: %w", err)
|
||||
}
|
||||
state := space.CommonSpace().Acl().AclState()
|
||||
return s.getSpaceKeysFromAcl(state)
|
||||
}
|
||||
|
||||
func (s *service) getSpaceKeysFromAcl(state *list.AclState) (*spaceKeys, error) {
|
||||
firstMetadataKey, err := state.FirstMetadataKey()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get first metadata key: %w", err)
|
||||
}
|
||||
signKey, err := deriveSpaceKey(firstMetadataKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("derive space key: %w", err)
|
||||
}
|
||||
symKey, err := state.CurrentReadKey()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get current read key: %w", err)
|
||||
}
|
||||
|
||||
spaceKey, err := signKey.GetPublic().Raw()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get raw space public key: %w", err)
|
||||
}
|
||||
|
||||
readKeyId := state.CurrentReadKeyId()
|
||||
hasher := sha256.New()
|
||||
encryptionKeyId := hex.EncodeToString(hasher.Sum([]byte(readKeyId)))
|
||||
|
||||
encryptionKey, err := deriveSymmetricKey(symKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &spaceKeys{
|
||||
spaceKey: spaceKeyType(spaceKey),
|
||||
encryptionKey: encryptionKey,
|
||||
encryptionKeyId: encryptionKeyId,
|
||||
signKey: signKey,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -1835,8 +1835,6 @@
|
|||
- [Event.File.SpaceUsage](#anytype-Event-File-SpaceUsage)
|
||||
- [Event.Import](#anytype-Event-Import)
|
||||
- [Event.Import.Finish](#anytype-Event-Import-Finish)
|
||||
- [Event.Key](#anytype-Event-Key)
|
||||
- [Event.Key.Update](#anytype-Event-Key-Update)
|
||||
- [Event.Membership](#anytype-Event-Membership)
|
||||
- [Event.Membership.Update](#anytype-Event-Membership-Update)
|
||||
- [Event.Message](#anytype-Event-Message)
|
||||
|
@ -1871,6 +1869,8 @@
|
|||
- [Event.Process.Done](#anytype-Event-Process-Done)
|
||||
- [Event.Process.New](#anytype-Event-Process-New)
|
||||
- [Event.Process.Update](#anytype-Event-Process-Update)
|
||||
- [Event.PushEncryptionKey](#anytype-Event-PushEncryptionKey)
|
||||
- [Event.PushEncryptionKey.Update](#anytype-Event-PushEncryptionKey-Update)
|
||||
- [Event.Space](#anytype-Event-Space)
|
||||
- [Event.Space.AutoWidgetAdded](#anytype-Event-Space-AutoWidgetAdded)
|
||||
- [Event.Space.SyncStatus](#anytype-Event-Space-SyncStatus)
|
||||
|
@ -28980,33 +28980,6 @@ Precondition: user A opened a block
|
|||
|
||||
|
||||
|
||||
<a name="anytype-Event-Key"></a>
|
||||
|
||||
### Event.Key
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
<a name="anytype-Event-Key-Update"></a>
|
||||
|
||||
### Event.Key.Update
|
||||
|
||||
|
||||
|
||||
| Field | Type | Label | Description |
|
||||
| ----- | ---- | ----- | ----------- |
|
||||
| spaceKeyId | [string](#string) | | |
|
||||
| encryptionKeyId | [string](#string) | | |
|
||||
| encryptionKey | [string](#string) | | |
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
<a name="anytype-Event-Membership"></a>
|
||||
|
||||
### Event.Membership
|
||||
|
@ -29120,7 +29093,7 @@ Precondition: user A opened a block
|
|||
| chatUpdateMentionReadStatus | [Event.Chat.UpdateMentionReadStatus](#anytype-Event-Chat-UpdateMentionReadStatus) | | received to update per-message mention read status (if needed to highlight the unread mentions in the UI) |
|
||||
| chatDelete | [Event.Chat.Delete](#anytype-Event-Chat-Delete) | | |
|
||||
| chatStateUpdate | [Event.Chat.UpdateState](#anytype-Event-Chat-UpdateState) | | in case new unread messages received or chat state changed (e.g. message read on another device) |
|
||||
| keyUpdate | [Event.Key.Update](#anytype-Event-Key-Update) | | |
|
||||
| pushEncryptionKeyUpdate | [Event.PushEncryptionKey.Update](#anytype-Event-PushEncryptionKey-Update) | | |
|
||||
|
||||
|
||||
|
||||
|
@ -29569,6 +29542,32 @@ Removes document from subscription
|
|||
|
||||
|
||||
|
||||
<a name="anytype-Event-PushEncryptionKey"></a>
|
||||
|
||||
### Event.PushEncryptionKey
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
<a name="anytype-Event-PushEncryptionKey-Update"></a>
|
||||
|
||||
### Event.PushEncryptionKey.Update
|
||||
|
||||
|
||||
|
||||
| Field | Type | Label | Description |
|
||||
| ----- | ---- | ----- | ----------- |
|
||||
| encryptionKeyId | [string](#string) | | |
|
||||
| encryptionKey | [string](#string) | | |
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
<a name="anytype-Event-Space"></a>
|
||||
|
||||
### Event.Space
|
||||
|
|
997
pb/events.pb.go
997
pb/events.pb.go
File diff suppressed because it is too large
Load diff
|
@ -122,7 +122,7 @@ message Event {
|
|||
|
||||
Chat.Delete chatDelete = 131;
|
||||
Chat.UpdateState chatStateUpdate = 133; // in case new unread messages received or chat state changed (e.g. message read on another device)
|
||||
Key.Update keyUpdate = 136;
|
||||
PushEncryptionKey.Update pushEncryptionKeyUpdate = 136;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1191,11 +1191,10 @@ message Event {
|
|||
}
|
||||
}
|
||||
|
||||
message Key {
|
||||
message PushEncryptionKey {
|
||||
message Update {
|
||||
string spaceKeyId = 1;
|
||||
string encryptionKeyId = 2;
|
||||
string encryptionKey = 3;
|
||||
string encryptionKeyId = 1;
|
||||
string encryptionKey = 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ func New(ownerMetadata []byte, guestKey crypto.PrivKey) AclObjectManager {
|
|||
}
|
||||
|
||||
type pushNotificationService interface {
|
||||
CreateSpace(ctx context.Context, spaceId string) (err error)
|
||||
SubscribeToTopics(ctx context.Context, spaceId string, topics []string)
|
||||
BroadcastKeyUpdate(spaceId string, aclState *list.AclState) error
|
||||
}
|
||||
|
@ -175,14 +176,6 @@ func (a *aclObjectManager) process() {
|
|||
log.Error("error processing acl", zap.Error(err))
|
||||
return
|
||||
}
|
||||
a.subscribeToPushNotifications(acl)
|
||||
}
|
||||
|
||||
func (a *aclObjectManager) subscribeToPushNotifications(acl syncacl.SyncAcl) {
|
||||
aclState := acl.AclState()
|
||||
if !aclState.Permissions(aclState.AccountKey().GetPublic()).IsOwner() {
|
||||
a.pushNotificationService.SubscribeToTopics(a.ctx, a.sp.Id(), []string{chatpush.ChatsTopicName})
|
||||
}
|
||||
}
|
||||
|
||||
func (a *aclObjectManager) processAcl() (err error) {
|
||||
|
@ -270,6 +263,36 @@ func (a *aclObjectManager) processAcl() (err error) {
|
|||
if err != nil {
|
||||
return fmt.Errorf("broadcast key update: %w", err)
|
||||
}
|
||||
|
||||
err = a.subscribeToPushNotifications(acl)
|
||||
if err != nil {
|
||||
log.Error("subscribe to push notifications", zap.Error(err))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *aclObjectManager) subscribeToPushNotifications(acl syncacl.SyncAcl) error {
|
||||
aclState := acl.AclState()
|
||||
currentIdentity := aclState.AccountKey().GetPublic()
|
||||
|
||||
var needToSubscribe bool
|
||||
if aclState.Permissions(currentIdentity).IsOwner() {
|
||||
// Only if user shared this space
|
||||
if len(aclState.Invites()) > 0 {
|
||||
err := a.pushNotificationService.CreateSpace(a.ctx, a.sp.Id())
|
||||
if err != nil {
|
||||
return fmt.Errorf("create space: %w", err)
|
||||
}
|
||||
needToSubscribe = true
|
||||
}
|
||||
} else {
|
||||
needToSubscribe = true
|
||||
}
|
||||
|
||||
if needToSubscribe {
|
||||
a.pushNotificationService.SubscribeToTopics(a.ctx, a.sp.Id(), []string{chatpush.ChatsTopicName})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -48,7 +48,11 @@ func (s *pushNotificationServiceDummy) SubscribeToTopics(ctx context.Context, sp
|
|||
|
||||
}
|
||||
|
||||
func (s pushNotificationServiceDummy) BroadcastKeyUpdate(spaceId string, aclState *list.AclState) error {
|
||||
func (s *pushNotificationServiceDummy) CreateSpace(ctx context.Context, spaceId string) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *pushNotificationServiceDummy) BroadcastKeyUpdate(spaceId string, aclState *list.AclState) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Binary file not shown.
Loading…
Add table
Add a link
Reference in a new issue