mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
Merge branch 'main' into GO-2482-full-sync-request-max-size
# Conflicts: # commonspace/headsync/diffsyncer.go # commonspace/object/acl/syncacl/requestfactory.go # commonspace/object/acl/syncacl/syncacl.go # commonspace/object/acl/syncacl/syncaclhandler.go # commonspace/object/acl/syncacl/syncaclhandler_test.go # commonspace/object/tree/synctree/requestfactory.go # commonspace/object/tree/synctree/synctree.go # commonspace/object/tree/synctree/synctreehandler.go # commonspace/object/tree/synctree/synctreehandler_test.go # commonspace/object/tree/synctree/treeremotegetter.go # commonspace/object/tree/synctree/treesyncprotocol.go # commonspace/object/tree/synctree/treesyncprotocol_test.go # commonspace/object/tree/synctree/utils_test.go # commonspace/objectsync/objectsync.go # commonspace/objectsync/synchandler/synchhandler.go # commonspace/requestmanager/requestmanager.go # commonspace/requestmanager/requestmanager_test.go # go.mod # net/rpc/rpctest/peer.go # net/secureservice/secureservice.go
This commit is contained in:
commit
0519718e51
23 changed files with 371 additions and 141 deletions
|
@ -100,7 +100,7 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
|
|||
}
|
||||
d.log.DebugCtx(ctx, "start diffsync", zap.Strings("peerIds", peerIds))
|
||||
for _, p := range peers {
|
||||
if err = d.syncWithPeer(p.Context(), p); err != nil {
|
||||
if err = d.syncWithPeer(peer.CtxWithPeerAddr(ctx, p.Id()), p); err != nil {
|
||||
d.log.ErrorCtx(ctx, "can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
@ -152,13 +152,13 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
|||
|
||||
// if we removed acl head from the list
|
||||
if len(existingIds) < prevExistingLen {
|
||||
if syncErr := d.syncAcl.SyncWithPeer(ctx, p.Id()); syncErr != nil {
|
||||
if syncErr := d.syncAcl.SyncWithPeer(ctx, p); syncErr != nil {
|
||||
log.Warn("failed to send acl sync message to peer", zap.String("aclId", syncAclId))
|
||||
}
|
||||
}
|
||||
|
||||
// treeSyncer should not get acl id, that's why we filter existing ids before
|
||||
err = d.treeSyncer.SyncAll(ctx, p.Id(), existingIds, missingIds)
|
||||
err = d.treeSyncer.SyncAll(ctx, p, existingIds, missingIds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
|
@ -18,6 +17,7 @@ import (
|
|||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anyproto/any-sync/consensus/consensusproto"
|
||||
"github.com/anyproto/any-sync/net/peer"
|
||||
"github.com/anyproto/any-sync/net/rpc/rpctest"
|
||||
)
|
||||
|
||||
type pushSpaceRequestMatcher struct {
|
||||
|
@ -56,49 +56,6 @@ func (p pushSpaceRequestMatcher) String() string {
|
|||
return ""
|
||||
}
|
||||
|
||||
type mockPeer struct {
|
||||
}
|
||||
|
||||
func (m mockPeer) CloseChan() <-chan struct{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m mockPeer) SetTTL(ttl time.Duration) {
|
||||
return
|
||||
}
|
||||
|
||||
func (m mockPeer) Id() string {
|
||||
return "peerId"
|
||||
}
|
||||
|
||||
func (m mockPeer) Context() context.Context {
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
func (m mockPeer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m mockPeer) ReleaseDrpcConn(conn drpc.Conn) {
|
||||
return
|
||||
}
|
||||
|
||||
func (m mockPeer) DoDrpc(ctx context.Context, do func(conn drpc.Conn) error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m mockPeer) IsClosed() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (m mockPeer) TryClose(objectTTL time.Duration) (res bool, err error) {
|
||||
return false, err
|
||||
}
|
||||
|
||||
func (m mockPeer) Close() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fx *headSyncFixture) initDiffSyncer(t *testing.T) {
|
||||
fx.init(t)
|
||||
fx.diffSyncer = newDiffSyncer(fx.headSync).(*diffSyncer)
|
||||
|
@ -116,7 +73,7 @@ func TestDiffSyncer(t *testing.T) {
|
|||
fx := newHeadSyncFixture(t)
|
||||
fx.initDiffSyncer(t)
|
||||
defer fx.stop()
|
||||
mPeer := mockPeer{}
|
||||
mPeer := rpctest.MockPeer{}
|
||||
remDiff := NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock)
|
||||
fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId")
|
||||
fx.treeSyncerMock.EXPECT().ShouldSync(gomock.Any()).Return(true)
|
||||
|
@ -131,7 +88,7 @@ func TestDiffSyncer(t *testing.T) {
|
|||
fx.deletionStateMock.EXPECT().Filter([]string{"new"}).Return([]string{"new"}).Times(1)
|
||||
fx.deletionStateMock.EXPECT().Filter([]string{"changed"}).Return([]string{"changed"}).Times(1)
|
||||
fx.deletionStateMock.EXPECT().Filter(nil).Return(nil).Times(1)
|
||||
fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"changed"}, []string{"new"}).Return(nil)
|
||||
fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer, []string{"changed"}, []string{"new"}).Return(nil)
|
||||
require.NoError(t, fx.diffSyncer.Sync(ctx))
|
||||
})
|
||||
|
||||
|
@ -139,7 +96,7 @@ func TestDiffSyncer(t *testing.T) {
|
|||
fx := newHeadSyncFixture(t)
|
||||
fx.initDiffSyncer(t)
|
||||
defer fx.stop()
|
||||
mPeer := mockPeer{}
|
||||
mPeer := rpctest.MockPeer{}
|
||||
remDiff := NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock)
|
||||
fx.treeSyncerMock.EXPECT().ShouldSync(gomock.Any()).Return(true)
|
||||
fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId")
|
||||
|
@ -154,8 +111,8 @@ func TestDiffSyncer(t *testing.T) {
|
|||
fx.deletionStateMock.EXPECT().Filter([]string{"new"}).Return([]string{"new"}).Times(1)
|
||||
fx.deletionStateMock.EXPECT().Filter([]string{"changed"}).Return([]string{"changed", "aclId"}).Times(1)
|
||||
fx.deletionStateMock.EXPECT().Filter(nil).Return(nil).Times(1)
|
||||
fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"changed"}, []string{"new"}).Return(nil)
|
||||
fx.aclMock.EXPECT().SyncWithPeer(gomock.Any(), mPeer.Id()).Return(nil)
|
||||
fx.treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer, []string{"changed"}, []string{"new"}).Return(nil)
|
||||
fx.aclMock.EXPECT().SyncWithPeer(gomock.Any(), mPeer).Return(nil)
|
||||
require.NoError(t, fx.diffSyncer.Sync(ctx))
|
||||
})
|
||||
|
||||
|
@ -225,7 +182,7 @@ func TestDiffSyncer(t *testing.T) {
|
|||
|
||||
fx.peerManagerMock.EXPECT().
|
||||
GetResponsiblePeers(gomock.Any()).
|
||||
Return([]peer.Peer{mockPeer{}}, nil)
|
||||
Return([]peer.Peer{rpctest.MockPeer{}}, nil)
|
||||
fx.diffContainerMock.EXPECT().
|
||||
DiffTypeCheck(gomock.Any(), gomock.Eq(remDiff)).Return(true, fx.diffMock, nil)
|
||||
fx.diffMock.EXPECT().
|
||||
|
@ -259,7 +216,7 @@ func TestDiffSyncer(t *testing.T) {
|
|||
fx.treeSyncerMock.EXPECT().ShouldSync(gomock.Any()).Return(true)
|
||||
fx.peerManagerMock.EXPECT().
|
||||
GetResponsiblePeers(gomock.Any()).
|
||||
Return([]peer.Peer{mockPeer{}}, nil)
|
||||
Return([]peer.Peer{rpctest.MockPeer{}}, nil)
|
||||
fx.diffContainerMock.EXPECT().
|
||||
DiffTypeCheck(gomock.Any(), gomock.Eq(remDiff)).Return(true, fx.diffMock, nil)
|
||||
fx.diffMock.EXPECT().
|
||||
|
@ -273,7 +230,7 @@ func TestDiffSyncer(t *testing.T) {
|
|||
fx := newHeadSyncFixture(t)
|
||||
fx.initDiffSyncer(t)
|
||||
defer fx.stop()
|
||||
mPeer := mockPeer{}
|
||||
mPeer := rpctest.MockPeer{}
|
||||
remDiff := NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock)
|
||||
fx.treeSyncerMock.EXPECT().ShouldSync(gomock.Any()).Return(true)
|
||||
fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId")
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
headupdater "github.com/anyproto/any-sync/commonspace/object/acl/syncacl/headupdater"
|
||||
spacesyncproto "github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||
consensusproto "github.com/anyproto/any-sync/consensus/consensusproto"
|
||||
peer "github.com/anyproto/any-sync/net/peer"
|
||||
crypto "github.com/anyproto/any-sync/util/crypto"
|
||||
gomock "go.uber.org/mock/gomock"
|
||||
)
|
||||
|
@ -132,17 +133,17 @@ func (mr *MockSyncAclMockRecorder) GetIndex(arg0 any) *gomock.Call {
|
|||
}
|
||||
|
||||
// HandleMessage mocks base method.
|
||||
func (m *MockSyncAcl) HandleMessage(arg0 context.Context, arg1 string, arg2 *spacesyncproto.ObjectSyncMessage) error {
|
||||
func (m *MockSyncAcl) HandleMessage(arg0 context.Context, arg1 string, arg2 uint32, arg3 *spacesyncproto.ObjectSyncMessage) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "HandleMessage", arg0, arg1, arg2)
|
||||
ret := m.ctrl.Call(m, "HandleMessage", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// HandleMessage indicates an expected call of HandleMessage.
|
||||
func (mr *MockSyncAclMockRecorder) HandleMessage(arg0, arg1, arg2 any) *gomock.Call {
|
||||
func (mr *MockSyncAclMockRecorder) HandleMessage(arg0, arg1, arg2, arg3 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleMessage", reflect.TypeOf((*MockSyncAcl)(nil).HandleMessage), arg0, arg1, arg2)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleMessage", reflect.TypeOf((*MockSyncAcl)(nil).HandleMessage), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
// HandleRequest mocks base method.
|
||||
|
@ -430,7 +431,7 @@ func (mr *MockSyncAclMockRecorder) SetHeadUpdater(arg0 any) *gomock.Call {
|
|||
}
|
||||
|
||||
// SyncWithPeer mocks base method.
|
||||
func (m *MockSyncAcl) SyncWithPeer(arg0 context.Context, arg1 string) error {
|
||||
func (m *MockSyncAcl) SyncWithPeer(arg0 context.Context, arg1 peer.Peer) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SyncWithPeer", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
|
@ -504,6 +505,20 @@ func (mr *MockSyncClientMockRecorder) Broadcast(arg0 any) *gomock.Call {
|
|||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockSyncClient)(nil).Broadcast), arg0)
|
||||
}
|
||||
|
||||
// CreateEmptyFullSyncRequest mocks base method.
|
||||
func (m *MockSyncClient) CreateEmptyFullSyncRequest(arg0 list.AclList) *consensusproto.LogSyncMessage {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "CreateEmptyFullSyncRequest", arg0)
|
||||
ret0, _ := ret[0].(*consensusproto.LogSyncMessage)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// CreateEmptyFullSyncRequest indicates an expected call of CreateEmptyFullSyncRequest.
|
||||
func (mr *MockSyncClientMockRecorder) CreateEmptyFullSyncRequest(arg0 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateEmptyFullSyncRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateEmptyFullSyncRequest), arg0)
|
||||
}
|
||||
|
||||
// CreateFullSyncRequest mocks base method.
|
||||
func (m *MockSyncClient) CreateFullSyncRequest(arg0 list.AclList, arg1 string) (*consensusproto.LogSyncMessage, error) {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -614,6 +629,20 @@ func (m *MockRequestFactory) EXPECT() *MockRequestFactoryMockRecorder {
|
|||
return m.recorder
|
||||
}
|
||||
|
||||
// CreateEmptyFullSyncRequest mocks base method.
|
||||
func (m *MockRequestFactory) CreateEmptyFullSyncRequest(arg0 list.AclList) *consensusproto.LogSyncMessage {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "CreateEmptyFullSyncRequest", arg0)
|
||||
ret0, _ := ret[0].(*consensusproto.LogSyncMessage)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// CreateEmptyFullSyncRequest indicates an expected call of CreateEmptyFullSyncRequest.
|
||||
func (mr *MockRequestFactoryMockRecorder) CreateEmptyFullSyncRequest(arg0 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateEmptyFullSyncRequest", reflect.TypeOf((*MockRequestFactory)(nil).CreateEmptyFullSyncRequest), arg0)
|
||||
}
|
||||
|
||||
// CreateFullSyncRequest mocks base method.
|
||||
func (m *MockRequestFactory) CreateFullSyncRequest(arg0 list.AclList, arg1 string) (*consensusproto.LogSyncMessage, error) {
|
||||
m.ctrl.T.Helper()
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/anyproto/any-sync/commonspace/sync"
|
||||
"github.com/anyproto/any-sync/commonspace/sync/objectsync/objectmessages"
|
||||
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
|
||||
"github.com/anyproto/any-sync/net/peer"
|
||||
|
||||
"github.com/anyproto/any-sync/accountservice"
|
||||
"github.com/anyproto/any-sync/app"
|
||||
|
@ -32,7 +33,7 @@ type SyncAcl interface {
|
|||
list.AclList
|
||||
syncdeps.ObjectSyncHandler
|
||||
SetHeadUpdater(updater headupdater.HeadUpdater)
|
||||
SyncWithPeer(ctx context.Context, peerId string) (err error)
|
||||
SyncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
||||
SetAclUpdater(updater headupdater.AclUpdater)
|
||||
}
|
||||
|
||||
|
@ -130,10 +131,10 @@ func (s *syncAcl) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (e
|
|||
return
|
||||
}
|
||||
|
||||
func (s *syncAcl) SyncWithPeer(ctx context.Context, peerId string) (err error) {
|
||||
func (s *syncAcl) SyncWithPeer(ctx context.Context, p peer.Peer) (err error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
req := s.syncClient.CreateFullSyncRequest(peerId, s)
|
||||
req := s.syncClient.CreateFullSyncRequest(p.Id(), s)
|
||||
return s.syncClient.QueueRequest(ctx, req)
|
||||
}
|
||||
|
||||
|
|
56
commonspace/object/acl/syncacl/syncacl_test.go
Normal file
56
commonspace/object/acl/syncacl/syncacl_test.go
Normal file
|
@ -0,0 +1,56 @@
|
|||
package syncacl
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
|
||||
"github.com/anyproto/any-sync/commonspace/object/acl/list/mock_list"
|
||||
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl/mock_syncacl"
|
||||
"github.com/anyproto/any-sync/consensus/consensusproto"
|
||||
"github.com/anyproto/any-sync/net/peer"
|
||||
"github.com/anyproto/any-sync/net/rpc/rpctest"
|
||||
"github.com/anyproto/any-sync/net/secureservice"
|
||||
)
|
||||
|
||||
var ctx = context.Background()
|
||||
|
||||
func TestSyncAcl_SyncWithPeer(t *testing.T) {
|
||||
// this component will be rewritten, so no need for fixture now
|
||||
t.Run("sync with old peer", func(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
acl := mock_list.NewMockAclList(ctrl)
|
||||
s := &syncAcl{AclList: acl}
|
||||
defer ctrl.Finish()
|
||||
mockClient := mock_syncacl.NewMockSyncClient(ctrl)
|
||||
s.syncClient = mockClient
|
||||
ctx := peer.CtxWithProtoVersion(ctx, secureservice.ProtoVersion)
|
||||
pr := rpctest.MockPeer{Ctx: ctx}
|
||||
retMsg := &consensusproto.LogSyncMessage{}
|
||||
mockClient.EXPECT().CreateHeadUpdate(s, nil).Return(retMsg)
|
||||
acl.EXPECT().Lock()
|
||||
acl.EXPECT().Unlock()
|
||||
mockClient.EXPECT().SendUpdate("peerId", retMsg).Return(nil)
|
||||
err := s.SyncWithPeer(ctx, &pr)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
t.Run("sync with new peer", func(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
acl := mock_list.NewMockAclList(ctrl)
|
||||
s := &syncAcl{AclList: acl}
|
||||
defer ctrl.Finish()
|
||||
mockClient := mock_syncacl.NewMockSyncClient(ctrl)
|
||||
s.syncClient = mockClient
|
||||
ctx := peer.CtxWithProtoVersion(ctx, secureservice.NewSyncProtoVersion)
|
||||
pr := rpctest.MockPeer{Ctx: ctx}
|
||||
req := &consensusproto.LogSyncMessage{}
|
||||
mockClient.EXPECT().CreateEmptyFullSyncRequest(s).Return(req)
|
||||
acl.EXPECT().Lock()
|
||||
acl.EXPECT().Unlock()
|
||||
mockClient.EXPECT().QueueRequest("peerId", req).Return(nil)
|
||||
err := s.SyncWithPeer(ctx, &pr)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
|
@ -20,6 +20,7 @@ import (
|
|||
treechangeproto "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
||||
treestorage "github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
|
||||
spacesyncproto "github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||
peer "github.com/anyproto/any-sync/net/peer"
|
||||
gomock "go.uber.org/mock/gomock"
|
||||
)
|
||||
|
||||
|
@ -90,6 +91,21 @@ func (mr *MockSyncTreeMockRecorder) AddRawChanges(arg0, arg1 any) *gomock.Call {
|
|||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChanges", reflect.TypeOf((*MockSyncTree)(nil).AddRawChanges), arg0, arg1)
|
||||
}
|
||||
|
||||
// AddRawChangesFromPeer mocks base method.
|
||||
func (m *MockSyncTree) AddRawChangesFromPeer(arg0 context.Context, arg1 string, arg2 objecttree.RawChangesPayload) (objecttree.AddResult, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "AddRawChangesFromPeer", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(objecttree.AddResult)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// AddRawChangesFromPeer indicates an expected call of AddRawChangesFromPeer.
|
||||
func (mr *MockSyncTreeMockRecorder) AddRawChangesFromPeer(arg0, arg1, arg2 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChangesFromPeer", reflect.TypeOf((*MockSyncTree)(nil).AddRawChangesFromPeer), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// ChangeInfo mocks base method.
|
||||
func (m *MockSyncTree) ChangeInfo() *treechangeproto.TreeChangeInfo {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -178,17 +194,17 @@ func (mr *MockSyncTreeMockRecorder) GetChange(arg0 any) *gomock.Call {
|
|||
}
|
||||
|
||||
// HandleMessage mocks base method.
|
||||
func (m *MockSyncTree) HandleMessage(arg0 context.Context, arg1 string, arg2 *spacesyncproto.ObjectSyncMessage) error {
|
||||
func (m *MockSyncTree) HandleMessage(arg0 context.Context, arg1 string, arg2 uint32, arg3 *spacesyncproto.ObjectSyncMessage) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "HandleMessage", arg0, arg1, arg2)
|
||||
ret := m.ctrl.Call(m, "HandleMessage", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// HandleMessage indicates an expected call of HandleMessage.
|
||||
func (mr *MockSyncTreeMockRecorder) HandleMessage(arg0, arg1, arg2 any) *gomock.Call {
|
||||
func (mr *MockSyncTreeMockRecorder) HandleMessage(arg0, arg1, arg2, arg3 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleMessage", reflect.TypeOf((*MockSyncTree)(nil).HandleMessage), arg0, arg1, arg2)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleMessage", reflect.TypeOf((*MockSyncTree)(nil).HandleMessage), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
// HandleRequest mocks base method.
|
||||
|
@ -404,7 +420,7 @@ func (mr *MockSyncTreeMockRecorder) Storage() *gomock.Call {
|
|||
}
|
||||
|
||||
// SyncWithPeer mocks base method.
|
||||
func (m *MockSyncTree) SyncWithPeer(arg0 context.Context, arg1 string) error {
|
||||
func (m *MockSyncTree) SyncWithPeer(arg0 context.Context, arg1 peer.Peer) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SyncWithPeer", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
|
@ -622,6 +638,20 @@ func (mr *MockSyncClientMockRecorder) Broadcast(arg0 any) *gomock.Call {
|
|||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockSyncClient)(nil).Broadcast), arg0)
|
||||
}
|
||||
|
||||
// CreateEmptyFullSyncRequest mocks base method.
|
||||
func (m *MockSyncClient) CreateEmptyFullSyncRequest(arg0 objecttree.ObjectTree) *treechangeproto.TreeSyncMessage {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "CreateEmptyFullSyncRequest", arg0)
|
||||
ret0, _ := ret[0].(*treechangeproto.TreeSyncMessage)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// CreateEmptyFullSyncRequest indicates an expected call of CreateEmptyFullSyncRequest.
|
||||
func (mr *MockSyncClientMockRecorder) CreateEmptyFullSyncRequest(arg0 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateEmptyFullSyncRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateEmptyFullSyncRequest), arg0)
|
||||
}
|
||||
|
||||
// CreateFullSyncRequest mocks base method.
|
||||
func (m *MockSyncClient) CreateFullSyncRequest(arg0 objecttree.ObjectTree, arg1, arg2 []string) (*treechangeproto.TreeSyncMessage, error) {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -746,6 +776,20 @@ func (m *MockRequestFactory) EXPECT() *MockRequestFactoryMockRecorder {
|
|||
return m.recorder
|
||||
}
|
||||
|
||||
// CreateEmptyFullSyncRequest mocks base method.
|
||||
func (m *MockRequestFactory) CreateEmptyFullSyncRequest(arg0 objecttree.ObjectTree) *treechangeproto.TreeSyncMessage {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "CreateEmptyFullSyncRequest", arg0)
|
||||
ret0, _ := ret[0].(*treechangeproto.TreeSyncMessage)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// CreateEmptyFullSyncRequest indicates an expected call of CreateEmptyFullSyncRequest.
|
||||
func (mr *MockRequestFactoryMockRecorder) CreateEmptyFullSyncRequest(arg0 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateEmptyFullSyncRequest", reflect.TypeOf((*MockRequestFactory)(nil).CreateEmptyFullSyncRequest), arg0)
|
||||
}
|
||||
|
||||
// CreateFullSyncRequest mocks base method.
|
||||
func (m *MockRequestFactory) CreateFullSyncRequest(arg0 objecttree.ObjectTree, arg1, arg2 []string) (*treechangeproto.TreeSyncMessage, error) {
|
||||
m.ctrl.T.Helper()
|
||||
|
|
|
@ -4,6 +4,7 @@ package synctree
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -18,6 +19,7 @@ import (
|
|||
"github.com/anyproto/any-sync/commonspace/syncstatus"
|
||||
"github.com/anyproto/any-sync/net/peer"
|
||||
"github.com/anyproto/any-sync/nodeconf"
|
||||
"github.com/anyproto/any-sync/util/slice"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -33,12 +35,16 @@ type ListenerSetter interface {
|
|||
SetListener(listener updatelistener.UpdateListener)
|
||||
}
|
||||
|
||||
type SyncTree interface {
|
||||
type peerSendableObjectTree interface {
|
||||
objecttree.ObjectTree
|
||||
syncdeps.ObjectSyncHandler
|
||||
AddRawChangesFromPeer(ctx context.Context, peerId string, changesPayload objecttree.RawChangesPayload) (res objecttree.AddResult, err error)
|
||||
}
|
||||
|
||||
type SyncTree interface {
|
||||
peerSendableObjectTree
|
||||
ListenerSetter
|
||||
SyncWithPeer(ctx context.Context, peerId string) (err error)
|
||||
SyncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
||||
}
|
||||
|
||||
// SyncTree sends head updates to sync service and also sends new changes to update listener
|
||||
|
@ -79,13 +85,13 @@ type BuildDeps struct {
|
|||
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
|
||||
var (
|
||||
remoteGetter = treeRemoteGetter{treeId: id, deps: deps}
|
||||
isRemote bool
|
||||
peerId string
|
||||
)
|
||||
deps.TreeStorage, isRemote, err = remoteGetter.getTree(ctx)
|
||||
deps.TreeStorage, peerId, err = remoteGetter.getTree(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return buildSyncTree(ctx, isRemote, deps)
|
||||
return buildSyncTree(ctx, peerId, deps)
|
||||
}
|
||||
|
||||
func PutSyncTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, deps BuildDeps) (t SyncTree, err error) {
|
||||
|
@ -93,10 +99,10 @@ func PutSyncTree(ctx context.Context, payload treestorage.TreeStorageCreatePaylo
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
return buildSyncTree(ctx, true, deps)
|
||||
return buildSyncTree(ctx, "", deps)
|
||||
}
|
||||
|
||||
func buildSyncTree(ctx context.Context, sendUpdate bool, deps BuildDeps) (t SyncTree, err error) {
|
||||
func buildSyncTree(ctx context.Context, peerId string, deps BuildDeps) (t SyncTree, err error) {
|
||||
objTree, err := deps.BuildObjectTree(deps.TreeStorage, deps.AclList)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -117,14 +123,14 @@ func buildSyncTree(ctx context.Context, sendUpdate bool, deps BuildDeps) (t Sync
|
|||
syncTree.afterBuild()
|
||||
syncTree.Unlock()
|
||||
|
||||
// don't send updates for empty derived trees, because they won't be accepted
|
||||
if sendUpdate && !objecttree.IsEmptyDerivedTree(objTree) {
|
||||
if peerId != "" && !objecttree.IsEmptyDerivedTree(objTree) {
|
||||
headUpdate := syncTree.syncClient.CreateHeadUpdate(objTree, "", nil)
|
||||
// send to everybody, because everybody should know that the node or client got new tree
|
||||
broadcastErr := syncTree.syncClient.Broadcast(ctx, headUpdate)
|
||||
if broadcastErr != nil {
|
||||
log.Warn("failed to broadcast head update", zap.Error(broadcastErr))
|
||||
}
|
||||
deps.SyncStatus.ObjectReceive(peerId, syncTree.Id(), syncTree.Heads())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -169,6 +175,15 @@ func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableCh
|
|||
}
|
||||
|
||||
func (s *syncTree) AddRawChangesFromPeer(ctx context.Context, peerId string, changesPayload objecttree.RawChangesPayload) (res objecttree.AddResult, err error) {
|
||||
if s.hasHeads(s, changesPayload.NewHeads) {
|
||||
s.syncStatus.HeadsApply(peerId, s.Id(), s.Heads(), true)
|
||||
return objecttree.AddResult{
|
||||
OldHeads: changesPayload.NewHeads,
|
||||
Heads: changesPayload.NewHeads,
|
||||
Mode: objecttree.Nothing,
|
||||
}, nil
|
||||
}
|
||||
prevHeads := s.Heads()
|
||||
res, err = s.AddRawChanges(ctx, changesPayload)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -180,9 +195,24 @@ func (s *syncTree) AddRawChangesFromPeer(ctx context.Context, peerId string, cha
|
|||
log.Warn("failed to broadcast head update", zap.Error(broadcastErr))
|
||||
}
|
||||
}
|
||||
curHeads := s.Heads()
|
||||
allAdded := true
|
||||
for _, head := range changesPayload.NewHeads {
|
||||
if !slices.Contains(curHeads, head) {
|
||||
allAdded = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !slice.UnsortedEquals(prevHeads, curHeads) {
|
||||
s.syncStatus.HeadsApply(peerId, s.Id(), curHeads, allAdded)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *syncTree) hasHeads(ot objecttree.ObjectTree, heads []string) bool {
|
||||
return slice.UnsortedEquals(ot.Heads(), heads) || ot.HasChanges(heads...)
|
||||
}
|
||||
|
||||
func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree.RawChangesPayload) (res objecttree.AddResult, err error) {
|
||||
if err = s.checkAlive(); err != nil {
|
||||
return
|
||||
|
@ -266,13 +296,13 @@ func (s *syncTree) checkAlive() (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (s *syncTree) SyncWithPeer(ctx context.Context, peerId string) (err error) {
|
||||
func (s *syncTree) SyncWithPeer(ctx context.Context, p peer.Peer) (err error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
if objecttree.IsEmptyDerivedTree(s) {
|
||||
return nil
|
||||
if objecttree.IsEmptyDerivedTree(s.ObjectTree) {
|
||||
return
|
||||
}
|
||||
req := s.syncClient.CreateFullSyncRequest(peerId, s)
|
||||
req := s.syncClient.CreateFullSyncRequest(p.Id(), s)
|
||||
return s.syncClient.QueueRequest(ctx, req)
|
||||
}
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ func Test_BuildSyncTree(t *testing.T) {
|
|||
}
|
||||
|
||||
headUpdate := &treechangeproto.TreeSyncMessage{}
|
||||
t.Run("AddRawChanges update", func(t *testing.T) {
|
||||
t.Run("AddRawChangesFromPeer update", func(t *testing.T) {
|
||||
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
|
||||
payload := objecttree.RawChangesPayload{
|
||||
NewHeads: nil,
|
||||
|
@ -69,18 +69,20 @@ func Test_BuildSyncTree(t *testing.T) {
|
|||
Added: changes,
|
||||
Mode: objecttree.Append,
|
||||
}
|
||||
objTreeMock.EXPECT().Heads().AnyTimes().Return([]string{"headId"})
|
||||
objTreeMock.EXPECT().HasChanges(gomock.Any()).AnyTimes().Return(false)
|
||||
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(payload)).
|
||||
Return(expectedRes, nil)
|
||||
updateListenerMock.EXPECT().Update(tr)
|
||||
|
||||
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
|
||||
syncClientMock.EXPECT().Broadcast(gomock.Eq(headUpdate))
|
||||
res, err := tr.AddRawChanges(ctx, payload)
|
||||
res, err := tr.AddRawChangesFromPeer(ctx, "peerId", payload)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedRes, res)
|
||||
})
|
||||
|
||||
t.Run("AddRawChanges rebuild", func(t *testing.T) {
|
||||
t.Run("AddRawChangesFromPeer rebuild", func(t *testing.T) {
|
||||
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
|
||||
payload := objecttree.RawChangesPayload{
|
||||
NewHeads: nil,
|
||||
|
@ -91,18 +93,19 @@ func Test_BuildSyncTree(t *testing.T) {
|
|||
Added: changes,
|
||||
Mode: objecttree.Rebuild,
|
||||
}
|
||||
objTreeMock.EXPECT().Heads().AnyTimes().Return([]string{"headId"})
|
||||
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(payload)).
|
||||
Return(expectedRes, nil)
|
||||
updateListenerMock.EXPECT().Rebuild(tr)
|
||||
|
||||
syncClientMock.EXPECT().CreateHeadUpdate(gomock.Eq(tr), gomock.Eq(changes)).Return(headUpdate)
|
||||
syncClientMock.EXPECT().Broadcast(gomock.Eq(headUpdate))
|
||||
res, err := tr.AddRawChanges(ctx, payload)
|
||||
res, err := tr.AddRawChangesFromPeer(ctx, "peerId", payload)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedRes, res)
|
||||
})
|
||||
|
||||
t.Run("AddRawChanges nothing", func(t *testing.T) {
|
||||
t.Run("AddRawChangesFromPeer nothing", func(t *testing.T) {
|
||||
changes := []*treechangeproto.RawTreeChangeWithId{{Id: "some"}}
|
||||
payload := objecttree.RawChangesPayload{
|
||||
NewHeads: nil,
|
||||
|
@ -112,10 +115,11 @@ func Test_BuildSyncTree(t *testing.T) {
|
|||
Added: changes,
|
||||
Mode: objecttree.Nothing,
|
||||
}
|
||||
objTreeMock.EXPECT().Heads().AnyTimes().Return([]string{"headId"})
|
||||
objTreeMock.EXPECT().AddRawChanges(gomock.Any(), gomock.Eq(payload)).
|
||||
Return(expectedRes, nil)
|
||||
|
||||
res, err := tr.AddRawChanges(ctx, payload)
|
||||
res, err := tr.AddRawChangesFromPeer(ctx, "peerId", payload)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedRes, res)
|
||||
})
|
||||
|
|
|
@ -56,16 +56,18 @@ func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (colle
|
|||
return collector, nil
|
||||
}
|
||||
|
||||
func (t treeRemoteGetter) treeRequestLoop(ctx context.Context) (collector *fullResponseCollector, err error) {
|
||||
func (t treeRemoteGetter) treeRequestLoop(ctx context.Context) (collector *fullResponseCollector, peerId string, err error) {
|
||||
availablePeers, err := t.getPeers(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
peerId = availablePeers[0]
|
||||
// in future we will try to load from different peers
|
||||
return t.treeRequest(ctx, availablePeers[0])
|
||||
collector, err = t.treeRequest(ctx, peerId)
|
||||
return collector, peerId, err
|
||||
}
|
||||
|
||||
func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, isRemote bool, err error) {
|
||||
func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, peerId string, err error) {
|
||||
treeStorage, err = t.deps.SpaceStorage.TreeStorage(t.treeId)
|
||||
if err == nil || !errors.Is(err, treestorage.ErrUnknownTreeId) {
|
||||
return
|
||||
|
@ -80,8 +82,7 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.
|
|||
return
|
||||
}
|
||||
|
||||
isRemote = true
|
||||
collector, err := t.treeRequestLoop(ctx)
|
||||
collector, peerId, err := t.treeRequestLoop(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ func TestTreeRemoteGetter(t *testing.T) {
|
|||
fx.peerGetterMock.EXPECT().GetResponsiblePeers(tCtx).Return([]peer.Peer{mockPeer}, nil)
|
||||
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
|
||||
fx.syncClientMock.EXPECT().SendRequest(tCtx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil)
|
||||
resp, err := fx.treeGetter.treeRequestLoop(tCtx)
|
||||
resp, _, err := fx.treeGetter.treeRequestLoop(tCtx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "id", resp.RootChange.Id)
|
||||
})
|
||||
|
@ -84,7 +84,7 @@ func TestTreeRemoteGetter(t *testing.T) {
|
|||
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
|
||||
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
|
||||
fx.syncClientMock.EXPECT().SendRequest(tCtx, peerId, fx.treeGetter.treeId, treeRequest).AnyTimes().Return(nil, fmt.Errorf("some"))
|
||||
_, err := fx.treeGetter.treeRequestLoop(tCtx)
|
||||
_, _, err := fx.treeGetter.treeRequestLoop(tCtx)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
reflect "reflect"
|
||||
|
||||
app "github.com/anyproto/any-sync/app"
|
||||
peer "github.com/anyproto/any-sync/net/peer"
|
||||
gomock "go.uber.org/mock/gomock"
|
||||
)
|
||||
|
||||
|
@ -135,7 +136,7 @@ func (mr *MockTreeSyncerMockRecorder) StopSync() *gomock.Call {
|
|||
}
|
||||
|
||||
// SyncAll mocks base method.
|
||||
func (m *MockTreeSyncer) SyncAll(arg0 context.Context, arg1 string, arg2, arg3 []string) error {
|
||||
func (m *MockTreeSyncer) SyncAll(arg0 context.Context, arg1 peer.Peer, arg2, arg3 []string) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SyncAll", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(error)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"context"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/net/peer"
|
||||
)
|
||||
|
||||
const CName = "common.object.treesyncer"
|
||||
|
@ -14,5 +15,5 @@ type TreeSyncer interface {
|
|||
StartSync()
|
||||
StopSync()
|
||||
ShouldSync(peerId string) bool
|
||||
SyncAll(ctx context.Context, peerId string, existing, missing []string) error
|
||||
SyncAll(ctx context.Context, p peer.Peer, existing, missing []string) error
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/anyproto/any-sync/commonspace/object/treesyncer"
|
||||
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anyproto/any-sync/commonspace/syncstatus"
|
||||
"github.com/anyproto/any-sync/net/peer"
|
||||
"github.com/anyproto/any-sync/net/rpc/rpctest"
|
||||
"github.com/anyproto/any-sync/net/rpc/server"
|
||||
"github.com/anyproto/any-sync/net/streampool"
|
||||
|
@ -62,7 +63,7 @@ func (t *treeSyncer) ShouldSync(peerId string) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (t *treeSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) (err error) {
|
||||
func (t *treeSyncer) SyncAll(ctx context.Context, p peer.Peer, existing, missing []string) (err error) {
|
||||
// TODO: copied from any-sync's previous version, should change later if needed to use queues
|
||||
// problem here is that all sync process is basically synchronous and has *same timeout
|
||||
syncTrees := func(ids []string) {
|
||||
|
@ -77,7 +78,7 @@ func (t *treeSyncer) SyncAll(ctx context.Context, peerId string, existing, missi
|
|||
if !ok {
|
||||
log.WarnCtx(ctx, "not a sync tree")
|
||||
}
|
||||
if err = syncTree.SyncWithPeer(ctx, peerId); err != nil {
|
||||
if err = syncTree.SyncWithPeer(ctx, p); err != nil {
|
||||
log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err))
|
||||
} else {
|
||||
log.DebugCtx(ctx, "success *synctree.SyncWithPeer")
|
||||
|
|
|
@ -341,7 +341,7 @@ func (m mockTreeSyncer) StartSync() {
|
|||
func (m mockTreeSyncer) StopSync() {
|
||||
}
|
||||
|
||||
func (m mockTreeSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error {
|
||||
func (m mockTreeSyncer) SyncAll(ctx context.Context, p peer.Peer, existing, missing []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package syncstatus
|
|||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
)
|
||||
|
||||
|
@ -22,6 +23,12 @@ func (n *noOpSyncStatus) Name() (name string) {
|
|||
func (n *noOpSyncStatus) HeadsChange(treeId string, heads []string) {
|
||||
}
|
||||
|
||||
func (n *noOpSyncStatus) ObjectReceive(senderId, treeId string, heads []string) {
|
||||
}
|
||||
|
||||
func (n *noOpSyncStatus) HeadsApply(senderId, treeId string, heads []string, allAdded bool) {
|
||||
}
|
||||
|
||||
func (n *noOpSyncStatus) HeadsReceive(senderId, treeId string, heads []string) {
|
||||
}
|
||||
|
||||
|
|
|
@ -11,4 +11,6 @@ type StatusUpdater interface {
|
|||
|
||||
HeadsChange(treeId string, heads []string)
|
||||
HeadsReceive(senderId, treeId string, heads []string)
|
||||
ObjectReceive(senderId, treeId string, heads []string)
|
||||
HeadsApply(senderId, treeId string, heads []string, allAdded bool)
|
||||
}
|
||||
|
|
18
go.mod
18
go.mod
|
@ -13,7 +13,7 @@ require (
|
|||
github.com/cespare/xxhash v1.1.0
|
||||
github.com/cheggaaa/mb/v3 v3.0.2
|
||||
github.com/gobwas/glob v0.2.3
|
||||
github.com/goccy/go-graphviz v0.1.2
|
||||
github.com/goccy/go-graphviz v0.1.3
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/hashicorp/yamux v0.1.1
|
||||
github.com/huandu/skiplist v1.2.0
|
||||
|
@ -21,7 +21,7 @@ require (
|
|||
github.com/ipfs/go-block-format v0.2.0
|
||||
github.com/ipfs/go-cid v0.4.1
|
||||
github.com/ipfs/go-ipld-format v0.6.0
|
||||
github.com/libp2p/go-libp2p v0.33.2
|
||||
github.com/libp2p/go-libp2p v0.35.1
|
||||
github.com/mr-tron/base58 v1.2.0
|
||||
github.com/multiformats/go-multibase v0.2.0
|
||||
github.com/multiformats/go-multihash v0.2.3
|
||||
|
@ -33,7 +33,7 @@ require (
|
|||
go.uber.org/atomic v1.11.0
|
||||
go.uber.org/mock v0.4.0
|
||||
go.uber.org/zap v1.27.0
|
||||
golang.org/x/crypto v0.23.0
|
||||
golang.org/x/crypto v0.24.0
|
||||
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
|
||||
golang.org/x/net v0.25.0
|
||||
golang.org/x/time v0.5.0
|
||||
|
@ -49,7 +49,7 @@ require (
|
|||
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
|
||||
github.com/fogleman/gg v1.3.0 // indirect
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
|
@ -58,7 +58,7 @@ require (
|
|||
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
|
||||
github.com/golang/protobuf v1.5.4 // indirect
|
||||
github.com/google/pprof v0.0.0-20240402174815-29b9bb013b0f // indirect
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
|
||||
github.com/ipfs/bbloom v0.0.4 // indirect
|
||||
github.com/ipfs/go-bitfield v1.1.0 // indirect
|
||||
github.com/ipfs/go-datastore v0.6.0 // indirect
|
||||
|
@ -76,7 +76,7 @@ require (
|
|||
github.com/minio/sha256-simd v1.0.1 // indirect
|
||||
github.com/multiformats/go-base32 v0.1.0 // indirect
|
||||
github.com/multiformats/go-base36 v0.2.0 // indirect
|
||||
github.com/multiformats/go-multiaddr v0.12.3 // indirect
|
||||
github.com/multiformats/go-multiaddr v0.12.4 // indirect
|
||||
github.com/multiformats/go-multicodec v0.9.0 // indirect
|
||||
github.com/multiformats/go-multistream v0.5.0 // indirect
|
||||
github.com/multiformats/go-varint v0.0.7 // indirect
|
||||
|
@ -84,7 +84,7 @@ require (
|
|||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/polydawn/refmt v0.89.0 // indirect
|
||||
github.com/prometheus/client_model v0.6.0 // indirect
|
||||
github.com/prometheus/client_model v0.6.1 // indirect
|
||||
github.com/prometheus/common v0.48.0 // indirect
|
||||
github.com/prometheus/procfs v0.12.0 // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
|
@ -96,8 +96,8 @@ require (
|
|||
golang.org/x/image v0.14.0 // indirect
|
||||
golang.org/x/mod v0.17.0 // indirect
|
||||
golang.org/x/sync v0.7.0 // indirect
|
||||
golang.org/x/sys v0.20.0 // indirect
|
||||
golang.org/x/sys v0.21.0 // indirect
|
||||
golang.org/x/tools v0.21.0 // indirect
|
||||
google.golang.org/protobuf v1.33.0 // indirect
|
||||
google.golang.org/protobuf v1.34.1 // indirect
|
||||
lukechampine.com/blake3 v1.2.1 // indirect
|
||||
)
|
||||
|
|
80
go.sum
80
go.sum
|
@ -54,8 +54,8 @@ github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c h1:pFUpOrbxDR
|
|||
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c/go.mod h1:6UhI8N9EjYm1c2odKpFpAYeR8dsBeM7PtzQhRgxRr9U=
|
||||
github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y=
|
||||
github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo=
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs=
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg=
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
|
||||
github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg=
|
||||
github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
|
||||
github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=
|
||||
|
@ -75,8 +75,8 @@ github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4
|
|||
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
|
||||
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
|
||||
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
|
||||
github.com/goccy/go-graphviz v0.1.2 h1:sWSJ6w13BCm/ZOUTHDVrdvbsxqN8yyzaFcHrH/hQ9Yg=
|
||||
github.com/goccy/go-graphviz v0.1.2/go.mod h1:pMYpbAqJT10V8dzV1JN/g/wUlG/0imKPzn3ZsrchGCI=
|
||||
github.com/goccy/go-graphviz v0.1.3 h1:Pkt8y4FBnBNI9tfSobpoN5qy1qMNqRXPQYvLhaSUasY=
|
||||
github.com/goccy/go-graphviz v0.1.3/go.mod h1:pMYpbAqJT10V8dzV1JN/g/wUlG/0imKPzn3ZsrchGCI=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
|
||||
|
@ -95,8 +95,8 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
|
|||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c h1:7lF+Vz0LqiRidnzC1Oq86fpX1q/iEv2KJdrCtttYjT4=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE=
|
||||
github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
|
@ -156,8 +156,8 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
|
|||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
|
||||
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
|
||||
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
|
||||
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
|
||||
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
||||
github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
|
||||
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
|
||||
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
|
||||
|
@ -172,8 +172,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
|||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8=
|
||||
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
|
||||
github.com/libp2p/go-libp2p v0.33.2 h1:vCdwnFxoGOXMKmaGHlDSnL4bM3fQeW8pgIa9DECnb40=
|
||||
github.com/libp2p/go-libp2p v0.33.2/go.mod h1:zTeppLuCvUIkT118pFVzA8xzP/p2dJYOMApCkFh0Yww=
|
||||
github.com/libp2p/go-libp2p v0.35.1 h1:Hm7Ub2BF+GCb14ojcsEK6WAy5it5smPDK02iXSZLl50=
|
||||
github.com/libp2p/go-libp2p v0.35.1/go.mod h1:Dnkgba5hsfSv5dvvXC8nfqk44hH0gIKKno+HOMU0fdc=
|
||||
github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94=
|
||||
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
|
||||
github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0=
|
||||
|
@ -204,8 +204,8 @@ github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aG
|
|||
github.com/multiformats/go-base32 v0.1.0/go.mod h1:Kj3tFY6zNr+ABYMqeUNeGvkIC/UYgtWibDcT0rExnbI=
|
||||
github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0=
|
||||
github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4=
|
||||
github.com/multiformats/go-multiaddr v0.12.3 h1:hVBXvPRcKG0w80VinQ23P5t7czWgg65BmIvQKjDydU8=
|
||||
github.com/multiformats/go-multiaddr v0.12.3/go.mod h1:sBXrNzucqkFJhvKOiwwLyqamGa/P5EIXNPLovyhQCII=
|
||||
github.com/multiformats/go-multiaddr v0.12.4 h1:rrKqpY9h+n80EwhhC/kkcunCZZ7URIF8yN1WEUt2Hvc=
|
||||
github.com/multiformats/go-multiaddr v0.12.4/go.mod h1:sBXrNzucqkFJhvKOiwwLyqamGa/P5EIXNPLovyhQCII=
|
||||
github.com/multiformats/go-multiaddr-dns v0.3.1 h1:QgQgR+LQVt3NPTjbrLLpsaT2ufAA2y0Mkk+QRVJbW3A=
|
||||
github.com/multiformats/go-multiaddr-dns v0.3.1/go.mod h1:G/245BRQ6FJGmryJCrOuTdB37AMA5AMOVuO6NY3JwTk=
|
||||
github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E=
|
||||
|
@ -231,6 +231,38 @@ github.com/onsi/ginkgo/v2 v2.17.1/go.mod h1:llBI3WDLL9Z6taip6f33H76YcWtJv+7R3Hig
|
|||
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8=
|
||||
github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ=
|
||||
github.com/pion/datachannel v1.5.6 h1:1IxKJntfSlYkpUj8LlYRSWpYiTTC02nUrOE8T3DqGeg=
|
||||
github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNIVb/NfGW4=
|
||||
github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks=
|
||||
github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
|
||||
github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs=
|
||||
github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
|
||||
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
|
||||
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
|
||||
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
|
||||
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
|
||||
github.com/pion/mdns v0.0.12 h1:CiMYlY+O0azojWDmxdNr7ADGrnZ+V6Ilfner+6mSVK8=
|
||||
github.com/pion/mdns v0.0.12/go.mod h1:VExJjv8to/6Wqm1FXK+Ii/Z9tsVk/F5sD/N70cnYFbk=
|
||||
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
|
||||
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
|
||||
github.com/pion/rtcp v1.2.14 h1:KCkGV3vJ+4DAJmvP0vaQShsb0xkRfWkO540Gy102KyE=
|
||||
github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
|
||||
github.com/pion/rtp v1.8.6 h1:MTmn/b0aWWsAzux2AmP8WGllusBVw4NPYPVFFd7jUPw=
|
||||
github.com/pion/rtp v1.8.6/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
|
||||
github.com/pion/sctp v1.8.16 h1:PKrMs+o9EMLRvFfXq59WFsC+V8mN1wnKzqrv+3D/gYY=
|
||||
github.com/pion/sctp v1.8.16/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE=
|
||||
github.com/pion/sdp/v3 v3.0.9 h1:pX++dCHoHUwq43kuwf3PyJfHlwIj4hXA7Vrifiq0IJY=
|
||||
github.com/pion/sdp/v3 v3.0.9/go.mod h1:B5xmvENq5IXJimIO4zfp6LAe1fD9N+kFv+V/1lOdz8M=
|
||||
github.com/pion/srtp/v2 v2.0.18 h1:vKpAXfawO9RtTRKZJbG4y0v1b11NZxQnxRl85kGuUlo=
|
||||
github.com/pion/srtp/v2 v2.0.18/go.mod h1:0KJQjA99A6/a0DOVTu1PhDSw0CXF2jTkqOoMg3ODqdA=
|
||||
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
|
||||
github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8=
|
||||
github.com/pion/transport/v2 v2.2.5 h1:iyi25i/21gQck4hfRhomF6SktmUQjRsRW4WJdhfc3Kc=
|
||||
github.com/pion/transport/v2 v2.2.5/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
|
||||
github.com/pion/turn/v2 v2.1.6 h1:Xr2niVsiPTB0FPtt+yAWKFUkU1eotQbGgpTIld4x1Gc=
|
||||
github.com/pion/turn/v2 v2.1.6/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
|
||||
github.com/pion/webrtc/v3 v3.2.40 h1:Wtfi6AZMQg+624cvCXUuSmrKWepSB7zfgYDOYqsSOVU=
|
||||
github.com/pion/webrtc/v3 v3.2.40/go.mod h1:M1RAe3TNTD1tzyvqHrbVODfwdPGSXOUo/OgpoGGJqFY=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
|
@ -240,8 +272,8 @@ github.com/polydawn/refmt v0.89.0 h1:ADJTApkvkeBZsN0tBTx8QjpD9JkmxbKp0cxfr9qszm4
|
|||
github.com/polydawn/refmt v0.89.0/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw=
|
||||
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
|
||||
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
|
||||
github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
|
||||
github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
|
||||
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
|
||||
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
|
||||
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
|
||||
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
|
||||
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
|
||||
|
@ -250,8 +282,8 @@ github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
|
|||
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
|
||||
github.com/quic-go/quic-go v0.44.0 h1:So5wOr7jyO4vzL2sd8/pD9Kesciv91zSk8BoFngItQ0=
|
||||
github.com/quic-go/quic-go v0.44.0/go.mod h1:z4cx/9Ny9UtGITIPzmPTXh1ULfOyWh4qGQlpnPcWmek=
|
||||
github.com/quic-go/webtransport-go v0.6.0 h1:CvNsKqc4W2HljHJnoT+rMmbRJybShZ0YPFDD3NxaZLY=
|
||||
github.com/quic-go/webtransport-go v0.6.0/go.mod h1:9KjU4AEBqEQidGHNDkZrb8CAa1abRaosM2yGOyiikEc=
|
||||
github.com/quic-go/webtransport-go v0.8.0 h1:HxSrwun11U+LlmwpgM1kEqIqH90IT4N8auv/cD7QFJg=
|
||||
github.com/quic-go/webtransport-go v0.8.0/go.mod h1:N99tjprW432Ut5ONql/aUhSLT0YVSlwHohQsuac9WaM=
|
||||
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
|
||||
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
|
||||
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
|
@ -316,8 +348,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
|
|||
golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20200602180216-279210d13fed/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
|
||||
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
|
||||
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
|
||||
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
|
||||
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM=
|
||||
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
|
||||
golang.org/x/image v0.14.0 h1:tNgSxAFe3jC4uYqvZdTr84SZoM1KfwdC9SKIFrLjFn4=
|
||||
|
@ -355,13 +387,13 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
|
||||
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
|
||||
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
|
||||
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
|
||||
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
|
||||
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
|
||||
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
@ -377,8 +409,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
|
|||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
|
||||
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
|
||||
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
|
|
|
@ -71,7 +71,7 @@ func New() AnyNsClientService {
|
|||
func (s *service) doClient(ctx context.Context, fn func(cl nsp.DRPCAnynsClient) error) error {
|
||||
if len(s.nodeconf.NamingNodePeers()) == 0 {
|
||||
log.Error("no namingNode peers configured")
|
||||
return errors.New("no namingNode peers configured")
|
||||
return errors.New("no namingNode peers configured. Node config ID: " + s.nodeconf.Id())
|
||||
}
|
||||
|
||||
// it will try to connect to the Naming Node
|
||||
|
|
14
net/peer/context_test.go
Normal file
14
net/peer/context_test.go
Normal file
|
@ -0,0 +1,14 @@
|
|||
package peer
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCtxProtoVersion(t *testing.T) {
|
||||
ctx := CtxWithProtoVersion(ctx, 1)
|
||||
ver, err := CtxProtoVersion(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint32(1), ver)
|
||||
}
|
|
@ -2,6 +2,9 @@ package rpctest
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"storj.io/drpc"
|
||||
|
||||
"github.com/anyproto/any-sync/net/peer"
|
||||
"github.com/anyproto/any-sync/net/rpc/rpctest/multiconntest"
|
||||
|
@ -15,3 +18,50 @@ func MultiConnPair(peerIdServ, peerIdClient string) (serv, client transport.Mult
|
|||
peer.CtxWithProtoVersion(peer.CtxWithPeerId(context.Background(), peerIdClient), secureservice.ProtoVersion),
|
||||
)
|
||||
}
|
||||
|
||||
type MockPeer struct {
|
||||
Ctx context.Context
|
||||
}
|
||||
|
||||
func (m MockPeer) CloseChan() <-chan struct{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m MockPeer) SetTTL(ttl time.Duration) {
|
||||
return
|
||||
}
|
||||
|
||||
func (m MockPeer) Id() string {
|
||||
return "peerId"
|
||||
}
|
||||
|
||||
func (m MockPeer) Context() context.Context {
|
||||
if m.Ctx != nil {
|
||||
return m.Ctx
|
||||
}
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
func (m MockPeer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m MockPeer) ReleaseDrpcConn(conn drpc.Conn) {
|
||||
return
|
||||
}
|
||||
|
||||
func (m MockPeer) DoDrpc(ctx context.Context, do func(conn drpc.Conn) error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m MockPeer) IsClosed() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (m MockPeer) TryClose(objectTTL time.Duration) (res bool, err error) {
|
||||
return false, err
|
||||
}
|
||||
|
||||
func (m MockPeer) Close() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -3,6 +3,10 @@ package yamux
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/app/logger"
|
||||
"github.com/anyproto/any-sync/net/connutil"
|
||||
|
@ -10,9 +14,6 @@ import (
|
|||
"github.com/anyproto/any-sync/net/transport"
|
||||
"github.com/hashicorp/yamux"
|
||||
"go.uber.org/zap"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const CName = "net.transport.yamux"
|
||||
|
@ -157,18 +158,18 @@ func (y *yamuxTransport) accept(conn net.Conn) {
|
|||
defer cancel()
|
||||
cctx, err := y.secure.SecureInbound(ctx, conn)
|
||||
if err != nil {
|
||||
log.Warn("incoming connection handshake error", zap.Error(err))
|
||||
log.Info("incoming connection handshake error", zap.Error(err), zap.String("remoteAddr", conn.RemoteAddr().String()))
|
||||
return
|
||||
}
|
||||
luc := connutil.NewLastUsageConn(connutil.NewTimeout(conn, time.Duration(y.conf.WriteTimeoutSec)*time.Second))
|
||||
sess, err := yamux.Server(luc, y.yamuxConf)
|
||||
if err != nil {
|
||||
log.Warn("incoming connection yamux session error", zap.Error(err))
|
||||
log.Info("incoming connection yamux session error", zap.Error(err), zap.String("remoteAddr", conn.RemoteAddr().String()))
|
||||
return
|
||||
}
|
||||
mc := NewMultiConn(cctx, luc, conn.RemoteAddr().String(), sess)
|
||||
if err = y.accepter.Accept(mc); err != nil {
|
||||
log.Warn("connection accept error", zap.Error(err))
|
||||
log.Info("connection accept error", zap.Error(err), zap.String("remoteAddr", conn.RemoteAddr().String()))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -57,8 +57,7 @@ func New() AnyPpClientService {
|
|||
func (s *service) doClient(ctx context.Context, fn func(cl pp.DRPCAnyPaymentProcessingClient) error) error {
|
||||
if len(s.nodeconf.PaymentProcessingNodePeers()) == 0 {
|
||||
log.Error("no payment processing peers configured")
|
||||
|
||||
return errors.New("no paymentProcessingNode peers configured")
|
||||
return errors.New("no paymentProcessingNode peers configured. Node config ID: " + s.nodeconf.Id())
|
||||
}
|
||||
|
||||
// it will try to connect to the Payment Node
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue