1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-10 10:00:49 +09:00

Merge pull request #64 from anyproto/GO-1857-Add-datatype

GO-1857: Add datatype and check for periodicsync
This commit is contained in:
Mikhail Rakhmanov 2023-08-23 17:11:51 +02:00 committed by GitHub
commit cd12b481de
Signed by: github
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 175 additions and 76 deletions

View file

@ -208,8 +208,10 @@ func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, peerId string, cl
Credential: cred, Credential: cred,
}) })
if err != nil { if err != nil {
d.log.WarnCtx(ctx, "space push failed", zap.Error(err))
return return
} }
d.log.InfoCtx(ctx, "space push completed successfully")
if e := d.subscribe(ctx, peerId); e != nil { if e := d.subscribe(ctx, peerId); e != nil {
d.log.WarnCtx(ctx, "error subscribing for space", zap.Error(e)) d.log.WarnCtx(ctx, "error subscribing for space", zap.Error(e))
} }

View file

@ -57,22 +57,26 @@ func (t *treeExporter) ExportUnencrypted(tree objecttree.ReadableObjectTree) (er
} }
return treeStorage.AddRawChange(raw) return treeStorage.AddRawChange(raw)
} }
err = tree.IterateRoot(t.converter.Unmarshall, func(change *objecttree.Change) bool { err = tree.IterateRoot(
if change.Id == tree.Id() { func(change *objecttree.Change, decrypted []byte) (any, error) {
return t.converter.Unmarshall(decrypted)
},
func(change *objecttree.Change) bool {
if change.Id == tree.Id() {
err = putStorage(change)
return err == nil
}
var data []byte
data, err = t.converter.Marshall(change.Model)
if err != nil {
return false
}
// that means that change is unencrypted
change.ReadKeyId = ""
change.Data = data
err = putStorage(change) err = putStorage(change)
return err == nil return err == nil
} })
var data []byte
data, err = t.converter.Marshall(change.Model)
if err != nil {
return false
}
// that means that change is unencrypted
change.ReadKeyId = ""
change.Data = data
err = putStorage(change)
return err == nil
})
if err != nil { if err != nil {
return return
} }

View file

@ -25,6 +25,7 @@ type Change struct {
Data []byte Data []byte
Model interface{} Model interface{}
Signature []byte Signature []byte
DataType string
// iterator helpers // iterator helpers
visited bool visited bool
@ -45,6 +46,7 @@ func NewChange(id string, identity crypto.PubKey, ch *treechangeproto.TreeChange
IsSnapshot: ch.IsSnapshot, IsSnapshot: ch.IsSnapshot,
Identity: identity, Identity: identity,
Signature: signature, Signature: signature,
DataType: ch.DataType,
} }
} }

View file

@ -20,6 +20,7 @@ type BuilderContent struct {
ReadKey crypto.SymKey ReadKey crypto.SymKey
Content []byte Content []byte
Timestamp int64 Timestamp int64
DataType string
} }
type InitialContent struct { type InitialContent struct {
@ -169,6 +170,7 @@ func (c *changeBuilder) Build(payload BuilderContent) (ch *Change, rawIdChange *
Timestamp: payload.Timestamp, Timestamp: payload.Timestamp,
Identity: identity, Identity: identity,
IsSnapshot: payload.IsSnapshot, IsSnapshot: payload.IsSnapshot,
DataType: payload.DataType,
} }
if payload.ReadKey != nil { if payload.ReadKey != nil {
var encrypted []byte var encrypted []byte
@ -225,6 +227,7 @@ func (c *changeBuilder) Marshall(ch *Change) (raw *treechangeproto.RawTreeChange
Timestamp: ch.Timestamp, Timestamp: ch.Timestamp,
Identity: identity, Identity: identity,
IsSnapshot: ch.IsSnapshot, IsSnapshot: ch.IsSnapshot,
DataType: ch.DataType,
} }
var marshalled []byte var marshalled []byte
marshalled, err = treeChange.Marshal() marshalled, err = treeChange.Marshal()

View file

@ -231,7 +231,7 @@ func (mr *MockObjectTreeMockRecorder) Id() *gomock.Call {
} }
// IterateFrom mocks base method. // IterateFrom mocks base method.
func (m *MockObjectTree) IterateFrom(arg0 string, arg1 func([]byte) (interface{}, error), arg2 func(*objecttree.Change) bool) error { func (m *MockObjectTree) IterateFrom(arg0 string, arg1 func(*objecttree.Change, []byte) (interface{}, error), arg2 func(*objecttree.Change) bool) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IterateFrom", arg0, arg1, arg2) ret := m.ctrl.Call(m, "IterateFrom", arg0, arg1, arg2)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
@ -245,7 +245,7 @@ func (mr *MockObjectTreeMockRecorder) IterateFrom(arg0, arg1, arg2 interface{})
} }
// IterateRoot mocks base method. // IterateRoot mocks base method.
func (m *MockObjectTree) IterateRoot(arg0 func([]byte) (interface{}, error), arg1 func(*objecttree.Change) bool) error { func (m *MockObjectTree) IterateRoot(arg0 func(*objecttree.Change, []byte) (interface{}, error), arg1 func(*objecttree.Change) bool) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IterateRoot", arg0, arg1) ret := m.ctrl.Call(m, "IterateRoot", arg0, arg1)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)

View file

@ -46,7 +46,7 @@ type RawChangesPayload struct {
} }
type ChangeIterateFunc = func(change *Change) bool type ChangeIterateFunc = func(change *Change) bool
type ChangeConvertFunc = func(decrypted []byte) (any, error) type ChangeConvertFunc = func(change *Change, decrypted []byte) (any, error)
type ReadableObjectTree interface { type ReadableObjectTree interface {
RWLocker RWLocker
@ -274,6 +274,7 @@ func (ot *objectTree) prepareBuilderContent(content SignableChangeContent) (cnt
PrivKey: content.Key, PrivKey: content.Key,
ReadKey: readKey, ReadKey: readKey,
Content: content.Data, Content: content.Data,
DataType: content.DataType,
Timestamp: timestamp, Timestamp: timestamp,
} }
return return
@ -537,7 +538,7 @@ func (ot *objectTree) IterateFrom(id string, convert ChangeConvertFunc, iterate
return false return false
} }
model, err = convert(decrypted) model, err = convert(c, decrypted)
if err != nil { if err != nil {
return false return false
} }

View file

@ -107,7 +107,7 @@ func TestObjectTree(t *testing.T) {
oTree, err := BuildObjectTree(store, aclList) oTree, err := BuildObjectTree(store, aclList)
require.NoError(t, err) require.NoError(t, err)
t.Run("0 timestamp is changed to current", func(t *testing.T) { t.Run("0 timestamp is changed to current, data type is correct", func(t *testing.T) {
start := time.Now() start := time.Now()
res, err := oTree.AddContent(ctx, SignableChangeContent{ res, err := oTree.AddContent(ctx, SignableChangeContent{
Data: []byte("some"), Data: []byte("some"),
@ -115,6 +115,7 @@ func TestObjectTree(t *testing.T) {
IsSnapshot: false, IsSnapshot: false,
IsEncrypted: true, IsEncrypted: true,
Timestamp: 0, Timestamp: 0,
DataType: mockDataType,
}) })
end := time.Now() end := time.Now()
require.NoError(t, err) require.NoError(t, err)
@ -125,6 +126,7 @@ func TestObjectTree(t *testing.T) {
require.GreaterOrEqual(t, start.Unix(), ch.Timestamp) require.GreaterOrEqual(t, start.Unix(), ch.Timestamp)
require.LessOrEqual(t, end.Unix(), ch.Timestamp) require.LessOrEqual(t, end.Unix(), ch.Timestamp)
require.Equal(t, res.Added[0].Id, oTree.(*objectTree).tree.lastIteratedHeadId) require.Equal(t, res.Added[0].Id, oTree.(*objectTree).tree.lastIteratedHeadId)
require.Equal(t, mockDataType, ch.DataType)
}) })
t.Run("timestamp is set correctly", func(t *testing.T) { t.Run("timestamp is set correctly", func(t *testing.T) {
someTs := time.Now().Add(time.Hour).Unix() someTs := time.Now().Add(time.Hour).Unix()
@ -174,6 +176,9 @@ func TestObjectTree(t *testing.T) {
// check tree iterate // check tree iterate
var iterChangesId []string var iterChangesId []string
err = objTree.IterateRoot(nil, func(change *Change) bool { err = objTree.IterateRoot(nil, func(change *Change) bool {
if change.Id != objTree.Id() {
assert.Equal(t, mockDataType, change.DataType)
}
iterChangesId = append(iterChangesId, change.Id) iterChangesId = append(iterChangesId, change.Id)
return true return true
}) })
@ -575,6 +580,12 @@ func TestObjectTree(t *testing.T) {
_, ok := changeIds[id] _, ok := changeIds[id]
assert.Equal(t, false, ok) assert.Equal(t, false, ok)
} }
for _, rawCh := range changes {
ch, err := ctx.objTree.(*objectTree).changeBuilder.Unmarshall(rawCh, false)
require.NoError(t, err)
require.Equal(t, mockDataType, ch.DataType)
}
}) })
t.Run("changes from tree after 5", func(t *testing.T) { t.Run("changes from tree after 5", func(t *testing.T) {

View file

@ -16,4 +16,6 @@ type SignableChangeContent struct {
IsEncrypted bool IsEncrypted bool
// Timestamp is a timestamp of change, if it is <= 0, then we use current timestamp // Timestamp is a timestamp of change, if it is <= 0, then we use current timestamp
Timestamp int64 Timestamp int64
// DataType contains additional info about the data in the payload
DataType string
} }

View file

@ -11,7 +11,10 @@ import (
type mockPubKey struct { type mockPubKey struct {
} }
const mockKeyValue = "mockKey" const (
mockKeyValue = "mockKey"
mockDataType = "mockDataType"
)
func (m mockPubKey) Equals(key crypto.Key) bool { func (m mockPubKey) Equals(key crypto.Key) bool {
return true return true
@ -99,6 +102,7 @@ func (c *MockChangeCreator) CreateRawWithData(id, aclId, snapshotId string, isSn
SnapshotBaseId: snapshotId, SnapshotBaseId: snapshotId,
ChangesData: data, ChangesData: data,
IsSnapshot: isSnapshot, IsSnapshot: isSnapshot,
DataType: mockDataType,
} }
res, _ := aclChange.Marshal() res, _ := aclChange.Marshal()

View file

@ -262,7 +262,7 @@ func (mr *MockSyncTreeMockRecorder) Id() *gomock.Call {
} }
// IterateFrom mocks base method. // IterateFrom mocks base method.
func (m *MockSyncTree) IterateFrom(arg0 string, arg1 func([]byte) (interface{}, error), arg2 func(*objecttree.Change) bool) error { func (m *MockSyncTree) IterateFrom(arg0 string, arg1 func(*objecttree.Change, []byte) (interface{}, error), arg2 func(*objecttree.Change) bool) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IterateFrom", arg0, arg1, arg2) ret := m.ctrl.Call(m, "IterateFrom", arg0, arg1, arg2)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
@ -276,7 +276,7 @@ func (mr *MockSyncTreeMockRecorder) IterateFrom(arg0, arg1, arg2 interface{}) *g
} }
// IterateRoot mocks base method. // IterateRoot mocks base method.
func (m *MockSyncTree) IterateRoot(arg0 func([]byte) (interface{}, error), arg1 func(*objecttree.Change) bool) error { func (m *MockSyncTree) IterateRoot(arg0 func(*objecttree.Change, []byte) (interface{}, error), arg1 func(*objecttree.Change) bool) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IterateRoot", arg0, arg1) ret := m.ctrl.Call(m, "IterateRoot", arg0, arg1)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)

View file

@ -38,6 +38,8 @@ message TreeChange {
bytes identity = 7; bytes identity = 7;
// IsSnapshot indicates whether this change contains a snapshot of state // IsSnapshot indicates whether this change contains a snapshot of state
bool isSnapshot = 8; bool isSnapshot = 8;
// DataType indicates some special parameters of data for the client
string dataType = 9;
} }
// RawTreeChange is a marshalled TreeChange (or RootChange) payload and a signature of this payload // RawTreeChange is a marshalled TreeChange (or RootChange) payload and a signature of this payload

View file

@ -171,6 +171,8 @@ type TreeChange struct {
Identity []byte `protobuf:"bytes,7,opt,name=identity,proto3" json:"identity,omitempty"` Identity []byte `protobuf:"bytes,7,opt,name=identity,proto3" json:"identity,omitempty"`
// IsSnapshot indicates whether this change contains a snapshot of state // IsSnapshot indicates whether this change contains a snapshot of state
IsSnapshot bool `protobuf:"varint,8,opt,name=isSnapshot,proto3" json:"isSnapshot,omitempty"` IsSnapshot bool `protobuf:"varint,8,opt,name=isSnapshot,proto3" json:"isSnapshot,omitempty"`
// DataType indicates some special parameters of data for the client
DataType string `protobuf:"bytes,9,opt,name=dataType,proto3" json:"dataType,omitempty"`
} }
func (m *TreeChange) Reset() { *m = TreeChange{} } func (m *TreeChange) Reset() { *m = TreeChange{} }
@ -262,6 +264,13 @@ func (m *TreeChange) GetIsSnapshot() bool {
return false return false
} }
func (m *TreeChange) GetDataType() string {
if m != nil {
return m.DataType
}
return ""
}
// RawTreeChange is a marshalled TreeChange (or RootChange) payload and a signature of this payload // RawTreeChange is a marshalled TreeChange (or RootChange) payload and a signature of this payload
type RawTreeChange struct { type RawTreeChange struct {
// Payload is a byte payload containing TreeChange // Payload is a byte payload containing TreeChange
@ -846,54 +855,55 @@ func init() {
} }
var fileDescriptor_5033f0301ef9b772 = []byte{ var fileDescriptor_5033f0301ef9b772 = []byte{
// 741 bytes of a gzipped FileDescriptorProto // 753 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x55, 0x4f, 0x4f, 0xfb, 0x46, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x55, 0xcd, 0x4e, 0xeb, 0x46,
0x10, 0xf5, 0x3a, 0x81, 0x90, 0x49, 0x08, 0xe9, 0xc2, 0xc1, 0x42, 0xad, 0x6b, 0x59, 0x55, 0x1b, 0x14, 0xf6, 0x38, 0x81, 0x90, 0x93, 0x10, 0xd2, 0x81, 0x85, 0x85, 0x5a, 0xd7, 0xb2, 0xaa, 0x36,
0xf5, 0x00, 0x12, 0x3d, 0xb5, 0xaa, 0x84, 0x4a, 0x0a, 0x24, 0x42, 0x6d, 0xd1, 0xf2, 0xa7, 0x12, 0xea, 0x02, 0x24, 0xba, 0x6a, 0x55, 0x09, 0x95, 0x14, 0x48, 0x84, 0xda, 0xa2, 0xe1, 0xa7, 0x12,
0xb7, 0xc5, 0x9e, 0x10, 0x57, 0x89, 0xed, 0x7a, 0x37, 0xa5, 0xf9, 0x00, 0xbd, 0xb4, 0x12, 0xe2, 0xbb, 0xc1, 0x3e, 0x21, 0xae, 0x12, 0xdb, 0xf5, 0x4c, 0x4a, 0xf3, 0x00, 0xdd, 0xb4, 0x52, 0xc5,
0x2b, 0xf5, 0xf6, 0x3b, 0x72, 0xe4, 0xf8, 0x13, 0x7c, 0x91, 0x9f, 0xbc, 0x1b, 0x27, 0xb6, 0x93, 0x73, 0xdc, 0xb7, 0xb8, 0xbb, 0xbb, 0x64, 0xc9, 0xf2, 0x0a, 0x5e, 0xe4, 0xca, 0x33, 0x71, 0x62,
0x03, 0x37, 0x2e, 0x89, 0xe7, 0x79, 0xe6, 0xed, 0x9b, 0x37, 0xde, 0x5d, 0x38, 0xf4, 0xa2, 0xf1, 0x3b, 0x59, 0xb0, 0x63, 0x93, 0xf8, 0x7c, 0x73, 0x7e, 0xbe, 0xf3, 0x9d, 0xf9, 0x81, 0x43, 0x2f,
0x38, 0x0a, 0x45, 0xcc, 0x3d, 0xdc, 0x8f, 0x6e, 0xff, 0x40, 0x4f, 0xee, 0xcb, 0x04, 0x51, 0xfd, 0x1a, 0x8f, 0xa3, 0x50, 0xc4, 0xdc, 0xc3, 0xfd, 0xe8, 0xf6, 0x0f, 0xf4, 0xe4, 0xbe, 0x4c, 0x10,
0x78, 0x43, 0x1e, 0xde, 0x61, 0x9c, 0x44, 0x32, 0xda, 0x57, 0xbf, 0x22, 0x07, 0xef, 0x29, 0x84, 0xd5, 0x8f, 0x37, 0xe4, 0xe1, 0x1d, 0xc6, 0x49, 0x24, 0xa3, 0x7d, 0xf5, 0x2b, 0x72, 0xf0, 0x9e,
0xc2, 0x02, 0x71, 0x9f, 0x09, 0x00, 0x8b, 0x22, 0xd9, 0x55, 0x21, 0xfd, 0x1c, 0xea, 0xdc, 0x1b, 0x42, 0x28, 0x2c, 0x10, 0xf7, 0x89, 0x00, 0xb0, 0x28, 0x92, 0x5d, 0x65, 0xd2, 0xcf, 0xa1, 0xce,
0xf5, 0x90, 0xfb, 0x7d, 0xdf, 0x22, 0x0e, 0xe9, 0xd4, 0xd9, 0x02, 0xa0, 0x16, 0xd4, 0xd4, 0xaa, 0xbd, 0x51, 0x0f, 0xb9, 0xdf, 0xf7, 0x2d, 0xe2, 0x90, 0x4e, 0x9d, 0x2d, 0x00, 0x6a, 0x41, 0x4d,
0x7d, 0xdf, 0x32, 0xd5, 0xbb, 0x2c, 0xa4, 0x36, 0x80, 0x26, 0xbc, 0x9c, 0xc6, 0x68, 0x55, 0xd4, 0x55, 0xed, 0xfb, 0x96, 0xa9, 0xd6, 0x32, 0x93, 0xda, 0x00, 0x3a, 0xe1, 0xe5, 0x34, 0x46, 0xab,
0xcb, 0x1c, 0x92, 0xf2, 0xca, 0x60, 0x8c, 0x42, 0xf2, 0x71, 0x6c, 0x55, 0x1d, 0xd2, 0xa9, 0xb0, 0xa2, 0x16, 0x73, 0x48, 0x9a, 0x57, 0x06, 0x63, 0x14, 0x92, 0x8f, 0x63, 0xab, 0xea, 0x90, 0x4e,
0x05, 0x40, 0x29, 0x54, 0x05, 0xa2, 0x6f, 0xad, 0x39, 0xa4, 0xd3, 0x64, 0xea, 0x99, 0xee, 0xc2, 0x85, 0x2d, 0x00, 0x4a, 0xa1, 0x2a, 0x10, 0x7d, 0x6b, 0xcd, 0x21, 0x9d, 0x26, 0x53, 0xdf, 0x74,
0x46, 0xe0, 0x63, 0x28, 0x03, 0x39, 0xb5, 0xd6, 0x15, 0x3e, 0x8f, 0xe9, 0x57, 0xb0, 0xa9, 0xb9, 0x17, 0x36, 0x02, 0x1f, 0x43, 0x19, 0xc8, 0xa9, 0xb5, 0xae, 0xf0, 0xb9, 0x4d, 0xbf, 0x82, 0x4d,
0xcf, 0xf9, 0x74, 0x14, 0x71, 0xdf, 0xaa, 0xa9, 0x84, 0x22, 0xe8, 0x3e, 0x98, 0x00, 0x97, 0x09, 0x9d, 0xfb, 0x9c, 0x4f, 0x47, 0x11, 0xf7, 0xad, 0x9a, 0x72, 0x28, 0x82, 0xee, 0x3b, 0x13, 0xe0,
0xe2, 0xac, 0x35, 0x07, 0x1a, 0x69, 0xdf, 0xba, 0x15, 0x61, 0x11, 0xa7, 0xd2, 0xa9, 0xb3, 0x3c, 0x32, 0x41, 0x9c, 0xb5, 0xe6, 0x40, 0x23, 0xed, 0x5b, 0xb7, 0x22, 0x2c, 0xe2, 0x54, 0x3a, 0x75,
0x54, 0x6c, 0xde, 0x2c, 0x37, 0xff, 0x35, 0xb4, 0x44, 0xc8, 0x63, 0x31, 0x8c, 0xe4, 0x11, 0x17, 0x96, 0x87, 0x8a, 0xcd, 0x9b, 0xe5, 0xe6, 0xbf, 0x86, 0x96, 0x08, 0x79, 0x2c, 0x86, 0x91, 0x3c,
0xa9, 0x07, 0xba, 0xcd, 0x12, 0x9a, 0xae, 0xa3, 0x75, 0x88, 0x9f, 0xb9, 0xe4, 0xaa, 0xd9, 0x26, 0xe2, 0x22, 0xd5, 0x40, 0xb7, 0x59, 0x42, 0xd3, 0x3a, 0x9a, 0x87, 0xf8, 0x99, 0x4b, 0xae, 0x9a,
0xcb, 0x43, 0xe9, 0x3a, 0x09, 0x72, 0xff, 0x0c, 0xa7, 0x7d, 0xdd, 0x73, 0x9d, 0x2d, 0x80, 0xa2, 0x6d, 0xb2, 0x3c, 0x94, 0xd6, 0x49, 0x90, 0xfb, 0x67, 0x38, 0xed, 0xeb, 0x9e, 0xeb, 0x6c, 0x01,
0x55, 0xeb, 0x65, 0xab, 0xf2, 0xb6, 0xd4, 0x4a, 0xb6, 0xd8, 0x00, 0x81, 0xb8, 0x98, 0xa9, 0xb1, 0x14, 0xa5, 0x5a, 0x2f, 0x4b, 0x95, 0x97, 0xa5, 0x56, 0x92, 0xc5, 0x06, 0x08, 0xc4, 0xc5, 0x8c,
0x36, 0x1c, 0xd2, 0xd9, 0x60, 0x39, 0xc4, 0x3d, 0x85, 0x4d, 0xc6, 0xef, 0x73, 0x96, 0x58, 0x50, 0x8d, 0xb5, 0xe1, 0x90, 0xce, 0x06, 0xcb, 0x21, 0x69, 0xac, 0xcf, 0x25, 0x57, 0x23, 0xaa, 0xab,
0x8b, 0x67, 0x0e, 0x12, 0xc5, 0x95, 0x85, 0xa9, 0x08, 0x11, 0xdc, 0x85, 0x5c, 0x4e, 0x12, 0x54, 0xb2, 0x73, 0xdb, 0x3d, 0x85, 0x4d, 0xc6, 0xef, 0x73, 0x72, 0x59, 0x50, 0x8b, 0x67, 0xea, 0x12,
0x56, 0x34, 0xd9, 0x02, 0x70, 0xbb, 0xb0, 0x5d, 0x20, 0xfa, 0x3d, 0x90, 0x43, 0xad, 0x3c, 0xe1, 0x55, 0x27, 0x33, 0x53, 0x82, 0x22, 0xb8, 0x0b, 0xb9, 0x9c, 0x24, 0xa8, 0x64, 0x6a, 0xb2, 0x05,
0xf7, 0x1a, 0x9a, 0x11, 0x2e, 0x00, 0xda, 0x02, 0x33, 0xc8, 0x6c, 0x35, 0x03, 0xdf, 0x7d, 0x20, 0xe0, 0x76, 0x61, 0xbb, 0x90, 0xe8, 0xf7, 0x40, 0x0e, 0x75, 0x57, 0x09, 0xbf, 0xd7, 0xd0, 0x2c,
0xb0, 0x95, 0x52, 0x5c, 0x4c, 0x43, 0xef, 0x17, 0x14, 0x82, 0xdf, 0x21, 0xfd, 0x01, 0x6a, 0x5e, 0xe1, 0x02, 0xa0, 0x2d, 0x30, 0x83, 0x4c, 0x72, 0x33, 0xf0, 0xdd, 0xff, 0x09, 0x6c, 0xa5, 0x29,
0x14, 0x4a, 0x0c, 0xa5, 0xaa, 0x6f, 0x1c, 0x38, 0x7b, 0xb9, 0xaf, 0x37, 0xcb, 0xee, 0xea, 0x94, 0x2e, 0xa6, 0xa1, 0xf7, 0x0b, 0x0a, 0xc1, 0xef, 0x90, 0xfe, 0x00, 0x35, 0x2f, 0x0a, 0x25, 0x86,
0x6b, 0x3e, 0x9a, 0x20, 0xcb, 0x0a, 0xe8, 0x21, 0x40, 0x32, 0xff, 0x90, 0xd5, 0x3a, 0x8d, 0x83, 0x52, 0xc5, 0x37, 0x0e, 0x9c, 0xbd, 0xdc, 0xce, 0xce, 0xbc, 0xbb, 0xda, 0xe5, 0x9a, 0x8f, 0x26,
0x2f, 0xf3, 0xe5, 0x2b, 0x24, 0xb3, 0x5c, 0x89, 0xfb, 0xbf, 0x09, 0x3b, 0xab, 0x96, 0xa0, 0x3f, 0xc8, 0xb2, 0x00, 0x7a, 0x08, 0x90, 0xcc, 0x37, 0xb9, 0xaa, 0xd3, 0x38, 0xf8, 0x32, 0x1f, 0xbe,
0x02, 0x0c, 0x91, 0xfb, 0x57, 0xb1, 0xcf, 0x25, 0xce, 0x84, 0xed, 0x96, 0x85, 0xf5, 0xe6, 0x19, 0x82, 0x32, 0xcb, 0x85, 0xb8, 0xef, 0x4d, 0xd8, 0x59, 0x55, 0x82, 0xfe, 0x08, 0x30, 0x44, 0xee,
0x3d, 0x83, 0xe5, 0xf2, 0xe9, 0x19, 0x6c, 0x0d, 0x26, 0xa3, 0x51, 0xca, 0xca, 0xf0, 0xcf, 0x09, 0x5f, 0xc5, 0x3e, 0x97, 0x38, 0x23, 0xb6, 0x5b, 0x26, 0xd6, 0x9b, 0x7b, 0xf4, 0x0c, 0x96, 0xf3,
0x0a, 0xb9, 0x4a, 0x5c, 0x4a, 0x71, 0x52, 0x4c, 0xeb, 0x19, 0xac, 0x5c, 0x49, 0x7f, 0x85, 0xf6, 0xa7, 0x67, 0xb0, 0x35, 0x98, 0x8c, 0x46, 0x69, 0x56, 0x86, 0x7f, 0x4e, 0x50, 0xc8, 0x55, 0xe4,
0x02, 0x12, 0x71, 0x14, 0x0a, 0xbd, 0xdb, 0x56, 0x38, 0x75, 0x52, 0xca, 0xeb, 0x19, 0x6c, 0xa9, 0xd2, 0x14, 0x27, 0x45, 0xb7, 0x9e, 0xc1, 0xca, 0x91, 0xf4, 0x57, 0x68, 0x2f, 0x20, 0x11, 0x47,
0x96, 0x1e, 0xc3, 0x26, 0x26, 0x49, 0x94, 0xcc, 0xc9, 0xaa, 0x8a, 0xec, 0x8b, 0x32, 0xd9, 0x71, 0xa1, 0xd0, 0x27, 0x71, 0x85, 0x52, 0x27, 0x25, 0xbf, 0x9e, 0xc1, 0x96, 0x62, 0xe9, 0x31, 0x6c,
0x3e, 0xa9, 0x67, 0xb0, 0x62, 0xd5, 0x51, 0x0d, 0xd6, 0xfe, 0x4a, 0xad, 0x72, 0xff, 0x21, 0xd0, 0x62, 0x92, 0x44, 0xc9, 0x3c, 0x59, 0x55, 0x25, 0xfb, 0xa2, 0x9c, 0xec, 0x38, 0xef, 0xd4, 0x33,
0x2a, 0xba, 0x41, 0x77, 0x60, 0x2d, 0x75, 0x23, 0xdb, 0x71, 0x3a, 0xa0, 0xdf, 0x43, 0x6d, 0xb6, 0x58, 0x31, 0xea, 0xa8, 0x06, 0x6b, 0x7f, 0xa5, 0x52, 0xb9, 0xff, 0x10, 0x68, 0x15, 0xd5, 0xa0,
0x25, 0x2c, 0xd3, 0xa9, 0xbc, 0x65, 0x54, 0x59, 0x3e, 0x75, 0xa1, 0x99, 0x6d, 0xb9, 0x73, 0x2e, 0x3b, 0xb0, 0x96, 0xaa, 0x91, 0x9d, 0x46, 0x6d, 0xd0, 0xef, 0xa1, 0x36, 0x3b, 0x2e, 0x96, 0xe9,
0x87, 0x56, 0x45, 0xf1, 0x16, 0x30, 0xf7, 0x5f, 0x02, 0xdb, 0x2b, 0x2c, 0x7d, 0x1f, 0x31, 0xff, 0x54, 0x5e, 0x33, 0xaa, 0xcc, 0x9f, 0xba, 0xd0, 0xcc, 0x8e, 0xe3, 0x39, 0x97, 0x43, 0xab, 0xa2,
0x11, 0xfd, 0x61, 0x95, 0x27, 0xf2, 0x3e, 0x6a, 0xba, 0xf0, 0xd9, 0xd2, 0x44, 0x53, 0x25, 0x6a, 0xf2, 0x16, 0x30, 0xf7, 0x5f, 0x02, 0xdb, 0x2b, 0x24, 0x7d, 0x1b, 0x32, 0xff, 0x11, 0xbd, 0xb1,
0xa2, 0xb3, 0x33, 0x5f, 0x07, 0xe9, 0xf9, 0x80, 0x49, 0xd2, 0x8d, 0x7c, 0xbd, 0x9f, 0xaa, 0x2c, 0xca, 0x13, 0x79, 0x1b, 0x36, 0x5d, 0xf8, 0x6c, 0x69, 0xa2, 0x29, 0x13, 0x35, 0xd1, 0xd9, 0x7b,
0x0b, 0xdd, 0x6b, 0x3d, 0x66, 0xad, 0xa2, 0x1f, 0x0e, 0xa2, 0xd2, 0x0d, 0x40, 0x96, 0x6e, 0x80, 0xa0, 0x8d, 0xf4, 0x7e, 0xc0, 0x24, 0xe9, 0x46, 0xbe, 0x3e, 0x4f, 0x55, 0x96, 0x99, 0xee, 0xb5,
0xa5, 0x33, 0xdb, 0x5c, 0x71, 0x66, 0x7f, 0x7b, 0x03, 0xa0, 0x84, 0xa5, 0x8b, 0x08, 0xda, 0x02, 0x1e, 0xb3, 0x66, 0xd1, 0x0f, 0x07, 0x51, 0xe9, 0x75, 0x20, 0x4b, 0xaf, 0xc3, 0xd2, 0x7d, 0x6e,
0xb8, 0x0a, 0xf1, 0xef, 0x18, 0x3d, 0x89, 0x7e, 0xdb, 0xa0, 0x6d, 0x68, 0x9e, 0xa2, 0x9c, 0xab, 0xae, 0xb8, 0xcf, 0xbf, 0xbd, 0x01, 0x50, 0xc4, 0xd2, 0x22, 0x82, 0xb6, 0x00, 0xae, 0x42, 0xfc,
0x6f, 0x13, 0x6a, 0xc1, 0x4e, 0x69, 0xc4, 0xfa, 0x8d, 0x49, 0xdb, 0xd0, 0x50, 0x8f, 0xbf, 0x0d, 0x3b, 0x46, 0x4f, 0xa2, 0xdf, 0x36, 0x68, 0x1b, 0x9a, 0xa7, 0x28, 0xe7, 0xec, 0xdb, 0x84, 0x5a,
0x06, 0x02, 0x65, 0xfb, 0xb1, 0x72, 0xf4, 0xd3, 0x87, 0x17, 0x9b, 0x3c, 0xbd, 0xd8, 0xe4, 0xe3, 0xb0, 0x53, 0x1a, 0xb1, 0x5e, 0x31, 0x69, 0x1b, 0x1a, 0xea, 0xf3, 0xb7, 0xc1, 0x40, 0xa0, 0x6c,
0x8b, 0x4d, 0x1e, 0x5f, 0x6d, 0xe3, 0xe9, 0xd5, 0x36, 0x9e, 0x5f, 0x6d, 0xe3, 0xe6, 0x9b, 0x37, 0x3f, 0x54, 0x8e, 0x7e, 0xfa, 0xf0, 0x6c, 0x93, 0xc7, 0x67, 0x9b, 0x7c, 0x7c, 0xb6, 0xc9, 0xc3,
0xde, 0xa8, 0xb7, 0xeb, 0xea, 0xef, 0xbb, 0x4f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x1b, 0x35, 0x02, 0x8b, 0x6d, 0x3c, 0xbe, 0xd8, 0xc6, 0xd3, 0x8b, 0x6d, 0xdc, 0x7c, 0xf3, 0xca, 0xd7, 0xf6, 0x76,
0x10, 0x83, 0x07, 0x00, 0x00, 0x5d, 0xfd, 0x7d, 0xf7, 0x29, 0x00, 0x00, 0xff, 0xff, 0xb0, 0xa9, 0x1c, 0xd8, 0x9f, 0x07, 0x00,
0x00,
} }
func (m *RootChange) Marshal() (dAtA []byte, err error) { func (m *RootChange) Marshal() (dAtA []byte, err error) {
@ -986,6 +996,13 @@ func (m *TreeChange) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i _ = i
var l int var l int
_ = l _ = l
if len(m.DataType) > 0 {
i -= len(m.DataType)
copy(dAtA[i:], m.DataType)
i = encodeVarintTreechange(dAtA, i, uint64(len(m.DataType)))
i--
dAtA[i] = 0x4a
}
if m.IsSnapshot { if m.IsSnapshot {
i-- i--
if m.IsSnapshot { if m.IsSnapshot {
@ -1607,6 +1624,10 @@ func (m *TreeChange) Size() (n int) {
if m.IsSnapshot { if m.IsSnapshot {
n += 2 n += 2
} }
l = len(m.DataType)
if l > 0 {
n += 1 + l + sovTreechange(uint64(l))
}
return n return n
} }
@ -2372,6 +2393,38 @@ func (m *TreeChange) Unmarshal(dAtA []byte) error {
} }
} }
m.IsSnapshot = bool(v != 0) m.IsSnapshot = bool(v != 0)
case 9:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field DataType", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTreechange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTreechange
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTreechange
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.DataType = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipTreechange(dAtA[iNdEx:]) skippy, err := skipTreechange(dAtA[iNdEx:])

View file

@ -34,7 +34,7 @@ func (s *stateBuilder) Build(tr objecttree.ReadableObjectTree, oldState *State)
state.LastIteratedId = change.Id state.LastIteratedId = change.Id
return true return true
} }
convert := func(decrypted []byte) (res any, err error) { convert := func(ch *objecttree.Change, decrypted []byte) (res any, err error) {
deleteChange := &spacesyncproto.SettingsData{} deleteChange := &spacesyncproto.SettingsData{}
err = proto.Unmarshal(decrypted, deleteChange) err = proto.Unmarshal(decrypted, deleteChange)
if err != nil { if err != nil {

View file

@ -119,6 +119,11 @@ func (s *syncStatusService) Init(a *app.App) (err error) {
s.spaceId = sharedState.SpaceId s.spaceId = sharedState.SpaceId
s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf)
s.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) s.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage)
s.periodicSync = periodicsync.NewPeriodicSync(
s.updateIntervalSecs,
s.updateTimeout,
s.update,
log)
return return
} }
@ -134,11 +139,6 @@ func (s *syncStatusService) SetUpdateReceiver(updater UpdateReceiver) {
} }
func (s *syncStatusService) Run(ctx context.Context) error { func (s *syncStatusService) Run(ctx context.Context) error {
s.periodicSync = periodicsync.NewPeriodicSync(
s.updateIntervalSecs,
s.updateTimeout,
s.update,
log)
s.periodicSync.Run() s.periodicSync.Run()
return nil return nil
} }

View file

@ -5,6 +5,7 @@ import (
"context" "context"
"github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/logger"
"go.uber.org/zap" "go.uber.org/zap"
"sync/atomic"
"time" "time"
) )
@ -39,9 +40,11 @@ type periodicCall struct {
loopDone chan struct{} loopDone chan struct{}
periodSeconds int periodSeconds int
timeout time.Duration timeout time.Duration
isRunning atomic.Bool
} }
func (p *periodicCall) Run() { func (p *periodicCall) Run() {
p.isRunning.Store(true)
go p.loop(p.periodSeconds) go p.loop(p.periodSeconds)
} }
@ -75,6 +78,9 @@ func (p *periodicCall) loop(periodSeconds int) {
} }
func (p *periodicCall) Close() { func (p *periodicCall) Close() {
if !p.isRunning.Load() {
return
}
p.loopCancel() p.loopCancel()
<-p.loopDone <-p.loopDone
} }

View file

@ -45,4 +45,13 @@ func TestPeriodicSync_Run(t *testing.T) {
pSync.Close() pSync.Close()
require.Equal(t, 2, times) require.Equal(t, 2, times)
}) })
t.Run("loop close not running", func(t *testing.T) {
secs := 0
diffSyncer := func(ctx context.Context) (err error) {
return nil
}
pSync := NewPeriodicSync(secs, 0, diffSyncer, l)
pSync.Close()
})
} }