1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-07 21:37:04 +09:00
anytype-heart/space/service.go
2025-05-13 15:36:40 +02:00

561 lines
16 KiB
Go

package space
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/util/crypto"
"github.com/ipfs/go-cid"
"go.uber.org/zap"
"github.com/anyproto/anytype-heart/core/anytype/config"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/addr"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/space/clientspace"
"github.com/anyproto/anytype-heart/space/internal/components/aclobjectmanager"
"github.com/anyproto/anytype-heart/space/internal/personalspace"
"github.com/anyproto/anytype-heart/space/internal/spacecontroller"
"github.com/anyproto/anytype-heart/space/spacecore"
"github.com/anyproto/anytype-heart/space/spacefactory"
"github.com/anyproto/anytype-heart/space/spaceinfo"
"github.com/anyproto/anytype-heart/space/techspace"
"github.com/anyproto/anytype-heart/util/encode"
"github.com/anyproto/anytype-heart/util/uri"
)
const CName = "client.space"
var log = logger.NewNamed(CName)
var (
waitSpaceDelay = 500 * time.Millisecond
loadTechSpaceDeadline = 15 * time.Second
)
var (
ErrIncorrectSpaceID = errors.New("incorrect space id")
ErrSpaceNotExists = errors.New("space not exists")
ErrSpaceStorageMissig = errors.New("space storage missing")
ErrSpaceDeleted = errors.New("space is deleted")
ErrSpaceIsClosing = errors.New("space is closing")
ErrFailedToLoad = errors.New("failed to load space")
)
func New() Service {
return &service{}
}
type Service interface {
Create(ctx context.Context) (space clientspace.Space, err error)
Join(ctx context.Context, id, aclHeadId string) error
InviteJoin(ctx context.Context, id, aclHeadId string) error
CancelLeave(ctx context.Context, id string) (err error)
Get(ctx context.Context, id string) (space clientspace.Space, err error)
Wait(ctx context.Context, spaceId string) (sp clientspace.Space, err error)
AddStreamable(ctx context.Context, id string, guestKey crypto.PrivKey) (err error)
Delete(ctx context.Context, id string) (err error)
TechSpaceId() string
PersonalSpaceId() string
FirstCreatedSpaceId() string
TechSpace() *clientspace.TechSpace
GetPersonalSpace(ctx context.Context) (space clientspace.Space, err error)
GetTechSpace(ctx context.Context) (space clientspace.Space, err error)
SpaceViewId(spaceId string) (spaceViewId string, err error)
AccountMetadataSymKey() crypto.SymKey
AccountMetadataPayload() []byte
app.ComponentRunnable
}
type coordinatorStatusUpdater interface {
app.Component
UpdateCoordinatorStatus()
}
type NotificationSender interface {
app.Component
CreateAndSend(notification *model.Notification) error
}
type AclJoiner interface {
Join(ctx context.Context, spaceId, networkId string, inviteCid cid.Cid, inviteFileKey crypto.SymKey) error
}
type service struct {
techSpace *clientspace.TechSpace
techSpaceReady chan struct{}
factory spacefactory.SpaceFactory
spaceCore spacecore.SpaceCoreService
aclJoiner AclJoiner
accountService accountservice.Service
config *config.Config
notificationService NotificationSender
updater coordinatorStatusUpdater
spaceNameGetter objectstore.SpaceNameGetter
personalSpaceId string
techSpaceId string
newAccount bool
autoJoinStreamSpace string
spaceControllers map[string]spacecontroller.SpaceController
waiting map[string]controllerWaiter
accountMetadataSymKey crypto.SymKey
accountMetadataPayload []byte
repKey uint64
spaceLoaderListener aclobjectmanager.SpaceLoaderListener
mu sync.Mutex
ctx context.Context // use ctx for the long operations within the lifecycle of the service, excluding Run
ctxCancel context.CancelFunc
isClosing atomic.Bool
firstCreatedSpaceId string
}
func (s *service) Delete(ctx context.Context, id string) (err error) {
if s.isClosing.Load() {
return ErrSpaceIsClosing
}
s.mu.Lock()
ctrl := s.spaceControllers[id]
s.mu.Unlock()
del, ok := ctrl.(spacecontroller.DeleteController)
if !ok {
return ErrSpaceNotExists
}
err = del.Delete(ctx)
if err != nil {
return fmt.Errorf("delete space: %w", err)
}
return nil
}
func (s *service) TechSpace() *clientspace.TechSpace {
return s.techSpace
}
func (s *service) Init(a *app.App) (err error) {
s.factory = app.MustComponent[spacefactory.SpaceFactory](a)
s.spaceCore = app.MustComponent[spacecore.SpaceCoreService](a)
s.accountService = app.MustComponent[accountservice.Service](a)
s.config = app.MustComponent[*config.Config](a)
s.aclJoiner = app.MustComponent[AclJoiner](a)
s.newAccount = s.config.IsNewAccount()
s.autoJoinStreamSpace = s.config.AutoJoinStream
s.spaceControllers = make(map[string]spacecontroller.SpaceController)
s.updater = app.MustComponent[coordinatorStatusUpdater](a)
s.notificationService = app.MustComponent[NotificationSender](a)
s.spaceNameGetter = app.MustComponent[objectstore.SpaceNameGetter](a)
s.spaceLoaderListener = app.MustComponent[aclobjectmanager.SpaceLoaderListener](a)
s.waiting = make(map[string]controllerWaiter)
s.techSpaceReady = make(chan struct{})
s.personalSpaceId, err = s.spaceCore.DeriveID(context.Background(), spacecore.SpaceType)
if err != nil {
return
}
s.techSpaceId, err = s.spaceCore.DeriveID(context.Background(), spacecore.TechSpaceType)
if err != nil {
return
}
accountMetadata, metadataSymKey, err := deriveMetadata(s.accountService.Account().SignKey)
if err != nil {
return
}
s.accountMetadataSymKey = metadataSymKey
s.accountMetadataPayload, err = accountMetadata.Marshal()
if err != nil {
return fmt.Errorf("marshal account metadata: %w", err)
}
s.repKey, err = getRepKey(s.personalSpaceId)
s.ctx, s.ctxCancel = context.WithCancel(context.Background())
return err
}
func (s *service) Name() (name string) {
return CName
}
func (s *service) Run(ctx context.Context) (err error) {
defer s.updater.UpdateCoordinatorStatus()
if s.newAccount {
return s.createAccount(ctx)
} else {
s.tryToJoinSpaceStream()
}
return s.initAccount(ctx)
}
func (s *service) createTechSpaceForOldAccounts(ctx context.Context) (err error) {
// check if we have a personal space
_, err = s.spaceCore.Get(ctx, s.personalSpaceId)
if err != nil {
// then we don't have a personal space, so we have nothing, sorry, there is no point in creating tech space
return fmt.Errorf("init tech space: %w", err)
}
// this is an old account
err = s.createTechSpace(ctx)
if err != nil {
return fmt.Errorf("init tech space: %w", err)
}
// skipping check for space view because we don't have it
ctx = context.WithValue(ctx, personalspace.SkipCheckSpaceViewKey, true)
_, err = s.startStatus(ctx, spaceinfo.NewSpacePersistentInfo(s.personalSpaceId))
if err != nil {
return fmt.Errorf("start personal space: %w", err)
}
return nil
}
func (s *service) initAccount(ctx context.Context) (err error) {
err = s.initMarketplaceSpace(ctx)
if err != nil {
return fmt.Errorf("init marketplace space: %w", err)
}
s.spaceLoaderListener.OnSpaceLoad(addr.AnytypeMarketplaceWorkspace)
timeoutCtx, cancel := context.WithTimeout(ctx, loadTechSpaceDeadline)
err = s.loadTechSpace(timeoutCtx)
cancel()
// this crazy logic is needed if the person is restoring the old account locally with no connection and no tech space
// nolint:nestif
if errors.Is(err, context.DeadlineExceeded) {
var personalExists bool
// checking if personal space exists locally
personalExists, err = s.spaceCore.StorageExistsLocally(ctx, s.personalSpaceId)
if err != nil {
return fmt.Errorf("check personal space: %w", err)
}
// ok no space locally, then we have to get a reply from server to know if we have a tech space
if !personalExists {
// trying again to load space with normal ctx
err = s.loadTechSpace(ctx)
} else {
// personal exists, then we have to create tech space
err = s.createTechSpaceForOldAccounts(ctx)
if err != nil {
return fmt.Errorf("create tech space for old accounts: %w", err)
}
}
}
if err != nil && !errors.Is(err, spacesyncproto.ErrSpaceMissing) {
return fmt.Errorf("init tech space: %w", err)
}
// nolint:nestif
if errors.Is(err, spacesyncproto.ErrSpaceMissing) {
// no tech space on nodes, this is our only chance
err = s.createTechSpaceForOldAccounts(ctx)
if err != nil {
return fmt.Errorf("create tech space for old accounts: %w", err)
}
}
s.techSpace.WakeUpViews()
// only persist networkId after successful space init
err = s.config.PersistAccountNetworkId()
if err != nil {
log.Error("persist network id to config", zap.Error(err))
}
return nil
}
func (s *service) createAccount(ctx context.Context) (err error) {
err = s.initMarketplaceSpace(ctx)
if err != nil {
return fmt.Errorf("init marketplace space: %w", err)
}
err = s.createTechSpace(ctx)
if err != nil {
return fmt.Errorf("init tech space: %w", err)
}
if s.autoJoinStreamSpace == "" {
firstSpace, err := s.create(ctx)
if err != nil {
if errors.Is(err, spacesyncproto.ErrSpaceMissing) || errors.Is(err, treechangeproto.ErrGetTree) {
err = ErrSpaceNotExists
}
// fix for the users that have wrong network id stored in the folder
err2 := s.config.ResetStoredNetworkId()
if err2 != nil {
log.Error("reset network id", zap.Error(err2))
}
return fmt.Errorf("init personal space: %w", err)
}
s.firstCreatedSpaceId = firstSpace.Id()
} else {
s.tryToJoinSpaceStream()
}
s.techSpace.WakeUpViews()
// only persist networkId after successful space init
err = s.config.PersistAccountNetworkId()
if err != nil {
log.Error("persist network id to config", zap.Error(err))
}
return nil
}
func (s *service) Create(ctx context.Context) (clientspace.Space, error) {
if s.isClosing.Load() {
return nil, ErrSpaceIsClosing
}
return s.create(ctx)
}
func (s *service) Wait(ctx context.Context, spaceId string) (sp clientspace.Space, err error) {
waiter := newSpaceWaiter(s, s.ctx, waitSpaceDelay)
return waiter.waitSpace(ctx, spaceId)
}
func (s *service) Get(ctx context.Context, spaceId string) (sp clientspace.Space, err error) {
if spaceId == s.techSpaceId {
return s.getTechSpace(ctx)
}
ctrl, err := s.getCtrl(ctx, spaceId)
if err != nil {
return nil, err
}
return s.waitLoad(ctx, ctrl)
}
func (s *service) UpdateSharedLimits(ctx context.Context, limits int) error {
return s.techSpace.DoAccountObject(ctx, func(accObj techspace.AccountObject) error {
return accObj.SetSharedSpacesLimit(limits)
})
}
func (s *service) GetPersonalSpace(ctx context.Context) (sp clientspace.Space, err error) {
return s.Get(ctx, s.personalSpaceId)
}
func (s *service) GetTechSpace(ctx context.Context) (sp clientspace.Space, err error) {
return s.Get(ctx, s.techSpaceId)
}
func (s *service) IsPersonal(id string) bool {
return s.personalSpaceId == id
}
func (s *service) OnViewUpdated(info spaceinfo.SpacePersistentInfo) {
go func() {
ctrl, err := s.startStatus(s.ctx, info)
if err != nil && !errors.Is(err, ErrSpaceDeleted) {
log.Warn("OnViewUpdated.startStatus error", zap.Error(err))
return
}
err = ctrl.Update()
if err != nil {
log.Warn("OnViewCreated.UpdateStatus error", zap.Error(err))
return
}
}()
}
func (s *service) OnWorkspaceChanged(spaceId string, details *domain.Details) {
go func() {
if err := s.techSpace.SpaceViewSetData(s.ctx, spaceId, details); err != nil {
log.Warn("OnWorkspaceChanged error", zap.Error(err))
}
}()
}
func (s *service) AccountMetadataSymKey() crypto.SymKey {
return s.accountMetadataSymKey
}
func (s *service) AccountMetadataPayload() []byte {
return s.accountMetadataPayload
}
func (s *service) UpdateRemoteStatus(ctx context.Context, status spaceinfo.SpaceRemoteStatusInfo) error {
spaceId := status.LocalInfo.SpaceId
s.mu.Lock()
ctrl := s.spaceControllers[spaceId]
s.mu.Unlock()
if ctrl == nil {
return fmt.Errorf("no such space: %s", spaceId)
}
err := ctrl.SetLocalInfo(ctx, status.LocalInfo)
if err != nil {
return fmt.Errorf("updateRemoteStatus: %w", err)
}
if !status.IsOwned && status.LocalInfo.GetRemoteStatus() == spaceinfo.RemoteStatusDeleted {
accountStatus := ctrl.GetStatus()
if accountStatus != spaceinfo.AccountStatusDeleted && accountStatus != spaceinfo.AccountStatusRemoving {
if ctrl.GetLocalStatus() == spaceinfo.LocalStatusOk {
s.sendNotification(spaceId)
}
info := spaceinfo.NewSpacePersistentInfo(spaceId)
info.SetAccountStatus(spaceinfo.AccountStatusRemoving)
return ctrl.SetPersistentInfo(ctx, info)
}
}
return nil
}
func (s *service) sendNotification(spaceId string) {
identity := s.accountService.Account().SignKey.GetPublic().Account()
notificationId := strings.Join([]string{spaceId, identity}, "_")
spaceName := s.spaceNameGetter.GetSpaceName(spaceId)
err := s.notificationService.CreateAndSend(&model.Notification{
Id: notificationId,
Payload: &model.NotificationPayloadOfParticipantRemove{
ParticipantRemove: &model.NotificationParticipantRemove{
SpaceId: spaceId,
SpaceName: spaceName,
},
},
Space: spaceId,
})
if err != nil {
log.Error("failed to send notification", zap.Error(err), zap.String("spaceId", spaceId))
}
}
func (s *service) SpaceViewId(spaceId string) (spaceViewId string, err error) {
return s.techSpace.SpaceViewId(spaceId)
}
func (s *service) Close(ctx context.Context) error {
if s.ctxCancel != nil {
s.ctxCancel()
}
s.isClosing.Store(true)
s.mu.Lock()
ctrls := make([]spacecontroller.SpaceController, 0, len(s.spaceControllers))
for _, ctrl := range s.spaceControllers {
ctrls = append(ctrls, ctrl)
}
s.mu.Unlock()
wg := sync.WaitGroup{}
for _, ctrl := range ctrls {
wg.Add(1)
go func(ctrl spacecontroller.SpaceController) {
defer wg.Done()
err := ctrl.Close(ctx)
if err != nil {
log.Error("close space", zap.String("spaceId", ctrl.SpaceId()), zap.Error(err))
}
}(ctrl)
}
wg.Wait()
err := s.techSpace.Close(ctx)
if err != nil {
log.Error("close tech space", zap.Error(err))
}
return nil
}
func (s *service) AllSpaceIds() (ids []string) {
s.mu.Lock()
defer s.mu.Unlock()
for id := range s.spaceControllers {
if id == addr.AnytypeMarketplaceWorkspace {
continue
}
ids = append(ids, id)
}
return
}
func (s *service) TechSpaceId() string {
return s.techSpaceId
}
func (s *service) PersonalSpaceId() string {
return s.personalSpaceId
}
func (s *service) FirstCreatedSpaceId() string {
return s.firstCreatedSpaceId
}
func (s *service) getTechSpace(ctx context.Context) (*clientspace.TechSpace, error) {
select {
case <-s.techSpaceReady:
return s.techSpace, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// tryToJoinSpaceStream tries to join space stream if autoJoinStreamSpace is set
// it runs in a goroutine and retries with increasing delay
// stops when service is closed
func (s *service) tryToJoinSpaceStream() {
if s.autoJoinStreamSpace == "" {
return
}
// retry with increasing delay
go func() {
delay := time.Second
for {
err := joinSpaceStream(s.ctx, s, s.aclJoiner, s.autoJoinStreamSpace)
if err == nil {
return
}
if s.ctx.Err() != nil {
return
}
log.Warn("failed to join stream", zap.Error(err))
select {
case <-time.After(delay):
delay *= 2
case <-s.ctx.Done():
return
}
}
}()
}
func joinSpaceStream(ctx context.Context, spaceService *service, aclJoiner AclJoiner, inviteUrl string) error {
if inviteUrl == "" {
return nil
}
if aclJoiner == nil {
return fmt.Errorf("aclJoiner is nil")
}
inviteId, inviteKey, spaceId, networkId, err := uri.ParseInviteUrl(inviteUrl)
if err != nil {
return err
}
if spaceId == "" {
return fmt.Errorf("spaceId is empty")
}
inviteCid, err := cid.Parse(inviteId)
if err != nil {
return err
}
inviteSymKey, err := encode.DecodeKeyFromBase58(inviteKey)
if err != nil {
return err
}
techSpace, err := spaceService.getTechSpace(ctx)
if err != nil {
return fmt.Errorf("get tech space: %w", err)
}
if exists, err := techSpace.SpaceViewExists(ctx, spaceId); err != nil {
return err
} else if exists {
// do not try to join stream if space already joined or removed
return nil
}
return aclJoiner.Join(ctx, spaceId, networkId, inviteCid, inviteSymKey)
}