diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 4aae1bf7..c0f4c462 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -79,9 +79,14 @@ func newWrappedSyncClient( func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) { + streamChecker := deps.ObjectSync.StreamChecker() peerId, err := peer.CtxPeerId(ctx) if err != nil { - return + streamChecker.CheckResponsiblePeers() + peerId, err = streamChecker.FirstResponsiblePeer() + if err != nil { + return + } } newTreeRequest := GetRequestFactory().CreateNewTreeRequest() diff --git a/commonspace/objectsync/streamchecker.go b/commonspace/objectsync/streamchecker.go index 4f99794d..5bf53e5e 100644 --- a/commonspace/objectsync/streamchecker.go +++ b/commonspace/objectsync/streamchecker.go @@ -16,6 +16,7 @@ import ( type StreamChecker interface { CheckResponsiblePeers() CheckPeerConnection(peerId string) (err error) + FirstResponsiblePeer() (peerId string, err error) } type streamChecker struct { @@ -28,7 +29,7 @@ type streamChecker struct { lastCheck *atomic.Time } -const streamCheckerInterval = time.Second * 10 +const streamCheckerInterval = time.Second * 5 func NewStreamChecker( spaceId string, @@ -131,3 +132,15 @@ func (s *streamChecker) createStream(p peer.Peer) (err error) { } return } + +func (s *streamChecker) FirstResponsiblePeer() (peerId string, err error) { + nodeIds := s.connector.Configuration().NodeIds(s.spaceId) + for _, nodeId := range nodeIds { + if s.streamPool.HasActiveStream(nodeId) { + peerId = nodeId + return + } + } + err = fmt.Errorf("no responsible peers are connected") + return +} diff --git a/nodeconf/mock_nodeconf/mock_nodeconf.go b/nodeconf/mock_nodeconf/mock_nodeconf.go index 38d5ef64..9e0bd4f4 100644 --- a/nodeconf/mock_nodeconf/mock_nodeconf.go +++ b/nodeconf/mock_nodeconf/mock_nodeconf.go @@ -9,6 +9,7 @@ import ( app "github.com/anytypeio/any-sync/app" nodeconf "github.com/anytypeio/any-sync/nodeconf" + chash "github.com/anytypeio/go-chash" gomock "github.com/golang/mock/gomock" ) @@ -128,6 +129,20 @@ func (mr *MockConfigurationMockRecorder) Addresses() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Addresses", reflect.TypeOf((*MockConfiguration)(nil).Addresses)) } +// CHash mocks base method. +func (m *MockConfiguration) CHash() chash.CHash { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CHash") + ret0, _ := ret[0].(chash.CHash) + return ret0 +} + +// CHash indicates an expected call of CHash. +func (mr *MockConfigurationMockRecorder) CHash() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CHash", reflect.TypeOf((*MockConfiguration)(nil).CHash)) +} + // ConsensusPeers mocks base method. func (m *MockConfiguration) ConsensusPeers() []string { m.ctrl.T.Helper()