1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-11 10:18:08 +09:00

Merge pull request #47 from anytypeio/update-proto

This commit is contained in:
Mikhail Rakhmanov 2023-03-11 13:48:57 +01:00 committed by GitHub
commit 1fbf87995b
Signed by: github
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 1189 additions and 435 deletions

120
app/ocache/entry.go Normal file
View file

@ -0,0 +1,120 @@
package ocache
import (
"context"
"go.uber.org/zap"
"sync"
"time"
)
type entryState int
const (
entryStateLoading = iota
entryStateActive
entryStateClosing
entryStateClosed
)
type entry struct {
id string
state entryState
lastUsage time.Time
load chan struct{}
loadErr error
value Object
close chan struct{}
mx sync.Mutex
}
func newEntry(id string, value Object, state entryState) *entry {
return &entry{
id: id,
load: make(chan struct{}),
lastUsage: time.Now(),
state: state,
value: value,
}
}
func (e *entry) isActive() bool {
e.mx.Lock()
defer e.mx.Unlock()
return e.state == entryStateActive
}
func (e *entry) isClosing() bool {
e.mx.Lock()
defer e.mx.Unlock()
return e.state == entryStateClosed || e.state == entryStateClosing
}
func (e *entry) waitLoad(ctx context.Context, id string) (value Object, err error) {
select {
case <-ctx.Done():
log.DebugCtx(ctx, "ctx done while waiting on object load", zap.String("id", id))
return nil, ctx.Err()
case <-e.load:
return e.value, e.loadErr
}
}
func (e *entry) waitClose(ctx context.Context, id string) (res bool, err error) {
e.mx.Lock()
switch e.state {
case entryStateClosing:
waitCh := e.close
e.mx.Unlock()
select {
case <-ctx.Done():
log.DebugCtx(ctx, "ctx done while waiting on object close", zap.String("id", id))
return false, ctx.Err()
case <-waitCh:
return true, nil
}
case entryStateClosed:
e.mx.Unlock()
return true, nil
default:
e.mx.Unlock()
return false, nil
}
}
func (e *entry) setClosing(wait bool) (prevState, curState entryState) {
e.mx.Lock()
prevState = e.state
curState = e.state
if e.state == entryStateClosing {
waitCh := e.close
e.mx.Unlock()
if !wait {
return
}
<-waitCh
e.mx.Lock()
}
if e.state != entryStateClosed {
e.state = entryStateClosing
e.close = make(chan struct{})
}
curState = e.state
e.mx.Unlock()
return
}
func (e *entry) setActive(chClose bool) {
e.mx.Lock()
defer e.mx.Unlock()
if chClose {
close(e.close)
}
e.state = entryStateActive
}
func (e *entry) setClosed() {
e.mx.Lock()
defer e.mx.Unlock()
close(e.close)
e.state = entryStateClosed
}

View file

@ -44,12 +44,6 @@ var WithGCPeriod = func(gcPeriod time.Duration) Option {
}
}
var WithRefCounter = func(enable bool) Option {
return func(cache *oCache) {
cache.refCounter = enable
}
}
func New(loadFunc LoadFunc, opts ...Option) OCache {
c := &oCache{
data: make(map[string]*entry),
@ -73,33 +67,7 @@ func New(loadFunc LoadFunc, opts ...Option) OCache {
type Object interface {
Close() (err error)
}
type ObjectLocker interface {
Object
Locked() bool
}
type ObjectLastUsage interface {
LastUsage() time.Time
}
type entry struct {
id string
lastUsage time.Time
refCount uint32
isClosing bool
load chan struct{}
loadErr error
value Object
close chan struct{}
}
func (e *entry) locked() bool {
if locker, ok := e.value.(ObjectLocker); ok {
return locker.Locked()
}
return false
TryClose(objectTTL time.Duration) (res bool, err error)
}
type OCache interface {
@ -116,12 +84,8 @@ type OCache interface {
// Add adds new object to cache
// Returns error when object exists
Add(id string, value Object) (err error)
// Release decreases the refs counter
Release(id string) bool
// Reset sets refs counter to 0
Reset(id string) bool
// Remove closes and removes object
Remove(id string) (ok bool, err error)
Remove(ctx context.Context, id string) (ok bool, err error)
// ForEach iterates over all loaded objects, breaks when callback returns false
ForEach(f func(v Object) (isContinue bool))
// GC frees not used and expired objects
@ -134,17 +98,16 @@ type OCache interface {
}
type oCache struct {
mu sync.Mutex
data map[string]*entry
loadFunc LoadFunc
timeNow func() time.Time
ttl time.Duration
gc time.Duration
closed bool
closeCh chan struct{}
log *zap.SugaredLogger
metrics *metrics
refCounter bool
mu sync.Mutex
data map[string]*entry
loadFunc LoadFunc
timeNow func() time.Time
ttl time.Duration
gc time.Duration
closed bool
closeCh chan struct{}
log *zap.SugaredLogger
metrics *metrics
}
func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) {
@ -160,69 +123,36 @@ Load:
return nil, ErrClosed
}
if e, ok = c.data[id]; !ok {
e = newEntry(id, nil, entryStateLoading)
load = true
e = &entry{
id: id,
load: make(chan struct{}),
}
c.data[id] = e
}
closing := e.isClosing
if !e.isClosing {
e.lastUsage = c.timeNow()
if c.refCounter {
e.refCount++
}
}
e.lastUsage = time.Now()
c.mu.Unlock()
if closing {
select {
case <-ctx.Done():
log.DebugCtx(ctx, "ctx done while waiting on object close", zap.String("id", id))
return nil, ctx.Err()
case <-e.close:
goto Load
}
reload, err := e.waitClose(ctx, id)
if err != nil {
return nil, err
}
if reload {
goto Load
}
if load {
go c.load(ctx, id, e)
}
if c.metrics != nil {
if load {
c.metrics.miss.Inc()
} else {
c.metrics.hit.Inc()
}
}
select {
case <-ctx.Done():
log.DebugCtx(ctx, "ctx done while waiting on object load", zap.String("id", id))
return nil, ctx.Err()
case <-e.load:
}
return e.value, e.loadErr
c.metricsGet(!load)
return e.waitLoad(ctx, id)
}
func (c *oCache) Pick(ctx context.Context, id string) (value Object, err error) {
c.mu.Lock()
val, ok := c.data[id]
if !ok || val.isClosing {
if !ok || val.isClosing() {
c.mu.Unlock()
return nil, ErrNotExists
}
c.mu.Unlock()
if c.metrics != nil {
c.metrics.hit.Inc()
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-val.load:
return val.value, val.loadErr
}
c.metricsGet(true)
return val.waitLoad(ctx, id)
}
func (c *oCache) load(ctx context.Context, id string, e *entry) {
@ -236,63 +166,39 @@ func (c *oCache) load(ctx context.Context, id string, e *entry) {
delete(c.data, id)
} else {
e.value = value
e.setActive(false)
}
}
func (c *oCache) Release(id string) bool {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return false
}
if e, ok := c.data[id]; ok {
if c.refCounter && e.refCount > 0 {
e.refCount--
return true
}
}
return false
}
func (c *oCache) Reset(id string) bool {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return false
}
if e, ok := c.data[id]; ok {
e.refCount = 0
return true
}
return false
}
func (c *oCache) Remove(id string) (ok bool, err error) {
func (c *oCache) Remove(ctx context.Context, id string) (ok bool, err error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
err = ErrClosed
return
}
var e *entry
e, ok = c.data[id]
if !ok || e.isClosing {
e, ok := c.data[id]
if !ok {
c.mu.Unlock()
return
return false, ErrNotExists
}
e.isClosing = true
e.close = make(chan struct{})
c.mu.Unlock()
return c.remove(ctx, e)
}
<-e.load
if e.value != nil {
func (c *oCache) remove(ctx context.Context, e *entry) (ok bool, err error) {
if _, err = e.waitLoad(ctx, e.id); err != nil {
return false, err
}
_, curState := e.setClosing(true)
if curState == entryStateClosing {
ok = true
err = e.value.Close()
c.mu.Lock()
e.setClosed()
delete(c.data, e.id)
c.mu.Unlock()
}
c.mu.Lock()
close(e.close)
delete(c.data, e.id)
c.mu.Unlock()
return
}
@ -314,13 +220,7 @@ func (c *oCache) Add(id string, value Object) (err error) {
if _, ok := c.data[id]; ok {
return ErrExists
}
e := &entry{
id: id,
lastUsage: time.Now(),
refCount: 0,
load: make(chan struct{}),
value: value,
}
e := newEntry(id, value, entryStateActive)
close(e.load)
c.data[id] = e
return
@ -332,7 +232,7 @@ func (c *oCache) ForEach(f func(obj Object) (isContinue bool)) {
for _, v := range c.data {
select {
case <-v.load:
if v.value != nil && !v.isClosing {
if v.value != nil && !v.isClosing() {
objects = append(objects, v.value)
}
default:
@ -368,40 +268,35 @@ func (c *oCache) GC() {
deadline := c.timeNow().Add(-c.ttl)
var toClose []*entry
for _, e := range c.data {
if e.isClosing {
continue
}
lu := e.lastUsage
if lug, ok := e.value.(ObjectLastUsage); ok {
lu = lug.LastUsage()
}
if !e.locked() && e.refCount <= 0 && lu.Before(deadline) {
e.isClosing = true
if e.isActive() && e.lastUsage.Before(deadline) {
e.close = make(chan struct{})
toClose = append(toClose, e)
}
}
size := len(c.data)
c.mu.Unlock()
closedNum := 0
for _, e := range toClose {
<-e.load
if e.value != nil {
if err := e.value.Close(); err != nil {
c.log.With("object_id", e.id).Warnf("GC: object close error: %v", err)
}
prevState, _ := e.setClosing(false)
if prevState == entryStateClosing || prevState == entryStateClosed {
continue
}
closed, err := e.value.TryClose(c.ttl)
if err != nil {
c.log.With("object_id", e.id).Warnf("GC: object close error: %v", err)
}
if !closed {
e.setActive(true)
continue
} else {
closedNum++
c.mu.Lock()
e.setClosed()
delete(c.data, e.id)
c.mu.Unlock()
}
}
c.log.Infof("GC: removed %d; cache size: %d", len(toClose), size)
if len(toClose) > 0 && c.metrics != nil {
c.metrics.gc.Add(float64(len(toClose)))
}
c.mu.Lock()
for _, e := range toClose {
close(e.close)
delete(c.data, e.id)
}
c.mu.Unlock()
c.metricsClosed(closedNum, size)
}
func (c *oCache) Len() int {
@ -418,25 +313,34 @@ func (c *oCache) Close() (err error) {
}
c.closed = true
close(c.closeCh)
var toClose, alreadyClosing []*entry
var toClose []*entry
for _, e := range c.data {
if e.isClosing {
alreadyClosing = append(alreadyClosing, e)
} else {
toClose = append(toClose, e)
}
toClose = append(toClose, e)
}
c.mu.Unlock()
for _, e := range toClose {
<-e.load
if e.value != nil {
if clErr := e.value.Close(); clErr != nil {
c.log.With("object_id", e.id).Warnf("cache close: object close error: %v", clErr)
}
if _, err := c.remove(context.Background(), e); err != nil && err != ErrNotExists {
c.log.With("object_id", e.id).Warnf("cache close: object close error: %v", err)
}
}
for _, e := range alreadyClosing {
<-e.close
}
return nil
}
func (c *oCache) metricsGet(hit bool) {
if c.metrics == nil {
return
}
if hit {
c.metrics.hit.Inc()
} else {
c.metrics.miss.Inc()
}
}
func (c *oCache) metricsClosed(closedLen, size int) {
c.log.Infof("GC: removed %d; cache size: %d", closedLen, size)
if c.metrics == nil || closedLen == 0 {
return
}
c.metrics.gc.Add(float64(closedLen))
}

View file

@ -3,6 +3,8 @@ package ocache
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
@ -11,26 +13,48 @@ import (
"github.com/stretchr/testify/require"
)
var ctx = context.Background()
type testObject struct {
name string
closeErr error
closeCh chan struct{}
name string
closeErr error
closeCh chan struct{}
tryReturn bool
closeCalled bool
tryCloseCalled bool
}
func NewTestObject(name string, closeCh chan struct{}) *testObject {
func NewTestObject(name string, tryReturn bool, closeCh chan struct{}) *testObject {
return &testObject{
name: name,
closeCh: closeCh,
name: name,
closeCh: closeCh,
tryReturn: tryReturn,
}
}
func (t *testObject) Close() (err error) {
if t.closeCalled || (t.tryCloseCalled && t.tryReturn) {
panic("close called twice")
}
t.closeCalled = true
if t.closeCh != nil {
<-t.closeCh
}
return t.closeErr
}
func (t *testObject) TryClose(objectTTL time.Duration) (res bool, err error) {
if t.closeCalled || (t.tryCloseCalled && t.tryReturn) {
panic("close called twice")
}
t.tryCloseCalled = true
if t.closeCh != nil {
<-t.closeCh
return t.tryReturn, t.closeErr
}
return t.tryReturn, nil
}
func TestOCache_Get(t *testing.T) {
t.Run("successful", func(t *testing.T) {
c := New(func(ctx context.Context, id string) (value Object, err error) {
@ -116,42 +140,37 @@ func TestOCache_Get(t *testing.T) {
}
func TestOCache_GC(t *testing.T) {
t.Run("test without close wait", func(t *testing.T) {
t.Run("test gc expired object", func(t *testing.T) {
c := New(func(ctx context.Context, id string) (value Object, err error) {
return &testObject{name: id}, nil
}, WithTTL(time.Millisecond*10), WithRefCounter(true))
return NewTestObject(id, true, nil), nil
}, WithTTL(time.Millisecond*10))
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, 1, c.Len())
c.GC()
assert.Equal(t, 1, c.Len())
time.Sleep(time.Millisecond * 30)
c.GC()
assert.Equal(t, 1, c.Len())
assert.True(t, c.Release("id"))
time.Sleep(time.Millisecond * 20)
c.GC()
assert.Equal(t, 0, c.Len())
assert.False(t, c.Release("id"))
})
t.Run("test with close wait", func(t *testing.T) {
t.Run("test gc tryClose true, close before get", func(t *testing.T) {
closeCh := make(chan struct{})
getCh := make(chan struct{})
c := New(func(ctx context.Context, id string) (value Object, err error) {
return NewTestObject(id, closeCh), nil
}, WithTTL(time.Millisecond*10), WithRefCounter(true))
return NewTestObject(id, true, closeCh), nil
}, WithTTL(time.Millisecond*10))
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, 1, c.Len())
assert.True(t, c.Release("id"))
// making ttl pass
time.Sleep(time.Millisecond * 40)
time.Sleep(time.Millisecond * 20)
// first gc will be run after 20 secs, so calling it manually
go c.GC()
// waiting until all objects are marked as closing
time.Sleep(time.Millisecond * 40)
time.Sleep(time.Millisecond * 20)
var events []string
go func() {
_, err := c.Get(context.TODO(), "id")
@ -160,47 +179,313 @@ func TestOCache_GC(t *testing.T) {
events = append(events, "get")
close(getCh)
}()
events = append(events, "close")
// sleeping to make sure that Get is called
time.Sleep(time.Millisecond * 40)
time.Sleep(time.Millisecond * 20)
events = append(events, "close")
close(closeCh)
<-getCh
require.Equal(t, []string{"close", "get"}, events)
})
t.Run("test gc tryClose false, many parallel get", func(t *testing.T) {
timesCalled := &atomic.Int32{}
obj := NewTestObject("id", false, nil)
c := New(func(ctx context.Context, id string) (value Object, err error) {
timesCalled.Add(1)
return obj, nil
}, WithTTL(0))
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, 1, c.Len())
begin := make(chan struct{})
wg := sync.WaitGroup{}
once := sync.Once{}
wg.Add(1)
go func() {
<-begin
c.GC()
wg.Done()
}()
for i := 0; i < 50; i++ {
wg.Add(1)
go func(i int) {
once.Do(func() {
close(begin)
})
if i%2 != 0 {
time.Sleep(time.Millisecond)
}
_, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
wg.Done()
}(i)
}
require.NoError(t, err)
wg.Wait()
require.Equal(t, timesCalled.Load(), int32(1))
require.True(t, obj.tryCloseCalled)
})
t.Run("test gc tryClose different, many objects", func(t *testing.T) {
tryCloseIds := make(map[string]bool)
called := make(map[string]int)
max := 1000
getId := func(i int) string {
return fmt.Sprintf("id%d", i)
}
for i := 0; i < max; i++ {
if i%2 == 1 {
tryCloseIds[getId(i)] = true
} else {
tryCloseIds[getId(i)] = false
}
}
c := New(func(ctx context.Context, id string) (value Object, err error) {
called[id] = called[id] + 1
return NewTestObject(id, tryCloseIds[id], nil), nil
}, WithTTL(time.Millisecond*10))
for i := 0; i < max; i++ {
val, err := c.Get(context.TODO(), getId(i))
require.NoError(t, err)
require.NotNil(t, val)
}
assert.Equal(t, max, c.Len())
time.Sleep(time.Millisecond * 20)
c.GC()
for i := 0; i < max; i++ {
val, err := c.Get(context.TODO(), getId(i))
require.NoError(t, err)
require.NotNil(t, val)
}
for i := 0; i < max; i++ {
val, err := c.Get(context.TODO(), getId(i))
require.NoError(t, err)
require.NotNil(t, val)
require.Equal(t, called[getId(i)], i%2+1)
}
})
}
func Test_OCache_Remove(t *testing.T) {
closeCh := make(chan struct{})
getCh := make(chan struct{})
t.Run("remove simple", func(t *testing.T) {
closeCh := make(chan struct{})
getCh := make(chan struct{})
c := New(func(ctx context.Context, id string) (value Object, err error) {
return NewTestObject(id, false, closeCh), nil
}, WithTTL(time.Millisecond*10))
c := New(func(ctx context.Context, id string) (value Object, err error) {
return NewTestObject(id, closeCh), nil
}, WithTTL(time.Millisecond*10))
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, 1, c.Len())
// removing the object, so we will wait on closing
go func() {
_, err := c.Remove("id")
require.NoError(t, err)
}()
time.Sleep(time.Millisecond * 40)
var events []string
go func() {
_, err := c.Get(context.TODO(), "id")
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
events = append(events, "get")
close(getCh)
}()
events = append(events, "close")
// sleeping to make sure that Get is called
time.Sleep(time.Millisecond * 40)
close(closeCh)
assert.Equal(t, 1, c.Len())
// removing the object, so we will wait on closing
go func() {
_, err := c.Remove(ctx, "id")
require.NoError(t, err)
}()
time.Sleep(time.Millisecond * 20)
<-getCh
require.Equal(t, []string{"close", "get"}, events)
var events []string
go func() {
_, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
events = append(events, "get")
close(getCh)
}()
// sleeping to make sure that Get is called
time.Sleep(time.Millisecond * 20)
events = append(events, "close")
close(closeCh)
<-getCh
require.Equal(t, []string{"close", "get"}, events)
})
t.Run("test remove while gc, tryClose false", func(t *testing.T) {
closeCh := make(chan struct{})
removeCh := make(chan struct{})
c := New(func(ctx context.Context, id string) (value Object, err error) {
return NewTestObject(id, false, closeCh), nil
}, WithTTL(time.Millisecond*10))
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, 1, c.Len())
time.Sleep(time.Millisecond * 20)
go c.GC()
time.Sleep(time.Millisecond * 20)
var events []string
go func() {
ok, err := c.Remove(ctx, "id")
require.NoError(t, err)
require.True(t, ok)
events = append(events, "remove")
close(removeCh)
}()
time.Sleep(time.Millisecond * 20)
events = append(events, "close")
close(closeCh)
<-removeCh
require.Equal(t, []string{"close", "remove"}, events)
})
t.Run("test remove while gc, tryClose true", func(t *testing.T) {
closeCh := make(chan struct{})
removeCh := make(chan struct{})
c := New(func(ctx context.Context, id string) (value Object, err error) {
return NewTestObject(id, true, closeCh), nil
}, WithTTL(time.Millisecond*10))
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, 1, c.Len())
time.Sleep(time.Millisecond * 20)
go c.GC()
time.Sleep(time.Millisecond * 20)
var events []string
go func() {
ok, err := c.Remove(ctx, "id")
require.NoError(t, err)
require.False(t, ok)
events = append(events, "remove")
close(removeCh)
}()
time.Sleep(time.Millisecond * 20)
events = append(events, "close")
close(closeCh)
<-removeCh
require.Equal(t, []string{"close", "remove"}, events)
})
t.Run("test gc while remove, tryClose true", func(t *testing.T) {
closeCh := make(chan struct{})
removeCh := make(chan struct{})
c := New(func(ctx context.Context, id string) (value Object, err error) {
return NewTestObject(id, true, closeCh), nil
}, WithTTL(time.Millisecond*10))
val, err := c.Get(context.TODO(), "id")
require.NoError(t, err)
require.NotNil(t, val)
assert.Equal(t, 1, c.Len())
go func() {
ok, err := c.Remove(ctx, "id")
require.NoError(t, err)
require.True(t, ok)
close(removeCh)
}()
time.Sleep(20 * time.Millisecond)
c.GC()
close(closeCh)
<-removeCh
})
}
func TestOCacheFuzzy(t *testing.T) {
t.Run("test many objects gc, get and remove simultaneously, close after", func(t *testing.T) {
tryCloseIds := make(map[string]bool)
max := 2000
getId := func(i int) string {
return fmt.Sprintf("id%d", i)
}
for i := 0; i < max; i++ {
if i%2 == 1 {
tryCloseIds[getId(i)] = true
} else {
tryCloseIds[getId(i)] = false
}
}
c := New(func(ctx context.Context, id string) (value Object, err error) {
return NewTestObject(id, tryCloseIds[id], nil), nil
}, WithTTL(time.Nanosecond))
stopGC := make(chan struct{})
wg := sync.WaitGroup{}
go func() {
for {
select {
case <-stopGC:
return
default:
c.GC()
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10; j++ {
for i := 0; i < max; i++ {
val, err := c.Get(context.TODO(), getId(i))
require.NoError(t, err)
require.NotNil(t, val)
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10; j++ {
for i := 0; i < max; i++ {
c.Remove(ctx, getId(i))
}
}
}()
wg.Wait()
close(stopGC)
err := c.Close()
require.NoError(t, err)
require.Equal(t, 0, c.Len())
})
t.Run("test many objects gc, get, remove and close simultaneously", func(t *testing.T) {
tryCloseIds := make(map[string]bool)
max := 2000
getId := func(i int) string {
return fmt.Sprintf("id%d", i)
}
for i := 0; i < max; i++ {
if i%2 == 1 {
tryCloseIds[getId(i)] = true
} else {
tryCloseIds[getId(i)] = false
}
}
c := New(func(ctx context.Context, id string) (value Object, err error) {
return NewTestObject(id, tryCloseIds[id], nil), nil
}, WithTTL(time.Nanosecond))
go func() {
for {
c.GC()
}
}()
go func() {
for j := 0; j < 10; j++ {
for i := 0; i < max; i++ {
val, err := c.Get(context.TODO(), getId(i))
if err == ErrClosed {
return
}
require.NoError(t, err)
require.NotNil(t, val)
}
}
}()
go func() {
for j := 0; j < 10; j++ {
for i := 0; i < max; i++ {
c.Remove(ctx, getId(i))
}
}
}()
time.Sleep(time.Millisecond)
err := c.Close()
require.NoError(t, err)
require.Equal(t, 0, c.Len())
})
}

View file

@ -51,6 +51,10 @@ func (p pushSpaceRequestMatcher) String() string {
type mockPeer struct{}
func (m mockPeer) TryClose(objectTTL time.Duration) (res bool, err error) {
return true, m.Close()
}
func (m mockPeer) Id() string {
return "mockId"
}

View file

@ -88,7 +88,7 @@ func (a *aclRecordBuilder) BuildUserJoin(acceptPrivKeyBytes []byte, encSymKeyByt
Identity: state.Identity(),
Data: marshalledJoin,
CurrentReadKeyHash: state.CurrentReadKeyHash(),
Timestamp: time.Now().UnixNano(),
Timestamp: time.Now().Unix(),
}
marshalledRecord, err := aclRecord.Marshal()
if err != nil {

View file

@ -126,7 +126,7 @@ func (t *AclListStorageBuilder) parseRecord(rec *Record, prevId string) *aclreco
Identity: []byte(t.keychain.GetIdentity(rec.Identity)),
Data: bytes,
CurrentReadKeyHash: k.Hash,
Timestamp: time.Now().UnixNano(),
Timestamp: time.Now().Unix(),
}
}

View file

@ -3,6 +3,7 @@ package objecttree
import (
"errors"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/gogo/protobuf/proto"
)
var (
@ -48,6 +49,11 @@ func NewChange(id string, ch *treechangeproto.TreeChange, signature []byte) *Cha
}
func NewChangeFromRoot(id string, ch *treechangeproto.RootChange, signature []byte) *Change {
changeInfo := &treechangeproto.TreeChangeInfo{
ChangeType: ch.ChangeType,
ChangePayload: ch.ChangePayload,
}
data, _ := proto.Marshal(changeInfo)
return &Change{
Next: nil,
AclHeadId: ch.AclHeadId,
@ -56,7 +62,8 @@ func NewChangeFromRoot(id string, ch *treechangeproto.RootChange, signature []by
Timestamp: ch.Timestamp,
Identity: string(ch.Identity),
Signature: signature,
Data: []byte(ch.ChangeType),
Data: data,
Model: changeInfo,
}
}

View file

@ -26,13 +26,14 @@ type BuilderContent struct {
}
type InitialContent struct {
AclHeadId string
Identity []byte
SigningKey signingkey.PrivKey
SpaceId string
Seed []byte
ChangeType string
Timestamp int64
AclHeadId string
Identity []byte
SigningKey signingkey.PrivKey
SpaceId string
Seed []byte
ChangeType string
ChangePayload []byte
Timestamp int64
}
type nonVerifiableChangeBuilder struct {
@ -122,41 +123,35 @@ func (c *changeBuilder) SetRootRawChange(rawIdChange *treechangeproto.RawTreeCha
func (c *changeBuilder) BuildRoot(payload InitialContent) (ch *Change, rawIdChange *treechangeproto.RawTreeChangeWithId, err error) {
change := &treechangeproto.RootChange{
AclHeadId: payload.AclHeadId,
Timestamp: payload.Timestamp,
Identity: payload.Identity,
ChangeType: payload.ChangeType,
SpaceId: payload.SpaceId,
Seed: payload.Seed,
AclHeadId: payload.AclHeadId,
Timestamp: payload.Timestamp,
Identity: payload.Identity,
ChangeType: payload.ChangeType,
ChangePayload: payload.ChangePayload,
SpaceId: payload.SpaceId,
Seed: payload.Seed,
}
marshalledChange, err := proto.Marshal(change)
if err != nil {
return
}
signature, err := payload.SigningKey.Sign(marshalledChange)
if err != nil {
return
}
raw := &treechangeproto.RawTreeChange{
Payload: marshalledChange,
Signature: signature,
}
marshalledRawChange, err := proto.Marshal(raw)
if err != nil {
return
}
id, err := cidutil.NewCidFromBytes(marshalledRawChange)
if err != nil {
return
}
ch = NewChangeFromRoot(id, change, signature)
rawIdChange = &treechangeproto.RawTreeChangeWithId{
RawChange: marshalledRawChange,
Id: id,
@ -170,7 +165,7 @@ func (c *changeBuilder) Build(payload BuilderContent) (ch *Change, rawIdChange *
AclHeadId: payload.AclHeadId,
SnapshotBaseId: payload.SnapshotBaseId,
CurrentReadKeyHash: payload.CurrentReadKeyHash,
Timestamp: time.Now().UnixNano(),
Timestamp: time.Now().Unix(),
Identity: payload.Identity,
IsSnapshot: payload.IsSnapshot,
}
@ -184,34 +179,27 @@ func (c *changeBuilder) Build(payload BuilderContent) (ch *Change, rawIdChange *
} else {
change.ChangesData = payload.Content
}
marshalledChange, err := proto.Marshal(change)
if err != nil {
return
}
signature, err := payload.SigningKey.Sign(marshalledChange)
if err != nil {
return
}
raw := &treechangeproto.RawTreeChange{
Payload: marshalledChange,
Signature: signature,
}
marshalledRawChange, err := proto.Marshal(raw)
if err != nil {
return
}
id, err := cidutil.NewCidFromBytes(marshalledRawChange)
if err != nil {
return
}
ch = NewChange(id, change, signature)
rawIdChange = &treechangeproto.RawTreeChangeWithId{
RawChange: marshalledRawChange,
Id: id,
@ -264,7 +252,6 @@ func (c *changeBuilder) unmarshallRawChange(raw *treechangeproto.RawTreeChange,
ch = NewChangeFromRoot(id, unmarshalled, raw.Signature)
return
}
unmarshalled := &treechangeproto.TreeChange{}
err = proto.Unmarshal(raw.Payload, unmarshalled)
if err != nil {

View file

@ -7,6 +7,7 @@ package mock_objecttree
import (
context "context"
reflect "reflect"
time "time"
list "github.com/anytypeio/any-sync/commonspace/object/acl/list"
objecttree "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
@ -82,6 +83,20 @@ func (mr *MockObjectTreeMockRecorder) AddRawChanges(arg0, arg1 interface{}) *gom
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChanges", reflect.TypeOf((*MockObjectTree)(nil).AddRawChanges), arg0, arg1)
}
// ChangeInfo mocks base method.
func (m *MockObjectTree) ChangeInfo() *treechangeproto.TreeChangeInfo {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChangeInfo")
ret0, _ := ret[0].(*treechangeproto.TreeChangeInfo)
return ret0
}
// ChangeInfo indicates an expected call of ChangeInfo.
func (mr *MockObjectTreeMockRecorder) ChangeInfo() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChangeInfo", reflect.TypeOf((*MockObjectTree)(nil).ChangeInfo))
}
// ChangesAfterCommonSnapshot mocks base method.
func (m *MockObjectTree) ChangesAfterCommonSnapshot(arg0, arg1 []string) ([]*treechangeproto.RawTreeChangeWithId, error) {
m.ctrl.T.Helper()
@ -336,6 +351,21 @@ func (mr *MockObjectTreeMockRecorder) Storage() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Storage", reflect.TypeOf((*MockObjectTree)(nil).Storage))
}
// TryClose mocks base method.
func (m *MockObjectTree) TryClose(arg0 time.Duration) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TryClose", arg0)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// TryClose indicates an expected call of TryClose.
func (mr *MockObjectTreeMockRecorder) TryClose(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryClose", reflect.TypeOf((*MockObjectTree)(nil).TryClose), arg0)
}
// TryLock mocks base method.
func (m *MockObjectTree) TryLock() bool {
m.ctrl.T.Helper()

View file

@ -5,6 +5,7 @@ import (
"context"
"errors"
"sync"
"time"
"github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto"
"github.com/anytypeio/any-sync/commonspace/object/acl/list"
@ -52,6 +53,7 @@ type ReadableObjectTree interface {
Id() string
Header() *treechangeproto.RawTreeChangeWithId
UnmarshalledHeader() *Change
ChangeInfo() *treechangeproto.TreeChangeInfo
Heads() []string
Root() *Change
@ -81,6 +83,7 @@ type ObjectTree interface {
Delete() error
Close() error
TryClose(objectTTL time.Duration) (bool, error)
}
type objectTree struct {
@ -142,6 +145,10 @@ func (ot *objectTree) UnmarshalledHeader() *Change {
return ot.root
}
func (ot *objectTree) ChangeInfo() *treechangeproto.TreeChangeInfo {
return ot.root.Model.(*treechangeproto.TreeChangeInfo)
}
func (ot *objectTree) Storage() treestorage.TreeStorage {
return ot.treeStorage
}
@ -555,6 +562,10 @@ func (ot *objectTree) Root() *Change {
return ot.tree.Root()
}
func (ot *objectTree) TryClose(objectTTL time.Duration) (bool, error) {
return true, ot.Close()
}
func (ot *objectTree) Close() error {
return nil
}

View file

@ -12,11 +12,12 @@ import (
)
type ObjectTreeCreatePayload struct {
SignKey signingkey.PrivKey
ChangeType string
SpaceId string
Identity []byte
IsEncrypted bool
SignKey signingkey.PrivKey
ChangeType string
ChangePayload []byte
SpaceId string
Identity []byte
IsEncrypted bool
}
type HistoryTreeParams struct {
@ -75,7 +76,7 @@ func CreateObjectTreeRoot(payload ObjectTreeCreatePayload, aclList list.AclList)
if err != nil {
return
}
return createObjectTreeRoot(payload, time.Now().UnixNano(), bytes, aclList)
return createObjectTreeRoot(payload, time.Now().Unix(), bytes, aclList)
}
func DeriveObjectTreeRoot(payload ObjectTreeCreatePayload, aclList list.AclList) (root *treechangeproto.RawTreeChangeWithId, err error) {
@ -125,7 +126,7 @@ func CreateObjectTree(
if err != nil {
return
}
return createObjectTree(payload, time.Now().UnixNano(), bytes, aclList, createStorage)
return createObjectTree(payload, time.Now().Unix(), bytes, aclList, createStorage)
}
func createObjectTree(
@ -165,13 +166,14 @@ func createObjectTreeRoot(
return
}
cnt := InitialContent{
AclHeadId: aclHeadId,
Identity: payload.Identity,
SigningKey: payload.SignKey,
SpaceId: payload.SpaceId,
ChangeType: payload.ChangeType,
Timestamp: timestamp,
Seed: seed,
AclHeadId: aclHeadId,
Identity: payload.Identity,
SigningKey: payload.SignKey,
SpaceId: payload.SpaceId,
ChangeType: payload.ChangeType,
ChangePayload: payload.ChangePayload,
Timestamp: timestamp,
Seed: seed,
}
_, root, err = NewChangeBuilder(keychain.NewKeychain(), nil).BuildRoot(cnt)

View file

@ -125,7 +125,7 @@ func TestTree_Hash(t *testing.T) {
}
func TestTree_AddFuzzy(t *testing.T) {
rand.Seed(time.Now().UnixNano())
rand.Seed(time.Now().Unix())
getChanges := func() []*Change {
changes := []*Change{
newChange("1", "root", "root"),

View file

@ -7,6 +7,7 @@ package mock_synctree
import (
context "context"
reflect "reflect"
time "time"
list "github.com/anytypeio/any-sync/commonspace/object/acl/list"
objecttree "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
@ -193,6 +194,20 @@ func (mr *MockSyncTreeMockRecorder) AddRawChanges(arg0, arg1 interface{}) *gomoc
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChanges", reflect.TypeOf((*MockSyncTree)(nil).AddRawChanges), arg0, arg1)
}
// ChangeInfo mocks base method.
func (m *MockSyncTree) ChangeInfo() *treechangeproto.TreeChangeInfo {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChangeInfo")
ret0, _ := ret[0].(*treechangeproto.TreeChangeInfo)
return ret0
}
// ChangeInfo indicates an expected call of ChangeInfo.
func (mr *MockSyncTreeMockRecorder) ChangeInfo() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChangeInfo", reflect.TypeOf((*MockSyncTree)(nil).ChangeInfo))
}
// ChangesAfterCommonSnapshot mocks base method.
func (m *MockSyncTree) ChangesAfterCommonSnapshot(arg0, arg1 []string) ([]*treechangeproto.RawTreeChangeWithId, error) {
m.ctrl.T.Helper()
@ -487,6 +502,21 @@ func (mr *MockSyncTreeMockRecorder) SyncWithPeer(arg0, arg1 interface{}) *gomock
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncWithPeer", reflect.TypeOf((*MockSyncTree)(nil).SyncWithPeer), arg0, arg1)
}
// TryClose mocks base method.
func (m *MockSyncTree) TryClose(arg0 time.Duration) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TryClose", arg0)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// TryClose indicates an expected call of TryClose.
func (mr *MockSyncTreeMockRecorder) TryClose(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryClose", reflect.TypeOf((*MockSyncTree)(nil).TryClose), arg0)
}
// TryLock mocks base method.
func (m *MockSyncTree) TryLock() bool {
m.ctrl.T.Helper()

View file

@ -3,6 +3,7 @@ package synctree
import (
"context"
"errors"
"time"
"github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/object/acl/list"
@ -209,6 +210,10 @@ func (s *syncTree) Delete() (err error) {
return
}
func (s *syncTree) TryClose(objectTTL time.Duration) (bool, error) {
return true, s.Close()
}
func (s *syncTree) Close() (err error) {
log.Debug("closing sync tree", zap.String("id", s.Id()))
defer func() {

View file

@ -16,6 +16,8 @@ message RootChange {
bytes seed = 5;
// Identity is a public key of the tree's creator
bytes identity = 6;
// ChangePayload is a payload related to ChangeType
bytes changePayload = 7;
}
// TreeChange is a change of a tree
@ -94,3 +96,9 @@ message TreeFullSyncResponse {
message TreeErrorResponse {
string error = 1;
}
// TreeChangeInfo is used internally in Tree implementation for ease of marshalling
message TreeChangeInfo {
string changeType = 1;
bytes changePayload = 2;
}

View file

@ -36,6 +36,8 @@ type RootChange struct {
Seed []byte `protobuf:"bytes,5,opt,name=seed,proto3" json:"seed,omitempty"`
// Identity is a public key of the tree's creator
Identity []byte `protobuf:"bytes,6,opt,name=identity,proto3" json:"identity,omitempty"`
// ChangePayload is a payload related to ChangeType
ChangePayload []byte `protobuf:"bytes,7,opt,name=changePayload,proto3" json:"changePayload,omitempty"`
}
func (m *RootChange) Reset() { *m = RootChange{} }
@ -113,6 +115,13 @@ func (m *RootChange) GetIdentity() []byte {
return nil
}
func (m *RootChange) GetChangePayload() []byte {
if m != nil {
return m.ChangePayload
}
return nil
}
// TreeChange is a change of a tree
type TreeChange struct {
// TreeHeadIds are previous ids for this TreeChange
@ -725,6 +734,59 @@ func (m *TreeErrorResponse) GetError() string {
return ""
}
// TreeChangeInfo is used internally in Tree implementation for ease of marshalling
type TreeChangeInfo struct {
ChangeType string `protobuf:"bytes,1,opt,name=changeType,proto3" json:"changeType,omitempty"`
ChangePayload []byte `protobuf:"bytes,2,opt,name=changePayload,proto3" json:"changePayload,omitempty"`
}
func (m *TreeChangeInfo) Reset() { *m = TreeChangeInfo{} }
func (m *TreeChangeInfo) String() string { return proto.CompactTextString(m) }
func (*TreeChangeInfo) ProtoMessage() {}
func (*TreeChangeInfo) Descriptor() ([]byte, []int) {
return fileDescriptor_5033f0301ef9b772, []int{10}
}
func (m *TreeChangeInfo) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *TreeChangeInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_TreeChangeInfo.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *TreeChangeInfo) XXX_Merge(src proto.Message) {
xxx_messageInfo_TreeChangeInfo.Merge(m, src)
}
func (m *TreeChangeInfo) XXX_Size() int {
return m.Size()
}
func (m *TreeChangeInfo) XXX_DiscardUnknown() {
xxx_messageInfo_TreeChangeInfo.DiscardUnknown(m)
}
var xxx_messageInfo_TreeChangeInfo proto.InternalMessageInfo
func (m *TreeChangeInfo) GetChangeType() string {
if m != nil {
return m.ChangeType
}
return ""
}
func (m *TreeChangeInfo) GetChangePayload() []byte {
if m != nil {
return m.ChangePayload
}
return nil
}
func init() {
proto.RegisterType((*RootChange)(nil), "treechange.RootChange")
proto.RegisterType((*TreeChange)(nil), "treechange.TreeChange")
@ -736,6 +798,7 @@ func init() {
proto.RegisterType((*TreeFullSyncRequest)(nil), "treechange.TreeFullSyncRequest")
proto.RegisterType((*TreeFullSyncResponse)(nil), "treechange.TreeFullSyncResponse")
proto.RegisterType((*TreeErrorResponse)(nil), "treechange.TreeErrorResponse")
proto.RegisterType((*TreeChangeInfo)(nil), "treechange.TreeChangeInfo")
}
func init() {
@ -743,48 +806,51 @@ func init() {
}
var fileDescriptor_5033f0301ef9b772 = []byte{
// 656 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x55, 0xc1, 0x6e, 0xd3, 0x40,
0x10, 0xf5, 0x3a, 0x69, 0xd3, 0x4e, 0xd3, 0x16, 0xb6, 0x3d, 0x58, 0x15, 0x18, 0xcb, 0x07, 0x08,
0x97, 0x56, 0x2a, 0x27, 0x10, 0x52, 0x45, 0x4b, 0x8b, 0xab, 0x0a, 0x84, 0xb6, 0x05, 0x24, 0x6e,
0x5b, 0x7b, 0x68, 0x8c, 0x12, 0xdb, 0x78, 0x37, 0x54, 0xf9, 0x00, 0x2e, 0x20, 0x21, 0x3e, 0x81,
0x6f, 0xe0, 0x0f, 0xb8, 0x71, 0xec, 0x91, 0x23, 0x6a, 0x7e, 0x04, 0xed, 0x3a, 0x4e, 0xd6, 0x6e,
0x0e, 0xbd, 0xf5, 0xb2, 0xc9, 0xbc, 0x9d, 0x79, 0xfb, 0xe6, 0xcd, 0x6e, 0x02, 0x3b, 0x61, 0xda,
0xef, 0xa7, 0x89, 0xc8, 0x78, 0x88, 0x5b, 0xe9, 0xe9, 0x47, 0x0c, 0xe5, 0x96, 0xcc, 0x11, 0xf5,
0x12, 0x76, 0x79, 0x72, 0x86, 0x59, 0x9e, 0xca, 0x74, 0x4b, 0xaf, 0xc2, 0x80, 0x37, 0x35, 0x42,
0x61, 0x8a, 0xf8, 0xbf, 0x08, 0x00, 0x4b, 0x53, 0xb9, 0xa7, 0x43, 0x7a, 0x07, 0x16, 0x79, 0xd8,
0x0b, 0x90, 0x47, 0x87, 0x91, 0x43, 0x3c, 0xd2, 0x59, 0x64, 0x53, 0x80, 0x3a, 0xd0, 0xd2, 0xa7,
0x1e, 0x46, 0x8e, 0xad, 0xf7, 0xca, 0x90, 0xba, 0x00, 0x05, 0xe1, 0xc9, 0x30, 0x43, 0xa7, 0xa1,
0x37, 0x0d, 0x44, 0xf1, 0xca, 0xb8, 0x8f, 0x42, 0xf2, 0x7e, 0xe6, 0x34, 0x3d, 0xd2, 0x69, 0xb0,
0x29, 0x40, 0x29, 0x34, 0x05, 0x62, 0xe4, 0xcc, 0x79, 0xa4, 0xd3, 0x66, 0xfa, 0x3b, 0xdd, 0x80,
0x85, 0x38, 0xc2, 0x44, 0xc6, 0x72, 0xe8, 0xcc, 0x6b, 0x7c, 0x12, 0xfb, 0x3f, 0x6d, 0x80, 0x93,
0x1c, 0x71, 0x2c, 0xda, 0x83, 0x25, 0xd5, 0x51, 0x21, 0x52, 0x38, 0xc4, 0x6b, 0x74, 0x16, 0x99,
0x09, 0x55, 0xdb, 0xb2, 0xeb, 0x6d, 0xdd, 0x87, 0x15, 0x91, 0xf0, 0x4c, 0x74, 0x53, 0xb9, 0xcb,
0x85, 0xea, 0xae, 0x68, 0xa0, 0x86, 0xaa, 0x73, 0x8a, 0x96, 0xc4, 0x73, 0x2e, 0xb9, 0x6e, 0xa3,
0xcd, 0x4c, 0x88, 0x6e, 0x02, 0x0d, 0x07, 0x79, 0x8e, 0x89, 0x64, 0xc8, 0xa3, 0x23, 0x1c, 0x06,
0x5c, 0x74, 0x75, 0x5b, 0x4d, 0x36, 0x63, 0xa7, 0x6a, 0xcb, 0x7c, 0xdd, 0x16, 0xd3, 0x82, 0x56,
0xd5, 0x02, 0x65, 0x78, 0x2c, 0x8e, 0xc7, 0xfa, 0x9c, 0x05, 0x8f, 0x74, 0x16, 0x98, 0x81, 0xf8,
0x2f, 0x60, 0x99, 0xf1, 0x73, 0xc3, 0x24, 0x07, 0x5a, 0x19, 0x1f, 0xf6, 0x52, 0x5e, 0xcc, 0xb5,
0xcd, 0xca, 0x50, 0x89, 0x10, 0xf1, 0x59, 0xc2, 0xe5, 0x20, 0x47, 0x6d, 0x4e, 0x9b, 0x4d, 0x01,
0x7f, 0x0f, 0xd6, 0x2a, 0x44, 0xef, 0x62, 0xd9, 0x3d, 0xd4, 0x45, 0x39, 0x3f, 0x2f, 0xa0, 0x31,
0xe1, 0x14, 0xa0, 0x2b, 0x60, 0xc7, 0xa5, 0xd1, 0x76, 0x1c, 0xf9, 0xdf, 0x09, 0xac, 0x2a, 0x8a,
0xe3, 0x61, 0x12, 0xbe, 0x44, 0x21, 0xf8, 0x19, 0xd2, 0x27, 0xd0, 0x0a, 0xd3, 0x44, 0x62, 0x22,
0x75, 0xfd, 0xd2, 0xb6, 0xb7, 0x69, 0xdc, 0xd4, 0x32, 0x7b, 0xaf, 0x48, 0x79, 0xcb, 0x7b, 0x03,
0x64, 0x65, 0x01, 0xdd, 0x01, 0xc8, 0x27, 0x97, 0x56, 0x9f, 0xb3, 0xb4, 0x7d, 0xcf, 0x2c, 0x9f,
0x21, 0x99, 0x19, 0x25, 0xfe, 0x6f, 0x1b, 0xd6, 0x67, 0x1d, 0x41, 0x9f, 0x02, 0x74, 0x91, 0x47,
0x6f, 0xb2, 0x88, 0x4b, 0x1c, 0x0b, 0xdb, 0xa8, 0x0b, 0x0b, 0x26, 0x19, 0x81, 0xc5, 0x8c, 0x7c,
0x7a, 0x04, 0xab, 0x1f, 0x06, 0xbd, 0x9e, 0x62, 0x65, 0xf8, 0x69, 0x80, 0x42, 0xce, 0x12, 0xa7,
0x28, 0x0e, 0xaa, 0x69, 0x81, 0xc5, 0xea, 0x95, 0xf4, 0x15, 0xdc, 0x9a, 0x42, 0x22, 0x4b, 0x13,
0x51, 0xbc, 0xac, 0x19, 0x4e, 0x1d, 0xd4, 0xf2, 0x02, 0x8b, 0x5d, 0xa9, 0xa5, 0xfb, 0xb0, 0x8c,
0x79, 0x9e, 0xe6, 0x13, 0xb2, 0xa6, 0x26, 0xbb, 0x5b, 0x27, 0xdb, 0x37, 0x93, 0x02, 0x8b, 0x55,
0xab, 0x76, 0x5b, 0x30, 0xf7, 0x59, 0x59, 0xe5, 0x7f, 0x21, 0xb0, 0x52, 0x75, 0x83, 0xae, 0xc3,
0x9c, 0x72, 0xa3, 0x7c, 0x83, 0x45, 0x40, 0x1f, 0x43, 0x6b, 0xfc, 0x48, 0x1c, 0xdb, 0x6b, 0x5c,
0x67, 0x54, 0x65, 0x3e, 0xf5, 0xa1, 0x5d, 0x3e, 0xc2, 0xd7, 0x5c, 0x76, 0x9d, 0x86, 0xe6, 0xad,
0x60, 0xfe, 0x57, 0x02, 0x6b, 0x33, 0x2c, 0xbd, 0x19, 0x31, 0xdf, 0x48, 0x71, 0xb1, 0xea, 0x13,
0xb9, 0x19, 0x35, 0x0f, 0xe1, 0xf6, 0x95, 0x89, 0x2a, 0x25, 0x7a, 0xa2, 0xe3, 0xdf, 0xf7, 0x22,
0xd8, 0x7d, 0xf6, 0xe7, 0xd2, 0x25, 0x17, 0x97, 0x2e, 0xf9, 0x77, 0xe9, 0x92, 0x1f, 0x23, 0xd7,
0xba, 0x18, 0xb9, 0xd6, 0xdf, 0x91, 0x6b, 0xbd, 0x7f, 0x70, 0xcd, 0xff, 0x9b, 0xd3, 0x79, 0xfd,
0xf1, 0xe8, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x4e, 0xa5, 0xe2, 0x7c, 0xa1, 0x06, 0x00, 0x00,
// 690 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x55, 0xc1, 0x4e, 0xdb, 0x4a,
0x14, 0xf5, 0x38, 0x01, 0xc3, 0x25, 0xc0, 0x7b, 0x03, 0x0b, 0x0b, 0xbd, 0xe7, 0x5a, 0x56, 0xd5,
0xa6, 0x1b, 0x90, 0xe8, 0xaa, 0x55, 0x25, 0x54, 0x28, 0xd4, 0x11, 0x6a, 0x85, 0x06, 0x4a, 0xa5,
0xee, 0x06, 0xfb, 0x42, 0x5c, 0x25, 0xb6, 0xeb, 0x99, 0x14, 0xe5, 0x03, 0xba, 0x69, 0xa5, 0xaa,
0x9f, 0xd0, 0x5f, 0xe9, 0xae, 0x4b, 0x96, 0x2c, 0x2b, 0xf8, 0x91, 0xca, 0x63, 0x3b, 0xb1, 0x1d,
0x2f, 0xd8, 0xb1, 0x71, 0x72, 0x8f, 0xef, 0x3d, 0x73, 0xee, 0xb9, 0x33, 0x63, 0xd8, 0xf1, 0xa2,
0xe1, 0x30, 0x0a, 0x45, 0xcc, 0x3d, 0xdc, 0x8a, 0xce, 0x3e, 0xa2, 0x27, 0xb7, 0x64, 0x82, 0xa8,
0x1e, 0x5e, 0x9f, 0x87, 0x17, 0x18, 0x27, 0x91, 0x8c, 0xb6, 0xd4, 0x53, 0x94, 0xe0, 0x4d, 0x85,
0x50, 0x98, 0x22, 0xce, 0x35, 0x01, 0x60, 0x51, 0x24, 0xf7, 0x54, 0x48, 0xff, 0x83, 0x45, 0xee,
0x0d, 0x5c, 0xe4, 0x7e, 0xcf, 0x37, 0x89, 0x4d, 0xba, 0x8b, 0x6c, 0x0a, 0x50, 0x13, 0x0c, 0xb5,
0x6a, 0xcf, 0x37, 0x75, 0xf5, 0xae, 0x08, 0xa9, 0x05, 0x90, 0x11, 0x9e, 0x8c, 0x63, 0x34, 0x5b,
0xea, 0x65, 0x09, 0x49, 0x79, 0x65, 0x30, 0x44, 0x21, 0xf9, 0x30, 0x36, 0xdb, 0x36, 0xe9, 0xb6,
0xd8, 0x14, 0xa0, 0x14, 0xda, 0x02, 0xd1, 0x37, 0xe7, 0x6c, 0xd2, 0xed, 0x30, 0xf5, 0x9f, 0x6e,
0xc0, 0x42, 0xe0, 0x63, 0x28, 0x03, 0x39, 0x36, 0xe7, 0x15, 0x3e, 0x89, 0xe9, 0x43, 0x58, 0xce,
0xb8, 0x8f, 0xf8, 0x78, 0x10, 0x71, 0xdf, 0x34, 0x54, 0x42, 0x15, 0x74, 0x7e, 0xea, 0x00, 0x27,
0x09, 0x62, 0xde, 0x9a, 0x0d, 0x4b, 0x69, 0xdf, 0x59, 0x2b, 0xc2, 0x24, 0x76, 0xab, 0xbb, 0xc8,
0xca, 0x50, 0xb5, 0x79, 0xbd, 0xde, 0xfc, 0x23, 0x58, 0x11, 0x21, 0x8f, 0x45, 0x3f, 0x92, 0xbb,
0x5c, 0xa4, 0x1e, 0x64, 0x6d, 0xd6, 0xd0, 0x74, 0x9d, 0x4c, 0x87, 0x78, 0xc5, 0x25, 0x57, 0xcd,
0x76, 0x58, 0x19, 0xa2, 0x9b, 0x40, 0xbd, 0x51, 0x92, 0x60, 0x28, 0x19, 0x72, 0xff, 0x10, 0xc7,
0x2e, 0x17, 0x7d, 0xd5, 0x7c, 0x9b, 0x35, 0xbc, 0xa9, 0x9a, 0x37, 0x5f, 0x37, 0xaf, 0x6c, 0x94,
0x51, 0x33, 0xca, 0x02, 0x08, 0xc4, 0x71, 0xae, 0xcf, 0x5c, 0xb0, 0x49, 0x77, 0x81, 0x95, 0x10,
0xe7, 0x35, 0x2c, 0x33, 0x7e, 0x59, 0x32, 0xc9, 0x04, 0x23, 0xce, 0x3d, 0x25, 0x8a, 0xab, 0x08,
0x53, 0x11, 0x22, 0xb8, 0x08, 0xb9, 0x1c, 0x25, 0xa8, 0xcc, 0xe9, 0xb0, 0x29, 0xe0, 0xec, 0xc1,
0x5a, 0x85, 0xe8, 0x7d, 0x20, 0xfb, 0x3d, 0x55, 0x94, 0xf0, 0xcb, 0x0c, 0xca, 0x09, 0xa7, 0x00,
0x5d, 0x01, 0x3d, 0x28, 0x8c, 0xd6, 0x03, 0xdf, 0xf9, 0x4e, 0x60, 0x35, 0xa5, 0x38, 0x1e, 0x87,
0xde, 0x1b, 0x14, 0x82, 0x5f, 0x20, 0x7d, 0x0e, 0x86, 0x17, 0x85, 0x12, 0x43, 0xa9, 0xea, 0x97,
0xb6, 0xed, 0xcd, 0xd2, 0x7e, 0x2e, 0xb2, 0xf7, 0xb2, 0x94, 0x53, 0x3e, 0x18, 0x21, 0x2b, 0x0a,
0xe8, 0x0e, 0x40, 0x32, 0xd9, 0xda, 0x6a, 0x9d, 0xa5, 0xed, 0x07, 0xe5, 0xf2, 0x06, 0xc9, 0xac,
0x54, 0xe2, 0xfc, 0xd2, 0x61, 0xbd, 0x69, 0x09, 0xfa, 0x02, 0xa0, 0x8f, 0xdc, 0x7f, 0x17, 0xfb,
0x5c, 0x62, 0x2e, 0x6c, 0xa3, 0x2e, 0xcc, 0x9d, 0x64, 0xb8, 0x1a, 0x2b, 0xe5, 0xd3, 0x43, 0x58,
0x3d, 0x1f, 0x0d, 0x06, 0x29, 0x2b, 0xc3, 0x4f, 0x23, 0x14, 0xb2, 0x49, 0x5c, 0x4a, 0x71, 0x50,
0x4d, 0x73, 0x35, 0x56, 0xaf, 0xa4, 0x6f, 0xe1, 0x9f, 0x29, 0x24, 0xe2, 0x28, 0x14, 0xd9, 0xf9,
0x6b, 0x70, 0xea, 0xa0, 0x96, 0xe7, 0x6a, 0x6c, 0xa6, 0x96, 0xee, 0xc3, 0x32, 0x26, 0x49, 0x94,
0x4c, 0xc8, 0xda, 0x8a, 0xec, 0xff, 0x3a, 0xd9, 0x7e, 0x39, 0xc9, 0xd5, 0x58, 0xb5, 0x6a, 0xd7,
0x80, 0xb9, 0xcf, 0xa9, 0x55, 0xce, 0x17, 0x02, 0x2b, 0x55, 0x37, 0xe8, 0x3a, 0xcc, 0xa5, 0x6e,
0x14, 0x67, 0x30, 0x0b, 0xe8, 0x33, 0x30, 0xf2, 0x43, 0x62, 0xea, 0x76, 0xeb, 0x2e, 0xa3, 0x2a,
0xf2, 0xa9, 0x03, 0x9d, 0xe2, 0x10, 0x1e, 0x71, 0xd9, 0x37, 0x5b, 0x8a, 0xb7, 0x82, 0x39, 0x5f,
0x09, 0xac, 0x35, 0x58, 0x7a, 0x3f, 0x62, 0xbe, 0x91, 0x6c, 0x63, 0xd5, 0x27, 0x72, 0x3f, 0x6a,
0x9e, 0xc0, 0xbf, 0x33, 0x13, 0x4d, 0x95, 0xa8, 0x89, 0xe6, 0x5f, 0x81, 0x2c, 0x70, 0x4e, 0xb3,
0x61, 0x66, 0x6b, 0xf5, 0xc2, 0xf3, 0xa8, 0x76, 0xf3, 0x93, 0x99, 0x9b, 0x7f, 0xe6, 0xae, 0xd6,
0x1b, 0xee, 0xea, 0xdd, 0x97, 0xbf, 0x6f, 0x2c, 0x72, 0x75, 0x63, 0x91, 0x3f, 0x37, 0x16, 0xf9,
0x71, 0x6b, 0x69, 0x57, 0xb7, 0x96, 0x76, 0x7d, 0x6b, 0x69, 0x1f, 0x1e, 0xdf, 0xf1, 0x6b, 0x77,
0x36, 0xaf, 0x7e, 0x9e, 0xfe, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x24, 0x93, 0x3b, 0x00, 0x1f, 0x07,
0x00, 0x00,
}
func (m *RootChange) Marshal() (dAtA []byte, err error) {
@ -807,6 +873,13 @@ func (m *RootChange) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if len(m.ChangePayload) > 0 {
i -= len(m.ChangePayload)
copy(dAtA[i:], m.ChangePayload)
i = encodeVarintTreechange(dAtA, i, uint64(len(m.ChangePayload)))
i--
dAtA[i] = 0x3a
}
if len(m.Identity) > 0 {
i -= len(m.Identity)
copy(dAtA[i:], m.Identity)
@ -1362,6 +1435,43 @@ func (m *TreeErrorResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *TreeChangeInfo) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *TreeChangeInfo) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *TreeChangeInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.ChangePayload) > 0 {
i -= len(m.ChangePayload)
copy(dAtA[i:], m.ChangePayload)
i = encodeVarintTreechange(dAtA, i, uint64(len(m.ChangePayload)))
i--
dAtA[i] = 0x12
}
if len(m.ChangeType) > 0 {
i -= len(m.ChangeType)
copy(dAtA[i:], m.ChangeType)
i = encodeVarintTreechange(dAtA, i, uint64(len(m.ChangeType)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintTreechange(dAtA []byte, offset int, v uint64) int {
offset -= sovTreechange(v)
base := offset
@ -1402,6 +1512,10 @@ func (m *RootChange) Size() (n int) {
if l > 0 {
n += 1 + l + sovTreechange(uint64(l))
}
l = len(m.ChangePayload)
if l > 0 {
n += 1 + l + sovTreechange(uint64(l))
}
return n
}
@ -1650,6 +1764,23 @@ func (m *TreeErrorResponse) Size() (n int) {
return n
}
func (m *TreeChangeInfo) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.ChangeType)
if l > 0 {
n += 1 + l + sovTreechange(uint64(l))
}
l = len(m.ChangePayload)
if l > 0 {
n += 1 + l + sovTreechange(uint64(l))
}
return n
}
func sovTreechange(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
@ -1868,6 +1999,40 @@ func (m *RootChange) Unmarshal(dAtA []byte) error {
m.Identity = []byte{}
}
iNdEx = postIndex
case 7:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ChangePayload", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTreechange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthTreechange
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthTreechange
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ChangePayload = append(m.ChangePayload[:0], dAtA[iNdEx:postIndex]...)
if m.ChangePayload == nil {
m.ChangePayload = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTreechange(dAtA[iNdEx:])
@ -3233,6 +3398,122 @@ func (m *TreeErrorResponse) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *TreeChangeInfo) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTreechange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: TreeChangeInfo: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: TreeChangeInfo: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ChangeType", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTreechange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTreechange
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTreechange
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ChangeType = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ChangePayload", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTreechange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthTreechange
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthTreechange
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ChangePayload = append(m.ChangePayload[:0], dAtA[iNdEx:postIndex]...)
if m.ChangePayload == nil {
m.ChangePayload = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTreechange(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthTreechange
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipTreechange(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0

View file

@ -3,7 +3,6 @@ package objectsync
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
"github.com/anytypeio/any-sync/commonspace/peermanager"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
@ -15,9 +14,13 @@ import (
"time"
)
type LastUsage interface {
LastUsage() time.Time
}
// MessagePool can be made generic to work with different streams
type MessagePool interface {
ocache.ObjectLastUsage
LastUsage
synchandler.SyncHandler
peermanager.PeerManager
SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)

View file

@ -6,7 +6,6 @@ import (
"time"
"github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
"github.com/anytypeio/any-sync/commonspace/peermanager"
@ -20,7 +19,7 @@ import (
var log = logger.NewNamed("common.commonspace.objectsync")
type ObjectSync interface {
ocache.ObjectLastUsage
LastUsage
synchandler.SyncHandler
MessagePool() MessagePool

View file

@ -14,8 +14,8 @@ import (
)
const (
SpaceSettingsChangeType = "reserved.spacesettings"
SpaceDerivationScheme = "derivation.standard"
SpaceReserved = "any-sync.space"
SpaceDerivationScheme = "derivation.standard"
)
func storagePayloadForSpaceCreate(payload SpaceCreatePayload) (storagePayload spacestorage.SpaceStorageCreatePayload, err error) {
@ -36,11 +36,12 @@ func storagePayloadForSpaceCreate(payload SpaceCreatePayload) (storagePayload sp
return
}
header := &spacesyncproto.SpaceHeader{
Identity: identity,
Timestamp: time.Now().UnixNano(),
SpaceType: payload.SpaceType,
ReplicationKey: payload.ReplicationKey,
Seed: spaceHeaderSeed,
Identity: identity,
Timestamp: time.Now().Unix(),
SpaceType: payload.SpaceType,
SpaceHeaderPayload: payload.SpacePayload,
ReplicationKey: payload.ReplicationKey,
Seed: spaceHeaderSeed,
}
marshalled, err := header.Marshal()
if err != nil {
@ -81,7 +82,7 @@ func storagePayloadForSpaceCreate(payload SpaceCreatePayload) (storagePayload sp
SpaceId: spaceId,
EncryptedReadKey: encReadKey,
CurrentReadKeyHash: readKeyHash,
Timestamp: time.Now().UnixNano(),
Timestamp: time.Now().Unix(),
}
rawWithId, err := marshalAclRoot(aclRoot, payload.SigningKey)
if err != nil {
@ -101,8 +102,8 @@ func storagePayloadForSpaceCreate(payload SpaceCreatePayload) (storagePayload sp
SigningKey: payload.SigningKey,
SpaceId: spaceId,
Seed: spaceSettingsSeed,
ChangeType: SpaceSettingsChangeType,
Timestamp: time.Now().UnixNano(),
ChangeType: SpaceReserved,
Timestamp: time.Now().Unix(),
})
if err != nil {
return
@ -146,9 +147,10 @@ func storagePayloadForSpaceDerive(payload SpaceDerivePayload) (storagePayload sp
// preparing header and space id
header := &spacesyncproto.SpaceHeader{
Identity: identity,
SpaceType: SpaceTypeDerived,
ReplicationKey: repKey,
Identity: identity,
SpaceType: payload.SpaceType,
SpaceHeaderPayload: payload.SpacePayload,
ReplicationKey: repKey,
}
marshalled, err := header.Marshal()
if err != nil {
@ -201,7 +203,7 @@ func storagePayloadForSpaceDerive(payload SpaceDerivePayload) (storagePayload sp
Identity: aclRoot.Identity,
SigningKey: payload.SigningKey,
SpaceId: spaceId,
ChangeType: SpaceSettingsChangeType,
ChangeType: SpaceReserved,
})
if err != nil {
return

View file

@ -48,7 +48,7 @@ func (s *stateBuilder) Build(tr objecttree.ObjectTree, oldState *State) (state *
func (s *stateBuilder) processChange(change *objecttree.Change, rootId string, state *State) *State {
// ignoring root change which has empty model or startId change
if change.Model == nil || state.LastIteratedId == change.Id {
if len(change.PreviousIds) == 0 || state.LastIteratedId == change.Id {
return state
}

View file

@ -42,6 +42,7 @@ func TestStateBuilder_ProcessChange(t *testing.T) {
t.Run("changeId is equal to rootId", func(t *testing.T) {
ch := &objecttree.Change{}
ch.PreviousIds = []string{"someId"}
ch.Model = &spacesyncproto.SettingsData{
Snapshot: &spacesyncproto.SpaceSettingsSnapshot{
DeletedIds: []string{"id1", "id2"},
@ -56,6 +57,7 @@ func TestStateBuilder_ProcessChange(t *testing.T) {
t.Run("changeId is not equal to lastIteratedId or rootId", func(t *testing.T) {
ch := &objecttree.Change{}
ch.PreviousIds = []string{"someId"}
ch.Model = &spacesyncproto.SettingsData{
Content: []*spacesyncproto.SpaceSettingsContent{
{Value: &spacesyncproto.SpaceSettingsContent_ObjectDelete{

View file

@ -6,7 +6,6 @@ import (
"fmt"
"github.com/anytypeio/any-sync/accountservice"
"github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace/headsync"
"github.com/anytypeio/any-sync/commonspace/object/acl/list"
"github.com/anytypeio/any-sync/commonspace/object/acl/syncacl"
@ -52,6 +51,8 @@ type SpaceCreatePayload struct {
ReadKey []byte
// ReplicationKey is a key which is to be used to determine the node where the space should be held
ReplicationKey uint64
// SpacePayload is an arbitrary payload related to space type
SpacePayload []byte
}
type HandleMessage struct {
@ -61,11 +62,11 @@ type HandleMessage struct {
Message *spacesyncproto.ObjectSyncMessage
}
const SpaceTypeDerived = "derived.space"
type SpaceDerivePayload struct {
SigningKey signingkey.PrivKey
EncryptionKey encryptionkey.PrivKey
SpaceType string
SpacePayload []byte
}
type SpaceDescription struct {
@ -81,9 +82,6 @@ func NewSpaceId(id string, repKey uint64) string {
}
type Space interface {
ocache.ObjectLocker
ocache.ObjectLastUsage
Id() string
Init(ctx context.Context) error
@ -108,6 +106,7 @@ type Space interface {
HandleMessage(ctx context.Context, msg HandleMessage) (err error)
TryClose(objectTTL time.Duration) (close bool, err error)
Close() error
}
@ -134,16 +133,6 @@ type space struct {
treesUsed *atomic.Int32
}
func (s *space) LastUsage() time.Time {
return s.objectSync.LastUsage()
}
func (s *space) Locked() bool {
locked := s.treesUsed.Load() > 1
log.With(zap.Int32("trees used", s.treesUsed.Load()), zap.Bool("locked", locked), zap.String("spaceId", s.id)).Debug("space lock status check")
return locked
}
func (s *space) Id() string {
return s.id
}
@ -462,3 +451,15 @@ func (s *space) Close() error {
log.With(zap.String("id", s.id)).Debug("space closed")
return mError.Err()
}
func (s *space) TryClose(objectTTL time.Duration) (close bool, err error) {
if time.Now().Sub(s.objectSync.LastUsage()) < objectTTL {
return false, nil
}
locked := s.treesUsed.Load() > 1
log.With(zap.Int32("trees used", s.treesUsed.Load()), zap.Bool("locked", locked), zap.String("spaceId", s.id)).Debug("space lock status check")
if locked {
return false, nil
}
return true, s.Close()
}

View file

@ -98,6 +98,7 @@ message SpaceHeader {
string spaceType = 3;
uint64 replicationKey = 4;
bytes seed = 5;
bytes spaceHeaderPayload = 6;
}
// RawSpaceHeader is raw header for SpaceHeader

View file

@ -694,11 +694,12 @@ func (m *SpacePayload) GetSpaceSettingsPayloadId() string {
// SpaceHeader is a header for a space
type SpaceHeader struct {
Identity []byte `protobuf:"bytes,1,opt,name=identity,proto3" json:"identity,omitempty"`
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
SpaceType string `protobuf:"bytes,3,opt,name=spaceType,proto3" json:"spaceType,omitempty"`
ReplicationKey uint64 `protobuf:"varint,4,opt,name=replicationKey,proto3" json:"replicationKey,omitempty"`
Seed []byte `protobuf:"bytes,5,opt,name=seed,proto3" json:"seed,omitempty"`
Identity []byte `protobuf:"bytes,1,opt,name=identity,proto3" json:"identity,omitempty"`
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
SpaceType string `protobuf:"bytes,3,opt,name=spaceType,proto3" json:"spaceType,omitempty"`
ReplicationKey uint64 `protobuf:"varint,4,opt,name=replicationKey,proto3" json:"replicationKey,omitempty"`
Seed []byte `protobuf:"bytes,5,opt,name=seed,proto3" json:"seed,omitempty"`
SpaceHeaderPayload []byte `protobuf:"bytes,6,opt,name=spaceHeaderPayload,proto3" json:"spaceHeaderPayload,omitempty"`
}
func (m *SpaceHeader) Reset() { *m = SpaceHeader{} }
@ -769,6 +770,13 @@ func (m *SpaceHeader) GetSeed() []byte {
return nil
}
func (m *SpaceHeader) GetSpaceHeaderPayload() []byte {
if m != nil {
return m.SpaceHeaderPayload
}
return nil
}
// RawSpaceHeader is raw header for SpaceHeader
type RawSpaceHeader struct {
SpaceHeader []byte `protobuf:"bytes,1,opt,name=spaceHeader,proto3" json:"spaceHeader,omitempty"`
@ -1240,72 +1248,73 @@ func init() {
}
var fileDescriptor_80e49f1f4ac27799 = []byte{
// 1030 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x56, 0x4b, 0x6f, 0xdb, 0x46,
0x10, 0x16, 0xe9, 0xa7, 0xc6, 0xb2, 0xcc, 0x6c, 0x9c, 0x44, 0x55, 0x0c, 0x45, 0x58, 0x14, 0x85,
0x91, 0x43, 0x1e, 0x72, 0x51, 0x20, 0x69, 0x7b, 0x48, 0x64, 0xa7, 0x11, 0x8a, 0xd4, 0xc6, 0xaa,
0x41, 0x81, 0x02, 0x39, 0xac, 0xc9, 0xb1, 0xc4, 0x96, 0x22, 0x59, 0xee, 0xaa, 0xb6, 0x8e, 0x3d,
0xf5, 0xda, 0x73, 0x7b, 0xea, 0x7f, 0xe8, 0x8f, 0xe8, 0x31, 0xc7, 0x1e, 0x0b, 0xfb, 0x8f, 0x14,
0xbb, 0x5c, 0x3e, 0x24, 0x51, 0x01, 0x7a, 0x91, 0xb9, 0xdf, 0xcc, 0x7c, 0xf3, 0xda, 0x9d, 0x31,
0x3c, 0x75, 0xa3, 0xc9, 0x24, 0x0a, 0x45, 0xcc, 0x5d, 0x7c, 0xac, 0x7f, 0xc5, 0x2c, 0x74, 0xe3,
0x24, 0x92, 0xd1, 0x63, 0xfd, 0x2b, 0x0a, 0xf4, 0x91, 0x06, 0x48, 0x3d, 0x07, 0xe8, 0x00, 0x76,
0x5f, 0x23, 0xf7, 0x86, 0xb3, 0xd0, 0x65, 0x3c, 0x1c, 0x21, 0x21, 0xb0, 0x7e, 0x91, 0x44, 0x93,
0x96, 0xd5, 0xb5, 0x0e, 0xd7, 0x99, 0xfe, 0x26, 0x4d, 0xb0, 0x65, 0xd4, 0xb2, 0x35, 0x62, 0xcb,
0x88, 0xec, 0xc3, 0x46, 0xe0, 0x4f, 0x7c, 0xd9, 0x5a, 0xeb, 0x5a, 0x87, 0xbb, 0x2c, 0x3d, 0xd0,
0x2b, 0x68, 0xe6, 0x54, 0x28, 0xa6, 0x81, 0x54, 0x5c, 0x63, 0x2e, 0xc6, 0x9a, 0xab, 0xc1, 0xf4,
0x37, 0xf9, 0x02, 0xb6, 0x31, 0xc0, 0x09, 0x86, 0x52, 0xb4, 0xec, 0xee, 0xda, 0xe1, 0x4e, 0xaf,
0xfb, 0xa8, 0x88, 0x6f, 0x9e, 0xe0, 0x24, 0x55, 0x64, 0xb9, 0x85, 0xf2, 0xec, 0x46, 0xd3, 0x30,
0xf7, 0xac, 0x0f, 0xf4, 0x73, 0xb8, 0x53, 0x69, 0xa8, 0x02, 0xf7, 0x3d, 0xed, 0xbe, 0xce, 0x6c,
0xdf, 0xd3, 0x01, 0x21, 0xf7, 0x74, 0x2a, 0x75, 0xa6, 0xbf, 0xe9, 0x3b, 0xd8, 0x2b, 0x8c, 0x7f,
0x9a, 0xa2, 0x90, 0xa4, 0x05, 0x5b, 0x3a, 0xa4, 0x41, 0x66, 0x9b, 0x1d, 0xc9, 0x13, 0xd8, 0x4c,
0x54, 0x99, 0xb2, 0xd8, 0x5b, 0x55, 0xb1, 0x2b, 0x05, 0x66, 0xf4, 0xe8, 0x57, 0xe0, 0x94, 0x62,
0x8b, 0xa3, 0x50, 0x20, 0x39, 0x82, 0xad, 0x44, 0xc7, 0x29, 0x5a, 0x96, 0xa6, 0xf9, 0x68, 0x65,
0x09, 0x58, 0xa6, 0x49, 0xff, 0xb0, 0xe0, 0xd6, 0xe9, 0xf9, 0x0f, 0xe8, 0x4a, 0x25, 0x7d, 0x83,
0x42, 0xf0, 0x11, 0x7e, 0x20, 0xd4, 0x03, 0xa8, 0x27, 0x69, 0x3e, 0x83, 0x2c, 0xe1, 0x02, 0x50,
0x76, 0x09, 0xc6, 0xc1, 0x6c, 0xe0, 0xe9, 0x52, 0xd6, 0x59, 0x76, 0x54, 0x92, 0x98, 0xcf, 0x82,
0x88, 0x7b, 0xad, 0x75, 0xdd, 0xb7, 0xec, 0x48, 0xda, 0xb0, 0x1d, 0xe9, 0x00, 0x06, 0x5e, 0x6b,
0x43, 0x1b, 0xe5, 0x67, 0x8a, 0xe0, 0x0c, 0x95, 0xe3, 0xb3, 0xa9, 0x18, 0x67, 0x65, 0x7c, 0x5a,
0x30, 0xa9, 0xd8, 0x76, 0x7a, 0xf7, 0x4a, 0x69, 0xa6, 0xda, 0xa9, 0xb8, 0x70, 0xd1, 0x01, 0xe8,
0x27, 0xe8, 0x61, 0x28, 0x7d, 0x1e, 0xe8, 0xa8, 0x1b, 0xac, 0x84, 0xd0, 0xdb, 0x70, 0xab, 0xe4,
0x26, 0x2d, 0x27, 0xa5, 0xb9, 0xef, 0x20, 0xc8, 0x7c, 0x2f, 0x74, 0x9e, 0xbe, 0xca, 0x0d, 0x95,
0x8e, 0xe9, 0xc3, 0xff, 0x0f, 0x90, 0xfe, 0x62, 0x43, 0xa3, 0x2c, 0x21, 0x2f, 0x60, 0x47, 0xdb,
0xa8, 0xb6, 0x61, 0x62, 0x78, 0x1e, 0x94, 0x78, 0x18, 0xbf, 0x1c, 0x16, 0x0a, 0xdf, 0xf9, 0x72,
0x3c, 0xf0, 0x58, 0xd9, 0x46, 0x25, 0xcd, 0xdd, 0xc0, 0x10, 0x66, 0x49, 0x17, 0x08, 0xa1, 0xd0,
0x28, 0x4e, 0x79, 0xc3, 0xe6, 0x30, 0xd2, 0x83, 0x7d, 0x4d, 0x39, 0x44, 0x29, 0xfd, 0x70, 0x24,
0xce, 0xe6, 0x5a, 0x58, 0x29, 0x23, 0x9f, 0xc1, 0xdd, 0x2a, 0x3c, 0xef, 0xee, 0x0a, 0x29, 0xfd,
0xd3, 0x82, 0x9d, 0x52, 0x4a, 0xea, 0x5e, 0xf8, 0xba, 0x41, 0x72, 0x66, 0x9e, 0x7a, 0x7e, 0x56,
0xb7, 0x50, 0xfa, 0x13, 0x14, 0x92, 0x4f, 0x62, 0x9d, 0xda, 0x1a, 0x2b, 0x00, 0x25, 0xd5, 0x3e,
0xbe, 0x9d, 0xc5, 0x68, 0xd2, 0x2a, 0x00, 0xf2, 0x09, 0x34, 0xd5, 0xa5, 0xf4, 0x5d, 0x2e, 0xfd,
0x28, 0xfc, 0x1a, 0x67, 0x3a, 0x9b, 0x75, 0xb6, 0x80, 0xaa, 0x57, 0x2d, 0x10, 0xd3, 0xa8, 0x1b,
0x4c, 0x7f, 0xd3, 0x33, 0x68, 0xce, 0x17, 0x9e, 0x74, 0x97, 0x1b, 0xd5, 0x98, 0xef, 0x83, 0x8a,
0xc6, 0x1f, 0x85, 0x5c, 0x4e, 0x13, 0x34, 0x6d, 0x28, 0x00, 0x7a, 0x0c, 0xfb, 0x55, 0xad, 0xd4,
0xef, 0x8c, 0x5f, 0xce, 0xb1, 0x16, 0x80, 0xb9, 0x87, 0x76, 0x7e, 0x0f, 0x7f, 0xb7, 0x60, 0x7f,
0x58, 0x2e, 0x6b, 0x3f, 0x0a, 0xa5, 0x1a, 0x55, 0x5f, 0x42, 0x23, 0x7d, 0x4c, 0xc7, 0x18, 0xa0,
0xc4, 0x8a, 0x0b, 0x79, 0x5a, 0x12, 0xbf, 0xae, 0xb1, 0x39, 0x75, 0xf2, 0xdc, 0x64, 0x67, 0xac,
0x6d, 0x6d, 0x7d, 0x77, 0xf1, 0x3a, 0xe7, 0xc6, 0x65, 0xe5, 0x97, 0x5b, 0xb0, 0xf1, 0x33, 0x0f,
0xa6, 0x48, 0x3b, 0xd0, 0x28, 0x3b, 0x59, 0x7a, 0x44, 0x47, 0xa6, 0xef, 0x46, 0xfc, 0x31, 0xec,
0x7a, 0xfa, 0x2b, 0x39, 0x43, 0x4c, 0xf2, 0x09, 0x34, 0x0f, 0xd2, 0x77, 0x70, 0x67, 0x2e, 0xe1,
0x61, 0xc8, 0x63, 0x31, 0x8e, 0xa4, 0xba, 0xf6, 0xa9, 0xa6, 0x37, 0xf0, 0xd2, 0x41, 0x58, 0x67,
0x25, 0x64, 0x99, 0xde, 0xae, 0xa2, 0xff, 0xd5, 0x82, 0x46, 0x46, 0x7d, 0xcc, 0x25, 0x27, 0xcf,
0x60, 0xcb, 0x4d, 0x6b, 0x6a, 0x86, 0xeb, 0x83, 0xc5, 0x2a, 0x2c, 0x94, 0x9e, 0x65, 0xfa, 0x6a,
0x37, 0x09, 0x13, 0x9d, 0xa9, 0x60, 0x77, 0x95, 0x6d, 0x96, 0x05, 0xcb, 0x2d, 0xe8, 0x8f, 0x66,
0xc4, 0x0c, 0xa7, 0xe7, 0xc2, 0x4d, 0xfc, 0x58, 0x5d, 0x4f, 0xf5, 0x36, 0xcc, 0x40, 0xce, 0x52,
0xcc, 0xcf, 0xe4, 0x39, 0x6c, 0x72, 0x57, 0x69, 0x69, 0x67, 0xcd, 0x1e, 0x5d, 0x72, 0x56, 0x62,
0x7a, 0xa1, 0x35, 0x99, 0xb1, 0x78, 0x78, 0x09, 0xdb, 0x27, 0x49, 0xd2, 0x8f, 0x3c, 0x14, 0xa4,
0x09, 0xf0, 0x36, 0xc4, 0xab, 0x18, 0x5d, 0x89, 0x9e, 0x53, 0x23, 0x8e, 0x19, 0x51, 0x6f, 0x7c,
0x21, 0xfc, 0x70, 0xe4, 0x58, 0x64, 0xcf, 0x34, 0xee, 0xe4, 0xca, 0x17, 0x52, 0x38, 0x36, 0xb9,
0x0d, 0x7b, 0x1a, 0xf8, 0x26, 0x92, 0x83, 0xb0, 0xcf, 0xdd, 0x31, 0x3a, 0x6b, 0x84, 0x40, 0x53,
0x83, 0x03, 0x91, 0x36, 0xd8, 0x73, 0xd6, 0x95, 0xe5, 0x49, 0x92, 0x44, 0xc9, 0xe9, 0xc5, 0x85,
0x40, 0xe9, 0x78, 0x0f, 0x9f, 0xc1, 0xbd, 0x15, 0xb1, 0x91, 0x5d, 0xa8, 0x1b, 0xf4, 0x1c, 0x9d,
0x9a, 0x32, 0x7d, 0x1b, 0x8a, 0x1c, 0xb0, 0x7a, 0x7f, 0xd9, 0x50, 0x4f, 0x6d, 0x67, 0xa1, 0x4b,
0xfa, 0xb0, 0x9d, 0xad, 0x3a, 0xd2, 0xae, 0xdc, 0x7f, 0x7a, 0x92, 0xb7, 0xef, 0x57, 0xef, 0xc6,
0x74, 0x82, 0xbf, 0x32, 0x8c, 0x6a, 0x1f, 0x90, 0xfb, 0x4b, 0xd3, 0xbb, 0x58, 0x46, 0xed, 0x83,
0x6a, 0xe1, 0x12, 0x4f, 0x10, 0x54, 0xf1, 0xe4, 0x8b, 0xa5, 0x8a, 0xa7, 0xb4, 0x51, 0x18, 0x38,
0xc5, 0x8e, 0x1e, 0xca, 0x04, 0xf9, 0x84, 0x1c, 0x2c, 0xbd, 0xe1, 0xd2, 0x02, 0x6f, 0x7f, 0x50,
0x7a, 0x68, 0x3d, 0xb1, 0x5e, 0x7e, 0xfa, 0xf7, 0x75, 0xc7, 0x7a, 0x7f, 0xdd, 0xb1, 0xfe, 0xbd,
0xee, 0x58, 0xbf, 0xdd, 0x74, 0x6a, 0xef, 0x6f, 0x3a, 0xb5, 0x7f, 0x6e, 0x3a, 0xb5, 0xef, 0xdb,
0xab, 0xff, 0xf5, 0x3b, 0xdf, 0xd4, 0x7f, 0x8e, 0xfe, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x0b, 0x96,
0xb3, 0xaa, 0x1f, 0x0a, 0x00, 0x00,
// 1042 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x56, 0x4f, 0x6f, 0x1b, 0x45,
0x14, 0xf7, 0x6e, 0xd2, 0x24, 0x7e, 0x71, 0x9c, 0xed, 0x34, 0x6d, 0x8d, 0x1b, 0xb9, 0xd6, 0x08,
0xa1, 0xa8, 0x87, 0xb4, 0x75, 0x10, 0x52, 0x0b, 0x1c, 0x5a, 0x27, 0xa5, 0x16, 0x2a, 0x89, 0xc6,
0x54, 0x48, 0x48, 0x3d, 0x4c, 0x76, 0x5f, 0xec, 0x85, 0xf5, 0xee, 0xb2, 0x33, 0x26, 0xf1, 0x91,
0x13, 0x57, 0xce, 0xf0, 0x35, 0xf8, 0x10, 0x1c, 0xcb, 0x8d, 0x23, 0x4a, 0xbe, 0x08, 0x9a, 0xd9,
0xd9, 0x3f, 0xb6, 0xd7, 0x95, 0xb8, 0x38, 0x3b, 0xbf, 0xf7, 0xde, 0xef, 0xfd, 0x9b, 0x79, 0x2f,
0xf0, 0xd4, 0x8d, 0x26, 0x93, 0x28, 0x14, 0x31, 0x77, 0xf1, 0xb1, 0xfe, 0x15, 0xb3, 0xd0, 0x8d,
0x93, 0x48, 0x46, 0x8f, 0xf5, 0xaf, 0x28, 0xd0, 0x43, 0x0d, 0x90, 0x7a, 0x0e, 0xd0, 0x01, 0xec,
0xbc, 0x46, 0xee, 0x0d, 0x67, 0xa1, 0xcb, 0x78, 0x38, 0x42, 0x42, 0x60, 0xfd, 0x22, 0x89, 0x26,
0x2d, 0xab, 0x6b, 0x1d, 0xac, 0x33, 0xfd, 0x4d, 0x9a, 0x60, 0xcb, 0xa8, 0x65, 0x6b, 0xc4, 0x96,
0x11, 0xd9, 0x83, 0x5b, 0x81, 0x3f, 0xf1, 0x65, 0x6b, 0xad, 0x6b, 0x1d, 0xec, 0xb0, 0xf4, 0x40,
0xaf, 0xa0, 0x99, 0x53, 0xa1, 0x98, 0x06, 0x52, 0x71, 0x8d, 0xb9, 0x18, 0x6b, 0xae, 0x06, 0xd3,
0xdf, 0xe4, 0x0b, 0xd8, 0xc2, 0x00, 0x27, 0x18, 0x4a, 0xd1, 0xb2, 0xbb, 0x6b, 0x07, 0xdb, 0xbd,
0xee, 0x61, 0x11, 0xdf, 0x3c, 0xc1, 0x49, 0xaa, 0xc8, 0x72, 0x0b, 0xe5, 0xd9, 0x8d, 0xa6, 0x61,
0xee, 0x59, 0x1f, 0xe8, 0xe7, 0x70, 0xb7, 0xd2, 0x50, 0x05, 0xee, 0x7b, 0xda, 0x7d, 0x9d, 0xd9,
0xbe, 0xa7, 0x03, 0x42, 0xee, 0xe9, 0x54, 0xea, 0x4c, 0x7f, 0xd3, 0x77, 0xb0, 0x5b, 0x18, 0xff,
0x34, 0x45, 0x21, 0x49, 0x0b, 0x36, 0x75, 0x48, 0x83, 0xcc, 0x36, 0x3b, 0x92, 0x27, 0xb0, 0x91,
0xa8, 0x32, 0x65, 0xb1, 0xb7, 0xaa, 0x62, 0x57, 0x0a, 0xcc, 0xe8, 0xd1, 0xaf, 0xc0, 0x29, 0xc5,
0x16, 0x47, 0xa1, 0x40, 0x72, 0x04, 0x9b, 0x89, 0x8e, 0x53, 0xb4, 0x2c, 0x4d, 0xf3, 0xd1, 0xca,
0x12, 0xb0, 0x4c, 0x93, 0xfe, 0x61, 0xc1, 0xed, 0xd3, 0xf3, 0x1f, 0xd0, 0x95, 0x4a, 0xfa, 0x06,
0x85, 0xe0, 0x23, 0xfc, 0x40, 0xa8, 0xfb, 0x50, 0x4f, 0xd2, 0x7c, 0x06, 0x59, 0xc2, 0x05, 0xa0,
0xec, 0x12, 0x8c, 0x83, 0xd9, 0xc0, 0xd3, 0xa5, 0xac, 0xb3, 0xec, 0xa8, 0x24, 0x31, 0x9f, 0x05,
0x11, 0xf7, 0x5a, 0xeb, 0xba, 0x6f, 0xd9, 0x91, 0xb4, 0x61, 0x2b, 0xd2, 0x01, 0x0c, 0xbc, 0xd6,
0x2d, 0x6d, 0x94, 0x9f, 0x29, 0x82, 0x33, 0x54, 0x8e, 0xcf, 0xa6, 0x62, 0x9c, 0x95, 0xf1, 0x69,
0xc1, 0xa4, 0x62, 0xdb, 0xee, 0xdd, 0x2f, 0xa5, 0x99, 0x6a, 0xa7, 0xe2, 0xc2, 0x45, 0x07, 0xa0,
0x9f, 0xa0, 0x87, 0xa1, 0xf4, 0x79, 0xa0, 0xa3, 0x6e, 0xb0, 0x12, 0x42, 0xef, 0xc0, 0xed, 0x92,
0x9b, 0xb4, 0x9c, 0x94, 0xe6, 0xbe, 0x83, 0x20, 0xf3, 0xbd, 0xd0, 0x79, 0xfa, 0x2a, 0x37, 0x54,
0x3a, 0xa6, 0x0f, 0xff, 0x3f, 0x40, 0xfa, 0x8b, 0x0d, 0x8d, 0xb2, 0x84, 0xbc, 0x80, 0x6d, 0x6d,
0xa3, 0xda, 0x86, 0x89, 0xe1, 0x79, 0x58, 0xe2, 0x61, 0xfc, 0x72, 0x58, 0x28, 0x7c, 0xe7, 0xcb,
0xf1, 0xc0, 0x63, 0x65, 0x1b, 0x95, 0x34, 0x77, 0x03, 0x43, 0x98, 0x25, 0x5d, 0x20, 0x84, 0x42,
0xa3, 0x38, 0xe5, 0x0d, 0x9b, 0xc3, 0x48, 0x0f, 0xf6, 0x34, 0xe5, 0x10, 0xa5, 0xf4, 0xc3, 0x91,
0x38, 0x9b, 0x6b, 0x61, 0xa5, 0x8c, 0x7c, 0x06, 0xf7, 0xaa, 0xf0, 0xbc, 0xbb, 0x2b, 0xa4, 0xf4,
0x6f, 0x0b, 0xb6, 0x4b, 0x29, 0xa9, 0x7b, 0xe1, 0xeb, 0x06, 0xc9, 0x99, 0x79, 0xea, 0xf9, 0x59,
0xdd, 0x42, 0xe9, 0x4f, 0x50, 0x48, 0x3e, 0x89, 0x75, 0x6a, 0x6b, 0xac, 0x00, 0x94, 0x54, 0xfb,
0xf8, 0x76, 0x16, 0xa3, 0x49, 0xab, 0x00, 0xc8, 0x27, 0xd0, 0x54, 0x97, 0xd2, 0x77, 0xb9, 0xf4,
0xa3, 0xf0, 0x6b, 0x9c, 0xe9, 0x6c, 0xd6, 0xd9, 0x02, 0xaa, 0x5e, 0xb5, 0x40, 0x4c, 0xa3, 0x6e,
0x30, 0xfd, 0x4d, 0x0e, 0x81, 0x94, 0x4a, 0x9c, 0x55, 0x63, 0x43, 0x6b, 0x54, 0x48, 0xe8, 0x19,
0x34, 0xe7, 0x1b, 0x45, 0xba, 0xcb, 0x8d, 0x6d, 0xcc, 0xf7, 0x4d, 0x45, 0xef, 0x8f, 0x42, 0x2e,
0xa7, 0x09, 0x9a, 0xb6, 0x15, 0x00, 0x3d, 0x86, 0xbd, 0xaa, 0xd6, 0xeb, 0x77, 0xc9, 0x2f, 0xe7,
0x58, 0x0b, 0xc0, 0xdc, 0x5b, 0x3b, 0xbf, 0xb7, 0xbf, 0x5b, 0xb0, 0x37, 0x2c, 0xb7, 0xa1, 0x1f,
0x85, 0x52, 0x8d, 0xb6, 0x2f, 0xa1, 0x91, 0x3e, 0xbe, 0x63, 0x0c, 0x50, 0x62, 0xc5, 0x05, 0x3e,
0x2d, 0x89, 0x5f, 0xd7, 0xd8, 0x9c, 0x3a, 0x79, 0x6e, 0xb2, 0x33, 0xd6, 0xb6, 0xb6, 0xbe, 0xb7,
0x78, 0xfd, 0x73, 0xe3, 0xb2, 0xf2, 0xcb, 0x4d, 0xb8, 0xf5, 0x33, 0x0f, 0xa6, 0x48, 0x3b, 0xd0,
0x28, 0x3b, 0x59, 0x7a, 0x74, 0x47, 0xe6, 0x9e, 0x18, 0xf1, 0xc7, 0xb0, 0xe3, 0xe9, 0xaf, 0xe4,
0x0c, 0x31, 0xc9, 0x27, 0xd6, 0x3c, 0x48, 0xdf, 0xc1, 0xdd, 0xb9, 0x84, 0x87, 0x21, 0x8f, 0xc5,
0x38, 0x92, 0xea, 0x99, 0xa4, 0x9a, 0xde, 0xc0, 0x4b, 0x07, 0x67, 0x9d, 0x95, 0x90, 0x65, 0x7a,
0xbb, 0x8a, 0xfe, 0x57, 0x0b, 0x1a, 0x19, 0xf5, 0x31, 0x97, 0x9c, 0x3c, 0x83, 0x4d, 0x37, 0xad,
0xa9, 0x19, 0xc6, 0x0f, 0x17, 0xab, 0xb0, 0x50, 0x7a, 0x96, 0xe9, 0xab, 0x5d, 0x26, 0x4c, 0x74,
0xa6, 0x82, 0xdd, 0x55, 0xb6, 0x59, 0x16, 0x2c, 0xb7, 0xa0, 0x3f, 0x9a, 0x91, 0x34, 0x9c, 0x9e,
0x0b, 0x37, 0xf1, 0x63, 0x75, 0x9d, 0xd5, 0x5b, 0x32, 0x03, 0x3c, 0x4b, 0x31, 0x3f, 0x93, 0xe7,
0xb0, 0xc1, 0x5d, 0xa5, 0xa5, 0x9d, 0x35, 0x7b, 0x74, 0xc9, 0x59, 0x89, 0xe9, 0x85, 0xd6, 0x64,
0xc6, 0xe2, 0xd1, 0x25, 0x6c, 0x9d, 0x24, 0x49, 0x3f, 0xf2, 0x50, 0x90, 0x26, 0xc0, 0xdb, 0x10,
0xaf, 0x62, 0x74, 0x25, 0x7a, 0x4e, 0x8d, 0x38, 0x66, 0xa4, 0xbd, 0xf1, 0x85, 0xf0, 0xc3, 0x91,
0x63, 0x91, 0x5d, 0xd3, 0xb8, 0x93, 0x2b, 0x5f, 0x48, 0xe1, 0xd8, 0xe4, 0x0e, 0xec, 0x6a, 0xe0,
0x9b, 0x48, 0x0e, 0xc2, 0x3e, 0x77, 0xc7, 0xe8, 0xac, 0x11, 0x02, 0x4d, 0x0d, 0x0e, 0x44, 0xda,
0x60, 0xcf, 0x59, 0x57, 0x96, 0x27, 0x49, 0x12, 0x25, 0xa7, 0x17, 0x17, 0x02, 0xa5, 0xe3, 0x3d,
0x7a, 0x06, 0xf7, 0x57, 0xc4, 0x46, 0x76, 0xa0, 0x6e, 0xd0, 0x73, 0x74, 0x6a, 0xca, 0xf4, 0x6d,
0x28, 0x72, 0xc0, 0xea, 0xfd, 0x69, 0x43, 0x3d, 0xb5, 0x9d, 0x85, 0x2e, 0xe9, 0xc3, 0x56, 0xb6,
0x1a, 0x49, 0xbb, 0x72, 0x5f, 0xea, 0xc9, 0xdf, 0x7e, 0x50, 0xbd, 0x4b, 0xd3, 0x89, 0xff, 0xca,
0x30, 0xaa, 0xfd, 0x41, 0x1e, 0x2c, 0x4d, 0xfb, 0x62, 0x79, 0xb5, 0xf7, 0xab, 0x85, 0x4b, 0x3c,
0x41, 0x50, 0xc5, 0x93, 0x2f, 0xa2, 0x2a, 0x9e, 0xd2, 0x06, 0x62, 0xe0, 0x14, 0x3b, 0x7d, 0x28,
0x13, 0xe4, 0x13, 0xb2, 0xbf, 0xf4, 0x86, 0x4b, 0x0b, 0xbf, 0xfd, 0x41, 0xe9, 0x81, 0xf5, 0xc4,
0x7a, 0xf9, 0xe9, 0x5f, 0xd7, 0x1d, 0xeb, 0xfd, 0x75, 0xc7, 0xfa, 0xf7, 0xba, 0x63, 0xfd, 0x76,
0xd3, 0xa9, 0xbd, 0xbf, 0xe9, 0xd4, 0xfe, 0xb9, 0xe9, 0xd4, 0xbe, 0x6f, 0xaf, 0xfe, 0x57, 0xf1,
0x7c, 0x43, 0xff, 0x39, 0xfa, 0x2f, 0x00, 0x00, 0xff, 0xff, 0x13, 0xff, 0xe7, 0x17, 0x4f, 0x0a,
0x00, 0x00,
}
func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) {
@ -1784,6 +1793,13 @@ func (m *SpaceHeader) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if len(m.SpaceHeaderPayload) > 0 {
i -= len(m.SpaceHeaderPayload)
copy(dAtA[i:], m.SpaceHeaderPayload)
i = encodeVarintSpacesync(dAtA, i, uint64(len(m.SpaceHeaderPayload)))
i--
dAtA[i] = 0x32
}
if len(m.Seed) > 0 {
i -= len(m.Seed)
copy(dAtA[i:], m.Seed)
@ -2387,6 +2403,10 @@ func (m *SpaceHeader) Size() (n int) {
if l > 0 {
n += 1 + l + sovSpacesync(uint64(l))
}
l = len(m.SpaceHeaderPayload)
if l > 0 {
n += 1 + l + sovSpacesync(uint64(l))
}
return n
}
@ -4041,6 +4061,40 @@ func (m *SpaceHeader) Unmarshal(dAtA []byte) error {
m.Seed = []byte{}
}
iNdEx = postIndex
case 6:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SpaceHeaderPayload", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSpacesync
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthSpacesync
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthSpacesync
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.SpaceHeaderPayload = append(m.SpaceHeaderPayload[:0], dAtA[iNdEx:postIndex]...)
if m.SpaceHeaderPayload == nil {
m.SpaceHeaderPayload = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSpacesync(dAtA[iNdEx:])

View file

@ -38,11 +38,12 @@ func (m *MockCoordinatorClient) EXPECT() *MockCoordinatorClientMockRecorder {
}
// ChangeStatus mocks base method.
func (m *MockCoordinatorClient) ChangeStatus(arg0 context.Context, arg1 string, arg2 *treechangeproto.RawTreeChangeWithId) error {
func (m *MockCoordinatorClient) ChangeStatus(arg0 context.Context, arg1 string, arg2 *treechangeproto.RawTreeChangeWithId) (*coordinatorproto.SpaceStatusPayload, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChangeStatus", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
ret0, _ := ret[0].(*coordinatorproto.SpaceStatusPayload)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ChangeStatus indicates an expected call of ChangeStatus.
@ -110,10 +111,10 @@ func (mr *MockCoordinatorClientMockRecorder) SpaceSign(arg0, arg1, arg2 interfac
}
// StatusCheck mocks base method.
func (m *MockCoordinatorClient) StatusCheck(arg0 context.Context, arg1 string) (*coordinatorproto.SpaceStatusCheckResponse, error) {
func (m *MockCoordinatorClient) StatusCheck(arg0 context.Context, arg1 string) (*coordinatorproto.SpaceStatusPayload, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StatusCheck", arg0, arg1)
ret0, _ := ret[0].(*coordinatorproto.SpaceStatusCheckResponse)
ret0, _ := ret[0].(*coordinatorproto.SpaceStatusPayload)
ret1, _ := ret[1].(error)
return ret0, ret1
}

View file

@ -25,11 +25,13 @@ type Peer interface {
Id() string
LastUsage() time.Time
UpdateLastUsage()
TryClose(objectTTL time.Duration) (res bool, err error)
drpc.Conn
}
type peer struct {
id string
ttl time.Duration
lastUsage int64
sc sec.SecureConn
drpc.Conn
@ -76,6 +78,13 @@ func (p *peer) UpdateLastUsage() {
atomic.StoreInt64(&p.lastUsage, time.Now().Unix())
}
func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) {
if time.Now().Sub(p.LastUsage()) < objectTTL {
return false, nil
}
return true, p.Close()
}
func (p *peer) Close() (err error) {
log.Debug("peer close", zap.String("peerId", p.id))
return p.Conn.Close()

View file

@ -49,7 +49,7 @@ func (p *pool) Get(ctx context.Context, id string) (peer.Peer, error) {
default:
return pr, nil
}
_, _ = p.cache.Remove(id)
_, _ = p.cache.Remove(ctx, id)
return p.Get(ctx, id)
}

View file

@ -194,6 +194,10 @@ func (t *testPeer) LastUsage() time.Time {
func (t *testPeer) UpdateLastUsage() {}
func (t *testPeer) TryClose(objectTTL time.Duration) (res bool, err error) {
return true, t.Close()
}
func (t *testPeer) Close() error {
select {
case <-t.closed:

View file

@ -103,6 +103,10 @@ type testPeer struct {
drpc.Conn
}
func (t testPeer) TryClose(objectTTL time.Duration) (res bool, err error) {
return true, t.Close()
}
func (t testPeer) Id() string {
return t.id
}