1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-11 18:20:28 +09:00

Update synctree to connect with responsible node

This commit is contained in:
mcrakhman 2023-01-13 12:55:28 +01:00 committed by Mikhail Iudin
parent 50b51b1246
commit b2da6ff300
No known key found for this signature in database
GPG key ID: FAAAA8BAABDFF1C0
3 changed files with 35 additions and 2 deletions

View file

@ -79,10 +79,15 @@ func newWrappedSyncClient(
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) {
streamChecker := deps.ObjectSync.StreamChecker()
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
streamChecker.CheckResponsiblePeers()
peerId, err = streamChecker.FirstResponsiblePeer()
if err != nil {
return
}
}
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
objMsg, err := marshallTreeMessage(newTreeRequest, id, "")

View file

@ -16,6 +16,7 @@ import (
type StreamChecker interface {
CheckResponsiblePeers()
CheckPeerConnection(peerId string) (err error)
FirstResponsiblePeer() (peerId string, err error)
}
type streamChecker struct {
@ -28,7 +29,7 @@ type streamChecker struct {
lastCheck *atomic.Time
}
const streamCheckerInterval = time.Second * 10
const streamCheckerInterval = time.Second * 5
func NewStreamChecker(
spaceId string,
@ -131,3 +132,15 @@ func (s *streamChecker) createStream(p peer.Peer) (err error) {
}
return
}
func (s *streamChecker) FirstResponsiblePeer() (peerId string, err error) {
nodeIds := s.connector.Configuration().NodeIds(s.spaceId)
for _, nodeId := range nodeIds {
if s.streamPool.HasActiveStream(nodeId) {
peerId = nodeId
return
}
}
err = fmt.Errorf("no responsible peers are connected")
return
}

View file

@ -9,6 +9,7 @@ import (
app "github.com/anytypeio/any-sync/app"
nodeconf "github.com/anytypeio/any-sync/nodeconf"
chash "github.com/anytypeio/go-chash"
gomock "github.com/golang/mock/gomock"
)
@ -128,6 +129,20 @@ func (mr *MockConfigurationMockRecorder) Addresses() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Addresses", reflect.TypeOf((*MockConfiguration)(nil).Addresses))
}
// CHash mocks base method.
func (m *MockConfiguration) CHash() chash.CHash {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CHash")
ret0, _ := ret[0].(chash.CHash)
return ret0
}
// CHash indicates an expected call of CHash.
func (mr *MockConfigurationMockRecorder) CHash() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CHash", reflect.TypeOf((*MockConfiguration)(nil).CHash))
}
// ConsensusPeers mocks base method.
func (m *MockConfiguration) ConsensusPeers() []string {
m.ctrl.T.Helper()