1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-09 09:35:00 +09:00

GO-2043: make process notificationable and add client event for rootCollectionID

Signed-off-by: AnastasiaShemyakinskaya <shem98a@mail.ru>
This commit is contained in:
AnastasiaShemyakinskaya 2023-12-13 15:35:35 +03:00
parent ed4b237b5e
commit 4dae1afd60
No known key found for this signature in database
GPG key ID: CCD60ED83B103281
9 changed files with 873 additions and 385 deletions

View file

@ -33,7 +33,9 @@ import (
"github.com/anyproto/anytype-heart/core/block/object/idresolver"
"github.com/anyproto/anytype-heart/core/block/object/objectcreator"
"github.com/anyproto/anytype-heart/core/block/process"
"github.com/anyproto/anytype-heart/core/event"
"github.com/anyproto/anytype-heart/core/filestorage/filesync"
"github.com/anyproto/anytype-heart/core/notifications"
"github.com/anyproto/anytype-heart/metrics"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
@ -54,12 +56,14 @@ const CName = "importer"
const workerPoolSize = 10
type Import struct {
converters map[string]common.Converter
s *block.Service
oc creator.Service
idProvider objectid.IDProvider
tempDirProvider core.TempDirProvider
fileSync filesync.FileSync
converters map[string]common.Converter
s *block.Service
oc creator.Service
idProvider objectid.IDProvider
tempDirProvider core.TempDirProvider
fileSync filesync.FileSync
notificationService notifications.Notifications
eventSender event.Sender
sync.Mutex
}
@ -96,45 +100,57 @@ func (i *Import) Init(a *app.App) (err error) {
objectCreator := app.MustComponent[objectcreator.Service](a)
i.oc = creator.New(i.s, factory, store, relationSyncer, fileStore, spaceService, objectCreator)
i.fileSync = app.MustComponent[filesync.FileSync](a)
i.notificationService = app.MustComponent[notifications.Notifications](a)
i.eventSender = app.MustComponent[event.Sender](a)
return nil
}
// Import get snapshots from converter or external api and create smartblocks from them
func (i *Import) Import(ctx context.Context, req *pb.RpcObjectImportRequest, origin model.ObjectOrigin, progress process.Progress) (string, string, error) {
func (i *Import) Import(ctx context.Context,
req *pb.RpcObjectImportRequest,
origin model.ObjectOrigin,
progress process.Progress,
sendNotification bool,
) (string, error) {
if req.SpaceId == "" {
return "", "", fmt.Errorf("spaceId is empty")
return "", fmt.Errorf("spaceId is empty")
}
i.Lock()
defer i.Unlock()
isNewProgress := false
if progress == nil {
progress = i.setupProgressBar(req)
progress = i.setupProgressBar(req, sendNotification)
isNewProgress = true
}
var (
returnedErr error
importId = uuid.New().String()
returnedErr error
importId = uuid.New().String()
rootCollectionId string
)
defer func() {
i.finishImportProcess(returnedErr, progress)
i.sendFileEvents(returnedErr)
i.recordEvent(&metrics.ImportFinishedEvent{ID: importId, ImportType: req.Type.String()})
i.onImportFinish(returnedErr, progress, req, importId, rootCollectionId)
}()
if i.s != nil && !req.GetNoProgress() && isNewProgress {
i.s.ProcessAdd(progress)
}
i.recordEvent(&metrics.ImportStartedEvent{ID: importId, ImportType: req.Type.String()})
var rootCollectionId string
if c, ok := i.converters[req.Type.String()]; ok {
rootCollectionId, returnedErr = i.importFromBuiltinConverter(ctx, req, c, progress, origin)
return rootCollectionId, "", returnedErr
return rootCollectionId, returnedErr
}
if req.Type == model.Import_External {
returnedErr = i.importFromExternalSource(ctx, req, progress)
return rootCollectionId, "", returnedErr
return rootCollectionId, returnedErr
}
returnedErr = fmt.Errorf("unknown import type %s", req.Type)
return rootCollectionId, progress.Id(), returnedErr
return rootCollectionId, returnedErr
}
func (i *Import) onImportFinish(returnedErr error, progress process.Progress, req *pb.RpcObjectImportRequest, importId string, rootCollectionId string) {
i.finishImportProcess(returnedErr, progress, req)
i.sendFileEvents(returnedErr)
i.recordEvent(&metrics.ImportFinishedEvent{ID: importId, ImportType: req.Type.String()})
i.sendImportFinishEventToClient(rootCollectionId)
}
func (i *Import) sendFileEvents(returnedErr error) {
@ -200,10 +216,27 @@ func (i *Import) importFromExternalSource(ctx context.Context,
return common.ErrNoObjectsToImport
}
func (i *Import) finishImportProcess(returnedErr error, progress process.Progress) {
func (i *Import) finishImportProcess(returnedErr error, progress process.Progress, req *pb.RpcObjectImportRequest) {
if notificationProgress, ok := progress.(process.Notificationable); ok {
notificationProgress.SetNotification(i.provideNotification(returnedErr, progress, req))
}
progress.Finish(returnedErr)
}
func (i *Import) provideNotification(returnedErr error, progress process.Progress, req *pb.RpcObjectImportRequest) *model.Notification {
return &model.Notification{
Status: model.Notification_Created,
IsLocal: true,
Space: req.SpaceId,
Payload: &model.NotificationPayloadOfImport{Import: &model.NotificationImport{
ProcessId: progress.Id(),
ErrorCode: common.GetImportErrorCode(returnedErr),
ImportType: req.Type,
SpaceId: req.SpaceId,
}},
}
}
func shouldReturnError(e error, res *common.Response, req *pb.RpcObjectImportRequest) bool {
return (e != nil && req.Mode != pb.RpcObjectImportRequest_IGNORE_ERRORS) ||
errors.Is(e, common.ErrFailedToReceiveListOfObjects) || errors.Is(e, common.ErrLimitExceeded) ||
@ -211,7 +244,7 @@ func shouldReturnError(e error, res *common.Response, req *pb.RpcObjectImportReq
errors.Is(e, common.ErrCancel)
}
func (i *Import) setupProgressBar(req *pb.RpcObjectImportRequest) process.Progress {
func (i *Import) setupProgressBar(req *pb.RpcObjectImportRequest, sendNotification bool) process.Progress {
progressBarType := pb.ModelProcess_Import
if req.IsMigration {
progressBarType = pb.ModelProcess_Migration
@ -221,6 +254,9 @@ func (i *Import) setupProgressBar(req *pb.RpcObjectImportRequest) process.Progre
progress = process.NewNoOp()
} else {
progress = process.NewProgress(progressBarType)
if sendNotification {
progress = process.NewNotificationProcess(progressBarType, i.notificationService)
}
}
return progress
}
@ -418,6 +454,18 @@ func (i *Import) recordEvent(event metrics.EventRepresentable) {
metrics.SharedClient.RecordEvent(event)
}
func (i *Import) sendImportFinishEventToClient(rootCollectionID string) {
i.eventSender.Broadcast(&pb.Event{
Messages: []*pb.EventMessage{
{
Value: &pb.EventMessageValueOfImportFinish{
ImportFinish: &pb.EventImportFinish{RootCollectionID: rootCollectionID},
},
},
},
})
}
func convertType(cType string) pb.RpcObjectImportListImportResponseType {
return pb.RpcObjectImportListImportResponseType(pb.RpcObjectImportListImportResponseType_value[cType])
}

View file

@ -22,7 +22,8 @@ type Importer interface {
req *pb.RpcObjectImportRequest,
origin model.ObjectOrigin,
progress process.Progress,
) (rootCollectionId string, processId string, err error)
sendNotification bool,
) (rootCollectionId string, err error)
ListImports(req *pb.RpcObjectImportListRequest) ([]*pb.RpcObjectImportListImportResponse, error)
ImportWeb(ctx context.Context, req *pb.RpcObjectImportRequest) (string, *types.Struct, error)

View file

@ -0,0 +1,38 @@
package process
import (
"github.com/anyproto/anytype-heart/core/notifications"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/logging"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
)
var log = logging.Logger("notification-process")
type Notificationable interface {
SendNotification()
SetNotification(notification *model.Notification)
}
type NotificationProcess struct {
Progress
notification *model.Notification
notificationService notifications.Notifications
}
func NewNotificationProcess(pbType pb.ModelProcessType, notificationService notifications.Notifications) *NotificationProcess {
return &NotificationProcess{Progress: NewProgress(pbType), notificationService: notificationService}
}
func (n *NotificationProcess) SetNotification(notification *model.Notification) {
n.notification = notification
}
func (n *NotificationProcess) SendNotification() {
if n.notification != nil {
notificationSendErr := n.notificationService.CreateAndSendLocal(n.notification)
if notificationSendErr != nil {
log.Errorf("failed to send notification: %v", notificationSendErr)
}
}
}

View file

@ -117,6 +117,9 @@ func (s *service) monitor(p Process) {
},
},
})
if notificationSender, ok := p.(Notificationable); ok {
notificationSender.SendNotification()
}
return
}
}

View file

@ -18,7 +18,6 @@ import (
"github.com/anyproto/anytype-heart/core/block/import/common"
"github.com/anyproto/anytype-heart/core/block/object/objectgraph"
"github.com/anyproto/anytype-heart/core/indexer"
"github.com/anyproto/anytype-heart/core/notifications"
"github.com/anyproto/anytype-heart/core/subscription"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
@ -791,22 +790,7 @@ func (mw *Middleware) ObjectImport(cctx context.Context, req *pb.RpcObjectImport
return m
}
rootCollectionId, processID, err := getService[importer.Importer](mw).Import(cctx, req, model.ObjectOrigin_import, nil)
notificationSendErr := getService[notifications.Notifications](mw).CreateAndSendLocal(&model.Notification{
Status: model.Notification_Created,
IsLocal: true,
Space: req.SpaceId,
Payload: &model.NotificationPayloadOfImport{Import: &model.NotificationImport{
ProcessId: processID,
ErrorCode: common.GetImportErrorCode(err),
ImportType: req.Type,
SpaceId: req.SpaceId,
}},
})
if notificationSendErr != nil {
log.Errorf("failed to send notification: %v", notificationSendErr)
}
rootCollectionId, err := getService[importer.Importer](mw).Import(cctx, req, model.ObjectOrigin_import, nil, true)
if err == nil {
return response(pb.RpcObjectImportResponseError_NULL, rootCollectionId, nil)

View file

@ -1331,6 +1331,8 @@
- [Event.File.LimitReached](#anytype-Event-File-LimitReached)
- [Event.File.LocalUsage](#anytype-Event-File-LocalUsage)
- [Event.File.SpaceUsage](#anytype-Event-File-SpaceUsage)
- [Event.Import](#anytype-Event-Import)
- [Event.Import.Finish](#anytype-Event-Import-Finish)
- [Event.Message](#anytype-Event-Message)
- [Event.Notification](#anytype-Event-Notification)
- [Event.Notification.Send](#anytype-Event-Notification-Send)
@ -20928,6 +20930,31 @@ Precondition: user A opened a block
<a name="anytype-Event-Import"></a>
### Event.Import
<a name="anytype-Event-Import-Finish"></a>
### Event.Import.Finish
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| rootCollectionID | [string](#string) | | |
<a name="anytype-Event-Message"></a>
### Event.Message
@ -20998,6 +21025,7 @@ Precondition: user A opened a block
| fileLocalUsage | [Event.File.LocalUsage](#anytype-Event-File-LocalUsage) | | |
| notificationSend | [Event.Notification.Send](#anytype-Event-Notification-Send) | | |
| notificationUpdate | [Event.Notification.Update](#anytype-Event-Notification-Update) | | |
| importFinish | [Event.Import.Finish](#anytype-Event-Import-Finish) | | |

File diff suppressed because it is too large Load diff

View file

@ -97,6 +97,8 @@ message Event {
Notification.Send notificationSend = 114;
Notification.Update notificationUpdate = 115;
Import.Finish importFinish = 116;
}
}
@ -1025,6 +1027,12 @@ message Event {
anytype.model.Notification notification = 1;
}
}
message Import {
message Finish {
string rootCollectionID = 1;
}
}
}
message ResponseEvent {

View file

@ -220,7 +220,15 @@ func (b *builtinObjects) CreateObjectsForExperience(ctx context.Context, spaceID
err = b.importArchive(ctx, spaceID, path, title, pb.RpcObjectImportRequestPbParams_EXPERIENCE, progress, isNewSpace)
notifErr := b.notifications.CreateAndSendLocal(&model.Notification{
if notificationableProcess, ok := progress.(process.Notificationable); ok {
notificationableProcess.SetNotification(b.provideNotification(spaceID, progress, err, title))
}
return err
}
func (b *builtinObjects) provideNotification(spaceID string, progress process.Progress, err error, title string) *model.Notification {
return &model.Notification{
Status: model.Notification_Created,
IsLocal: true,
Space: spaceID,
@ -230,12 +238,7 @@ func (b *builtinObjects) CreateObjectsForExperience(ctx context.Context, spaceID
SpaceId: spaceID,
Name: title,
}},
})
if notifErr != nil {
log.Errorf("failed to send notification: %v", notifErr)
}
return err
}
func (b *builtinObjects) InjectMigrationDashboard(spaceID string) error {
@ -284,7 +287,7 @@ func (b *builtinObjects) inject(ctx session.Context, spaceID string, useCase pb.
}
func (b *builtinObjects) importArchive(ctx context.Context, spaceID, path, title string, importType pb.RpcObjectImportRequestPbParamsType, progress process.Progress, isNewSpace bool) (err error) {
_, _, err = b.importer.Import(ctx, &pb.RpcObjectImportRequest{
_, err = b.importer.Import(ctx, &pb.RpcObjectImportRequest{
SpaceId: spaceID,
UpdateExistingObjects: false,
Type: model.Import_Pb,
@ -299,7 +302,7 @@ func (b *builtinObjects) importArchive(ctx context.Context, spaceID, path, title
ImportType: importType,
}},
IsNewSpace: isNewSpace,
}, model.ObjectOrigin_usecase, progress)
}, model.ObjectOrigin_usecase, progress, false)
return err
}
@ -503,7 +506,7 @@ func (b *builtinObjects) downloadZipToFile(url string, progress process.Progress
}
func (b *builtinObjects) setupProgress() (process.Progress, error) {
progress := process.NewProgress(pb.ModelProcess_Import)
progress := process.NewNotificationProcess(pb.ModelProcess_Import, b.notifications)
if err := b.progress.Add(progress); err != nil {
return nil, fmt.Errorf("failed to add progress bar: %w", err)
}