1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-08 05:47:07 +09:00

GO-683: Basic migration functionality

This commit is contained in:
Sergey 2023-04-25 18:17:59 +02:00 committed by Mikhail Iudin
parent f4c1dbe6be
commit ab0b112d57
No known key found for this signature in database
GPG key ID: FAAAA8BAABDFF1C0
9 changed files with 1361 additions and 1846 deletions

486
change/builder.go Normal file
View file

@ -0,0 +1,486 @@
package change
import (
"context"
"encoding/base64"
"errors"
"fmt"
"sort"
"strings"
"time"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/core"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/logging"
"github.com/anytypeio/go-anytype-middleware/util/slice"
)
var (
ErrEmpty = errors.New("logs empty")
)
var log = logging.Logger("anytype-mw-change-builder")
const (
virtualChangeBasePrefix = "_virtual:"
virtualChangeBaseSeparator = "+"
)
func BuildTreeBefore(ctx context.Context, s core.SmartBlock, beforeLogId string, includeBeforeId bool) (t *Tree, err error) {
sb := &stateBuilder{beforeId: beforeLogId, includeBeforeId: includeBeforeId}
err = sb.Build(ctx, s)
return sb.tree, err
}
func BuildTree(ctx context.Context, s core.SmartBlock) (t *Tree, logHeads map[string]*Change, err error) {
sb := new(stateBuilder)
err = sb.Build(ctx, s)
return sb.tree, sb.logHeads, err
}
func BuildMetaTree(ctx context.Context, s core.SmartBlock) (t *Tree, logHeads map[string]*Change, err error) {
sb := &stateBuilder{onlyMeta: true}
err = sb.Build(ctx, s)
return sb.tree, sb.logHeads, err
}
type stateBuilder struct {
smartblockId string
cache map[string]*Change
logHeads map[string]*Change
tree *Tree
smartblock core.SmartBlock
qt time.Duration
qr int64
onlyMeta bool
beforeId string
includeBeforeId bool
}
func (sb *stateBuilder) Build(ctx context.Context, s core.SmartBlock) (err error) {
sb.smartblockId = s.ID()
st := time.Now()
sb.smartblock = s
logs, err := sb.getLogs(ctx)
if err != nil {
return err
}
heads, err := sb.getActualHeads(ctx, logs)
if err != nil {
return fmt.Errorf("getActualHeads error: %v", err)
}
breakpoint, err := sb.findBreakpoint(ctx, heads)
if err != nil {
return fmt.Errorf("findBreakpoint error: %v", err)
}
if err = sb.buildTree(ctx, heads, breakpoint); err != nil {
return fmt.Errorf("buildTree error: %v", err)
}
log.Infof("tree build: len: %d; scanned: %d; dur: %v (lib %v)", sb.tree.Len(), len(sb.cache), time.Since(st), sb.qt)
sb.cache = nil
return
}
func (sb *stateBuilder) getLogs(ctx context.Context) (logs []core.SmartblockLog, err error) {
sb.cache = make(map[string]*Change)
if sb.beforeId != "" {
before, e := sb.loadChange(ctx, sb.beforeId)
if e != nil {
return nil, e
}
if sb.includeBeforeId {
return []core.SmartblockLog{
{Head: sb.beforeId},
}, nil
}
for _, pid := range before.PreviousIds {
logs = append(logs, core.SmartblockLog{Head: pid})
}
return
}
logs, err = sb.smartblock.GetLogs()
if err != nil {
return nil, fmt.Errorf("GetLogs error: %w", err)
}
log.Debugf("build tree: logs: %v", logs)
sb.logHeads = make(map[string]*Change)
if len(logs) == 0 || len(logs) == 1 && len(logs[0].Head) <= 1 {
return nil, ErrEmpty
}
var nonEmptyLogs = logs[:0]
for _, l := range logs {
if len(l.Head) == 0 {
continue
}
if ch, err := sb.loadChange(ctx, l.Head); err != nil {
log.Errorf("loading head %s of the log %s failed: %v", l.Head, l.ID, err)
} else {
sb.logHeads[l.ID] = ch
}
nonEmptyLogs = append(nonEmptyLogs, l)
}
return nonEmptyLogs, nil
}
func (sb *stateBuilder) buildTree(ctx context.Context, heads []string, breakpoint string) (err error) {
ch, err := sb.loadChange(ctx, breakpoint)
if err != nil {
return
}
if sb.onlyMeta {
sb.tree = NewMetaTree()
} else {
sb.tree = NewTree()
}
sb.tree.AddFast(ch)
var changes = make([]*Change, 0, len(heads)*2)
var uniqMap = map[string]struct{}{breakpoint: {}}
for _, id := range heads {
changes, err = sb.loadChangesFor(ctx, id, uniqMap, changes)
if err != nil {
return
}
}
if sb.onlyMeta {
var filteredChanges = changes[:0]
for _, ch := range changes {
if ch.HasMeta() {
filteredChanges = append(filteredChanges, ch)
}
}
changes = filteredChanges
}
sb.tree.AddFast(changes...)
return
}
func (sb *stateBuilder) loadChangesFor(ctx context.Context, id string, uniqMap map[string]struct{}, buf []*Change) ([]*Change, error) {
if _, exists := uniqMap[id]; exists {
return buf, nil
}
ch, err := sb.loadChange(ctx, id)
if err != nil {
return nil, err
}
for _, prev := range ch.GetPreviousIds() {
if buf, err = sb.loadChangesFor(ctx, prev, uniqMap, buf); err != nil {
return nil, err
}
}
uniqMap[id] = struct{}{}
return append(buf, ch), nil
}
func (sb *stateBuilder) findBreakpoint(ctx context.Context, heads []string) (breakpoint string, err error) {
var (
ch *Change
snapshotIds []string
)
for _, head := range heads {
if ch, err = sb.loadChange(ctx, head); err != nil {
return
}
shId := ch.GetLastSnapshotId()
if slice.FindPos(snapshotIds, shId) == -1 {
snapshotIds = append(snapshotIds, shId)
}
}
return sb.findCommonSnapshot(ctx, snapshotIds)
}
func (sb *stateBuilder) findCommonSnapshot(ctx context.Context, snapshotIds []string) (snapshotId string, err error) {
// sb.smartblock can be nil in this func
if len(snapshotIds) == 1 {
return snapshotIds[0], nil
} else if len(snapshotIds) == 0 {
return "", fmt.Errorf("snapshots not found")
}
findCommon := func(s1, s2 string) (s string, err error) {
// fast cases
if s1 == s2 {
return s1, nil
}
ch1, err := sb.loadChange(ctx, s1)
if err != nil {
return "", err
}
if ch1.LastSnapshotId == s2 {
return s2, nil
}
ch2, err := sb.loadChange(ctx, s2)
if err != nil {
return "", err
}
if ch2.LastSnapshotId == s1 {
return s1, nil
}
if ch1.LastSnapshotId == ch2.LastSnapshotId && ch1.LastSnapshotId != "" {
return ch1.LastSnapshotId, nil
}
// traverse
var t1 = make([]string, 0, 5)
var t2 = make([]string, 0, 5)
t1 = append(t1, ch1.Id, ch1.LastSnapshotId)
t2 = append(t2, ch2.Id, ch2.LastSnapshotId)
for {
lid1 := t1[len(t1)-1]
if lid1 != "" {
l1, e := sb.loadChange(ctx, lid1)
if e != nil {
return "", e
}
if l1.LastSnapshotId != "" {
if slice.FindPos(t2, l1.LastSnapshotId) != -1 {
return l1.LastSnapshotId, nil
}
}
t1 = append(t1, l1.LastSnapshotId)
}
lid2 := t2[len(t2)-1]
if lid2 != "" {
l2, e := sb.loadChange(ctx, t2[len(t2)-1])
if e != nil {
return "", e
}
if l2.LastSnapshotId != "" {
if slice.FindPos(t1, l2.LastSnapshotId) != -1 {
return l2.LastSnapshotId, nil
}
}
t2 = append(t2, l2.LastSnapshotId)
}
if lid1 == "" && lid2 == "" {
break
}
}
log.Warnf("changes build tree: possible versions split")
// prefer not first snapshot
if len(ch1.PreviousIds) == 0 && len(ch2.PreviousIds) > 0 {
log.Warnf("changes build tree: prefer %s(%d prevIds) over %s(%d prevIds)", s2, len(ch2.PreviousIds), s1, len(ch1.PreviousIds))
return s2, nil
} else if len(ch1.PreviousIds) > 0 && len(ch2.PreviousIds) == 0 {
log.Warnf("changes build tree: prefer %s(%d prevIds) over %s(%d prevIds)", s1, len(ch1.PreviousIds), s2, len(ch2.PreviousIds))
return s1, nil
}
isEmptySnapshot := func(ch *Change) bool {
// todo: ignore root & header blocks
if ch.Snapshot == nil || ch.Snapshot.Data == nil || len(ch.Snapshot.Data.Blocks) <= 1 {
return true
}
return false
}
// prefer not empty snapshot
if isEmptySnapshot(ch1) && !isEmptySnapshot(ch2) {
log.Warnf("changes build tree: prefer %s(not empty) over %s(empty)", s2, s1)
return s2, nil
} else if isEmptySnapshot(ch2) && !isEmptySnapshot(ch1) {
log.Warnf("changes build tree: prefer %s(not empty) over %s(empty)", s1, s2)
return s1, nil
}
var p1, p2 string
// unexpected behavior - lets merge branches using the virtual change mechanism
if s1 < s2 {
p1, p2 = s1, s2
} else {
p1, p2 = s2, s1
}
log.With("thread", sb.smartblockId).Errorf("changes build tree: made base snapshot for logs %s and %s: conflicting snapshots %s+%s", ch1.Device, ch2.Device, p1, p2)
baseId := sb.makeVirtualSnapshotId(p1, p2)
if len(ch2.PreviousIds) != 0 || len(ch2.PreviousIds) != 0 {
if len(ch2.PreviousIds) == 1 && len(ch2.PreviousIds) == 1 && ch1.PreviousIds[0] == baseId && ch2.PreviousIds[0] == baseId {
// already patched
return baseId, nil
} else {
return "", fmt.Errorf("failed to create virtual base change: has invalid PreviousIds")
}
}
ch1.PreviousIds = []string{baseId}
ch2.PreviousIds = []string{baseId}
return baseId, nil
}
for len(snapshotIds) > 1 {
l := len(snapshotIds)
shId, e := findCommon(snapshotIds[l-2], snapshotIds[l-1])
if e != nil {
return "", e
}
snapshotIds[l-2] = shId
snapshotIds = snapshotIds[:l-1]
}
return snapshotIds[0], nil
}
func (sb *stateBuilder) getActualHeads(ctx context.Context, logs []core.SmartblockLog) (heads []string, err error) {
sort.Slice(logs, func(i, j int) bool {
return logs[i].ID < logs[j].ID
})
var knownHeads []string
var validLogs = logs[:0]
for _, l := range logs {
if slice.FindPos(knownHeads, l.Head) != -1 { // do not scan known heads
continue
}
sh, err := sb.getNearSnapshot(ctx, l.Head)
if err != nil {
log.Warnf("can't get near snapshot: %v; ignore", err)
continue
}
if sh.Snapshot.LogHeads != nil {
for _, headId := range sh.Snapshot.LogHeads {
knownHeads = append(knownHeads, headId)
}
}
validLogs = append(validLogs, l)
}
for _, l := range validLogs {
if slice.FindPos(knownHeads, l.Head) != -1 { // do not scan known heads
continue
} else {
heads = append(heads, l.Head)
}
}
if len(heads) == 0 {
return nil, fmt.Errorf("no usable logs in head")
}
return
}
func (sb *stateBuilder) getNearSnapshot(ctx context.Context, id string) (sh *Change, err error) {
ch, err := sb.loadChange(ctx, id)
if err != nil {
return
}
if ch.Snapshot != nil {
return ch, nil
}
sch, err := sb.loadChange(ctx, ch.LastSnapshotId)
if err != nil {
return
}
if sch.Snapshot == nil {
return nil, fmt.Errorf("snapshot %s is empty", ch.LastSnapshotId)
}
return sch, nil
}
func (sb *stateBuilder) makeVirtualSnapshotId(s1, s2 string) string {
return virtualChangeBasePrefix + base64.RawStdEncoding.EncodeToString([]byte(s1+virtualChangeBaseSeparator+s2))
}
func (sb *stateBuilder) makeChangeFromVirtualId(ctx context.Context, id string) (*Change, error) {
dataB, err := base64.RawStdEncoding.DecodeString(id[len(virtualChangeBasePrefix):])
if err != nil {
return nil, fmt.Errorf("invalid virtual id format: %s", err.Error())
}
ids := strings.Split(string(dataB), virtualChangeBaseSeparator)
if len(ids) != 2 {
return nil, fmt.Errorf("invalid virtual id format: %v", id)
}
ch1, err := sb.loadChange(context.Background(), ids[0])
if err != nil {
return nil, err
}
ch2, err := sb.loadChange(ctx, ids[1])
if err != nil {
return nil, err
}
return &Change{
Id: id,
Account: ch1.Account,
Device: ch1.Device,
Next: []*Change{ch1, ch2},
Change: &pb.Change{Snapshot: ch1.Snapshot},
}, nil
}
func (sb *stateBuilder) loadChange(ctx context.Context, id string) (ch *Change, err error) {
if ch, ok := sb.cache[id]; ok {
return ch, nil
}
if strings.HasPrefix(id, virtualChangeBasePrefix) {
ch, err = sb.makeChangeFromVirtualId(ctx, id)
if err != nil {
return nil, err
}
sb.cache[id] = ch
return
}
if sb.smartblock == nil {
return nil, fmt.Errorf("no smarblock in builder")
}
st := time.Now()
sr, err := sb.smartblock.GetRecord(ctx, id)
s := time.Since(st)
if err != nil {
log.With("thread", sb.smartblock.ID()).
Errorf("failed to loadChange %s after %.2fs. Total %.2f(%d records were loaded)", id, s.Seconds(), sb.qt.Seconds(), sb.qr)
return
}
sb.qt += s
sb.qr++
if s.Seconds() > 0.1 {
// this means we got this record through bitswap, so lets log some details
lgs, _ := sb.smartblock.GetLogs()
var sbLog *core.SmartblockLog
for _, lg := range lgs {
if lg.ID == sr.LogID {
sbLog = &lg
break
}
}
var (
logHead string
logCounter int64
)
if sbLog != nil {
logHead = sbLog.Head
logCounter = sbLog.HeadCounter
}
log.With("thread", sb.smartblock.ID()).
With("logid", sr.LogID).
With("logHead", logHead).
With("logCounter", logCounter).
Errorf("long loadChange %.2fs for %s. Total %.2f(%d records)", s.Seconds(), id, sb.qt.Seconds(), sb.qr)
}
chp := new(pb.Change)
if err3 := sr.Unmarshal(chp); err3 != nil {
// skip this error for the future compatibility
log.With("thread", sb.smartblock.ID()).
With("logid", sr.LogID).
With("change", id).Errorf("failed to unmarshal change: %s; continue", err3.Error())
if chp == nil || chp.PreviousIds == nil {
// no way we can continue when we don't have some minimal information
return nil, err3
}
}
fmt.Println("VERISON", id, chp.Version)
ch = &Change{
Id: id,
Account: sr.AccountID,
Device: sr.LogID,
Change: chp,
}
if sb.onlyMeta {
ch.PreviousIds = ch.PreviousMetaIds
}
sb.cache[id] = ch
return
}

View file

@ -1,7 +1,96 @@
package change
import (
"errors"
"fmt"
"time"
"github.com/anytypeio/go-anytype-middleware/core/block/editor/state"
)
var ErrEmpty = errors.New("first change doesn't exist")
func NewStateCache() *stateCache {
return &stateCache{
states: make(map[string]struct {
refs int
state *state.State
}),
}
}
type stateCache struct {
states map[string]struct {
refs int
state *state.State
}
}
func (sc *stateCache) Set(id string, s *state.State, refs int) {
sc.states[id] = struct {
refs int
state *state.State
}{refs: refs, state: s}
}
func (sc *stateCache) Get(id string) *state.State {
item := sc.states[id]
item.refs--
if item.refs == 0 {
delete(sc.states, id)
} else {
sc.states[id] = item
}
return item.state
}
// Simple implementation hopes for CRDT and ignores errors. No merge
func BuildStateSimpleCRDT(root *state.State, t *Tree) (s *state.State, changesApplied int, err error) {
var (
startId string
applyRoot bool
st = time.Now()
lastChange *Change
)
if startId = root.ChangeId(); startId == "" {
startId = t.RootId()
applyRoot = true
}
var lastMigrationVersion uint32
t.Iterate(startId, func(c *Change) (isContinue bool) {
if c.Version > lastMigrationVersion {
lastMigrationVersion = c.Version
}
changesApplied++
lastChange = c
if startId == c.Id {
s = root.NewState()
if applyRoot {
s.ApplyChangeIgnoreErr(c.Change.Content...)
s.SetChangeId(c.Id)
s.AddFileKeys(c.FileKeys...)
}
return true
}
ns := s.NewState()
ns.ApplyChangeIgnoreErr(c.Change.Content...)
ns.SetChangeId(c.Id)
ns.AddFileKeys(c.FileKeys...)
_, _, err = state.ApplyStateFastOne(ns)
if err != nil {
return false
}
return true
})
if err != nil {
return nil, changesApplied, err
}
if lastChange != nil {
s.SetLastModified(lastChange.Timestamp, lastChange.Account)
}
fmt.Println("SET MIG VERSION", s.RootId(), lastMigrationVersion)
s.SetMigrationVersion(lastMigrationVersion)
log.Infof("build state (crdt): changes: %d; dur: %v;", changesApplied, time.Since(st))
return s, changesApplied, err
}

View file

@ -8,15 +8,13 @@ import (
"sync"
"time"
"github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace"
// nolint:misspell
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/gogo/protobuf/types"
"github.com/anytypeio/go-anytype-middleware/app"
"github.com/anytypeio/go-anytype-middleware/core/block/doc"
"github.com/anytypeio/go-anytype-middleware/core/block/editor/state"
"github.com/anytypeio/go-anytype-middleware/core/block/editor/template"
"github.com/anytypeio/go-anytype-middleware/core/block/migration"
"github.com/anytypeio/go-anytype-middleware/core/block/restriction"
"github.com/anytypeio/go-anytype-middleware/core/block/simple"
"github.com/anytypeio/go-anytype-middleware/core/block/source"
@ -30,12 +28,12 @@ import (
"github.com/anytypeio/go-anytype-middleware/pkg/lib/core"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/database"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/files"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/addr"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/localstore/objectstore"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/logging"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/pb/model"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/util"
"github.com/anytypeio/go-anytype-middleware/util/internalflag"
"github.com/anytypeio/go-anytype-middleware/util/mutex"
"github.com/anytypeio/go-anytype-middleware/util/ocache"
"github.com/anytypeio/go-anytype-middleware/util/pbtypes"
"github.com/anytypeio/go-anytype-middleware/util/slice"
)
@ -48,8 +46,6 @@ var (
ErrIsDeleted = errors.New("smartblock is deleted")
)
const CollectionStoreKey = "objects"
const (
NoHistory ApplyFlag = iota
NoEvent
@ -84,7 +80,7 @@ const CallerKey key = 0
var log = logging.Logger("anytype-mw-smartblock")
func New() SmartBlock {
s := &smartBlock{hooks: map[Hook][]HookCallback{}, Locker: &sync.Mutex{}}
s := &smartBlock{hooks: map[Hook][]HookCallback{}, Locker: mutex.NewLocker()}
return s
}
@ -122,27 +118,17 @@ type SmartBlock interface {
EnabledRelationAsDependentObjects()
AddHook(f HookCallback, events ...Hook)
CheckSubscriptions() (changed bool)
GetDocInfo() DocInfo
GetDocInfo() (doc.DocInfo, error)
Restrictions() restriction.Restrictions
SetRestrictions(r restriction.Restrictions)
ObjectClose()
FileRelationKeys(s *state.State) []string
Inner() SmartBlock
ocache.Object
ocache.ObjectLocker
state.Doc
sync.Locker
}
type DocInfo struct {
Id string
Links []string
FileHashes []string
Heads []string
Creator string
State *state.State
}
type InitContext struct {
Source source.Source
ObjectTypeUrls []string
@ -150,9 +136,8 @@ type InitContext struct {
State *state.State
Relations []*model.Relation
Restriction restriction.Service
Doc doc.Service
ObjectStore objectstore.ObjectStore
SpaceID string
BuildTreeOpts commonspace.BuildTreeOpts
Ctx context.Context
App *app.App
}
@ -162,24 +147,14 @@ type linkSource interface {
HasSmartIds() bool
}
type Locker interface {
TryLock() bool
sync.Locker
}
type Indexer interface {
Index(ctx context.Context, info DocInfo) error
}
type smartBlock struct {
state.Doc
objecttree.ObjectTree
Locker
sync.Locker
depIds []string // slice must be sorted
sendEvent func(e *pb.Event)
undo undo.History
source source.Source
indexer Indexer
doc doc.Service
metaData *core.SmartBlockMeta
lastDepDetails map[string]*pb.EventObjectDetailsSet
restrictions restriction.Restrictions
@ -197,22 +172,6 @@ type smartBlock struct {
closeRecordsSub func()
}
type LockerSetter interface {
SetLocker(locker Locker)
}
func (sb *smartBlock) SetLocker(locker Locker) {
sb.Locker = locker
}
func (sb *smartBlock) Tree() objecttree.ObjectTree {
return sb.ObjectTree
}
func (sb *smartBlock) Inner() SmartBlock {
return sb
}
func (sb *smartBlock) FileRelationKeys(s *state.State) (fileKeys []string) {
for _, rel := range s.GetRelationLinks() {
// coverId can contains both hash or predefined cover id
@ -270,13 +229,10 @@ func (sb *smartBlock) Init(ctx *InitContext) (err error) {
}
sb.source = ctx.Source
if provider, ok := sb.source.(source.ObjectTreeProvider); ok {
sb.ObjectTree = provider.Tree()
}
sb.undo = undo.NewHistory(0)
sb.restrictions = ctx.App.MustComponent(restriction.CName).(restriction.Service).RestrictionsByObj(sb)
sb.relationService = ctx.App.MustComponent(relation2.CName).(relation2.Service)
sb.indexer = app.MustComponent[Indexer](ctx.App)
sb.doc = ctx.App.MustComponent(doc.CName).(doc.Service)
sb.objectStore = ctx.App.MustComponent(objectstore.CName).(objectstore.ObjectStore)
sb.lastDepDetails = map[string]*pb.EventObjectDetailsSet{}
if ctx.State != nil {
@ -317,6 +273,9 @@ func (sb *smartBlock) Init(ctx *InitContext) (err error) {
if err = sb.injectLocalDetails(ctx.State); err != nil {
return
}
if err := migration.ApplyMigrations(ctx.State, sb); err != nil {
return fmt.Errorf("apply migrations: %w", err)
}
return
}
@ -389,10 +348,8 @@ func (sb *smartBlock) fetchMeta() (details []*model.ObjectViewDetailsSet, object
}
recordsCh := make(chan *types.Struct, 10)
sb.recordsSub = database.NewSubscription(nil, recordsCh)
depIDs := sb.dependentSmartIds(sb.includeRelationObjectsAsDependents, true, true, true)
sb.setDependentIDs(depIDs)
sb.depIds = sb.dependentSmartIds(sb.includeRelationObjectsAsDependents, true, true, true)
sort.Strings(sb.depIds)
var records []database.Record
if records, sb.closeRecordsSub, err = sb.objectStore.QueryByIdAndSubscribeForChanges(sb.depIds, sb.recordsSub); err != nil {
// datastore unavailable, cancel the subscription
@ -434,14 +391,6 @@ func (sb *smartBlock) fetchMeta() (details []*model.ObjectViewDetailsSet, object
return
}
func (sb *smartBlock) Lock() {
sb.Locker.Lock()
}
func (sb *smartBlock) Unlock() {
sb.Locker.Unlock()
}
func (sb *smartBlock) metaListener(ch chan *types.Struct) {
for {
rec, ok := <-ch
@ -455,48 +404,46 @@ func (sb *smartBlock) metaListener(ch chan *types.Struct) {
}
func (sb *smartBlock) onMetaChange(details *types.Struct) {
if sb.sendEvent == nil {
return
}
if details == nil {
return
}
id := pbtypes.GetString(details, bundle.RelationKeyId.String())
msgs := []*pb.EventMessage{}
if v, exists := sb.lastDepDetails[id]; exists {
diff := pbtypes.StructDiff(v.Details, details)
if id == sb.Id() {
// if we've got update for ourselves, we are only interested in local-only details, because the rest details changes will be appended when applying records in the current sb
diff = pbtypes.StructFilterKeys(diff, bundle.LocalRelationsKeys)
if len(diff.GetFields()) > 0 {
log.With("thread", sb.Id()).Debugf("onMetaChange current object: %s", pbtypes.Sprint(diff))
if sb.sendEvent != nil {
id := pbtypes.GetString(details, bundle.RelationKeyId.String())
msgs := []*pb.EventMessage{}
if details != nil {
if v, exists := sb.lastDepDetails[id]; exists {
diff := pbtypes.StructDiff(v.Details, details)
if id == sb.Id() {
// if we've got update for ourselves, we are only interested in local-only details, because the rest details changes will be appended when applying records in the current sb
diff = pbtypes.StructFilterKeys(diff, bundle.LocalRelationsKeys)
if len(diff.GetFields()) > 0 {
log.With("thread", sb.Id()).Debugf("onMetaChange current object: %s", pbtypes.Sprint(diff))
}
}
msgs = append(msgs, state.StructDiffIntoEvents(id, diff)...)
} else {
msgs = append(msgs, &pb.EventMessage{
Value: &pb.EventMessageValueOfObjectDetailsSet{
ObjectDetailsSet: &pb.EventObjectDetailsSet{
Id: id,
Details: details,
},
},
})
}
sb.lastDepDetails[id] = &pb.EventObjectDetailsSet{
Id: id,
Details: details,
}
}
msgs = append(msgs, state.StructDiffIntoEvents(id, diff)...)
} else {
msgs = append(msgs, &pb.EventMessage{
Value: &pb.EventMessageValueOfObjectDetailsSet{
ObjectDetailsSet: &pb.EventObjectDetailsSet{
Id: id,
Details: details,
},
},
if len(msgs) == 0 {
return
}
sb.sendEvent(&pb.Event{
Messages: msgs,
ContextId: sb.Id(),
})
}
sb.lastDepDetails[id] = &pb.EventObjectDetailsSet{
Id: id,
Details: details,
}
if len(msgs) == 0 {
return
}
sb.sendEvent(&pb.Event{
Messages: msgs,
ContextId: sb.Id(),
})
}
// dependentSmartIds returns list of dependent objects in this order: Simple blocks(Link, mentions in Text), Relations. Both of them are returned in the order of original blocks/relations
@ -509,82 +456,16 @@ func (sb *smartBlock) dependentSmartIds(includeRelations, includeObjTypes, inclu
return sb.Doc.(*state.State).DepSmartIds(true, true, includeRelations, includeObjTypes, includeCreatorModifier)
}
func (sb *smartBlock) navigationalLinks() []string {
includeDetails := sb.Type() != model.SmartBlockType_Breadcrumbs
includeRelations := sb.includeRelationObjectsAsDependents
s := sb.Doc.(*state.State)
// Objects from collection
ids := pbtypes.GetStringList(s.Store(), CollectionStoreKey)
err := s.Iterate(func(b simple.Block) (isContinue bool) {
if f := b.Model().GetFile(); f != nil {
if f.Hash != "" {
ids = append(ids, f.Hash)
}
return true
}
// Include only link to target object
if dv := b.Model().GetDataview(); dv != nil {
if dv.TargetObjectId != "" {
ids = append(ids, dv.TargetObjectId)
}
return true
}
if ls, ok := b.(linkSource); ok {
ids = ls.FillSmartIds(ids)
}
return true
})
if err != nil {
log.With("thread", s.RootId()).Errorf("failed to iterate over simple blocks: %s", err)
}
var det *types.Struct
if includeDetails {
det = s.CombinedDetails()
}
for _, rel := range s.GetRelationLinks() {
if includeRelations {
ids = append(ids, addr.RelationKeyToIdPrefix+rel.Key)
}
if !includeDetails {
continue
}
if rel.Format != model.RelationFormat_object {
continue
}
if bundle.RelationKey(rel.Key).IsSystem() {
continue
}
// Do not include hidden relations. Only bundled relations can be hidden, so we don't need
// to request relations from object store.
if r, err := bundle.GetRelation(bundle.RelationKey(rel.Key)); err == nil && r.Hidden {
continue
}
// Add all object relation values as dependents
for _, targetID := range pbtypes.GetStringList(det, rel.Key) {
if targetID != "" {
ids = append(ids, targetID)
}
}
}
return util.UniqueStrings(ids)
}
func (sb *smartBlock) SetEventFunc(f func(e *pb.Event)) {
sb.sendEvent = f
}
func (sb *smartBlock) Locked() bool {
sb.Lock()
defer sb.Unlock()
return sb.IsLocked()
}
func (sb *smartBlock) IsLocked() bool {
return sb.sendEvent != nil
}
@ -602,14 +483,7 @@ func (sb *smartBlock) Apply(s *state.State, flags ...ApplyFlag) (err error) {
if sb.IsDeleted() {
return ErrIsDeleted
}
var (
sendEvent = true
addHistory = true
doSnapshot = false
checkRestrictions = true
hooks = true
skipIfNoChanges = false
)
var sendEvent, addHistory, doSnapshot, checkRestrictions, hooks = true, true, false, true, true
for _, f := range flags {
switch f {
case NoEvent:
@ -622,8 +496,6 @@ func (sb *smartBlock) Apply(s *state.State, flags ...ApplyFlag) (err error) {
checkRestrictions = false
case NoHooks:
hooks = false
case SkipIfNoChanges:
skipIfNoChanges = true
}
}
if sb.source.ReadOnly() && addHistory {
@ -645,7 +517,7 @@ func (sb *smartBlock) Apply(s *state.State, flags ...ApplyFlag) (err error) {
}
if sb.Anytype() != nil {
// this one will be reverted in case we don't have any actual change being made
s.SetLastModified(time.Now().Unix(), sb.Anytype().PredefinedBlocks().Profile)
s.SetLastModified(time.Now().Unix(), sb.Anytype().Account())
}
beforeApplyStateTime := time.Now()
msgs, act, err := state.ApplyState(s, !sb.disableLayouts)
@ -656,12 +528,6 @@ func (sb *smartBlock) Apply(s *state.State, flags ...ApplyFlag) (err error) {
st := sb.Doc.(*state.State)
changes := st.GetChanges()
sb.runIndexer(st)
if skipIfNoChanges && len(changes) == 0 {
return nil
}
pushChange := func() {
fileDetailsKeys := sb.FileRelationKeys(st)
fileDetailsKeysFiltered := fileDetailsKeys[:0]
@ -711,6 +577,8 @@ func (sb *smartBlock) Apply(s *state.State, flags ...ApplyFlag) (err error) {
}
}
sb.reportChange(st)
if hasDepIds(sb.GetRelationLinks(), &act) {
sb.CheckSubscriptions()
}
@ -736,7 +604,6 @@ func (sb *smartBlock) Apply(s *state.State, flags ...ApplyFlag) (err error) {
func (sb *smartBlock) ResetToVersion(s *state.State) (err error) {
s.SetParent(sb.Doc.(*state.State))
sb.storeFileKeys(s)
if err = sb.Apply(s, NoHistory, DoSnapshot, NoRestrictions); err != nil {
return
}
@ -747,35 +614,23 @@ func (sb *smartBlock) ResetToVersion(s *state.State) (err error) {
}
func (sb *smartBlock) CheckSubscriptions() (changed bool) {
depIDs := sb.dependentSmartIds(sb.includeRelationObjectsAsDependents, true, true, true)
changed = sb.setDependentIDs(depIDs)
if sb.recordsSub == nil {
depIds := sb.dependentSmartIds(sb.includeRelationObjectsAsDependents, true, true, true)
sort.Strings(depIds)
if !slice.SortedEquals(sb.depIds, depIds) {
sb.depIds = depIds
if sb.recordsSub != nil {
newIds := sb.recordsSub.Subscribe(sb.depIds)
records, err := sb.objectStore.QueryById(newIds)
if err != nil {
log.Errorf("queryById error: %v", err)
}
for _, rec := range records {
sb.onMetaChange(rec.Details)
}
}
return true
}
newIDs := sb.recordsSub.Subscribe(sb.depIds)
records, err := sb.objectStore.QueryById(newIDs)
if err != nil {
log.Errorf("queryById error: %v", err)
}
for _, rec := range records {
sb.onMetaChange(rec.Details)
}
return true
}
func (sb *smartBlock) setDependentIDs(depIDs []string) (changed bool) {
sort.Strings(depIDs)
if slice.SortedEquals(sb.depIds, depIDs) {
return false
}
// TODO Use algo for sorted strings
removed, _ := slice.DifferenceRemovedAdded(sb.depIds, depIDs)
for _, id := range removed {
delete(sb.lastDepDetails, id)
}
sb.depIds = depIDs
return true
return false
}
func (sb *smartBlock) NewState() *state.State {
@ -813,9 +668,6 @@ func (sb *smartBlock) SetDetails(ctx *session.Context, details []*pb.RpcObjectSe
for _, detail := range details {
if detail.Value != nil {
if err := pbtypes.ValidateValue(detail.Value); err != nil {
return fmt.Errorf("detail %s validation error: %s", detail.Key, err.Error())
}
if detail.Key == bundle.RelationKeyType.String() {
// special case when client sets the type's detail directly instead of using setObjectType command
err = sb.SetObjectTypes(ctx, pbtypes.GetStringListValue(detail.Value))
@ -836,9 +688,11 @@ func (sb *smartBlock) SetDetails(ctx *session.Context, details []*pb.RpcObjectSe
// TODO: add relation2.WithWorkspaceId(workspaceId) filter
rel, err := sb.RelationService().FetchKey(detail.Key)
if err != nil || rel == nil {
log.Errorf("failed to get relation: %s", err)
continue
if err != nil {
return fmt.Errorf("fetch relation by key %s: %w", detail.Key, err)
}
if rel == nil {
return fmt.Errorf("relation %s is not found", detail.Key)
}
s.AddRelationLinks(&model.RelationLink{
Format: rel.Format,
@ -847,8 +701,7 @@ func (sb *smartBlock) SetDetails(ctx *session.Context, details []*pb.RpcObjectSe
err = sb.RelationService().ValidateFormat(detail.Key, detail.Value)
if err != nil {
log.Errorf("failed to validate relation: %s", err)
continue
return fmt.Errorf("relation %s validation failed: %s", detail.Key, err.Error())
}
// special case for type relation that we are storing in a separate object's field
@ -925,17 +778,16 @@ func (sb *smartBlock) injectLocalDetails(s *state.State) error {
return err
}
// Consume pending details
err = sb.objectStore.UpdatePendingLocalDetails(sb.Id(), func(pending *types.Struct) (*types.Struct, error) {
storedDetails.Details = pbtypes.StructMerge(storedDetails.GetDetails(), pending, false)
return nil, nil
})
if err != nil {
log.With("thread", sb.Id()).
With("sbType", sb.Type()).
Errorf("failed to update pending details: %v", err)
pendingDetails, err := sb.objectStore.GetPendingLocalDetails(sb.Id())
if err == nil {
storedDetails.Details = pbtypes.StructMerge(storedDetails.GetDetails(), pendingDetails.GetDetails(), false)
err = sb.objectStore.UpdatePendingLocalDetails(sb.Id(), nil)
if err != nil {
log.With("thread", sb.Id()).
With("sbType", sb.Type()).
Errorf("failed to update pending details: %v", err)
}
}
// inject also derived keys, because it may be a good idea to have created date and creator cached,
// so we don't need to traverse changes every time
keys := append(bundle.LocalRelationsKeys, bundle.DerivedRelationsKeys...)
@ -1006,16 +858,16 @@ func (sb *smartBlock) SetObjectTypes(ctx *session.Context, objectTypes []string)
return err
}
if textBlock != nil {
s.SetDetail(bundle.RelationKeyName.String(), pbtypes.String(textBlock.Model().GetText().GetText()))
for _, id := range textBlock.Model().ChildrenIds {
s.Unlink(id)
s.SetDetail(bundle.RelationKeyName.String(), pbtypes.String(textBlock.Text.Text))
if err := s.Iterate(func(b simple.Block) (isContinue bool) {
if b.Model().Content == textBlock {
s.Unlink(b.Model().Id)
return false
}
return true
}); err != nil {
return err
}
err = s.InsertTo(textBlock.Model().Id, model.Block_Bottom, textBlock.Model().ChildrenIds...)
if err != nil {
return fmt.Errorf("insert children: %w", err)
}
s.Unlink(textBlock.Model().Id)
}
}
}
@ -1137,11 +989,11 @@ func (sb *smartBlock) RemoveExtraRelations(ctx *session.Context, relationIds []s
return sb.Apply(st)
}
func (sb *smartBlock) StateAppend(f func(d state.Doc) (s *state.State, changes []*pb.ChangeContent, err error)) error {
func (sb *smartBlock) StateAppend(f func(d state.Doc) (s *state.State, err error), changes []*pb.ChangeContent) error {
if sb.IsDeleted() {
return ErrIsDeleted
}
s, changes, err := f(sb.Doc)
s, err := f(sb.Doc)
if err != nil {
return err
}
@ -1162,7 +1014,7 @@ func (sb *smartBlock) StateAppend(f func(d state.Doc) (s *state.State, changes [
if hasDepIds(sb.GetRelationLinks(), &act) {
sb.CheckSubscriptions()
}
sb.runIndexer(s)
sb.reportChange(s)
sb.execHooks(HookAfterApply, ApplyInfo{State: s, Events: msgs, Changes: changes})
return nil
@ -1192,33 +1044,22 @@ func (sb *smartBlock) StateRebuild(d state.Doc) (err error) {
}
sb.storeFileKeys(d)
sb.CheckSubscriptions()
sb.runIndexer(sb.Doc.(*state.State))
sb.reportChange(sb.Doc.(*state.State))
sb.execHooks(HookAfterApply, ApplyInfo{State: sb.Doc.(*state.State), Events: msgs, Changes: d.(*state.State).GetChanges()})
return nil
}
func (sb *smartBlock) DocService() doc.Service {
return sb.doc
}
func (sb *smartBlock) ObjectClose() {
sb.execHooks(HookOnBlockClose, ApplyInfo{State: sb.Doc.(*state.State)})
sb.SetEventFunc(nil)
}
func (sb *smartBlock) TryClose(objectTTL time.Duration) (res bool, err error) {
if !sb.Locker.TryLock() {
return false, nil
}
if sb.IsLocked() {
sb.Unlock()
return false, nil
}
return true, sb.closeLocked()
}
func (sb *smartBlock) Close() (err error) {
sb.Lock()
return sb.closeLocked()
}
func (sb *smartBlock) closeLocked() (err error) {
sb.execHooks(HookOnClose, ApplyInfo{State: sb.Doc.(*state.State)})
if sb.closeRecordsSub != nil {
sb.closeRecordsSub()
@ -1358,11 +1199,11 @@ func (sb *smartBlock) execHooks(event Hook, info ApplyInfo) (err error) {
return
}
func (sb *smartBlock) GetDocInfo() DocInfo {
return sb.getDocInfo(sb.NewState())
func (sb *smartBlock) GetDocInfo() (doc.DocInfo, error) {
return sb.getDocInfo(sb.NewState()), nil
}
func (sb *smartBlock) getDocInfo(st *state.State) DocInfo {
func (sb *smartBlock) getDocInfo(st *state.State) doc.DocInfo {
fileHashes := st.GetAllFileHashes(sb.FileRelationKeys(st))
creator := pbtypes.GetString(st.Details(), bundle.RelationKeyCreator.String())
if creator == "" {
@ -1370,26 +1211,28 @@ func (sb *smartBlock) getDocInfo(st *state.State) DocInfo {
}
// we don't want any hidden or internal relations here. We want to capture the meaningful outgoing links only
links := sb.navigationalLinks()
links := sb.dependentSmartIds(sb.includeRelationObjectsAsDependents, false, false, false)
links = slice.Remove(links, sb.Id())
// so links will have this order
// 1. Simple blocks: links, mentions in the text
// 2. Relations(format==Object)
return DocInfo{
return doc.DocInfo{
Id: sb.Id(),
Links: links,
Heads: sb.source.Heads(),
LogHeads: sb.source.LogHeads(),
FileHashes: fileHashes,
Creator: creator,
State: st.Copy(),
}
}
func (sb *smartBlock) runIndexer(s *state.State) {
docInfo := sb.getDocInfo(s)
if err := sb.indexer.Index(context.TODO(), docInfo); err != nil {
log.Errorf("index object %s error: %s", sb.Id(), err)
func (sb *smartBlock) reportChange(s *state.State) {
if sb.doc == nil {
return
}
docInfo := sb.getDocInfo(s)
sb.doc.ReportChange(context.TODO(), docInfo)
}
func (sb *smartBlock) onApply(s *state.State) (err error) {
@ -1439,9 +1282,7 @@ func ObjectApplyTemplate(sb SmartBlock, s *state.State, templates ...template.St
func hasStoreChanges(changes []*pb.ChangeContent) bool {
for _, ch := range changes {
if ch.GetStoreKeySet() != nil ||
ch.GetStoreKeyUnset() != nil ||
ch.GetStoreSliceUpdate() != nil {
if ch.GetStoreKeySet() != nil || ch.GetStoreKeyUnset() != nil {
return true
}
}

View file

@ -33,7 +33,6 @@ const (
HeaderLayoutID = "header"
TitleBlockID = "title"
DescriptionBlockID = "description"
DataviewBlockID = "dataview"
DataviewTemplatesBlockID = "templates"
FeaturedRelationsID = "featuredRelations"
@ -66,7 +65,7 @@ type Doc interface {
GetAndUnsetFileKeys() []pb.ChangeFileKeys
BlocksInit(ds simple.DetailsService)
SearchText() string
GetFirstTextBlock() (simple.Block, error)
GetFirstTextBlock() (*model.BlockContentOfText, error)
}
func NewDoc(rootId string, blocks map[string]simple.Block) Doc {
@ -94,6 +93,8 @@ type State struct {
localDetails *types.Struct
relationLinks pbtypes.RelationLinks
migrationVersion uint32
// deprecated, used for migration
extraRelations []*model.Relation
aggregatedOptionsByRelation map[string][]*model.RelationOption // deprecated, used for migration
@ -112,6 +113,14 @@ type State struct {
noObjectType bool
}
func (s *State) MigrationVersion() uint32 {
return s.migrationVersion
}
func (s *State) SetMigrationVersion(v uint32) {
s.migrationVersion = v
}
func (s *State) RootId() string {
if s.rootId == "" {
for id := range s.blocks {
@ -132,11 +141,11 @@ func (s *State) RootId() string {
}
func (s *State) NewState() *State {
return &State{parent: s, blocks: make(map[string]simple.Block), rootId: s.rootId, noObjectType: s.noObjectType}
return &State{parent: s, blocks: make(map[string]simple.Block), rootId: s.rootId, noObjectType: s.noObjectType, migrationVersion: s.migrationVersion}
}
func (s *State) NewStateCtx(ctx *session.Context) *State {
return &State{parent: s, blocks: make(map[string]simple.Block), rootId: s.rootId, ctx: ctx, noObjectType: s.noObjectType}
return &State{parent: s, blocks: make(map[string]simple.Block), rootId: s.rootId, ctx: ctx, noObjectType: s.noObjectType, migrationVersion: s.migrationVersion}
}
func (s *State) Context() *session.Context {
@ -287,13 +296,6 @@ func (s *State) IsChild(parentId, childId string) bool {
}
}
func (s *State) PickOriginParentOf(id string) (res simple.Block) {
if s.parent != nil {
return s.parent.PickParentOf(id)
}
return
}
func (s *State) getStringBuf() []string {
if s.parent != nil {
return s.parent.getStringBuf()
@ -370,11 +372,11 @@ func (s *State) SearchText() (text string) {
return
}
func (s *State) GetFirstTextBlock() (simple.Block, error) {
var res simple.Block
func (s *State) GetFirstTextBlock() (*model.BlockContentOfText, error) {
var firstTextBlock *model.BlockContentOfText
err := s.Iterate(func(b simple.Block) (isContinue bool) {
if b.Model().GetText() != nil {
res = b
if content, ok := b.Model().Content.(*model.BlockContentOfText); ok {
firstTextBlock = content
return false
}
return true
@ -382,7 +384,8 @@ func (s *State) GetFirstTextBlock() (simple.Block, error) {
if err != nil {
return nil, err
}
return res, nil
return firstTextBlock, nil
}
func ApplyState(s *State, withLayouts bool) (msgs []simple.EventMessage, action undo.Action, err error) {
@ -583,6 +586,7 @@ func (s *State) apply(fast, one, withLayouts bool) (msgs []simple.EventMessage,
}
if s.parent != nil {
s.parent.changes = s.changes
s.parent.migrationVersion = s.migrationVersion
}
if s.parent != nil && s.changeId != "" {
s.parent.changeId = s.changeId
@ -708,7 +712,10 @@ func (s *State) processTrailingDuplicatedEvents(msgs []simple.EventMessage) (fil
var prev []byte
filtered = msgs[:0]
for _, e := range msgs {
curr, _ := e.Msg.Marshal()
curr, err := e.Msg.Marshal()
if err != nil {
continue
}
if bytes.Equal(prev, curr) {
log.With("thread", s.RootId()).Debugf("found trailing duplicated event %s", e.Msg.String())
continue
@ -843,10 +850,6 @@ func (s *State) SetLocalDetail(key string, value *types.Value) {
return
}
if err := pbtypes.ValidateValue(value); err != nil {
log.Errorf("invalid value for pb %s: %v", key, err)
}
s.localDetails.Fields[key] = value
return
}
@ -877,11 +880,6 @@ func (s *State) SetDetail(key string, value *types.Value) {
delete(s.details.Fields, key)
return
}
if err := pbtypes.ValidateValue(value); err != nil {
log.Errorf("invalid value for pb %s: %v", key, err)
}
s.details.Fields[key] = value
return
}
@ -903,14 +901,6 @@ func (s *State) SetObjectTypesToMigrate(objectTypes []string) *State {
}
func (s *State) InjectDerivedDetails() {
if objTypes := s.ObjectTypes(); len(objTypes) > 0 && objTypes[0] == bundle.TypeKeySet.URL() {
if b := s.Get("dataview"); b != nil {
source := b.Model().GetDataview().GetSource()
s.SetLocalDetail(bundle.RelationKeySetOf.String(), pbtypes.StringList(source))
} else {
s.SetLocalDetail(bundle.RelationKeySetOf.String(), pbtypes.StringList([]string{}))
}
}
s.SetDetailAndBundledRelation(bundle.RelationKeyId, pbtypes.String(s.RootId()))
if ot := s.ObjectType(); ot != "" {
@ -1298,37 +1288,25 @@ func (s *State) Copy() *State {
objTypesToMigrate := make([]string, len(s.ObjectTypesToMigrate()))
copy(objTypesToMigrate, s.ObjectTypesToMigrate())
agOptsCopy := make(map[string][]*model.RelationOption, len(s.AggregatedOptionsByRelation()))
for k, v := range s.AggregatedOptionsByRelation() {
agOptsCopy[k] = pbtypes.CopyRelationOptions(v)
}
relationLinks := make([]*model.RelationLink, len(s.relationLinks))
for i, rl := range s.relationLinks {
relationLinks[i] = &model.RelationLink{
Format: rl.Format,
Key: rl.Key,
}
}
storeKeyRemoved := s.StoreKeysRemoved()
storeKeyRemovedCopy := make(map[string]struct{}, len(storeKeyRemoved))
for i := range storeKeyRemoved {
storeKeyRemovedCopy[i] = struct{}{}
}
copy := &State{
ctx: s.ctx,
blocks: blocks,
rootId: s.rootId,
details: pbtypes.CopyStruct(s.Details()),
localDetails: pbtypes.CopyStruct(s.LocalDetails()),
relationLinks: relationLinks,
extraRelations: pbtypes.CopyRelations(s.OldExtraRelations()),
aggregatedOptionsByRelation: agOptsCopy,
objectTypes: objTypes,
objectTypesToMigrate: objTypesToMigrate,
noObjectType: s.noObjectType,
store: pbtypes.CopyStruct(s.Store()),
storeKeyRemoved: storeKeyRemovedCopy,
ctx: s.ctx,
blocks: blocks,
rootId: s.rootId,
details: pbtypes.CopyStruct(s.Details()),
localDetails: pbtypes.CopyStruct(s.LocalDetails()),
relationLinks: s.GetRelationLinks(), // Get methods copy inside
extraRelations: pbtypes.CopyRelations(s.OldExtraRelations()),
objectTypes: objTypes,
objectTypesToMigrate: objTypesToMigrate,
noObjectType: s.noObjectType,
migrationVersion: s.migrationVersion,
store: pbtypes.CopyStruct(s.Store()),
storeKeyRemoved: storeKeyRemovedCopy,
}
return copy
}

View file

@ -0,0 +1,111 @@
package migration
import (
"fmt"
"github.com/anytypeio/go-anytype-middleware/core/block/editor/state"
"github.com/anytypeio/go-anytype-middleware/core/block/simple"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/bundle"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/pb/model"
)
type SmartBlock interface {
Id() string
Type() model.SmartBlockType
ObjectType() string
}
type MigrationSelector func(sb SmartBlock) bool
func TypeSelector(t bundle.TypeKey) MigrationSelector {
return func(sb SmartBlock) bool {
return sb.ObjectType() == t.URL()
}
}
type Migration struct {
Version uint32
Steps []MigrationStep
}
type MigrationStep struct {
Selector MigrationSelector
Proc func(s *state.State, sb SmartBlock) error
}
func ApplyMigrations(st *state.State, sb SmartBlock) error {
for _, m := range migrations {
if st.MigrationVersion() >= m.Version {
fmt.Println("SKIP", m.Version, "FOR", sb.Id())
continue
}
for _, s := range m.Steps {
if s.Selector(sb) {
fmt.Println("APPLY", m.Version, "FOR", sb.Id())
if err := s.Proc(st, sb); err != nil {
return fmt.Errorf("MIGRATION FAIL: %w", err)
}
}
}
}
return nil
}
var migrations = []Migration{
{
Version: 1,
Steps: []MigrationStep{
{
Selector: TypeSelector(bundle.TypeKeyPage),
Proc: func(s *state.State, sb SmartBlock) error {
b := simple.New(&model.Block{
Content: &model.BlockContentOfText{
Text: &model.BlockContentText{
Text: "Test 1 " + sb.Id(),
},
},
})
s.Add(b)
return s.InsertTo("", model.Block_Inner, b.Model().Id)
},
},
},
},
{
Version: 2,
Steps: []MigrationStep{
{
Selector: TypeSelector(bundle.TypeKeyPage),
Proc: func(s *state.State, _ SmartBlock) error {
b := simple.New(&model.Block{
Content: &model.BlockContentOfText{
Text: &model.BlockContentText{
Text: "Second one",
},
},
})
s.Add(b)
return s.InsertTo("", model.Block_Inner, b.Model().Id)
},
},
},
},
}
var lastMigrationVersion uint32
func LastMigrationVersion() uint32 {
return lastMigrationVersion
}
func init() {
for _, m := range migrations {
if lastMigrationVersion < m.Version {
lastMigrationVersion = m.Version
}
}
}

View file

@ -8,23 +8,22 @@ import (
"sync"
"time"
"github.com/anytypeio/any-sync/accountservice"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree"
"github.com/gogo/protobuf/proto"
"github.com/cheggaaa/mb"
"github.com/gogo/protobuf/types"
"github.com/textileio/go-threads/core/logstore"
"github.com/textileio/go-threads/core/thread"
"go.uber.org/zap"
"github.com/anytypeio/go-anytype-middleware/change"
"github.com/anytypeio/go-anytype-middleware/core/block/editor/state"
"github.com/anytypeio/go-anytype-middleware/core/block/migration"
"github.com/anytypeio/go-anytype-middleware/core/status"
"github.com/anytypeio/go-anytype-middleware/metrics"
"github.com/anytypeio/go-anytype-middleware/pb"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/core"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/core/smartblock"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/logging"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/pb/model"
"github.com/anytypeio/go-anytype-middleware/space"
"github.com/anytypeio/go-anytype-middleware/space/typeprovider"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/threads"
"github.com/anytypeio/go-anytype-middleware/util/slice"
)
@ -35,7 +34,7 @@ var (
)
type ChangeReceiver interface {
StateAppend(func(d state.Doc) (s *state.State, changes []*pb.ChangeContent, err error)) error
StateAppend(func(d state.Doc) (s *state.State, err error), []*pb.ChangeContent) error
StateRebuild(d state.Doc) (err error)
sync.Locker
}
@ -45,11 +44,13 @@ type Source interface {
Anytype() core.Service
Type() model.SmartBlockType
Virtual() bool
Heads() []string
LogHeads() map[string]string
GetFileKeysSnapshot() []*pb.ChangeFileKeys
ReadOnly() bool
ReadDoc(ctx context.Context, receiver ChangeReceiver, empty bool) (doc state.Doc, err error)
ReadMeta(ctx context.Context, receiver ChangeReceiver) (doc state.Doc, err error)
PushChange(params PushChangeParams) (id string, err error)
FindFirstChange(ctx context.Context) (c *change.Change, err error)
Close() (err error)
}
@ -79,113 +80,71 @@ func (s *service) SourceTypeBySbType(blockType smartblock.SmartBlockType) (Sourc
case smartblock.SmartBlockTypeAnytypeProfile:
return &anytypeProfile{a: s.anytype}, nil
case smartblock.SmartBlockTypeFile:
return &files{a: s.anytype, fileStore: s.fileStore}, nil
return &files{a: s.anytype}, nil
case smartblock.SmartBlockTypeBundledObjectType:
return &bundledObjectType{a: s.anytype}, nil
case smartblock.SmartBlockTypeBundledRelation:
return &bundledRelation{a: s.anytype}, nil
case smartblock.SmartBlockTypeWorkspaceOld:
return &threadDB{a: s.anytype}, nil
case smartblock.SmartBlockTypeBundledTemplate:
return s.NewStaticSource("", model.SmartBlockType_BundledTemplate, nil, nil), nil
default:
if err := blockType.Valid(); err != nil {
return nil, err
} else {
return &source{
a: s.anytype,
spaceService: s.spaceService,
smartblockType: blockType,
sbtProvider: s.sbtProvider,
}, nil
return &source{a: s.anytype, smartblockType: blockType}, nil
}
}
}
type sourceDeps struct {
anytype core.Service
statusService status.Service
accountService accountservice.Service
sbt smartblock.SmartBlockType
ot objecttree.ObjectTree
spaceService space.Service
sbtProvider typeprovider.SmartBlockTypeProvider
}
func newSource(a core.Service, ss status.Service, tid thread.ID, listenToOwnChanges bool) (s Source, err error) {
id := tid.String()
sb, err := a.GetBlock(id)
if err != nil {
if err == logstore.ErrThreadNotFound {
return nil, ErrObjectNotFound
}
err = fmt.Errorf("anytype.GetBlock error: %w", err)
return
}
func newTreeSource(id string, deps sourceDeps) (s Source, err error) {
return &source{
ObjectTree: deps.ot,
id: id,
a: deps.anytype,
spaceService: deps.spaceService,
ss: deps.statusService,
logId: deps.anytype.Device(),
openedAt: time.Now(),
smartblockType: deps.sbt,
acc: deps.accountService,
sbtProvider: deps.sbtProvider,
}, nil
}
sbt, err := smartblock.SmartBlockTypeFromID(id)
if err != nil {
return nil, err
}
type ObjectTreeProvider interface {
Tree() objecttree.ObjectTree
s = &source{
id: id,
smartblockType: sbt,
tid: tid,
a: a,
ss: ss,
sb: sb,
listenToOwnDeviceChanges: listenToOwnChanges,
logId: a.Device(),
openedAt: time.Now(),
}
return
}
type source struct {
objecttree.ObjectTree
id, logId string
tid thread.ID
smartblockType smartblock.SmartBlockType
lastSnapshotId string
changesSinceSnapshot int
receiver ChangeReceiver
unsubscribe func()
metaOnly bool
closed chan struct{}
openedAt time.Time
a core.Service
ss status.Service
acc accountservice.Service
spaceService space.Service
sbtProvider typeprovider.SmartBlockTypeProvider
}
func (s *source) Tree() objecttree.ObjectTree {
return s.ObjectTree
}
func (s *source) Update(ot objecttree.ObjectTree) {
// here it should work, because we always have the most common snapshot of the changes in tree
s.lastSnapshotId = ot.Root().Id
prevSnapshot := s.lastSnapshotId
err := s.receiver.StateAppend(func(d state.Doc) (st *state.State, changes []*pb.ChangeContent, err error) {
st, changes, sinceSnapshot, err := BuildState(d.(*state.State), ot, s.Anytype().PredefinedBlocks().Profile)
if prevSnapshot != s.lastSnapshotId {
s.changesSinceSnapshot = sinceSnapshot
} else {
s.changesSinceSnapshot += sinceSnapshot
}
return st, changes, err
})
if err != nil {
log.With(zap.Error(err)).Debug("failed to append the state and send it to receiver")
}
}
func (s *source) Rebuild(ot objecttree.ObjectTree) {
if s.ObjectTree == nil {
return
}
doc, err := s.buildState()
if err != nil {
log.With(zap.Error(err)).Debug("failed to build state")
return
}
err = s.receiver.StateRebuild(doc.(*state.State))
if err != nil {
log.With(zap.Error(err)).Debug("failed to send the state to receiver")
}
id, logId string
tid thread.ID
smartblockType smartblock.SmartBlockType
a core.Service
ss status.Service
sb core.SmartBlock
tree *change.Tree
lastSnapshotId string
changesSinceSnapshot int
logHeads map[string]*change.Change
receiver ChangeReceiver
unsubscribe func()
metaOnly bool
listenToOwnDeviceChanges bool // false means we will ignore own(same-logID) changes in applyRecords
closed chan struct{}
openedAt time.Time
}
func (s *source) ReadOnly() bool {
@ -201,40 +160,176 @@ func (s *source) Anytype() core.Service {
}
func (s *source) Type() model.SmartBlockType {
return model.SmartBlockType(s.smartblockType)
return model.SmartBlockType(s.sb.Type())
}
func (s *source) Virtual() bool {
return false
}
func (s *source) ReadMeta(ctx context.Context, receiver ChangeReceiver) (doc state.Doc, err error) {
s.metaOnly = true
return s.readDoc(ctx, receiver, false)
}
func (s *source) ReadDoc(ctx context.Context, receiver ChangeReceiver, allowEmpty bool) (doc state.Doc, err error) {
return s.readDoc(ctx, receiver, allowEmpty)
}
func (s *source) readDoc(ctx context.Context, receiver ChangeReceiver, allowEmpty bool) (doc state.Doc, err error) {
s.receiver = receiver
setter, ok := s.ObjectTree.(synctree.ListenerSetter)
if !ok {
err = fmt.Errorf("should be able to set listner inside object tree")
var ch chan core.SmartblockRecordEnvelope
batch := mb.New(0)
if receiver != nil {
s.receiver = receiver
ch = make(chan core.SmartblockRecordEnvelope)
if s.unsubscribe, err = s.sb.SubscribeForRecords(ch); err != nil {
return
}
go func() {
defer batch.Close()
for rec := range ch {
batch.Add(rec)
}
}()
defer func() {
if err != nil {
batch.Close()
s.unsubscribe()
s.unsubscribe = nil
}
}()
}
startTime := time.Now()
log.With("thread", s.id).
Debug("start building tree")
loadCh := make(chan struct{})
ctxP := core.ThreadLoadProgress{}
ctx = ctxP.DeriveContext(ctx)
var request string
if v := ctx.Value(metrics.CtxKeyRequest); v != nil {
request = v.(string)
}
sendEvent := func(v core.ThreadLoadProgress, inProgress bool) {
logs, _ := s.sb.GetLogs()
spent := time.Since(startTime).Seconds()
var msg string
if inProgress {
msg = "tree building in progress"
} else {
msg = "tree building finished"
}
l := log.With("thread", s.id).
With("sb_type", s.smartblockType).
With("request", request).
With("logs", logs).
With("records_loaded", v.RecordsLoaded).
With("records_missing", v.RecordsMissingLocally).
With("spent", spent)
if spent > 30 {
l.Errorf(msg)
} else if spent > 3 {
l.Warn(msg)
} else {
l.Debug(msg)
}
event := metrics.TreeBuild{
SbType: uint64(s.smartblockType),
TimeMs: time.Since(startTime).Milliseconds(),
ObjectId: s.id,
Request: request,
InProgress: inProgress,
Logs: len(logs),
RecordsFailed: v.RecordsFailedToLoad,
RecordsLoaded: v.RecordsLoaded,
RecordsMissing: v.RecordsMissingLocally,
}
metrics.SharedClient.RecordEvent(event)
}
go func() {
tDuration := time.Second * 10
var v core.ThreadLoadProgress
forloop:
for {
select {
case <-loadCh:
break forloop
case <-time.After(tDuration):
v2 := ctxP.Value()
if v2.RecordsLoaded == v.RecordsLoaded && v2.RecordsMissingLocally == v.RecordsMissingLocally && v2.RecordsFailedToLoad == v.RecordsFailedToLoad {
// no progress, double the ticker
tDuration = tDuration * 2
}
v = v2
sendEvent(v, true)
}
}
}()
if s.metaOnly {
s.tree, s.logHeads, err = change.BuildMetaTree(ctx, s.sb)
} else {
s.tree, s.logHeads, err = change.BuildTree(ctx, s.sb)
}
close(loadCh)
treeBuildTime := time.Now().Sub(startTime).Milliseconds()
// if the build time is large enough we should record it
if treeBuildTime > 100 {
sendEvent(ctxP.Value(), false)
}
if allowEmpty && err == change.ErrEmpty {
err = nil
s.tree = new(change.Tree)
doc = state.NewDoc(s.id, nil)
doc.(*state.State).InjectDerivedDetails()
} else if err != nil {
log.With("thread", s.id).Errorf("buildTree failed: %s", err.Error())
return
} else if doc, err = s.buildState(); err != nil {
return
}
setter.SetListener(s)
return s.buildState()
if s.ss != nil {
// update timeline with recent information about heads
s.ss.UpdateTimeline(s.tid, s.timeline())
}
if s.unsubscribe != nil {
s.closed = make(chan struct{})
go s.changeListener(batch)
}
return
}
func (s *source) buildState() (doc state.Doc, err error) {
st, _, changesAppliedSinceSnapshot, err := BuildState(nil, s.ObjectTree, s.Anytype().PredefinedBlocks().Profile)
root := s.tree.Root()
if root == nil || root.GetSnapshot() == nil {
return nil, fmt.Errorf("root missing or not a snapshot")
}
s.lastSnapshotId = root.Id
doc = state.NewDocFromSnapshot(s.id, root.GetSnapshot()).(*state.State)
doc.(*state.State).SetChangeId(root.Id)
st, changesApplied, err := change.BuildStateSimpleCRDT(doc.(*state.State), s.tree)
if err != nil {
return
}
err = st.Validate()
if err != nil {
return
s.changesSinceSnapshot = changesApplied
if s.sb.Type() != smartblock.SmartBlockTypeArchive && !s.Virtual() {
if verr := st.Validate(); verr != nil {
log.With("thread", s.id).With("sbType", s.sb.Type()).Errorf("not valid state: %v", verr)
}
}
st.BlocksInit(st)
st.InjectDerivedDetails()
s.changesSinceSnapshot = changesAppliedSinceSnapshot
// TODO: check if we can leave only removeDuplicates instead of Normalize
if err = st.Normalize(false); err != nil {
return
@ -244,7 +339,8 @@ func (s *source) buildState() (doc state.Doc, err error) {
if _, _, err = state.ApplyState(st, false); err != nil {
return
}
return st, nil
return
}
func (s *source) GetCreationInfo() (creator string, createdDate int64, err error) {
@ -252,8 +348,32 @@ func (s *source) GetCreationInfo() (creator string, createdDate int64, err error
return "", 0, fmt.Errorf("anytype is nil")
}
createdDate = s.ObjectTree.UnmarshalledHeader().Timestamp
// TODO: add creator in profile
createdDate = time.Now().Unix()
createdBy := s.Anytype().Account()
// protect from the big documents with a large trees
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
start := time.Now()
fc, err := s.FindFirstChange(ctx)
if err == change.ErrEmpty {
err = nil
createdBy = s.Anytype().Account()
log.Debugf("InjectCreationInfo set for the empty object")
} else if err != nil {
return "", 0, fmt.Errorf("failed to find first change to derive creation info")
} else {
createdDate = fc.Timestamp
createdBy = fc.Account
}
spent := time.Since(start).Seconds()
if spent > 0.05 {
log.Warnf("Calculate creation info %s: %.2fs", s.Id(), time.Since(start).Seconds())
}
if profileId, e := threads.ProfileThreadIDFromAccountAddress(createdBy); e == nil {
creator = profileId.String()
}
return
}
@ -265,60 +385,60 @@ type PushChangeParams struct {
}
func (s *source) PushChange(params PushChangeParams) (id string, err error) {
c := &pb.Change{
Timestamp: time.Now().Unix(),
var c = &pb.Change{
PreviousIds: s.tree.Heads(),
LastSnapshotId: s.lastSnapshotId,
PreviousMetaIds: s.tree.MetaHeads(),
Timestamp: time.Now().Unix(),
Version: migration.LastMigrationVersion(),
}
if params.DoSnapshot || s.needSnapshot() || len(params.Changes) == 0 {
c.Snapshot = &pb.ChangeSnapshot{
LogHeads: s.LogHeads(),
Data: &model.SmartBlockSnapshotBase{
Blocks: params.State.BlocksToSave(),
Details: params.State.Details(),
ObjectTypes: params.State.ObjectTypes(),
Collections: params.State.Store(),
RelationLinks: params.State.PickRelationLinks(),
Blocks: params.State.BlocksToSave(),
Details: params.State.Details(),
ExtraRelations: nil,
ObjectTypes: params.State.ObjectTypes(),
Collections: params.State.Store(),
RelationLinks: params.State.PickRelationLinks(),
},
FileKeys: s.getFileHashesForSnapshot(params.FileChangedHashes),
}
if s.tree.Len() > 0 {
log.With("thread", s.id).With("len", s.tree.Len(), "lenSnap", s.changesSinceSnapshot, "changes", len(params.Changes), "doSnap", params.DoSnapshot).Warnf("do the snapshot")
}
}
c.Content = params.Changes
c.FileKeys = s.getFileKeysByHashes(params.FileChangedHashes)
data, err := c.Marshal()
if err != nil {
return
}
addResult, err := s.ObjectTree.AddContent(context.Background(), objecttree.SignableChangeContent{
Data: data,
Key: s.acc.Account().SignKey,
Identity: s.acc.Account().Identity,
IsSnapshot: c.Snapshot != nil,
IsEncrypted: true,
})
if err != nil {
return
}
id = addResult.Heads[0]
if id, err = s.sb.PushRecord(c); err != nil {
return
}
ch := &change.Change{Id: id, Change: c}
s.tree.Add(ch)
s.logHeads[s.logId] = ch
if c.Snapshot != nil {
s.lastSnapshotId = id
s.changesSinceSnapshot = 0
log.Infof("%s: pushed snapshot", s.id)
} else {
s.changesSinceSnapshot++
log.Debugf("%s: pushed %d changes", s.id, len(c.Content))
log.Debugf("%s: pushed %d changes", s.id, len(ch.Content))
}
return
}
func (s *source) ListIds() (ids []string, err error) {
spc, err := s.spaceService.AccountSpace(context.Background())
func (s *source) ListIds() ([]string, error) {
ids, err := s.Anytype().ThreadsIds()
if err != nil {
return
return nil, err
}
ids = slice.Filter(spc.StoredIds(), func(id string) bool {
ids = slice.Filter(ids, func(id string) bool {
if s.Anytype().PredefinedBlocks().IsAccount(id) {
return false
}
t, err := s.sbtProvider.Type(id)
t, err := smartblock.SmartBlockTypeFromID(id)
if err != nil {
return false
}
@ -348,24 +468,102 @@ func snapshotChance(changesSinceSnapshot int) bool {
}
func (s *source) needSnapshot() bool {
if s.ObjectTree.Heads()[0] == s.ObjectTree.Id() {
if s.tree.Len() == 0 {
// starting tree with snapshot
return true
}
return snapshotChance(s.changesSinceSnapshot)
}
func (s *source) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return s.getFileHashesForSnapshot(nil)
func (s *source) changeListener(batch *mb.MB) {
defer close(s.closed)
var records []core.SmartblockRecordEnvelope
for {
msgs := batch.Wait()
if len(msgs) == 0 {
return
}
records = records[:0]
for _, msg := range msgs {
records = append(records, msg.(core.SmartblockRecordEnvelope))
}
s.receiver.Lock()
if err := s.applyRecords(records); err != nil {
log.Errorf("can't handle records: %v; records: %v", err, records)
} else if s.ss != nil {
// notify about probably updated timeline
if tl := s.timeline(); len(tl) > 0 {
s.ss.UpdateTimeline(s.tid, tl)
}
}
s.receiver.Unlock()
// wait 100 millisecond for better batching
time.Sleep(100 * time.Millisecond)
}
}
func (s *source) iterate(startId string, iterFunc objecttree.ChangeIterateFunc) (err error) {
unmarshall := func(decrypted []byte) (res any, err error) {
ch := &pb.Change{}
err = proto.Unmarshal(decrypted, ch)
res = ch
return
func (s *source) applyRecords(records []core.SmartblockRecordEnvelope) error {
var changes = make([]*change.Change, 0, len(records))
for _, record := range records {
if record.LogID == s.a.Device() && !s.listenToOwnDeviceChanges {
// ignore self logs
continue
}
ch, e := change.NewChangeFromRecord(record)
if e != nil {
return e
}
if s.metaOnly && !ch.HasMeta() {
continue
}
changes = append(changes, ch)
s.logHeads[record.LogID] = ch
}
return s.ObjectTree.IterateFrom(startId, unmarshall, iterFunc)
log.With("thread", s.id).Infof("received %d records; changes count: %d", len(records), len(changes))
if len(changes) == 0 {
return nil
}
metrics.SharedClient.RecordEvent(metrics.ChangesetEvent{
Diff: time.Now().Unix() - changes[0].Timestamp,
})
switch s.tree.Add(changes...) {
case change.Nothing:
// existing or not complete
return nil
case change.Append:
changesContent := make([]*pb.ChangeContent, 0, len(changes))
for _, ch := range changes {
if ch.Snapshot != nil {
s.changesSinceSnapshot = 0
} else {
s.changesSinceSnapshot++
}
changesContent = append(changesContent, ch.Content...)
}
s.lastSnapshotId = s.tree.LastSnapshotId(context.TODO())
return s.receiver.StateAppend(func(d state.Doc) (*state.State, error) {
st, _, err := change.BuildStateSimpleCRDT(d.(*state.State), s.tree)
return st, err
}, changesContent)
case change.Rebuild:
s.lastSnapshotId = s.tree.LastSnapshotId(context.TODO())
doc, err := s.buildState()
if err != nil {
return err
}
return s.receiver.StateRebuild(doc.(*state.State))
default:
return fmt.Errorf("unsupported tree mode")
}
}
func (s *source) GetFileKeysSnapshot() []*pb.ChangeFileKeys {
return s.getFileHashesForSnapshot(nil)
}
func (s *source) getFileHashesForSnapshot(changeHashes []string) []*pb.ChangeFileKeys {
@ -382,22 +580,15 @@ func (s *source) getFileHashesForSnapshot(changeHashes []string) []*pb.ChangeFil
}
}
}
err := s.iterate(s.ObjectTree.Root().Id, func(c *objecttree.Change) (isContinue bool) {
model, ok := c.Model.(*pb.Change)
if !ok {
return false
s.tree.Iterate(s.tree.RootId(), func(c *change.Change) (isContinue bool) {
if c.Snapshot != nil && len(c.Snapshot.FileKeys) > 0 {
processFileKeys(c.Snapshot.FileKeys)
}
if model.Snapshot != nil && len(model.Snapshot.FileKeys) > 0 {
processFileKeys(model.Snapshot.FileKeys)
}
if len(model.FileKeys) > 0 {
processFileKeys(model.FileKeys)
if len(c.Change.FileKeys) > 0 {
processFileKeys(c.Change.FileKeys)
}
return true
})
if err != nil {
log.With(zap.Error(err)).Debug("failed to iterate through file keys")
}
return fileKeys
}
@ -417,14 +608,49 @@ func (s *source) getFileKeysByHashes(hashes []string) []*pb.ChangeFileKeys {
return fileKeys
}
func (s *source) Heads() []string {
if s.ObjectTree == nil {
return nil
func (s *source) FindFirstChange(ctx context.Context) (c *change.Change, err error) {
if s.tree.RootId() == "" {
return nil, change.ErrEmpty
}
heads := s.ObjectTree.Heads()
headsCopy := make([]string, 0, len(heads))
headsCopy = append(headsCopy, heads...)
return headsCopy
c = s.tree.Get(s.tree.RootId())
for c.LastSnapshotId != "" {
var rec *core.SmartblockRecordEnvelope
if rec, err = s.sb.GetRecord(ctx, c.LastSnapshotId); err != nil {
log.With("thread", s.id).With("logid", s.logId).With("recordId", c.LastSnapshotId).Errorf("failed to load first change: %s", err.Error())
return
}
if c, err = change.NewChangeFromRecord(*rec); err != nil {
log.With("thread", s.id).
With("logid", s.logId).
With("change", rec.ID).Errorf("FindFirstChange: failed to unmarshal change: %s; continue", err.Error())
err = nil
}
}
return
}
func (s *source) LogHeads() map[string]string {
var hs = make(map[string]string)
for id, ch := range s.logHeads {
if ch != nil {
hs[id] = ch.Id
}
}
return hs
}
func (s *source) timeline() []status.LogTime {
var timeline = make([]status.LogTime, 0, len(s.logHeads))
for _, ch := range s.logHeads {
if ch != nil && len(ch.Account) > 0 && len(ch.Device) > 0 {
timeline = append(timeline, status.LogTime{
AccountID: ch.Account,
DeviceID: ch.Device,
LastEdit: ch.Timestamp,
})
}
}
return timeline
}
func (s *source) Close() (err error) {
@ -432,74 +658,5 @@ func (s *source) Close() (err error) {
s.unsubscribe()
<-s.closed
}
return s.ObjectTree.Close()
}
func BuildState(initState *state.State, ot objecttree.ReadableObjectTree, profileId string) (st *state.State, appliedContent []*pb.ChangeContent, changesAppliedSinceSnapshot int, err error) {
var (
startId string
lastChange *objecttree.Change
count int
)
// if the state has no first change
if initState == nil {
startId = ot.Root().Id
} else {
st = initState
startId = st.ChangeId()
}
err = ot.IterateFrom(startId,
func(decrypted []byte) (any, error) {
ch := &pb.Change{}
err = proto.Unmarshal(decrypted, ch)
if err != nil {
return nil, err
}
return ch, nil
}, func(change *objecttree.Change) bool {
count++
lastChange = change
// that means that we are starting from tree root
if change.Id == ot.Id() {
st = state.NewDoc(ot.Id(), nil).(*state.State)
st.SetChangeId(change.Id)
return true
}
model := change.Model.(*pb.Change)
if startId == change.Id {
if st == nil {
changesAppliedSinceSnapshot = 0
st = state.NewDocFromSnapshot(ot.Id(), model.Snapshot).(*state.State)
st.SetChangeId(startId)
return true
} else {
st = st.NewState()
}
return true
}
if model.Snapshot != nil {
changesAppliedSinceSnapshot = 0
} else {
changesAppliedSinceSnapshot++
}
ns := st.NewState()
appliedContent = append(appliedContent, model.Content...)
ns.ApplyChangeIgnoreErr(model.Content...)
ns.SetChangeId(change.Id)
ns.AddFileKeys(model.FileKeys...)
_, _, err = state.ApplyStateFastOne(ns)
if err != nil {
return false
}
return true
})
if err != nil {
return
}
if lastChange != nil {
st.SetLastModified(lastChange.Timestamp, profileId)
}
return
return nil
}

View file

@ -1632,6 +1632,7 @@ the element of change tree used to store and internal apply smartBlock history
| snapshot | [Change.Snapshot](#anytype.Change.Snapshot) | | snapshot - when not null, the Content will be ignored |
| fileKeys | [Change.FileKeys](#anytype.Change.FileKeys) | repeated | file keys related to changes content |
| timestamp | [int64](#int64) | | creation timestamp |
| version | [uint32](#uint32) | | version of business logic |

File diff suppressed because it is too large Load diff

View file

@ -23,7 +23,8 @@ message Change {
// creation timestamp
int64 timestamp = 7;
string version = 8;
// version of business logic
uint32 version = 8;
message Snapshot {
// logId -> lastChangeId
@ -61,7 +62,6 @@ message Change {
StoreKeySet storeKeySet = 107;
StoreKeyUnset storeKeyUnset = 108;
StoreSliceUpdate storeSliceUpdate = 109;
}
}
@ -152,27 +152,4 @@ message Change {
message StoreKeyUnset {
repeated string path = 1;
}
message StoreSliceUpdate {
string key = 1;
oneof operation {
Add add = 2;
Remove remove = 3;
Move move = 4;
}
message Add {
string afterId = 1;
repeated string ids = 2;
}
message Remove {
repeated string ids = 1;
}
message Move {
string afterId = 1;
repeated string ids = 2;
}
}
}