1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

Add diffcontainer once again

This commit is contained in:
Mikhail Rakhmanov 2025-03-21 12:44:14 +01:00
parent cfc54ad61f
commit b81b79b75b
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
9 changed files with 92 additions and 37 deletions

View file

@ -42,9 +42,9 @@ func (d *diffContainer) DiffTypeCheck(ctx context.Context, typeChecker RemoteTyp
return typeChecker.DiffTypeCheck(ctx, d)
}
func NewDiffContainer(divideFactor, compareThreshold int) DiffContainer {
newDiff := newDiff(divideFactor, compareThreshold)
func NewDiffContainer(new, old Diff) DiffContainer {
return &diffContainer{
precalculated: newDiff,
newDiff: new,
oldDiff: old,
}
}

View file

@ -36,7 +36,7 @@ const logPeriodSecs = 200
func newDiffSyncer(hs *headSync) DiffSyncer {
return &diffSyncer{
diff: hs.diff,
diffContainer: hs.diffContainer,
spaceId: hs.spaceId,
storage: hs.storage,
peerManager: hs.peerManager,
@ -51,7 +51,7 @@ func newDiffSyncer(hs *headSync) DiffSyncer {
type diffSyncer struct {
spaceId string
diff ldiff.Diff
diffContainer ldiff.DiffContainer
peerManager peermanager.PeerManager
headUpdater *headUpdater
treeManager treemanager.TreeManager
@ -82,7 +82,7 @@ func (d *diffSyncer) OnUpdate(headsUpdate headstorage.HeadsUpdate) {
func (d *diffSyncer) updateHeads(update headstorage.HeadsUpdate) {
if update.DeletedStatus != nil {
_ = d.diff.RemoveId(update.Id)
_ = d.diffContainer.RemoveId(update.Id)
} else {
if d.deletionState.Exists(update.Id) {
return
@ -90,13 +90,15 @@ func (d *diffSyncer) updateHeads(update headstorage.HeadsUpdate) {
if update.IsDerived != nil && *update.IsDerived && len(update.Heads) == 1 && update.Heads[0] == update.Id {
return
}
d.diff.Set(ldiff.Element{
d.diffContainer.Set(ldiff.Element{
Id: update.Id,
Head: concatStrings(update.Heads),
})
}
// probably we should somehow batch the updates
err := d.storage.StateStorage().SetHash(d.ctx, d.diff.Hash())
oldHash := d.diffContainer.OldDiff().Hash()
newHash := d.diffContainer.NewDiff().Hash()
err := d.storage.StateStorage().SetHash(d.ctx, oldHash, newHash)
if err != nil {
d.log.Warn("can't write space hash", zap.Error(err))
}
@ -152,11 +154,18 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
newIds, changedIds, removedIds []string
)
newIds, changedIds, removedIds, err = d.diff.Diff(ctx, rdiff)
needsSync, diff, err := d.diffContainer.DiffTypeCheck(ctx, rdiff)
err = rpcerr.Unwrap(err)
if err != nil {
return d.onDiffError(ctx, p, cl, err)
}
if needsSync {
newIds, changedIds, removedIds, err = diff.Diff(ctx, rdiff)
err = rpcerr.Unwrap(err)
if err != nil {
return d.onDiffError(ctx, p, cl, err)
}
}
totalLen := len(newIds) + len(changedIds) + len(removedIds)
// not syncing ids which were removed through settings document
missingIds := d.deletionState.Filter(newIds)

View file

@ -10,6 +10,7 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/ldiff"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/app/olddiff"
"github.com/anyproto/any-sync/commonspace/config"
"github.com/anyproto/any-sync/commonspace/credentialprovider"
"github.com/anyproto/any-sync/commonspace/deletionstate"
@ -49,7 +50,7 @@ type headSync struct {
periodicSync periodicsync.PeriodicSync
storage spacestorage.SpaceStorage
diff ldiff.Diff
diffContainer ldiff.DiffContainer
log logger.CtxLogger
syncer DiffSyncer
configuration nodeconf.NodeConf
@ -75,7 +76,7 @@ func (h *headSync) Init(a *app.App) (err error) {
h.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
h.log = log.With(zap.String("spaceId", h.spaceId))
h.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
h.diff = ldiff.New(32, 256)
h.diffContainer = ldiff.NewDiffContainer(ldiff.New(32, 256), olddiff.New(32, 256))
h.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager)
h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider)
h.treeSyncer = a.MustComponent(treesyncer.CName).(treesyncer.TreeSyncer)
@ -103,17 +104,15 @@ func (h *headSync) Run(ctx context.Context) (err error) {
}
func (h *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
resp, err = HandleRangeRequest(ctx, h.diff, req)
if err != nil {
return
if req.DiffType == spacesyncproto.DiffType_V2 {
return HandleRangeRequest(ctx, h.diffContainer.NewDiff(), req)
} else {
return HandleRangeRequest(ctx, h.diffContainer.OldDiff(), req)
}
// this is done to fix the problem with compatibility with old clients
resp.DiffType = spacesyncproto.DiffType_Precalculated
return
}
func (h *headSync) AllIds() []string {
return h.diff.Ids()
return h.diffContainer.NewDiff().Ids()
}
func (h *headSync) ExternalIds() []string {
@ -125,7 +124,7 @@ func (h *headSync) ExternalIds() []string {
}
func (h *headSync) DebugAllHeads() (res []TreeHeads) {
els := h.diff.Elements()
els := h.diffContainer.NewDiff().Elements()
for _, el := range els {
idHead := TreeHeads{
Id: el.Id,
@ -162,8 +161,10 @@ func (h *headSync) fillDiff(ctx context.Context) error {
Head: h.syncAcl.Head().Id,
})
log.Debug("setting acl", zap.String("aclId", h.syncAcl.Id()), zap.String("headId", h.syncAcl.Head().Id))
h.diff.Set(els...)
if err := h.storage.StateStorage().SetHash(ctx, h.diff.Hash()); err != nil {
h.diffContainer.Set(els...)
oldHash := h.diffContainer.OldDiff().Hash()
newHash := h.diffContainer.NewDiff().Hash()
if err := h.storage.StateStorage().SetHash(ctx, oldHash, newHash); err != nil {
h.log.Error("can't write space hash", zap.Error(err))
return err
}

View file

@ -1,7 +1,10 @@
package headsync
import (
"bytes"
"context"
"encoding/hex"
"math"
"github.com/anyproto/any-sync/app/ldiff"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
@ -12,6 +15,7 @@ type Client interface {
}
type RemoteDiff interface {
ldiff.RemoteTypeChecker
ldiff.Remote
}
@ -68,6 +72,39 @@ func (r *remote) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf []ldif
return
}
func (r *remote) DiffTypeCheck(ctx context.Context, diffContainer ldiff.DiffContainer) (needsSync bool, diff ldiff.Diff, err error) {
req := &spacesyncproto.HeadSyncRequest{
SpaceId: r.spaceId,
DiffType: spacesyncproto.DiffType_V2,
Ranges: []*spacesyncproto.HeadSyncRange{{From: 0, To: math.MaxUint64}},
}
resp, err := r.client.HeadSync(ctx, req)
if err != nil {
return
}
needsSync = true
checkHash := func(diff ldiff.Diff) (bool, error) {
hashB, err := hex.DecodeString(diff.Hash())
if err != nil {
return false, err
}
if len(resp.Results) != 0 && bytes.Equal(hashB, resp.Results[0].Hash) {
return false, nil
}
return true, nil
}
r.diffType = resp.DiffType
switch resp.DiffType {
case spacesyncproto.DiffType_V2:
diff = diffContainer.OldDiff()
needsSync, err = checkHash(diff)
default:
diff = diffContainer.NewDiff()
needsSync, err = checkHash(diff)
}
return
}
func HandleRangeRequest(ctx context.Context, d ldiff.Diff, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
ranges := make([]ldiff.Range, 0, len(req.Ranges))
// basically we gather data applicable for both diffs

View file

@ -9,7 +9,8 @@ import (
)
type State struct {
Hash string
OldHash string
NewHash string
AclId string
SettingsId string
SpaceId string
@ -17,20 +18,21 @@ type State struct {
}
type Observer interface {
OnHashChange(hash string)
OnHashChange(oldHash, newHash string)
}
type StateStorage interface {
GetState(ctx context.Context) (State, error)
SettingsId() string
SetHash(ctx context.Context, hash string) error
SetHash(ctx context.Context, oldHash, newHash string) error
SetObserver(observer Observer)
}
const (
stateCollectionKey = "state"
idKey = "id"
hashKey = "h"
oldHashKey = "oh"
newHashKey = "nh"
headerKey = "e"
aclIdKey = "a"
settingsIdKey = "s"
@ -58,10 +60,10 @@ func (s *stateStorage) SetObserver(observer Observer) {
s.observer = observer
}
func (s *stateStorage) SetHash(ctx context.Context, hash string) (err error) {
func (s *stateStorage) SetHash(ctx context.Context, oldHash, newHash string) (err error) {
defer func() {
if s.observer != nil && err == nil {
s.observer.OnHashChange(hash)
s.observer.OnHashChange(oldHash, newHash)
}
}()
tx, err := s.stateColl.WriteTx(ctx)
@ -69,7 +71,8 @@ func (s *stateStorage) SetHash(ctx context.Context, hash string) (err error) {
return err
}
mod := query.ModifyFunc(func(a *anyenc.Arena, v *anyenc.Value) (result *anyenc.Value, modified bool, err error) {
v.Set(hashKey, a.NewString(hash))
v.Set(oldHashKey, a.NewString(oldHash))
v.Set(newHashKey, a.NewString(newHash))
return v, true, nil
})
_, err = s.stateColl.UpsertId(tx.Context(), s.spaceId, mod)
@ -150,7 +153,8 @@ func (s *stateStorage) stateFromDoc(doc anystore.Doc) State {
SpaceId: doc.Value().GetString(idKey),
SettingsId: doc.Value().GetString(settingsIdKey),
AclId: doc.Value().GetString(aclIdKey),
Hash: doc.Value().GetString(hashKey),
OldHash: doc.Value().GetString(newHashKey),
NewHash: doc.Value().GetString(oldHashKey),
SpaceHeader: doc.Value().GetBytes(headerKey),
}
}

View file

@ -190,7 +190,7 @@ func (s *spaceMigrator) migrateHash(ctx context.Context, oldStorage oldstorage.S
if err != nil {
return err
}
return newStorage.StateStorage().SetHash(ctx, spaceHash)
return newStorage.StateStorage().SetHash(ctx, spaceHash, spaceHash)
}
func (s *spaceMigrator) checkMigrated(ctx context.Context, id string) (bool, spacestorage.SpaceStorage) {

View file

@ -199,5 +199,6 @@ message AclGetRecordsResponse {
// DiffType is a type of diff
enum DiffType {
Initial = 0;
Precalculated = 1;
V1 = 1;
V2 = 2;
}

View file

@ -104,18 +104,21 @@ func (SpaceSubscriptionAction) EnumDescriptor() ([]byte, []int) {
type DiffType int32
const (
DiffType_Initial DiffType = 0
DiffType_Precalculated DiffType = 1
DiffType_Initial DiffType = 0
DiffType_V1 DiffType = 1
DiffType_V2 DiffType = 2
)
var DiffType_name = map[int32]string{
0: "Initial",
1: "Precalculated",
1: "V1",
2: "V2",
}
var DiffType_value = map[string]int32{
"Initial": 0,
"Precalculated": 1,
"Initial": 0,
"V1": 1,
"V2": 2,
}
func (x DiffType) String() string {

View file

@ -829,7 +829,7 @@ func Test_Sync(t *testing.T) {
require.NoError(t, err)
state, err := sp.Storage().StateStorage().GetState(context.Background())
require.NoError(t, err)
hashes = append(hashes, state.Hash)
hashes = append(hashes, state.NewHash)
}
for i := 1; i < len(hashes); i++ {
require.Equal(t, hashes[0], hashes[i])