mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
Fix ctx and change add method name in storage
This commit is contained in:
parent
7feb6b0d2c
commit
86ac7223db
7 changed files with 22 additions and 19 deletions
|
@ -199,7 +199,7 @@ func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeCont
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ot.treeStorage.AddRawChangesSetHead([]*treechangeproto.RawTreeChangeWithId{rawChange}, []string{objChange.Id})
|
err = ot.treeStorage.AddRawChangesSetHeads([]*treechangeproto.RawTreeChangeWithId{rawChange}, []string{objChange.Id})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -296,7 +296,7 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChang
|
||||||
addResult.Mode = Rebuild
|
addResult.Mode = Rebuild
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ot.treeStorage.AddRawChangesSetHead(addResult.Added, addResult.Heads)
|
err = ot.treeStorage.AddRawChangesSetHeads(addResult.Added, addResult.Heads)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// rolling back all changes made to inmemory state
|
// rolling back all changes made to inmemory state
|
||||||
ot.rebuildFromStorage(nil, nil)
|
ot.rebuildFromStorage(nil, nil)
|
||||||
|
|
|
@ -65,7 +65,7 @@ func prepareContext(
|
||||||
treeStorage := changeCreator.CreateNewTreeStorage("0", aclList.Head().Id)
|
treeStorage := changeCreator.CreateNewTreeStorage("0", aclList.Head().Id)
|
||||||
if additionalChanges != nil {
|
if additionalChanges != nil {
|
||||||
payload := additionalChanges(changeCreator)
|
payload := additionalChanges(changeCreator)
|
||||||
err := treeStorage.AddRawChangesSetHead(payload.RawChanges, payload.NewHeads)
|
err := treeStorage.AddRawChangesSetHeads(payload.RawChanges, payload.NewHeads)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
objTree, err := objTreeBuilder(treeStorage, aclList)
|
objTree, err := objTreeBuilder(treeStorage, aclList)
|
||||||
|
@ -503,7 +503,7 @@ func TestObjectTree(t *testing.T) {
|
||||||
changeCreator.CreateRaw("5", aclList.Head().Id, "1", false, "1"),
|
changeCreator.CreateRaw("5", aclList.Head().Id, "1", false, "1"),
|
||||||
changeCreator.CreateRaw("6", aclList.Head().Id, "1", true, "3", "4", "5"),
|
changeCreator.CreateRaw("6", aclList.Head().Id, "1", true, "3", "4", "5"),
|
||||||
}
|
}
|
||||||
store.AddRawChangesSetHead(storageChanges, []string{"1"})
|
store.AddRawChangesSetHeads(storageChanges, []string{"1"})
|
||||||
|
|
||||||
// updating with subset of those changes to see that everything will still work
|
// updating with subset of those changes to see that everything will still work
|
||||||
payload = RawChangesPayload{
|
payload = RawChangesPayload{
|
||||||
|
@ -748,7 +748,7 @@ func TestObjectTree(t *testing.T) {
|
||||||
changeCreator.CreateRaw("5", aclList.Head().Id, "0", false, "1"),
|
changeCreator.CreateRaw("5", aclList.Head().Id, "0", false, "1"),
|
||||||
changeCreator.CreateRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"),
|
changeCreator.CreateRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"),
|
||||||
}
|
}
|
||||||
deps.treeStorage.AddRawChangesSetHead(rawChanges, []string{"6"})
|
deps.treeStorage.AddRawChangesSetHeads(rawChanges, []string{"6"})
|
||||||
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
|
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
|
||||||
BeforeId: "6",
|
BeforeId: "6",
|
||||||
IncludeBeforeId: false,
|
IncludeBeforeId: false,
|
||||||
|
@ -780,7 +780,7 @@ func TestObjectTree(t *testing.T) {
|
||||||
changeCreator.CreateRaw("5", aclList.Head().Id, "1", true, "3", "4"),
|
changeCreator.CreateRaw("5", aclList.Head().Id, "1", true, "3", "4"),
|
||||||
changeCreator.CreateRaw("6", aclList.Head().Id, "5", false, "5"),
|
changeCreator.CreateRaw("6", aclList.Head().Id, "5", false, "5"),
|
||||||
}
|
}
|
||||||
deps.treeStorage.AddRawChangesSetHead(rawChanges, []string{"6"})
|
deps.treeStorage.AddRawChangesSetHeads(rawChanges, []string{"6"})
|
||||||
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
|
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
|
||||||
BuildFullTree: true,
|
BuildFullTree: true,
|
||||||
})
|
})
|
||||||
|
@ -810,7 +810,7 @@ func TestObjectTree(t *testing.T) {
|
||||||
changeCreator.CreateRaw("5", aclList.Head().Id, "0", false, "1"),
|
changeCreator.CreateRaw("5", aclList.Head().Id, "0", false, "1"),
|
||||||
changeCreator.CreateRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"),
|
changeCreator.CreateRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"),
|
||||||
}
|
}
|
||||||
deps.treeStorage.AddRawChangesSetHead(rawChanges, []string{"6"})
|
deps.treeStorage.AddRawChangesSetHeads(rawChanges, []string{"6"})
|
||||||
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
|
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
|
||||||
BeforeId: "6",
|
BeforeId: "6",
|
||||||
IncludeBeforeId: true,
|
IncludeBeforeId: true,
|
||||||
|
|
|
@ -112,7 +112,7 @@ func testTreeMerge(t *testing.T, levels, perLevel int, hasData bool, isSnapshot
|
||||||
}
|
}
|
||||||
// generating initial tree
|
// generating initial tree
|
||||||
initialRes := genChanges(changeCreator, params)
|
initialRes := genChanges(changeCreator, params)
|
||||||
err = storage.AddRawChangesSetHead(initialRes.changes, initialRes.heads)
|
err = storage.AddRawChangesSetHeads(initialRes.changes, initialRes.heads)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
deps := fixtureDeps{
|
deps := fixtureDeps{
|
||||||
aclList: aclList,
|
aclList: aclList,
|
||||||
|
@ -264,7 +264,7 @@ func testTreeStorageHasExtra(t *testing.T, levels, perLevel int, hasData bool, i
|
||||||
// adding some changes to store, but without updating heads
|
// adding some changes to store, but without updating heads
|
||||||
store := fx.handlers["peer1"].tree().Storage().(*treestorage.InMemoryTreeStorage)
|
store := fx.handlers["peer1"].tree().Storage().(*treestorage.InMemoryTreeStorage)
|
||||||
oldHeads, _ := store.Heads()
|
oldHeads, _ := store.Heads()
|
||||||
store.AddRawChangesSetHead(initialRes.changes, oldHeads)
|
store.AddRawChangesSetHeads(initialRes.changes, oldHeads)
|
||||||
|
|
||||||
// sending those changes to other peer
|
// sending those changes to other peer
|
||||||
fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
|
fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
|
||||||
|
@ -333,8 +333,8 @@ func testTreeStorageHasExtraThreeParts(t *testing.T, levels, perLevel int, hasDa
|
||||||
require.True(t, slice.UnsortedEquals(res.Heads, firstPart.heads))
|
require.True(t, slice.UnsortedEquals(res.Heads, firstPart.heads))
|
||||||
store := fx.handlers["peer1"].tree().Storage().(*treestorage.InMemoryTreeStorage)
|
store := fx.handlers["peer1"].tree().Storage().(*treestorage.InMemoryTreeStorage)
|
||||||
oldHeads, _ := store.Heads()
|
oldHeads, _ := store.Heads()
|
||||||
store.AddRawChangesSetHead(secondPart.changes, oldHeads)
|
store.AddRawChangesSetHeads(secondPart.changes, oldHeads)
|
||||||
store.AddRawChangesSetHead(thirdPart.changes, oldHeads)
|
store.AddRawChangesSetHeads(thirdPart.changes, oldHeads)
|
||||||
|
|
||||||
var peer2Initial []*treechangeproto.RawTreeChangeWithId
|
var peer2Initial []*treechangeproto.RawTreeChangeWithId
|
||||||
peer2Initial = append(peer2Initial, firstPart.changes...)
|
peer2Initial = append(peer2Initial, firstPart.changes...)
|
||||||
|
|
|
@ -22,7 +22,7 @@ func (t *InMemoryTreeStorage) SetReturnErrorOnAdd(err error) {
|
||||||
t.addErr = err
|
t.addErr = err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *InMemoryTreeStorage) AddRawChangesSetHead(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error {
|
func (t *InMemoryTreeStorage) AddRawChangesSetHeads(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error {
|
||||||
t.RLock()
|
t.RLock()
|
||||||
defer t.RUnlock()
|
defer t.RUnlock()
|
||||||
if t.addErr != nil {
|
if t.addErr != nil {
|
||||||
|
|
|
@ -49,18 +49,18 @@ func (mr *MockTreeStorageMockRecorder) AddRawChange(arg0 interface{}) *gomock.Ca
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChange", reflect.TypeOf((*MockTreeStorage)(nil).AddRawChange), arg0)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChange", reflect.TypeOf((*MockTreeStorage)(nil).AddRawChange), arg0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddRawChangesSetHead mocks base method.
|
// AddRawChangesSetHeads mocks base method.
|
||||||
func (m *MockTreeStorage) AddRawChangesSetHead(arg0 []*treechangeproto.RawTreeChangeWithId, arg1 []string) error {
|
func (m *MockTreeStorage) AddRawChangesSetHeads(arg0 []*treechangeproto.RawTreeChangeWithId, arg1 []string) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "AddRawChangesSetHead", arg0, arg1)
|
ret := m.ctrl.Call(m, "AddRawChangesSetHeads", arg0, arg1)
|
||||||
ret0, _ := ret[0].(error)
|
ret0, _ := ret[0].(error)
|
||||||
return ret0
|
return ret0
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddRawChangesSetHead indicates an expected call of AddRawChangesSetHead.
|
// AddRawChangesSetHeads indicates an expected call of AddRawChangesSetHeads.
|
||||||
func (mr *MockTreeStorageMockRecorder) AddRawChangesSetHead(arg0, arg1 interface{}) *gomock.Call {
|
func (mr *MockTreeStorageMockRecorder) AddRawChangesSetHeads(arg0, arg1 interface{}) *gomock.Call {
|
||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChangesSetHead", reflect.TypeOf((*MockTreeStorage)(nil).AddRawChangesSetHead), arg0, arg1)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChangesSetHeads", reflect.TypeOf((*MockTreeStorage)(nil).AddRawChangesSetHeads), arg0, arg1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete mocks base method.
|
// Delete mocks base method.
|
||||||
|
|
|
@ -31,7 +31,7 @@ type TreeStorage interface {
|
||||||
Heads() ([]string, error)
|
Heads() ([]string, error)
|
||||||
SetHeads(heads []string) error
|
SetHeads(heads []string) error
|
||||||
AddRawChange(change *treechangeproto.RawTreeChangeWithId) error
|
AddRawChange(change *treechangeproto.RawTreeChangeWithId) error
|
||||||
AddRawChangesSetHead(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error
|
AddRawChangesSetHeads(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error
|
||||||
|
|
||||||
GetRawChange(ctx context.Context, id string) (*treechangeproto.RawTreeChangeWithId, error)
|
GetRawChange(ctx context.Context, id string) (*treechangeproto.RawTreeChangeWithId, error)
|
||||||
HasChange(ctx context.Context, id string) (bool, error)
|
HasChange(ctx context.Context, id string) (bool, error)
|
||||||
|
|
|
@ -395,6 +395,9 @@ func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error)
|
||||||
_ = s.handleQueue.CloseThread(threadId)
|
_ = s.handleQueue.CloseThread(threadId)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
if hm.PeerCtx == nil {
|
||||||
|
hm.PeerCtx = ctx
|
||||||
|
}
|
||||||
err = s.handleQueue.Add(ctx, threadId, hm)
|
err = s.handleQueue.Add(ctx, threadId, hm)
|
||||||
if err == mb.ErrOverflowed {
|
if err == mb.ErrOverflowed {
|
||||||
log.InfoCtx(ctx, "queue overflowed", zap.String("spaceId", s.id), zap.String("objectId", threadId))
|
log.InfoCtx(ctx, "queue overflowed", zap.String("spaceId", s.id), zap.String("objectId", threadId))
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue