1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-10 18:10:49 +09:00

Merge pull request #714 from anyproto/go-2043-unify-logic-for-long-background-requests-from-client

GO-2043: make import process not blocking
This commit is contained in:
Anastasia Shemyakinskaya 2024-08-13 14:52:15 +02:00 committed by GitHub
commit a630361681
Signed by: github
GPG key ID: B5690EEEBB952194
16 changed files with 1426 additions and 674 deletions

View file

@ -16,6 +16,7 @@ import (
"github.com/anyproto/anytype-heart/metrics"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/logging"
"github.com/anyproto/anytype-heart/util/conc"
"github.com/anyproto/anytype-heart/util/vcs"
)
@ -33,7 +34,7 @@ func init() {
}
fmt.Printf("mw lib: %s\n", vcs.GetVCSInfo().Description())
PanicHandler = mw.OnPanic
PanicHandler = conc.OnPanic
metrics.Service.InitWithKeys(metrics.DefaultInHouseKey)
registerClientCommandsHandler(
&ClientCommandsHandlerProxy{

View file

@ -34,6 +34,7 @@ import (
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pb/service"
"github.com/anyproto/anytype-heart/pkg/lib/logging"
"github.com/anyproto/anytype-heart/util/conc"
"github.com/anyproto/anytype-heart/util/vcs"
)
@ -267,7 +268,7 @@ func appendInterceptor(
}
func onDefaultError(mw *core.Middleware, r any, resp interface{}) interface{} {
mw.OnPanic(r)
conc.OnPanic(r)
resp = &pb.RpcGenericErrorResponse{
Error: &pb.RpcGenericErrorResponseError{
Code: pb.RpcGenericErrorResponseError_UNKNOWN_ERROR,

View file

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/anyproto/any-sync/app"
@ -34,8 +33,10 @@ import (
"github.com/anyproto/anytype-heart/core/block/process"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/core/domain/objectorigin"
"github.com/anyproto/anytype-heart/core/event"
"github.com/anyproto/anytype-heart/core/files/fileobject"
"github.com/anyproto/anytype-heart/core/filestorage/filesync"
"github.com/anyproto/anytype-heart/core/notifications"
"github.com/anyproto/anytype-heart/metrics"
"github.com/anyproto/anytype-heart/metrics/anymetry"
"github.com/anyproto/anytype-heart/pb"
@ -47,6 +48,7 @@ import (
"github.com/anyproto/anytype-heart/pkg/lib/logging"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
"github.com/anyproto/anytype-heart/space"
"github.com/anyproto/anytype-heart/util/conc"
"github.com/anyproto/anytype-heart/util/pbtypes"
)
@ -57,14 +59,18 @@ const CName = "importer"
const workerPoolSize = 10
type Import struct {
converters map[string]common.Converter
s *block.Service
oc creator.Service
idProvider objectid.IdAndKeyProvider
tempDirProvider core.TempDirProvider
fileStore filestore.FileStore
fileSync filesync.FileSync
sync.Mutex
converters map[string]common.Converter
s *block.Service
oc creator.Service
idProvider objectid.IdAndKeyProvider
tempDirProvider core.TempDirProvider
fileStore filestore.FileStore
fileSync filesync.FileSync
notificationService notifications.Notifications
eventSender event.Sender
importCtx context.Context
importCtxCancel context.CancelFunc
}
func New() Importer {
@ -100,16 +106,40 @@ func (i *Import) Init(a *app.App) (err error) {
objectCreator := app.MustComponent[objectcreator.Service](a)
i.oc = creator.New(i.s, factory, store, relationSyncer, spaceService, objectCreator)
i.fileSync = app.MustComponent[filesync.FileSync](a)
i.notificationService = app.MustComponent[notifications.Notifications](a)
i.eventSender = app.MustComponent[event.Sender](a)
i.importCtx, i.importCtxCancel = context.WithCancel(context.Background())
return nil
}
func (i *Import) Run(ctx context.Context) (err error) {
return
}
func (i *Import) Close(ctx context.Context) (err error) {
if i.importCtxCancel != nil {
i.importCtxCancel()
}
return
}
// Import get snapshots from converter or external api and create smartblocks from them
func (i *Import) Import(ctx context.Context,
req *pb.RpcObjectImportRequest,
origin objectorigin.ObjectOrigin,
progress process.Progress,
) *ImportResponse {
if req.SpaceId == "" {
func (i *Import) Import(ctx context.Context, importRequest *ImportRequest) *ImportResponse {
if importRequest.IsSync {
return i.importObjects(ctx, importRequest)
}
conc.Go(func() {
res := i.importObjects(i.importCtx, importRequest)
if res.Err != nil {
log.Errorf("import from %s failed with error: %s", importRequest.Type.String(), res.Err)
}
})
return nil
}
func (i *Import) importObjects(ctx context.Context, importRequest *ImportRequest) *ImportResponse {
if importRequest.SpaceId == "" {
return &ImportResponse{
RootCollectionId: "",
ProcessId: "",
@ -117,43 +147,41 @@ func (i *Import) Import(ctx context.Context,
Err: fmt.Errorf("spaceId is empty"),
}
}
i.Lock()
defer i.Unlock()
isNewProgress := false
if progress == nil {
progress = i.setupProgressBar(req)
var (
res = &ImportResponse{}
importId = uuid.New().String()
isNewProgress = false
)
if importRequest.Progress == nil {
i.setupProgressBar(importRequest)
isNewProgress = true
}
var (
returnedErr error
importId = uuid.New().String()
)
defer func() {
i.finishImportProcess(returnedErr, progress)
i.sendFileEvents(returnedErr)
i.recordEvent(&metrics.ImportFinishedEvent{ID: importId, ImportType: req.Type.String()})
i.onImportFinish(res, importRequest, importId)
}()
if i.s != nil && !req.GetNoProgress() && isNewProgress {
i.s.ProcessAdd(progress)
if i.s != nil && !importRequest.GetNoProgress() && isNewProgress {
err := i.s.ProcessAdd(importRequest.Progress)
if err != nil {
return &ImportResponse{Err: fmt.Errorf("failed to add process")}
}
}
i.recordEvent(&metrics.ImportStartedEvent{ID: importId, ImportType: req.Type.String()})
var (
rootCollectionId string
objectsCount int64
)
returnedErr = fmt.Errorf("unknown import type %s", req.Type)
if c, ok := i.converters[req.Type.String()]; ok {
rootCollectionId, objectsCount, returnedErr = i.importFromBuiltinConverter(ctx, req, c, progress, origin)
i.recordEvent(&metrics.ImportStartedEvent{ID: importId, ImportType: importRequest.Type.String()})
res.Err = fmt.Errorf("unknown import type %s", importRequest.Type)
if c, ok := i.converters[importRequest.Type.String()]; ok {
res.RootCollectionId, res.ObjectsCount, res.Err = i.importFromBuiltinConverter(ctx, importRequest, c)
}
if req.Type == model.Import_External {
objectsCount, returnedErr = i.importFromExternalSource(ctx, req, progress)
}
return &ImportResponse{
RootCollectionId: rootCollectionId,
ProcessId: progress.Id(),
ObjectsCount: objectsCount,
Err: returnedErr,
if importRequest.Type == model.Import_External {
res.ObjectsCount, res.Err = i.importFromExternalSource(ctx, importRequest)
}
res.ProcessId = importRequest.Progress.Id()
return res
}
func (i *Import) onImportFinish(res *ImportResponse, req *ImportRequest, importId string) {
i.finishImportProcess(res.Err, req)
i.sendFileEvents(res.Err)
i.recordEvent(&metrics.ImportFinishedEvent{ID: importId, ImportType: req.Type.String()})
i.sendImportFinishEventToClient(res.RootCollectionId, req.IsSync, res.ObjectsCount, req.Type)
}
func (i *Import) sendFileEvents(returnedErr error) {
@ -163,17 +191,12 @@ func (i *Import) sendFileEvents(returnedErr error) {
i.fileSync.ClearImportEvents()
}
func (i *Import) importFromBuiltinConverter(ctx context.Context,
req *pb.RpcObjectImportRequest,
c common.Converter,
progress process.Progress,
origin objectorigin.ObjectOrigin,
) (string, int64, error) {
func (i *Import) importFromBuiltinConverter(ctx context.Context, req *ImportRequest, c common.Converter) (string, int64, error) {
allErrors := common.NewError(req.Mode)
res, err := c.GetSnapshots(ctx, req, progress)
res, err := c.GetSnapshots(ctx, req.RpcObjectImportRequest, req.Progress)
if !err.IsEmpty() {
resultErr := err.GetResultError(req.Type)
if shouldReturnError(resultErr, res, req) {
if shouldReturnError(resultErr, res, req.RpcObjectImportRequest) {
return "", 0, resultErr
}
allErrors.Merge(err)
@ -186,7 +209,7 @@ func (i *Import) importFromBuiltinConverter(ctx context.Context,
return "", 0, fmt.Errorf("source path doesn't contain %s resources to import", req.Type)
}
details, rootCollectionID := i.createObjects(ctx, res, progress, req, allErrors, origin)
details, rootCollectionID := i.createObjects(ctx, res, req.Progress, req.RpcObjectImportRequest, allErrors, req.Origin)
resultErr := allErrors.GetResultError(req.Type)
if resultErr != nil {
rootCollectionID = ""
@ -202,10 +225,7 @@ func (i *Import) getObjectCount(details map[string]*types.Struct, rootCollection
return objectsCount
}
func (i *Import) importFromExternalSource(ctx context.Context,
req *pb.RpcObjectImportRequest,
progress process.Progress,
) (int64, error) {
func (i *Import) importFromExternalSource(ctx context.Context, req *ImportRequest) (int64, error) {
allErrors := common.NewError(req.Mode)
if req.Snapshots != nil {
sn := make([]*common.Snapshot, len(req.Snapshots))
@ -220,7 +240,7 @@ func (i *Import) importFromExternalSource(ctx context.Context,
}
originImport := objectorigin.Import(model.Import_External)
details, _ := i.createObjects(ctx, res, progress, req, allErrors, originImport)
details, _ := i.createObjects(ctx, res, req.Progress, req.RpcObjectImportRequest, allErrors, originImport)
if !allErrors.IsEmpty() {
return 0, allErrors.GetResultError(req.Type)
}
@ -229,8 +249,26 @@ func (i *Import) importFromExternalSource(ctx context.Context,
return 0, common.ErrNoObjectsToImport
}
func (i *Import) finishImportProcess(returnedErr error, progress process.Progress) {
progress.Finish(returnedErr)
func (i *Import) finishImportProcess(returnedErr error, req *ImportRequest) {
if notificationProgress, ok := req.Progress.(process.Notificationable); ok {
notificationProgress.FinishWithNotification(i.provideNotification(returnedErr, req.Progress, req), returnedErr)
} else {
req.Progress.Finish(returnedErr)
}
}
func (i *Import) provideNotification(returnedErr error, progress process.Progress, req *ImportRequest) *model.Notification {
return &model.Notification{
Status: model.Notification_Created,
IsLocal: true,
Space: req.SpaceId,
Payload: &model.NotificationPayloadOfImport{Import: &model.NotificationImport{
ProcessId: progress.Id(),
ErrorCode: common.GetImportErrorCode(returnedErr),
ImportType: req.Type,
SpaceId: req.SpaceId,
}},
}
}
func shouldReturnError(e error, res *common.Response, req *pb.RpcObjectImportRequest) bool {
@ -240,7 +278,7 @@ func shouldReturnError(e error, res *common.Response, req *pb.RpcObjectImportReq
errors.Is(e, common.ErrCancel)
}
func (i *Import) setupProgressBar(req *pb.RpcObjectImportRequest) process.Progress {
func (i *Import) setupProgressBar(req *ImportRequest) {
progressBarType := pb.ModelProcess_Import
if req.IsMigration {
progressBarType = pb.ModelProcess_Migration
@ -250,8 +288,11 @@ func (i *Import) setupProgressBar(req *pb.RpcObjectImportRequest) process.Progre
progress = process.NewNoOp()
} else {
progress = process.NewProgress(progressBarType)
if req.SendNotification {
progress = process.NewNotificationProcess(progressBarType, i.notificationService)
}
}
return progress
req.Progress = progress
}
func (i *Import) Name() string {
@ -477,6 +518,25 @@ func (i *Import) recordEvent(event anymetry.Event) {
metrics.Service.Send(event)
}
func (i *Import) sendImportFinishEventToClient(rootCollectionID string, isSync bool, objectsCount int64, importType model.ImportType) {
if isSync {
return
}
i.eventSender.Broadcast(&pb.Event{
Messages: []*pb.EventMessage{
{
Value: &pb.EventMessageValueOfImportFinish{
ImportFinish: &pb.EventImportFinish{
RootCollectionID: rootCollectionID,
ObjectsCount: objectsCount,
ImportType: importType,
},
},
},
},
})
}
func convertType(cType string) pb.RpcObjectImportListImportResponseType {
return pb.RpcObjectImportListImportResponseType(pb.RpcObjectImportListImportResponseType_value[cType])
}

View file

@ -66,13 +66,18 @@ func Test_ImportSuccess(t *testing.T) {
fileSync.EXPECT().ClearImportEvents().Return().Times(1)
i.fileSync = fileSync
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"bafybbbbruo3kqubijrbhr24zonagbz3ksxbrutwjjoczf37axdsusu4a.pb"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 0,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"bafybbbbruo3kqubijrbhr24zonagbz3ksxbrutwjjoczf37axdsusu4a.pb"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 0,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion),
nil,
false,
true}
res := i.Import(context.Background(), importRequest)
assert.Nil(t, res.Err)
assert.Equal(t, int64(1), res.ObjectsCount)
@ -96,13 +101,20 @@ func Test_ImportErrorFromConverter(t *testing.T) {
fileSync.EXPECT().ClearImportEvents().Return().Times(1)
i.fileSync = fileSync
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 0,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 0,
SpaceId: "space1",
},
objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), importRequest)
assert.NotNil(t, res.Err)
assert.Contains(t, res.Err.Error(), "converter error")
@ -145,13 +157,19 @@ func Test_ImportErrorFromObjectCreator(t *testing.T) {
fileSync.EXPECT().ClearImportEvents().Return().Times(1)
i.fileSync = fileSync
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 0,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
request := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 0,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), request)
assert.NotNil(t, res.Err)
assert.Equal(t, int64(0), res.ObjectsCount)
@ -194,13 +212,20 @@ func Test_ImportIgnoreErrorMode(t *testing.T) {
fileSync.EXPECT().ClearImportEvents().Return().Times(1)
i.fileSync = fileSync
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 1,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 1,
SpaceId: "space1",
},
objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), importRequest)
assert.NotNil(t, res.Err)
assert.Equal(t, int64(1), res.ObjectsCount)
@ -245,13 +270,19 @@ func Test_ImportIgnoreErrorModeWithTwoErrorsPerFile(t *testing.T) {
fileSync.EXPECT().ClearImportEvents().Return().Times(1)
i.fileSync = fileSync
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 1,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 1,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), importRequest)
assert.NotNil(t, res.Err)
assert.Contains(t, res.Err.Error(), "converter error")
@ -296,16 +327,22 @@ func Test_ImportExternalPlugin(t *testing.T) {
Collections: nil,
},
})
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: nil,
Snapshots: snapshots,
UpdateExistingObjects: false,
Type: model.Import_External,
Mode: 2,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: nil,
Snapshots: snapshots,
UpdateExistingObjects: false,
Type: model.Import_External,
Mode: 2,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), importRequest)
assert.NotNil(t, res)
assert.Nil(t, res.Err)
assert.Equal(t, int64(1), res.ObjectsCount)
}
func Test_ImportExternalPluginError(t *testing.T) {
@ -322,17 +359,23 @@ func Test_ImportExternalPluginError(t *testing.T) {
fileSync.EXPECT().ClearImportEvents().Return().Times(1)
i.fileSync = fileSync
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: nil,
Snapshots: nil,
UpdateExistingObjects: false,
Type: model.Import_External,
Mode: 2,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
assert.NotNil(t, res.Err)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: nil,
Snapshots: nil,
UpdateExistingObjects: false,
Type: model.Import_External,
Mode: 2,
SpaceId: "space1",
},
objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), importRequest)
assert.NotNil(t, res)
assert.Contains(t, res.Err.Error(), common.ErrNoObjectsToImport.Error())
assert.Equal(t, int64(0), res.ObjectsCount)
}
func Test_ListImports(t *testing.T) {
@ -492,13 +535,19 @@ func Test_ImportCancelError(t *testing.T) {
fileSync.EXPECT().ClearImportEvents().Return().Times(1)
i.fileSync = fileSync
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
res := i.Import(context.Background(), &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
SpaceId: "space1",
},
objectorigin.Import(model.Import_Notion),
nil,
false,
true,
})
assert.NotNil(t, res.Err)
assert.True(t, errors.Is(res.Err, common.ErrCancel))
@ -516,13 +565,20 @@ func Test_ImportNoObjectToImportError(t *testing.T) {
fileSync.EXPECT().ClearImportEvents().Return().Times(1)
i.fileSync = fileSync
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
SpaceId: "space1",
},
objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), importRequest)
assert.NotNil(t, res.Err)
assert.True(t, errors.Is(res.Err, common.ErrNoObjectsToImport))
@ -555,13 +611,20 @@ func Test_ImportNoObjectToImportErrorModeAllOrNothing(t *testing.T) {
fileSync.EXPECT().ClearImportEvents().Return().Times(1)
i.fileSync = fileSync
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_ALL_OR_NOTHING,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_ALL_OR_NOTHING,
SpaceId: "space1",
},
objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), importRequest)
assert.NotNil(t, res.Err)
assert.True(t, errors.Is(res.Err, common.ErrNoObjectsToImport))
@ -603,13 +666,20 @@ func Test_ImportNoObjectToImportErrorIgnoreErrorsMode(t *testing.T) {
fileSync.EXPECT().ClearImportEvents().Return().Times(1)
i.fileSync = fileSync
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
SpaceId: "space1",
},
objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), importRequest)
assert.NotNil(t, res.Err)
assert.True(t, errors.Is(res.Err, common.ErrNoObjectsToImport))
@ -643,13 +713,20 @@ func Test_ImportErrLimitExceeded(t *testing.T) {
fileSync.EXPECT().ClearImportEvents().Return().Times(1)
i.fileSync = fileSync
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_ALL_OR_NOTHING,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_ALL_OR_NOTHING,
SpaceId: "space1",
},
objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), importRequest)
assert.NotNil(t, res.Err)
assert.True(t, errors.Is(res.Err, common.ErrLimitExceeded))
@ -683,13 +760,20 @@ func Test_ImportErrLimitExceededIgnoreErrorMode(t *testing.T) {
fileSync.EXPECT().ClearImportEvents().Return().Times(1)
i.fileSync = fileSync
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"test"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
SpaceId: "space1",
},
objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), importRequest)
assert.NotNil(t, res.Err)
assert.True(t, errors.Is(res.Err, common.ErrLimitExceeded))
@ -798,13 +882,20 @@ func Test_ImportRootCollectionInResponse(t *testing.T) {
i.fileSync = fileSync
// when
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"bafybbbbruo3kqubijrbhr24zonagbz3ksxbrutwjjoczf37axdsusu4a.pb"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 0,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"bafybbbbruo3kqubijrbhr24zonagbz3ksxbrutwjjoczf37axdsusu4a.pb"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 0,
SpaceId: "space1",
},
objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), importRequest)
// then
assert.Nil(t, res.Err)
@ -846,13 +937,20 @@ func Test_ImportRootCollectionInResponse(t *testing.T) {
i.fileSync = fileSync
// when
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"bafybbbbruo3kqubijrbhr24zonagbz3ksxbrutwjjoczf37axdsusu4a.pb"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 0,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"bafybbbbruo3kqubijrbhr24zonagbz3ksxbrutwjjoczf37axdsusu4a.pb"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 0,
SpaceId: "space1",
},
objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), importRequest)
// then
assert.NotNil(t, res.Err)
@ -884,13 +982,20 @@ func Test_ImportRootCollectionInResponse(t *testing.T) {
i.fileSync = fileSync
// when
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"bafybbbbruo3kqubijrbhr24zonagbz3ksxbrutwjjoczf37axdsusu4a.pb"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 0,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"bafybbbbruo3kqubijrbhr24zonagbz3ksxbrutwjjoczf37axdsusu4a.pb"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: 0,
SpaceId: "space1",
},
objectorigin.Import(model.Import_Notion),
nil,
false,
true,
}
res := i.Import(context.Background(), importRequest)
// then
assert.NotNil(t, res.Err)
@ -931,13 +1036,16 @@ func Test_ImportRootCollectionInResponse(t *testing.T) {
i.fileSync = fileSync
// when
res := i.Import(context.Background(), &pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"bafybbbbruo3kqubijrbhr24zonagbz3ksxbrutwjjoczf37axdsusu4a.pb"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil)
importRequest := &ImportRequest{
&pb.RpcObjectImportRequest{
Params: &pb.RpcObjectImportRequestParamsOfPbParams{PbParams: &pb.RpcObjectImportRequestPbParams{Path: []string{"bafybbbbruo3kqubijrbhr24zonagbz3ksxbrutwjjoczf37axdsusu4a.pb"}}},
UpdateExistingObjects: false,
Type: 0,
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
SpaceId: "space1",
}, objectorigin.Import(model.Import_Notion), nil, false, true,
}
res := i.Import(context.Background(), importRequest)
// then
assert.NotNil(t, res.Err)

View file

@ -14,6 +14,14 @@ import (
"github.com/anyproto/anytype-heart/pb"
)
type ImportRequest struct {
*pb.RpcObjectImportRequest
Origin objectorigin.ObjectOrigin
Progress process.Progress
SendNotification bool
IsSync bool
}
type ImportResponse struct {
RootCollectionId string
ProcessId string
@ -24,11 +32,7 @@ type ImportResponse struct {
// Importer encapsulate logic with import
type Importer interface {
app.Component
Import(ctx context.Context,
req *pb.RpcObjectImportRequest,
origin objectorigin.ObjectOrigin,
progress process.Progress,
) *ImportResponse
Import(ctx context.Context, importRequest *ImportRequest) *ImportResponse
ListImports(req *pb.RpcObjectImportListRequest) ([]*pb.RpcObjectImportListImportResponse, error)
ImportWeb(ctx context.Context, req *pb.RpcObjectImportRequest) (string, *types.Struct, error)

View file

@ -0,0 +1,53 @@
package process
import (
"github.com/globalsign/mgo/bson"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/logging"
"github.com/anyproto/anytype-heart/pkg/lib/pb/model"
)
var log = logging.Logger("notification-process")
type NotificationService interface {
CreateAndSend(notification *model.Notification) error
}
type NotificationSender interface {
SendNotification()
}
type Notificationable interface {
Progress
FinishWithNotification(notification *model.Notification, err error)
}
type notificationProcess struct {
*progress
notification *model.Notification
notificationService NotificationService
}
func NewNotificationProcess(pbType pb.ModelProcessType, notificationService NotificationService) Notificationable {
return &notificationProcess{progress: &progress{
id: bson.NewObjectId().Hex(),
done: make(chan struct{}),
cancel: make(chan struct{}),
pType: pbType,
}, notificationService: notificationService}
}
func (n *notificationProcess) FinishWithNotification(notification *model.Notification, err error) {
n.notification = notification
n.Finish(err)
}
func (n *notificationProcess) SendNotification() {
if n.notification != nil {
notificationSendErr := n.notificationService.CreateAndSend(n.notification)
if notificationSendErr != nil {
log.Errorf("failed to send notification: %v", notificationSendErr)
}
}
}

View file

@ -117,6 +117,9 @@ func (s *service) monitor(p Process) {
},
},
})
if notificationSender, ok := p.(NotificationSender); ok {
notificationSender.SendNotification()
}
return
}
}

View file

@ -4,8 +4,6 @@ import (
"context"
"errors"
"fmt"
"os"
"runtime/debug"
"github.com/anyproto/any-sync/app"
@ -94,12 +92,6 @@ func (mw *Middleware) GetApp() *app.App {
return mw.applicationService.GetApp()
}
func (mw *Middleware) OnPanic(v interface{}) {
stack := debug.Stack()
os.Stderr.Write(stack)
log.With("stack", stack).Errorf("panic recovered: %v", v)
}
func (mw *Middleware) SetEventSender(sender event.Sender) {
mw.applicationService.SetEventSender(sender)
}

View file

@ -11,7 +11,6 @@ import (
"github.com/anyproto/go-naturaldate/v2"
"github.com/araddon/dateparse"
"github.com/gogo/protobuf/types"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"github.com/anyproto/anytype-heart/core/block"
@ -20,7 +19,6 @@ import (
"github.com/anyproto/anytype-heart/core/block/object/objectgraph"
"github.com/anyproto/anytype-heart/core/domain/objectorigin"
"github.com/anyproto/anytype-heart/core/indexer"
"github.com/anyproto/anytype-heart/core/notifications"
"github.com/anyproto/anytype-heart/core/subscription"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
@ -876,41 +874,42 @@ func (mw *Middleware) ObjectSetInternalFlags(cctx context.Context, req *pb.RpcOb
}
func (mw *Middleware) ObjectImport(cctx context.Context, req *pb.RpcObjectImportRequest) *pb.RpcObjectImportResponse {
response := func(code pb.RpcObjectImportResponseErrorCode, res *importer.ImportResponse) *pb.RpcObjectImportResponse {
response := func(code pb.RpcObjectImportResponseErrorCode, err error) *pb.RpcObjectImportResponse {
m := &pb.RpcObjectImportResponse{
Error: &pb.RpcObjectImportResponseError{
Code: code,
},
CollectionId: res.RootCollectionId,
ObjectsCount: res.ObjectsCount,
}
if res.Err != nil {
m.Error.Description = getErrorDescription(res.Err)
if err != nil {
m.Error.Description = getErrorDescription(err)
}
return m
}
originImport := objectorigin.Import(req.Type)
res := getService[importer.Importer](mw).Import(cctx, req, originImport, nil)
spaceName := getService[objectstore.SpaceNameGetter](mw).GetSpaceName(req.SpaceId)
code := common.GetImportErrorCode(res.Err)
notificationSendErr := getService[notifications.Notifications](mw).CreateAndSend(&model.Notification{
Id: uuid.New().String(),
Status: model.Notification_Created,
IsLocal: true,
Space: req.SpaceId,
Payload: &model.NotificationPayloadOfImport{Import: &model.NotificationImport{
ProcessId: res.ProcessId,
ErrorCode: code,
ImportType: req.Type,
SpaceId: req.SpaceId,
SpaceName: spaceName,
}},
})
if notificationSendErr != nil {
log.Errorf("failed to send notification: %v", notificationSendErr)
importRequest := &importer.ImportRequest{
RpcObjectImportRequest: req,
Origin: objectorigin.Import(req.Type),
Progress: nil,
SendNotification: true,
IsSync: false,
}
res := getService[importer.Importer](mw).Import(cctx, importRequest)
if res == nil || res.Err == nil {
return response(pb.RpcObjectImportResponseError_NULL, nil)
}
switch {
case errors.Is(res.Err, common.ErrNoObjectsToImport):
return response(pb.RpcObjectImportResponseError_NO_OBJECTS_TO_IMPORT, res.Err)
case errors.Is(res.Err, common.ErrCancel):
return response(pb.RpcObjectImportResponseError_IMPORT_IS_CANCELED, res.Err)
case errors.Is(res.Err, common.ErrLimitExceeded):
return response(pb.RpcObjectImportResponseError_LIMIT_OF_ROWS_OR_RELATIONS_EXCEEDED, res.Err)
case errors.Is(res.Err, common.ErrFileLoad):
return response(pb.RpcObjectImportResponseError_FILE_LOAD_ERROR, res.Err)
default:
return response(pb.RpcObjectImportResponseError_INTERNAL_ERROR, res.Err)
}
return response(pb.RpcObjectImportResponseErrorCode(code), res)
}
func (mw *Middleware) ObjectImportList(cctx context.Context, req *pb.RpcObjectImportListRequest) *pb.RpcObjectImportListResponse {

View file

@ -1577,6 +1577,8 @@
- [Event.File.LimitUpdated](#anytype-Event-File-LimitUpdated)
- [Event.File.LocalUsage](#anytype-Event-File-LocalUsage)
- [Event.File.SpaceUsage](#anytype-Event-File-SpaceUsage)
- [Event.Import](#anytype-Event-Import)
- [Event.Import.Finish](#anytype-Event-Import-Finish)
- [Event.Membership](#anytype-Event-Membership)
- [Event.Membership.Update](#anytype-Event-Membership-Update)
- [Event.Message](#anytype-Event-Message)
@ -13952,9 +13954,9 @@ DEPRECATED, GO-1926 |
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| error | [Rpc.Object.Import.Response.Error](#anytype-Rpc-Object-Import-Response-Error) | | |
| collectionId | [string](#string) | | |
| objectsCount | [int64](#int64) | | |
| error | [Rpc.Object.Import.Response.Error](#anytype-Rpc-Object-Import-Response-Error) | | deprecated |
| collectionId | [string](#string) | | deprecated |
| objectsCount | [int64](#int64) | | deprecated |
@ -24885,6 +24887,33 @@ Precondition: user A opened a block
<a name="anytype-Event-Import"></a>
### Event.Import
<a name="anytype-Event-Import-Finish"></a>
### Event.Import.Finish
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| rootCollectionID | [string](#string) | | |
| objectsCount | [int64](#int64) | | |
| importType | [model.Import.Type](#anytype-model-Import-Type) | | |
<a name="anytype-Event-Membership"></a>
### Event.Membership
@ -24987,6 +25016,7 @@ Precondition: user A opened a block
| membershipUpdate | [Event.Membership.Update](#anytype-Event-Membership-Update) | | |
| spaceSyncStatusUpdate | [Event.Space.SyncStatus.Update](#anytype-Event-Space-SyncStatus-Update) | | |
| p2pStatusUpdate | [Event.P2PStatus.Update](#anytype-Event-P2PStatus-Update) | | |
| importFinish | [Event.Import.Finish](#anytype-Event-Import-Finish) | | |

File diff suppressed because it is too large Load diff

View file

@ -2559,9 +2559,9 @@ message Rpc {
}
message Response {
Error error = 1;
string collectionId = 2;
int64 objectsCount = 3;
Error error = 1; // deprecated
string collectionId = 2; // deprecated
int64 objectsCount = 3; // deprecated
message Error {
Code code = 1;

View file

@ -108,6 +108,8 @@ message Event {
Space.SyncStatus.Update spaceSyncStatusUpdate = 119;
P2PStatus.Update p2pStatusUpdate = 120;
Import.Finish importFinish = 121;
}
}
@ -1112,6 +1114,14 @@ message Event {
Connected = 2;
}
}
message Import {
message Finish {
string rootCollectionID = 1;
int64 objectsCount = 2;
model.Import.Type importType = 3;
}
}
}
message ResponseEvent {

View file

@ -32,16 +32,20 @@ func TestImportFileFromRelation(t *testing.T) {
})
importerService := getService[importer.Importer](app)
res := importerService.Import(ctx, &pb.RpcObjectImportRequest{
SpaceId: app.personalSpaceId(),
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
Type: model.Import_Pb,
Params: &pb.RpcObjectImportRequestParamsOfPbParams{
PbParams: &pb.RpcObjectImportRequestPbParams{
Path: []string{"./testdata/import/object with file relation/"},
res := importerService.Import(ctx, &importer.ImportRequest{
RpcObjectImportRequest: &pb.RpcObjectImportRequest{
SpaceId: app.personalSpaceId(),
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
Type: model.Import_Pb,
Params: &pb.RpcObjectImportRequestParamsOfPbParams{
PbParams: &pb.RpcObjectImportRequestPbParams{
Path: []string{"./testdata/import/object with file relation/"},
},
},
},
}, objectorigin.Import(model.Import_Pb), nil)
Origin: objectorigin.Import(model.Import_Pb),
IsSync: true,
})
require.NoError(t, res.Err)
app.waitEventMessage(t, func(msg *pb.EventMessage) bool {
@ -87,16 +91,20 @@ func testImportFileFromMarkdown(t *testing.T, path string) {
})
importerService := getService[importer.Importer](app)
res := importerService.Import(ctx, &pb.RpcObjectImportRequest{
SpaceId: app.personalSpaceId(),
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
Type: model.Import_Markdown,
Params: &pb.RpcObjectImportRequestParamsOfMarkdownParams{
MarkdownParams: &pb.RpcObjectImportRequestMarkdownParams{
Path: []string{path},
res := importerService.Import(ctx, &importer.ImportRequest{
RpcObjectImportRequest: &pb.RpcObjectImportRequest{
SpaceId: app.personalSpaceId(),
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
Type: model.Import_Markdown,
Params: &pb.RpcObjectImportRequestParamsOfMarkdownParams{
MarkdownParams: &pb.RpcObjectImportRequestMarkdownParams{
Path: []string{path},
},
},
},
}, objectorigin.Import(model.Import_Markdown), nil)
Origin: objectorigin.Import(model.Import_Markdown),
IsSync: true,
})
require.NoError(t, res.Err)
app.waitEventMessage(t, func(msg *pb.EventMessage) bool {
@ -124,16 +132,20 @@ func testImportObjectWithFileBlock(t *testing.T, path string) {
})
importerService := getService[importer.Importer](app)
res := importerService.Import(ctx, &pb.RpcObjectImportRequest{
SpaceId: app.personalSpaceId(),
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
Type: model.Import_Pb,
Params: &pb.RpcObjectImportRequestParamsOfPbParams{
PbParams: &pb.RpcObjectImportRequestPbParams{
Path: []string{path},
res := importerService.Import(ctx, &importer.ImportRequest{
RpcObjectImportRequest: &pb.RpcObjectImportRequest{
SpaceId: app.personalSpaceId(),
Mode: pb.RpcObjectImportRequest_IGNORE_ERRORS,
Type: model.Import_Pb,
Params: &pb.RpcObjectImportRequestParamsOfPbParams{
PbParams: &pb.RpcObjectImportRequestPbParams{
Path: []string{path},
},
},
},
}, objectorigin.Import(model.Import_Pb), nil)
Origin: objectorigin.Import(model.Import_Pb),
IsSync: true,
})
require.NoError(t, res.Err)
app.waitEventMessage(t, func(msg *pb.EventMessage) bool {

View file

@ -15,7 +15,6 @@ import (
"time"
"github.com/anyproto/any-sync/app"
"github.com/google/uuid"
"github.com/miolini/datacounter"
"github.com/anyproto/anytype-heart/core/block"
@ -213,27 +212,8 @@ func (b *builtinObjects) CreateObjectsForExperience(ctx context.Context, spaceID
}
var (
path string
removeFunc = func() {}
sendNotification = func(code model.ImportErrorCode) {
spaceName := b.store.GetSpaceName(spaceID)
nErr := b.notifications.CreateAndSend(&model.Notification{
Id: uuid.New().String(),
Status: model.Notification_Created,
IsLocal: true,
Space: spaceID,
Payload: &model.NotificationPayloadOfGalleryImport{GalleryImport: &model.NotificationGalleryImport{
ProcessId: progress.Id(),
ErrorCode: code,
SpaceId: spaceID,
Name: title,
SpaceName: spaceName,
}},
})
if nErr != nil {
log.Errorf("failed to send notification: %v", nErr)
}
}
path string
removeFunc = func() {}
)
if _, err = os.Stat(url); err == nil {
@ -243,7 +223,7 @@ func (b *builtinObjects) CreateObjectsForExperience(ctx context.Context, spaceID
if pErr := progress.Cancel(); pErr != nil {
log.Errorf("failed to cancel progress %s: %v", progress.Id(), pErr)
}
sendNotification(model.Import_INTERNAL_ERROR)
progress.FinishWithNotification(b.provideNotification(spaceID, progress, err, title), err)
if errors.Is(err, uri.ErrFilepathNotSupported) {
return fmt.Errorf("invalid path to file: '%s'", url)
}
@ -257,7 +237,11 @@ func (b *builtinObjects) CreateObjectsForExperience(ctx context.Context, spaceID
}
importErr := b.importArchive(ctx, spaceID, path, title, pb.RpcObjectImportRequestPbParams_EXPERIENCE, progress, isNewSpace)
sendNotification(common.GetImportErrorCode(importErr))
progress.FinishWithNotification(b.provideNotification(spaceID, progress, err, title), err)
if err != nil {
log.Errorf("failed to send notification: %v", err)
}
if isNewSpace {
// TODO: GO-2627 Home page handling should be moved to importer
@ -269,6 +253,22 @@ func (b *builtinObjects) CreateObjectsForExperience(ctx context.Context, spaceID
return importErr
}
func (b *builtinObjects) provideNotification(spaceID string, progress process.Progress, err error, title string) *model.Notification {
spaceName := b.store.GetSpaceName(spaceID)
return &model.Notification{
Status: model.Notification_Created,
IsLocal: true,
Space: spaceID,
Payload: &model.NotificationPayloadOfGalleryImport{GalleryImport: &model.NotificationGalleryImport{
ProcessId: progress.Id(),
ErrorCode: common.GetImportErrorCode(err),
SpaceId: spaceID,
Name: title,
SpaceName: spaceName,
}},
}
}
func (b *builtinObjects) InjectMigrationDashboard(spaceID string) error {
return b.inject(nil, spaceID, migrationUseCase, migrationDashboardZip)
}
@ -303,22 +303,28 @@ func (b *builtinObjects) importArchive(
isNewSpace bool,
) (err error) {
origin := objectorigin.Usecase()
res := b.importer.Import(ctx, &pb.RpcObjectImportRequest{
SpaceId: spaceID,
UpdateExistingObjects: false,
Type: model.Import_Pb,
Mode: pb.RpcObjectImportRequest_ALL_OR_NOTHING,
NoProgress: progress == nil,
IsMigration: false,
Params: &pb.RpcObjectImportRequestParamsOfPbParams{
PbParams: &pb.RpcObjectImportRequestPbParams{
Path: []string{path},
NoCollection: true,
CollectionTitle: title,
ImportType: importType,
}},
IsNewSpace: isNewSpace,
}, origin, progress)
importRequest := &importer.ImportRequest{
RpcObjectImportRequest: &pb.RpcObjectImportRequest{
SpaceId: spaceID,
UpdateExistingObjects: false,
Type: model.Import_Pb,
Mode: pb.RpcObjectImportRequest_ALL_OR_NOTHING,
NoProgress: progress == nil,
IsMigration: false,
Params: &pb.RpcObjectImportRequestParamsOfPbParams{
PbParams: &pb.RpcObjectImportRequestPbParams{
Path: []string{path},
NoCollection: true,
CollectionTitle: title,
ImportType: importType,
}},
IsNewSpace: isNewSpace,
},
Origin: origin,
Progress: progress,
IsSync: true,
}
res := b.importer.Import(ctx, importRequest)
return res.Err
}
@ -535,8 +541,8 @@ func (b *builtinObjects) downloadZipToFile(url string, progress process.Progress
return path, nil
}
func (b *builtinObjects) setupProgress() (process.Progress, error) {
progress := process.NewProgress(pb.ModelProcess_Import)
func (b *builtinObjects) setupProgress() (process.Notificationable, error) {
progress := process.NewNotificationProcess(pb.ModelProcess_Import, b.notifications)
if err := b.progress.Add(progress); err != nil {
return nil, fmt.Errorf("failed to add progress bar: %w", err)
}

View file

@ -1,12 +1,18 @@
package conc
import (
"os"
"runtime/debug"
"sync"
"github.com/hashicorp/go-multierror"
"github.com/samber/lo/parallel"
"github.com/anyproto/anytype-heart/pkg/lib/logging"
)
var log = logging.Logger("anytype-mw-panic")
func MapErr[T, R any](input []T, f func(T) (R, error)) ([]R, error) {
var (
allErrors error
@ -29,3 +35,22 @@ func MapErr[T, R any](input []T, f func(T) (R, error)) ([]R, error) {
return res, allErrors
}
func Go(fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
if rerr, ok := r.(error); ok {
OnPanic(rerr)
}
}
}()
fn()
}()
}
func OnPanic(v any) {
stack := debug.Stack()
os.Stderr.Write(stack)
log.With("stack", stack).Errorf("panic recovered: %v", v)
}