From bab5cb694f4f8c68d21e2d5ec3c6872e93595f8e Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Tue, 2 Jul 2024 11:38:03 +0200 Subject: [PATCH] Add protocol version to context and add new sync logic --- commonspace/headsync/diffsyncer.go | 4 +-- .../object/acl/syncacl/requestfactory.go | 7 +++++ commonspace/object/acl/syncacl/syncacl.go | 21 ++++++++++---- .../object/acl/syncacl/syncaclhandler.go | 5 ++-- .../object/tree/synctree/requestfactory.go | 9 ++++++ commonspace/object/tree/synctree/synctree.go | 17 +++++++---- .../object/tree/synctree/synctreehandler.go | 27 ++++++++++++++---- commonspace/object/treesyncer/treesyncer.go | 3 +- commonspace/objectsync/objectsync.go | 11 ++++++-- .../objectsync/synchandler/synchhandler.go | 3 +- commonspace/requestmanager/requestmanager.go | 28 +++++++++++++------ commonspace/spaceutils_test.go | 2 +- net/peer/context.go | 19 +++++++++++-- net/secureservice/secureservice.go | 2 ++ 14 files changed, 121 insertions(+), 37 deletions(-) diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 266b6b53..061bd526 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -152,13 +152,13 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) // if we removed acl head from the list if len(existingIds) < prevExistingLen { - if syncErr := d.syncAcl.SyncWithPeer(ctx, p.Id()); syncErr != nil { + if syncErr := d.syncAcl.SyncWithPeer(ctx, p); syncErr != nil { log.Warn("failed to send acl sync message to peer", zap.String("aclId", syncAclId)) } } // treeSyncer should not get acl id, that's why we filter existing ids before - err = d.treeSyncer.SyncAll(ctx, p.Id(), existingIds, missingIds) + err = d.treeSyncer.SyncAll(ctx, p, existingIds, missingIds) if err != nil { return err } diff --git a/commonspace/object/acl/syncacl/requestfactory.go b/commonspace/object/acl/syncacl/requestfactory.go index 6adddb4b..e0d0c288 100644 --- a/commonspace/object/acl/syncacl/requestfactory.go +++ b/commonspace/object/acl/syncacl/requestfactory.go @@ -9,6 +9,7 @@ import ( type RequestFactory interface { CreateHeadUpdate(l list.AclList, added []*consensusproto.RawRecordWithId) (msg *consensusproto.LogSyncMessage) + CreateEmptyFullSyncRequest(l list.AclList) (req *consensusproto.LogSyncMessage) CreateFullSyncRequest(l list.AclList, theirHead string) (req *consensusproto.LogSyncMessage, err error) CreateFullSyncResponse(l list.AclList, theirHead string) (*consensusproto.LogSyncMessage, error) } @@ -26,6 +27,12 @@ func (r *requestFactory) CreateHeadUpdate(l list.AclList, added []*consensusprot }, l.Root()) } +func (r *requestFactory) CreateEmptyFullSyncRequest(l list.AclList) (req *consensusproto.LogSyncMessage) { + return consensusproto.WrapFullRequest(&consensusproto.LogFullSyncRequest{ + Head: l.Head().Id, + }, l.Root()) +} + func (r *requestFactory) CreateFullSyncRequest(l list.AclList, theirHead string) (req *consensusproto.LogSyncMessage, err error) { if !l.HasHead(theirHead) { return consensusproto.WrapFullRequest(&consensusproto.LogFullSyncRequest{ diff --git a/commonspace/object/acl/syncacl/syncacl.go b/commonspace/object/acl/syncacl/syncacl.go index 7fb1d9e0..aba928ac 100644 --- a/commonspace/object/acl/syncacl/syncacl.go +++ b/commonspace/object/acl/syncacl/syncacl.go @@ -8,6 +8,8 @@ import ( "github.com/anyproto/any-sync/commonspace/object/acl/syncacl/headupdater" "github.com/anyproto/any-sync/commonspace/object/syncobjectgetter" + "github.com/anyproto/any-sync/net/peer" + "github.com/anyproto/any-sync/net/secureservice" "github.com/anyproto/any-sync/accountservice" "github.com/anyproto/any-sync/app" @@ -35,7 +37,7 @@ type SyncAcl interface { list.AclList syncobjectgetter.SyncObject SetHeadUpdater(updater headupdater.HeadUpdater) - SyncWithPeer(ctx context.Context, peerId string) (err error) + SyncWithPeer(ctx context.Context, p peer.Peer) (err error) SetAclUpdater(updater headupdater.AclUpdater) } @@ -73,8 +75,8 @@ func (s *syncAcl) SetHeadUpdater(updater headupdater.HeadUpdater) { s.headUpdater = updater } -func (s *syncAcl) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) { - return s.syncHandler.HandleMessage(ctx, senderId, request) +func (s *syncAcl) HandleMessage(ctx context.Context, senderId string, protoVersion uint32, request *spacesyncproto.ObjectSyncMessage) (err error) { + return s.syncHandler.HandleMessage(ctx, senderId, protoVersion, request) } func (s *syncAcl) Init(a *app.App) (err error) { @@ -136,11 +138,18 @@ func (s *syncAcl) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (e return } -func (s *syncAcl) SyncWithPeer(ctx context.Context, peerId string) (err error) { +func (s *syncAcl) SyncWithPeer(ctx context.Context, p peer.Peer) (err error) { s.Lock() defer s.Unlock() - headUpdate := s.syncClient.CreateHeadUpdate(s, nil) - return s.syncClient.SendUpdate(peerId, headUpdate) + protoVersion, err := peer.CtxProtoVersion(ctx) + // this works with old protocol + if err != nil || protoVersion <= secureservice.ProtoVersion { + headUpdate := s.syncClient.CreateHeadUpdate(s, nil) + return s.syncClient.SendUpdate(p.Id(), headUpdate) + } + // for new protocol sending empty request + request := s.syncClient.CreateEmptyFullSyncRequest(s) + return s.syncClient.QueueRequest(p.Id(), request) } func (s *syncAcl) Close(ctx context.Context) (err error) { diff --git a/commonspace/object/acl/syncacl/syncaclhandler.go b/commonspace/object/acl/syncacl/syncaclhandler.go index 2d76cc46..709a6fb0 100644 --- a/commonspace/object/acl/syncacl/syncaclhandler.go +++ b/commonspace/object/acl/syncacl/syncaclhandler.go @@ -4,12 +4,13 @@ import ( "context" "errors" + "github.com/gogo/protobuf/proto" + "github.com/anyproto/any-sync/commonspace/object/acl/list" "github.com/anyproto/any-sync/commonspace/objectsync/synchandler" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/consensus/consensusproto" - "github.com/gogo/protobuf/proto" ) var ( @@ -35,7 +36,7 @@ func newSyncAclHandler(spaceId string, aclList list.AclList, syncClient SyncClie } } -func (s *syncAclHandler) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { +func (s *syncAclHandler) HandleMessage(ctx context.Context, senderId string, protoVersion uint32, message *spacesyncproto.ObjectSyncMessage) (err error) { unmarshalled := &consensusproto.LogSyncMessage{} err = proto.Unmarshal(message.Payload, unmarshalled) if err != nil { diff --git a/commonspace/object/tree/synctree/requestfactory.go b/commonspace/object/tree/synctree/requestfactory.go index 8d91add8..0306c980 100644 --- a/commonspace/object/tree/synctree/requestfactory.go +++ b/commonspace/object/tree/synctree/requestfactory.go @@ -2,6 +2,7 @@ package synctree import ( "fmt" + "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/util/slice" @@ -10,6 +11,7 @@ import ( type RequestFactory interface { CreateHeadUpdate(t objecttree.ObjectTree, added []*treechangeproto.RawTreeChangeWithId) (msg *treechangeproto.TreeSyncMessage) CreateNewTreeRequest() (msg *treechangeproto.TreeSyncMessage) + CreateEmptyFullSyncRequest(t objecttree.ObjectTree) (req *treechangeproto.TreeSyncMessage) CreateFullSyncRequest(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (req *treechangeproto.TreeSyncMessage, err error) CreateFullSyncResponse(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (*treechangeproto.TreeSyncMessage, error) } @@ -32,6 +34,13 @@ func (r *requestFactory) CreateNewTreeRequest() (msg *treechangeproto.TreeSyncMe return treechangeproto.WrapFullRequest(&treechangeproto.TreeFullSyncRequest{}, nil) } +func (r *requestFactory) CreateEmptyFullSyncRequest(t objecttree.ObjectTree) (msg *treechangeproto.TreeSyncMessage) { + return treechangeproto.WrapFullRequest(&treechangeproto.TreeFullSyncRequest{ + Heads: t.Heads(), + SnapshotPath: t.SnapshotPath(), + }, t.Header()) +} + func (r *requestFactory) CreateFullSyncRequest(t objecttree.ObjectTree, theirHeads, theirSnapshotPath []string) (msg *treechangeproto.TreeSyncMessage, err error) { req := &treechangeproto.TreeFullSyncRequest{} if t == nil { diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index ca3f2cc3..a9b49653 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -17,6 +17,7 @@ import ( "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/net/peer" + "github.com/anyproto/any-sync/net/secureservice" "github.com/anyproto/any-sync/nodeconf" ) @@ -37,7 +38,7 @@ type SyncTree interface { objecttree.ObjectTree synchandler.SyncHandler ListenerSetter - SyncWithPeer(ctx context.Context, peerId string) (err error) + SyncWithPeer(ctx context.Context, p peer.Peer) (err error) } // SyncTree sends head updates to sync service and also sends new changes to update listener @@ -245,14 +246,18 @@ func (s *syncTree) checkAlive() (err error) { return } -func (s *syncTree) SyncWithPeer(ctx context.Context, peerId string) (err error) { +func (s *syncTree) SyncWithPeer(ctx context.Context, p peer.Peer) (err error) { s.Lock() defer s.Unlock() - if objecttree.IsEmptyDerivedTree(s) { - return nil + protoVersion, err := peer.CtxProtoVersion(ctx) + // this works with old protocol + if err != nil || protoVersion <= secureservice.ProtoVersion { + headUpdate := s.syncClient.CreateHeadUpdate(s, nil) + return s.syncClient.SendUpdate(p.Id(), headUpdate.RootChange.Id, headUpdate) } - headUpdate := s.syncClient.CreateHeadUpdate(s, nil) - return s.syncClient.SendUpdate(peerId, headUpdate.RootChange.Id, headUpdate) + // for new protocol sending empty request + request := s.syncClient.CreateEmptyFullSyncRequest(s) + return s.syncClient.QueueRequest(p.Id(), s.Id(), request) } func (s *syncTree) afterBuild() { diff --git a/commonspace/object/tree/synctree/synctreehandler.go b/commonspace/object/tree/synctree/synctreehandler.go index 14f30961..168d3fec 100644 --- a/commonspace/object/tree/synctree/synctreehandler.go +++ b/commonspace/object/tree/synctree/synctreehandler.go @@ -5,13 +5,15 @@ import ( "errors" "sync" + "github.com/gogo/protobuf/proto" + "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/objectsync/synchandler" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/syncstatus" + "github.com/anyproto/any-sync/net/secureservice" "github.com/anyproto/any-sync/util/slice" - "github.com/gogo/protobuf/proto" ) var ( @@ -85,7 +87,7 @@ func (s *syncTreeHandler) handleRequest(ctx context.Context, senderId string, fu return } -func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { +func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, protoVersion uint32, msg *spacesyncproto.ObjectSyncMessage) (err error) { unmarshalled := &treechangeproto.TreeSyncMessage{} err = proto.Unmarshal(msg.Payload, unmarshalled) if err != nil { @@ -100,10 +102,10 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms return } s.handlerLock.Unlock() - return s.handleMessage(ctx, unmarshalled, senderId) + return s.handleMessage(ctx, unmarshalled, protoVersion, senderId) } -func (s *syncTreeHandler) handleMessage(ctx context.Context, msg *treechangeproto.TreeSyncMessage, senderId string) (err error) { +func (s *syncTreeHandler) handleMessage(ctx context.Context, msg *treechangeproto.TreeSyncMessage, protoVersion uint32, senderId string) (err error) { s.objTree.Lock() defer s.objTree.Unlock() var ( @@ -137,7 +139,22 @@ func (s *syncTreeHandler) handleMessage(ctx context.Context, msg *treechangeprot case content.GetFullSyncRequest() != nil: return ErrMessageIsRequest case content.GetFullSyncResponse() != nil: - return s.syncProtocol.FullSyncResponse(ctx, senderId, content.GetFullSyncResponse()) + err := s.syncProtocol.FullSyncResponse(ctx, senderId, content.GetFullSyncResponse()) + if err != nil { + return err + } + if protoVersion <= secureservice.ProtoVersion { + return nil + } + cnt := content.GetFullSyncResponse() + if slice.UnsortedEquals(cnt.Heads, s.objTree.Heads()) { + return nil + } + req, err := s.syncClient.CreateFullSyncRequest(s.objTree, cnt.Heads, cnt.SnapshotPath) + if err != nil { + return err + } + return s.syncClient.QueueRequest(senderId, treeId, req) } return } diff --git a/commonspace/object/treesyncer/treesyncer.go b/commonspace/object/treesyncer/treesyncer.go index 5f8819fd..1ebe8267 100644 --- a/commonspace/object/treesyncer/treesyncer.go +++ b/commonspace/object/treesyncer/treesyncer.go @@ -5,6 +5,7 @@ import ( "context" "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/net/peer" ) const CName = "common.object.treesyncer" @@ -14,5 +15,5 @@ type TreeSyncer interface { StartSync() StopSync() ShouldSync(peerId string) bool - SyncAll(ctx context.Context, peerId string, existing, missing []string) error + SyncAll(ctx context.Context, p peer.Peer, existing, missing []string) error } diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index aec6c419..d0187da4 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -15,6 +15,7 @@ import ( "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/metric" "github.com/anyproto/any-sync/net/peer" + "github.com/anyproto/any-sync/net/secureservice" "github.com/anyproto/any-sync/util/multiqueue" "go.uber.org/zap" @@ -128,6 +129,10 @@ func (s *objectSync) HandleMessage(ctx context.Context, hm HandleMessage) (err e func (s *objectSync) processHandleMessage(msg HandleMessage) { var err error + peerProtoVersion, err := peer.CtxProtoVersion(msg.PeerCtx) + if err != nil { + peerProtoVersion = secureservice.ProtoVersion + } msg.StartHandlingTime = time.Now() ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId) ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId)) @@ -148,7 +153,7 @@ func (s *objectSync) processHandleMessage(msg HandleMessage) { return } } - if err = s.handleMessage(ctx, msg.SenderId, msg.Message); err != nil { + if err = s.handleMessage(ctx, msg.SenderId, peerProtoVersion, msg.Message); err != nil { if msg.Message.ObjectId != "" { // cleanup thread on error _ = s.handleQueue.CloseThread(msg.Message.ObjectId) @@ -170,7 +175,7 @@ func (s *objectSync) handleRequest(ctx context.Context, senderId string, msg *sp return obj.HandleRequest(ctx, senderId, msg) } -func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { +func (s *objectSync) handleMessage(ctx context.Context, senderId string, protoVersion uint32, msg *spacesyncproto.ObjectSyncMessage) (err error) { log := log.With(zap.String("objectId", msg.ObjectId)) defer func() { if p := recover(); p != nil { @@ -186,7 +191,7 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp if err != nil { return fmt.Errorf("failed to get object from cache: %w", err) } - err = obj.HandleMessage(ctx, senderId, msg) + err = obj.HandleMessage(ctx, senderId, protoVersion, msg) if err != nil { return fmt.Errorf("failed to handle message: %w", err) } diff --git a/commonspace/objectsync/synchandler/synchhandler.go b/commonspace/objectsync/synchandler/synchhandler.go index 090118cd..e9a4104c 100644 --- a/commonspace/objectsync/synchandler/synchhandler.go +++ b/commonspace/objectsync/synchandler/synchhandler.go @@ -2,10 +2,11 @@ package synchandler import ( "context" + "github.com/anyproto/any-sync/commonspace/spacesyncproto" ) type SyncHandler interface { - HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) + HandleMessage(ctx context.Context, senderId string, protoVersion uint32, message *spacesyncproto.ObjectSyncMessage) (err error) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) } diff --git a/commonspace/requestmanager/requestmanager.go b/commonspace/requestmanager/requestmanager.go index 1df8fc61..8a58bcac 100644 --- a/commonspace/requestmanager/requestmanager.go +++ b/commonspace/requestmanager/requestmanager.go @@ -13,7 +13,6 @@ import ( "github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacesyncproto" - "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/pool" "github.com/anyproto/any-sync/net/rpc/rpcerr" ) @@ -112,7 +111,11 @@ func (r *requestManager) SendRequest(ctx context.Context, peerId string, req *sp defer func() { r.reqStat.RemoveSyncRequest(peerId, req) }() - return r.doRequest(ctx, peerId, req) + res, err := r.doRequest(ctx, peerId, req) + if err != nil { + return nil, err + } + return res.resp, err } func (r *requestManager) QueueRequest(peerId string, req *spacesyncproto.ObjectSyncMessage) (err error) { @@ -142,27 +145,36 @@ var doRequestAndHandle = (*requestManager).requestAndHandle func (r *requestManager) requestAndHandle(peerId string, req *spacesyncproto.ObjectSyncMessage) { ctx := r.ctx - resp, err := r.doRequest(ctx, peerId, req) + res, err := r.doRequest(ctx, peerId, req) if err != nil { log.Warn("failed to send request", zap.Error(err)) return } - ctx = peer.CtxWithPeerId(ctx, peerId) _ = r.handler.HandleMessage(ctx, objectsync.HandleMessage{ SenderId: peerId, - Message: resp, - PeerCtx: ctx, + Message: res.resp, + PeerCtx: res.peerCtx, }) } -func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) { +type result struct { + peerCtx context.Context + resp *spacesyncproto.ObjectSyncMessage +} + +func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (res result, err error) { pr, err := r.peerPool.Get(ctx, peerId) if err != nil { return } + res.peerCtx = pr.Context() err = pr.DoDrpc(ctx, func(conn drpc.Conn) error { cl := r.clientFactory.Client(conn) - resp, err = cl.ObjectSync(ctx, msg) + resp, err := cl.ObjectSync(ctx, msg) + if err != nil { + return err + } + res.resp = resp return err }) err = rpcerr.Unwrap(err) diff --git a/commonspace/spaceutils_test.go b/commonspace/spaceutils_test.go index a4da18a0..64ed7c37 100644 --- a/commonspace/spaceutils_test.go +++ b/commonspace/spaceutils_test.go @@ -298,7 +298,7 @@ func (m mockTreeSyncer) StartSync() { func (m mockTreeSyncer) StopSync() { } -func (m mockTreeSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error { +func (m mockTreeSyncer) SyncAll(ctx context.Context, p peer.Peer, existing, missing []string) error { return nil } diff --git a/net/peer/context.go b/net/peer/context.go index ca405012..4df39eaf 100644 --- a/net/peer/context.go +++ b/net/peer/context.go @@ -17,11 +17,13 @@ const ( contextKeyIdentity contextKeyPeerAddr contextKeyPeerClientVersion + contextKeyPeerProtoVersion ) var ( - ErrPeerIdNotFoundInContext = errors.New("peer id not found in context") - ErrIdentityNotFoundInContext = errors.New("identity not found in context") + ErrPeerIdNotFoundInContext = errors.New("peer id not found in context") + ErrProtoVersionNotFoundInContext = errors.New("proto version not found in context") + ErrIdentityNotFoundInContext = errors.New("identity not found in context") ) const CtxResponsiblePeers = "*" @@ -42,6 +44,19 @@ func CtxWithPeerId(ctx context.Context, peerId string) context.Context { return context.WithValue(ctx, contextKeyPeerId, peerId) } +// CtxWithProtoVersion sets peer protocol version +func CtxWithProtoVersion(ctx context.Context, version uint32) context.Context { + return context.WithValue(ctx, contextKeyPeerProtoVersion, version) +} + +// CtxProtoVersion returns peer protocol version +func CtxProtoVersion(ctx context.Context) (uint32, error) { + if protoVersion, ok := ctx.Value(contextKeyPeerProtoVersion).(uint32); ok { + return protoVersion, nil + } + return 0, ErrProtoVersionNotFoundInContext +} + // CtxPeerAddr returns peer address func CtxPeerAddr(ctx context.Context) string { if p, ok := ctx.Value(contextKeyPeerAddr).(string); ok { diff --git a/net/secureservice/secureservice.go b/net/secureservice/secureservice.go index e8fb18c1..a6073481 100644 --- a/net/secureservice/secureservice.go +++ b/net/secureservice/secureservice.go @@ -119,6 +119,7 @@ func (s *secureService) HandshakeInbound(ctx context.Context, conn io.ReadWriteC cctx = peer.CtxWithPeerId(cctx, peerId) cctx = peer.CtxWithIdentity(cctx, res.Identity) cctx = peer.CtxWithClientVersion(cctx, res.ClientVersion) + cctx = peer.CtxWithProtoVersion(cctx, res.ProtoVersion) return } @@ -146,6 +147,7 @@ func (s *secureService) HandshakeOutbound(ctx context.Context, conn io.ReadWrite cctx = peer.CtxWithPeerId(cctx, peerId) cctx = peer.CtxWithIdentity(cctx, res.Identity) cctx = peer.CtxWithClientVersion(cctx, res.ClientVersion) + cctx = peer.CtxWithProtoVersion(cctx, res.ProtoVersion) return cctx, nil }