diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index 34f2c392..81cd58a0 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -5,34 +5,30 @@ import ( "errors" "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache" - "github.com/libp2p/go-libp2p-core/sec" + "github.com/libp2p/go-libp2p/core/sec" "storj.io/drpc/drpcctx" "sync" "sync/atomic" - "time" ) var ErrEmptyPeer = errors.New("don't have such a peer") var ErrStreamClosed = errors.New("stream is already closed") -var maxSimultaneousOperationsPerStream = 10 -var syncWaitPeriod = 2 * time.Second - -var ErrSyncTimeout = errors.New("too long wait on sync receive") +const maxSimultaneousOperationsPerStream = 10 // StreamPool can be made generic to work with different streams type StreamPool interface { - ocache.ObjectLastUsage + Sender AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) + HasActiveStream(peerId string) bool + Close() (err error) +} +type Sender interface { SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) - - HasActiveStream(peerId string) bool - Close() (err error) } type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) @@ -48,23 +44,15 @@ type streamPool struct { wg *sync.WaitGroup waiters map[string]responseWaiter waitersMx sync.Mutex - counter atomic.Uint64 - lastUsage atomic.Int64 + counter uint64 } func newStreamPool(messageHandler MessageHandler) StreamPool { - s := &streamPool{ + return &streamPool{ peerStreams: make(map[string]spacesyncproto.SpaceStream), messageHandler: messageHandler, - waiters: make(map[string]responseWaiter), wg: &sync.WaitGroup{}, } - s.lastUsage.Store(time.Now().Unix()) - return s -} - -func (s *streamPool) LastUsage() time.Time { - return time.Unix(s.lastUsage.Load(), 0) } func (s *streamPool) HasActiveStream(peerId string) (res bool) { @@ -77,39 +65,26 @@ func (s *streamPool) HasActiveStream(peerId string) (res bool) { func (s *streamPool) SendSync( peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { - newCounter := s.counter.Add(1) - msg.ReplyId = genStreamPoolKey(peerId, msg.ObjectId, newCounter) + newCounter := atomic.AddUint64(&s.counter, 1) + msg.TrackingId = genStreamPoolKey(peerId, msg.TreeId, newCounter) s.waitersMx.Lock() waiter := responseWaiter{ - ch: make(chan *spacesyncproto.ObjectSyncMessage, 1), + ch: make(chan *spacesyncproto.ObjectSyncMessage), } - s.waiters[msg.ReplyId] = waiter + s.waiters[msg.TrackingId] = waiter s.waitersMx.Unlock() err = s.SendAsync([]string{peerId}, msg) if err != nil { return } - delay := time.NewTimer(syncWaitPeriod) - select { - case <-delay.C: - s.waitersMx.Lock() - delete(s.waiters, msg.ReplyId) - s.waitersMx.Unlock() - log.With("replyId", msg.ReplyId).Error("time elapsed when waiting") - err = ErrSyncTimeout - case reply = <-waiter.ch: - if !delay.Stop() { - <-delay.C - } - } + reply = <-waiter.ch return } func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) { - s.lastUsage.Store(time.Now().Unix()) getStreams := func() (streams []spacesyncproto.SpaceStream) { for _, pId := range peers { stream, err := s.getOrDeleteStream(pId) @@ -125,13 +100,10 @@ func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyn streams := getStreams() s.Unlock() - log.With("objectId", message.ObjectId). - Debugf("sending message to %d peers", len(streams)) for _, s := range streams { - err = s.Send(message) - } - if len(peers) != 1 { - err = nil + if len(peers) == 1 { + err = s.Send(message) + } } return err } @@ -173,8 +145,6 @@ Loop: func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) { streams := s.getAllStreams() - log.With("objectId", message.ObjectId). - Debugf("broadcasting message to %d peers", len(streams)) for _, stream := range streams { if err = stream.Send(message); err != nil { // TODO: add logging @@ -221,24 +191,21 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre } process := func(msg *spacesyncproto.ObjectSyncMessage) { - s.lastUsage.Store(time.Now().Unix()) - if msg.ReplyId == "" { + if msg.TrackingId == "" { s.messageHandler(stream.Context(), peerId, msg) return } - log.With("replyId", msg.ReplyId).Debug("getting message with reply id") + s.waitersMx.Lock() - waiter, exists := s.waiters[msg.ReplyId] + waiter, exists := s.waiters[msg.TrackingId] if !exists { - log.With("replyId", msg.ReplyId).Debug("reply id not exists") s.waitersMx.Unlock() s.messageHandler(stream.Context(), peerId, msg) return } - log.With("replyId", msg.ReplyId).Debug("reply id exists") - delete(s.waiters, msg.ReplyId) + delete(s.waiters, msg.TrackingId) s.waitersMx.Unlock() waiter.ch <- msg } @@ -246,7 +213,6 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre Loop: for { msg, err := stream.Recv() - s.lastUsage.Store(time.Now().Unix()) if err != nil { break } diff --git a/common/go.mod b/common/go.mod index c80a866c..8dfec05d 100644 --- a/common/go.mod +++ b/common/go.mod @@ -12,7 +12,6 @@ require ( github.com/huandu/skiplist v1.2.0 github.com/ipfs/go-cid v0.3.2 github.com/libp2p/go-libp2p v0.23.2 - github.com/libp2p/go-libp2p-core v0.20.1 github.com/minio/sha256-simd v1.0.0 github.com/multiformats/go-multibase v0.1.1 github.com/multiformats/go-multihash v0.2.1 diff --git a/common/go.sum b/common/go.sum index 3f5b91c2..03e87801 100644 --- a/common/go.sum +++ b/common/go.sum @@ -189,8 +189,6 @@ github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6 github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/libp2p/go-libp2p v0.23.2 h1:yqyTeKQJyofWXxEv/eEVUvOrGdt/9x+0PIQ4N1kaxmE= github.com/libp2p/go-libp2p v0.23.2/go.mod h1:s9DEa5NLR4g+LZS+md5uGU4emjMWFiqkZr6hBTY8UxI= -github.com/libp2p/go-libp2p-core v0.20.1 h1:fQz4BJyIFmSZAiTbKV8qoYhEH5Dtv/cVhZbG3Ib/+Cw= -github.com/libp2p/go-libp2p-core v0.20.1/go.mod h1:6zR8H7CvQWgYLsbG4on6oLNSGcyKaYFSEYyDt51+bIY= github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo= github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= diff --git a/common/net/dialer/dialer.go b/common/net/dialer/dialer.go index 10de1176..ebef079b 100644 --- a/common/net/dialer/dialer.go +++ b/common/net/dialer/dialer.go @@ -8,7 +8,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure" - "github.com/libp2p/go-libp2p-core/sec" + "github.com/libp2p/go-libp2p/core/sec" "go.uber.org/zap" "net" "storj.io/drpc" diff --git a/common/net/peer/peer.go b/common/net/peer/peer.go index d73dc596..6056b0b9 100644 --- a/common/net/peer/peer.go +++ b/common/net/peer/peer.go @@ -2,7 +2,7 @@ package peer import ( "context" - "github.com/libp2p/go-libp2p-core/sec" + "github.com/libp2p/go-libp2p/core/sec" "storj.io/drpc" "sync/atomic" "time" diff --git a/common/net/rpc/rpctest/pool.go b/common/net/rpc/rpctest/pool.go new file mode 100644 index 00000000..7e73b226 --- /dev/null +++ b/common/net/rpc/rpctest/pool.go @@ -0,0 +1,98 @@ +package rpctest + +import ( + "context" + "errors" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" + "math/rand" + "storj.io/drpc" + "sync" + "time" +) + +var ErrCantConnect = errors.New("can't connect to test server") + +func NewTestPool() *TestPool { + return &TestPool{} +} + +type TestPool struct { + ts *TesServer + mu sync.Mutex +} + +func (t *TestPool) WithServer(ts *TesServer) *TestPool { + t.mu.Lock() + defer t.mu.Unlock() + t.ts = ts + return t +} + +func (t *TestPool) Get(ctx context.Context, id string) (peer.Peer, error) { + t.mu.Lock() + defer t.mu.Unlock() + if t.ts == nil { + return nil, ErrCantConnect + } + return &testPeer{id: id, Conn: t.ts.Dial()}, nil +} + +func (t *TestPool) Dial(ctx context.Context, id string) (peer.Peer, error) { + t.mu.Lock() + defer t.mu.Unlock() + if t.ts == nil { + return nil, ErrCantConnect + } + return &testPeer{id: id, Conn: t.ts.Dial()}, nil +} + +func (t *TestPool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) { + t.mu.Lock() + defer t.mu.Unlock() + if t.ts == nil { + return nil, ErrCantConnect + } + return &testPeer{id: peerIds[rand.Intn(len(peerIds))], Conn: t.ts.Dial()}, nil +} + +func (t *TestPool) DialOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) { + t.mu.Lock() + defer t.mu.Unlock() + if t.ts == nil { + return nil, ErrCantConnect + } + return &testPeer{id: peerIds[rand.Intn(len(peerIds))], Conn: t.ts.Dial()}, nil +} + +func (t *TestPool) Init(a *app.App) (err error) { + return nil +} + +func (t *TestPool) Name() (name string) { + return pool.CName +} + +func (t *TestPool) Run(ctx context.Context) (err error) { + return nil +} + +func (t *TestPool) Close(ctx context.Context) (err error) { + return nil +} + +type testPeer struct { + id string + drpc.Conn +} + +func (t testPeer) Id() string { + return t.id +} + +func (t testPeer) LastUsage() time.Time { + return time.Now() +} + +func (t testPeer) UpdateLastUsage() {} diff --git a/common/net/rpc/rpctest/server.go b/common/net/rpc/rpctest/server.go new file mode 100644 index 00000000..270067e4 --- /dev/null +++ b/common/net/rpc/rpctest/server.go @@ -0,0 +1,29 @@ +package rpctest + +import ( + "context" + "net" + "storj.io/drpc" + "storj.io/drpc/drpcconn" + "storj.io/drpc/drpcmux" + "storj.io/drpc/drpcserver" +) + +func NewTestServer() *TesServer { + ts := &TesServer{ + Mux: drpcmux.New(), + } + ts.Server = drpcserver.New(ts.Mux) + return ts +} + +type TesServer struct { + *drpcmux.Mux + *drpcserver.Server +} + +func (ts *TesServer) Dial() drpc.Conn { + sc, cc := net.Pipe() + go ts.Server.ServeOne(context.Background(), sc) + return drpcconn.New(cc) +} diff --git a/common/net/rpc/server/drpcserver.go b/common/net/rpc/server/drpcserver.go index 74a41c0e..3d1341b6 100644 --- a/common/net/rpc/server/drpcserver.go +++ b/common/net/rpc/server/drpcserver.go @@ -4,9 +4,9 @@ import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" - config2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/metric" - secure2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure" "github.com/prometheus/client_golang/prometheus" "github.com/zeebo/errs" "go.uber.org/zap" @@ -32,22 +32,22 @@ type DRPCServer interface { } type configGetter interface { - GetGRPCServer() config2.GrpcServer + GetGRPCServer() config.GrpcServer } type drpcServer struct { - config config2.GrpcServer + config config.GrpcServer drpcServer *drpcserver.Server - transport secure2.Service - listeners []secure2.ContextListener + transport secure.Service + listeners []secure.ContextListener metric metric.Metric cancel func() *drpcmux.Mux } func (s *drpcServer) Init(a *app.App) (err error) { - s.config = a.MustComponent(config2.CName).(configGetter).GetGRPCServer() - s.transport = a.MustComponent(secure2.CName).(secure2.Service) + s.config = a.MustComponent(config.CName).(configGetter).GetGRPCServer() + s.transport = a.MustComponent(secure.CName).(secure.Service) s.metric = a.MustComponent(metric.CName).(metric.Metric) return nil } @@ -87,7 +87,7 @@ func (s *drpcServer) Run(ctx context.Context) (err error) { return } -func (s *drpcServer) serve(ctx context.Context, lis secure2.ContextListener) { +func (s *drpcServer) serve(ctx context.Context, lis secure.ContextListener) { l := log.With(zap.String("localAddr", lis.Addr().String())) l.Info("drpc listener started") defer func() { @@ -111,7 +111,7 @@ func (s *drpcServer) serve(ctx context.Context, lis secure2.ContextListener) { } continue } - if _, ok := err.(secure2.HandshakeError); ok { + if _, ok := err.(secure.HandshakeError); ok { l.Warn("listener handshake error", zap.Error(err)) continue } diff --git a/common/net/secure/context.go b/common/net/secure/context.go index c38da959..e22b3b00 100644 --- a/common/net/secure/context.go +++ b/common/net/secure/context.go @@ -3,7 +3,7 @@ package secure import ( "context" "errors" - "github.com/libp2p/go-libp2p-core/sec" + "github.com/libp2p/go-libp2p/core/sec" ) var ( diff --git a/common/net/secure/listener.go b/common/net/secure/listener.go index a0c61d56..b7c31b26 100644 --- a/common/net/secure/listener.go +++ b/common/net/secure/listener.go @@ -2,7 +2,7 @@ package secure import ( "context" - "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p/core/crypto" libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" "net" ) diff --git a/common/net/secure/service.go b/common/net/secure/service.go index 51a92c0e..d47ebd4b 100644 --- a/common/net/secure/service.go +++ b/common/net/secure/service.go @@ -2,13 +2,13 @@ package secure import ( "context" + commonaccount "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" - commonaccount "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys" - "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/sec" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/sec" libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" "go.uber.org/zap" "net" diff --git a/common/nodeconf/configuration.go b/common/nodeconf/configuration.go index add0647e..eecc53c3 100644 --- a/common/nodeconf/configuration.go +++ b/common/nodeconf/configuration.go @@ -1,4 +1,4 @@ -//go:generate mockgen -destination mock_nodeconf/mock_nodeconf.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf Configuration,ConfConnector +//go:generate mockgen -destination mock_nodeconf/mock_nodeconf.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf Service,Configuration,ConfConnector package nodeconf import ( diff --git a/common/nodeconf/mock_nodeconf/mock_nodeconf.go b/common/nodeconf/mock_nodeconf/mock_nodeconf.go index a22e4a6e..206c2528 100644 --- a/common/nodeconf/mock_nodeconf/mock_nodeconf.go +++ b/common/nodeconf/mock_nodeconf/mock_nodeconf.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf (interfaces: Configuration,ConfConnector) +// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf (interfaces: Service,Configuration,ConfConnector) // Package mock_nodeconf is a generated GoMock package. package mock_nodeconf @@ -8,10 +8,105 @@ import ( context "context" reflect "reflect" + app "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" peer "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" + nodeconf "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" gomock "github.com/golang/mock/gomock" ) +// MockService is a mock of Service interface. +type MockService struct { + ctrl *gomock.Controller + recorder *MockServiceMockRecorder +} + +// MockServiceMockRecorder is the mock recorder for MockService. +type MockServiceMockRecorder struct { + mock *MockService +} + +// NewMockService creates a new mock instance. +func NewMockService(ctrl *gomock.Controller) *MockService { + mock := &MockService{ctrl: ctrl} + mock.recorder = &MockServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockService) EXPECT() *MockServiceMockRecorder { + return m.recorder +} + +// ConsensusPeers mocks base method. +func (m *MockService) ConsensusPeers() []string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ConsensusPeers") + ret0, _ := ret[0].([]string) + return ret0 +} + +// ConsensusPeers indicates an expected call of ConsensusPeers. +func (mr *MockServiceMockRecorder) ConsensusPeers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConsensusPeers", reflect.TypeOf((*MockService)(nil).ConsensusPeers)) +} + +// GetById mocks base method. +func (m *MockService) GetById(arg0 string) nodeconf.Configuration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetById", arg0) + ret0, _ := ret[0].(nodeconf.Configuration) + return ret0 +} + +// GetById indicates an expected call of GetById. +func (mr *MockServiceMockRecorder) GetById(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetById", reflect.TypeOf((*MockService)(nil).GetById), arg0) +} + +// GetLast mocks base method. +func (m *MockService) GetLast() nodeconf.Configuration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLast") + ret0, _ := ret[0].(nodeconf.Configuration) + return ret0 +} + +// GetLast indicates an expected call of GetLast. +func (mr *MockServiceMockRecorder) GetLast() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLast", reflect.TypeOf((*MockService)(nil).GetLast)) +} + +// Init mocks base method. +func (m *MockService) Init(arg0 *app.App) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Init", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Init indicates an expected call of Init. +func (mr *MockServiceMockRecorder) Init(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockService)(nil).Init), arg0) +} + +// Name mocks base method. +func (m *MockService) Name() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Name") + ret0, _ := ret[0].(string) + return ret0 +} + +// Name indicates an expected call of Name. +func (mr *MockServiceMockRecorder) Name() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockService)(nil).Name)) +} + // MockConfiguration is a mock of Configuration interface. type MockConfiguration struct { ctrl *gomock.Controller diff --git a/common/nodeconf/service.go b/common/nodeconf/service.go index 6684b571..babf13ba 100644 --- a/common/nodeconf/service.go +++ b/common/nodeconf/service.go @@ -3,10 +3,10 @@ package nodeconf import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" - config2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys" - encryptionkey2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey" - signingkey2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey" "github.com/anytypeio/go-chash" ) @@ -36,8 +36,8 @@ type service struct { type Node struct { Address string PeerId string - SigningKey signingkey2.PubKey - EncryptionKey encryptionkey2.PubKey + SigningKey signingkey.PubKey + EncryptionKey encryptionkey.PubKey } func (n *Node) Id() string { @@ -49,7 +49,7 @@ func (n *Node) Capacity() float64 { } func (s *service) Init(a *app.App) (err error) { - conf := a.MustComponent(config2.CName).(*config2.Config) + conf := a.MustComponent(config.CName).(*config.Config) s.accountId = conf.Account.PeerId config := &configuration{ @@ -100,10 +100,10 @@ func (s *service) ConsensusPeers() []string { } func nodeFromConfigNode( - n config2.Node) (*Node, error) { + n config.Node) (*Node, error) { decodedSigningKey, err := keys.DecodeKeyFromString( n.SigningKey, - signingkey2.UnmarshalEd25519PrivateKey, + signingkey.UnmarshalEd25519PrivateKey, nil) if err != nil { return nil, err @@ -111,7 +111,7 @@ func nodeFromConfigNode( decodedEncryptionKey, err := keys.DecodeKeyFromString( n.EncryptionKey, - encryptionkey2.NewEncryptionRsaPrivKeyFromBytes, + encryptionkey.NewEncryptionRsaPrivKeyFromBytes, nil) if err != nil { return nil, err diff --git a/common/util/peer/peer.go b/common/util/peer/peer.go index 00f5d5c3..32ea21ee 100644 --- a/common/util/peer/peer.go +++ b/common/util/peer/peer.go @@ -2,8 +2,8 @@ package peer import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey" - "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" ) func IDFromSigningPubKey(pubKey signingkey.PubKey) (peer.ID, error) { diff --git a/consensus/consensusclient/client.go b/consensus/consensusclient/client.go index 0df9fbdb..9d08617b 100644 --- a/consensus/consensusclient/client.go +++ b/consensus/consensusclient/client.go @@ -2,37 +2,65 @@ package consensusclient import ( "context" + "errors" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto" + _ "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto/consensuserr" + "go.uber.org/zap" + "sync" + "time" ) const CName = "consensus.client" var log = logger.NewNamed(CName) +var ( + ErrWatcherExists = errors.New("watcher exists") + ErrWatcherNotExists = errors.New("watcher not exists") +) + func New() Service { return new(service) } +// Watcher watches new events by specified logId +type Watcher interface { + AddConsensusRecords(recs []*consensusproto.Record) + AddConsensusError(err error) +} + type Service interface { + // AddLog adds new log to consensus servers AddLog(ctx context.Context, clog *consensusproto.Log) (err error) + // AddRecord adds new record to consensus servers AddRecord(ctx context.Context, logId []byte, clog *consensusproto.Record) (err error) - WatchLog(ctx context.Context) (stream Stream, err error) - app.Component + // Watch starts watching to given logId and calls watcher when any relative event received + Watch(logId []byte, w Watcher) (err error) + // UnWatch stops watching given logId and removes watcher + UnWatch(logId []byte) (err error) + app.ComponentRunnable } type service struct { pool pool.Pool nodeconf nodeconf.Service + + watchers map[string]Watcher + stream *stream + close chan struct{} + mu sync.Mutex } func (s *service) Init(a *app.App) (err error) { s.pool = a.MustComponent(pool.CName).(pool.Pool) s.nodeconf = a.MustComponent(nodeconf.CName).(nodeconf.Service) + s.watchers = make(map[string]Watcher) + s.close = make(chan struct{}) return nil } @@ -40,6 +68,11 @@ func (s *service) Name() (name string) { return CName } +func (s *service) Run(_ context.Context) error { + go s.streamWatcher() + return nil +} + func (s *service) getClient(ctx context.Context) (consensusproto.DRPCConsensusClient, error) { peer, err := s.pool.GetOneOf(ctx, s.nodeconf.ConsensusPeers()) if err != nil { @@ -83,7 +116,37 @@ func (s *service) AddRecord(ctx context.Context, logId []byte, clog *consensuspr return } -func (s *service) WatchLog(ctx context.Context) (st Stream, err error) { +func (s *service) Watch(logId []byte, w Watcher) (err error) { + s.mu.Lock() + defer s.mu.Unlock() + if _, ok := s.watchers[string(logId)]; ok { + return ErrWatcherExists + } + s.watchers[string(logId)] = w + if s.stream != nil { + if wErr := s.stream.WatchIds([][]byte{logId}); wErr != nil { + log.Warn("WatchIds error", zap.Error(wErr)) + } + } + return +} + +func (s *service) UnWatch(logId []byte) (err error) { + s.mu.Lock() + defer s.mu.Unlock() + if _, ok := s.watchers[string(logId)]; !ok { + return ErrWatcherNotExists + } + delete(s.watchers, string(logId)) + if s.stream != nil { + if wErr := s.stream.UnwatchIds([][]byte{logId}); wErr != nil { + log.Warn("UnWatchIds error", zap.Error(wErr)) + } + } + return +} + +func (s *service) openStream(ctx context.Context) (st *stream, err error) { cl, err := s.dialClient(ctx) if err != nil { return @@ -94,3 +157,87 @@ func (s *service) WatchLog(ctx context.Context) (st Stream, err error) { } return runStream(rpcStream), nil } + +func (s *service) streamWatcher() { + var ( + err error + st *stream + i int + ) + for { + // open stream + if st, err = s.openStream(context.Background()); err != nil { + // can't open stream, we will retry until success connection or close + if i < 60 { + i++ + } + sleepTime := time.Second * time.Duration(i) + log.Error("watch log error", zap.Error(err), zap.Duration("waitTime", sleepTime)) + select { + case <-time.After(sleepTime): + continue + case <-s.close: + return + } + } + i = 0 + + // collect ids and setup stream + s.mu.Lock() + var logIds = make([][]byte, 0, len(s.watchers)) + for id := range s.watchers { + logIds = append(logIds, []byte(id)) + } + s.stream = st + s.mu.Unlock() + + // restore subscriptions + if len(logIds) > 0 { + if err = s.stream.WatchIds(logIds); err != nil { + log.Error("watch ids error", zap.Error(err)) + continue + } + } + + // read stream + if err = s.streamReader(); err != nil { + log.Error("stream read error", zap.Error(err)) + continue + } + return + } +} + +func (s *service) streamReader() error { + for { + events := s.stream.WaitLogs() + if len(events) == 0 { + return s.stream.Err() + } + for _, e := range events { + if w, ok := s.watchers[string(e.LogId)]; ok { + if e.Error == nil { + w.AddConsensusRecords(e.Records) + } else { + w.AddConsensusError(rpcerr.Err(uint64(e.Error.Error))) + } + } else { + log.Warn("received unexpected log id", zap.Binary("logId", e.LogId)) + } + } + } +} + +func (s *service) Close(_ context.Context) error { + s.mu.Lock() + if s.stream != nil { + _ = s.stream.Close() + } + s.mu.Unlock() + select { + case <-s.close: + default: + close(s.close) + } + return nil +} diff --git a/consensus/consensusclient/client_test.go b/consensus/consensusclient/client_test.go new file mode 100644 index 00000000..f27787ab --- /dev/null +++ b/consensus/consensusclient/client_test.go @@ -0,0 +1,221 @@ +package consensusclient + +import ( + "context" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpctest" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf/mock_nodeconf" + "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto/consensuserr" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestService_Watch(t *testing.T) { + t.Run("not found error", func(t *testing.T) { + fx := newFixture(t).run(t) + defer fx.Finish() + var logId = []byte{'1'} + w := &testWatcher{} + require.NoError(t, fx.Watch(logId, w)) + st := fx.testServer.waitStream(t) + req, err := st.Recv() + require.NoError(t, err) + assert.Equal(t, [][]byte{logId}, req.WatchIds) + require.NoError(t, st.Send(&consensusproto.WatchLogEvent{ + LogId: logId, + Error: &consensusproto.Err{ + Error: consensusproto.ErrCodes_ErrorOffset + consensusproto.ErrCodes_LogNotFound, + }, + })) + assert.Equal(t, consensuserr.ErrLogNotFound, w.err) + fx.testServer.releaseStream <- nil + }) + t.Run("watcherExists error", func(t *testing.T) { + fx := newFixture(t).run(t) + defer fx.Finish() + var logId = []byte{'1'} + w := &testWatcher{} + require.NoError(t, fx.Watch(logId, w)) + require.Error(t, fx.Watch(logId, w)) + st := fx.testServer.waitStream(t) + st.Recv() + fx.testServer.releaseStream <- nil + }) + t.Run("watch", func(t *testing.T) { + fx := newFixture(t).run(t) + defer fx.Finish() + var logId1 = []byte{'1'} + w := &testWatcher{} + require.NoError(t, fx.Watch(logId1, w)) + st := fx.testServer.waitStream(t) + req, err := st.Recv() + require.NoError(t, err) + assert.Equal(t, [][]byte{logId1}, req.WatchIds) + + var logId2 = []byte{'2'} + w = &testWatcher{} + require.NoError(t, fx.Watch(logId2, w)) + req, err = st.Recv() + require.NoError(t, err) + assert.Equal(t, [][]byte{logId2}, req.WatchIds) + + fx.testServer.releaseStream <- nil + }) +} + +func TestService_UnWatch(t *testing.T) { + t.Run("no watcher", func(t *testing.T) { + fx := newFixture(t).run(t) + defer fx.Finish() + require.Error(t, fx.UnWatch([]byte{'1'})) + }) + t.Run("success", func(t *testing.T) { + fx := newFixture(t).run(t) + defer fx.Finish() + w := &testWatcher{} + require.NoError(t, fx.Watch([]byte{'1'}, w)) + assert.NoError(t, fx.UnWatch([]byte{'1'})) + }) +} + +func TestService_Init(t *testing.T) { + t.Run("reconnect on watch err", func(t *testing.T) { + fx := newFixture(t) + fx.testServer.watchErrOnce = true + fx.run(t) + defer fx.Finish() + fx.testServer.waitStream(t) + fx.testServer.releaseStream <- nil + }) + t.Run("reconnect on start", func(t *testing.T) { + fx := newFixture(t) + fx.a.MustComponent(pool.CName).(*rpctest.TestPool).WithServer(nil) + fx.run(t) + defer fx.Finish() + time.Sleep(time.Millisecond * 50) + fx.a.MustComponent(pool.CName).(*rpctest.TestPool).WithServer(fx.drpcTS) + fx.testServer.waitStream(t) + fx.testServer.releaseStream <- nil + }) +} + +func TestService_AddLog(t *testing.T) { + fx := newFixture(t).run(t) + defer fx.Finish() + assert.NoError(t, fx.AddLog(ctx, &consensusproto.Log{})) +} + +func TestService_AddRecord(t *testing.T) { + fx := newFixture(t).run(t) + defer fx.Finish() + assert.NoError(t, fx.AddRecord(ctx, []byte{'1'}, &consensusproto.Record{})) +} + +var ctx = context.Background() + +func newFixture(t *testing.T) *fixture { + fx := &fixture{ + Service: New(), + a: &app.App{}, + ctrl: gomock.NewController(t), + testServer: &testServer{ + stream: make(chan consensusproto.DRPCConsensus_WatchLogStream), + releaseStream: make(chan error), + }, + } + fx.nodeconf = mock_nodeconf.NewMockService(fx.ctrl) + fx.nodeconf.EXPECT().Init(gomock.Any()) + fx.nodeconf.EXPECT().Name().Return(nodeconf.CName).AnyTimes() + fx.nodeconf.EXPECT().ConsensusPeers().Return([]string{"c1", "c2"}).AnyTimes() + fx.drpcTS = rpctest.NewTestServer() + require.NoError(t, consensusproto.DRPCRegisterConsensus(fx.drpcTS.Mux, fx.testServer)) + fx.a.Register(fx.Service). + Register(fx.nodeconf). + Register(rpctest.NewTestPool().WithServer(fx.drpcTS)) + return fx +} + +type fixture struct { + Service + nodeconf *mock_nodeconf.MockService + a *app.App + ctrl *gomock.Controller + testServer *testServer + drpcTS *rpctest.TesServer +} + +func (fx *fixture) run(t *testing.T) *fixture { + require.NoError(t, fx.a.Start(ctx)) + return fx +} + +func (fx *fixture) Finish() { + assert.NoError(fx.ctrl.T, fx.a.Close(ctx)) + fx.ctrl.Finish() +} + +type testServer struct { + stream chan consensusproto.DRPCConsensus_WatchLogStream + addLog func(ctx context.Context, req *consensusproto.AddLogRequest) error + addRecord func(ctx context.Context, req *consensusproto.AddRecordRequest) error + releaseStream chan error + watchErrOnce bool +} + +func (t *testServer) AddLog(ctx context.Context, req *consensusproto.AddLogRequest) (*consensusproto.Ok, error) { + if t.addLog != nil { + if err := t.addLog(ctx, req); err != nil { + return nil, err + } + } + return &consensusproto.Ok{}, nil +} + +func (t *testServer) AddRecord(ctx context.Context, req *consensusproto.AddRecordRequest) (*consensusproto.Ok, error) { + if t.addRecord != nil { + if err := t.addRecord(ctx, req); err != nil { + return nil, err + } + } + return &consensusproto.Ok{}, nil +} + +func (t *testServer) WatchLog(stream consensusproto.DRPCConsensus_WatchLogStream) error { + fmt.Println("watchLog", t.watchErrOnce) + if t.watchErrOnce { + t.watchErrOnce = false + return fmt.Errorf("error") + } + t.stream <- stream + return <-t.releaseStream +} + +func (t *testServer) waitStream(test *testing.T) consensusproto.DRPCConsensus_WatchLogStream { + select { + case <-time.After(time.Second * 5): + test.Fatalf("waiteStream timeout") + case st := <-t.stream: + return st + } + return nil +} + +type testWatcher struct { + recs [][]*consensusproto.Record + err error +} + +func (t *testWatcher) AddConsensusRecords(recs []*consensusproto.Record) { + t.recs = append(t.recs, recs) +} + +func (t *testWatcher) AddConsensusError(err error) { + t.err = err +} diff --git a/consensus/consensusclient/stream.go b/consensus/consensusclient/stream.go index d2f1a055..d3383629 100644 --- a/consensus/consensusclient/stream.go +++ b/consensus/consensusclient/stream.go @@ -5,18 +5,10 @@ import ( "github.com/cheggaaa/mb/v2" ) -type Stream interface { - WatchIds(logIds [][]byte) (err error) - UnwatchIds(logIds [][]byte) (err error) - WaitLogs() []*consensusproto.Log - Err() error - Close() error -} - -func runStream(rpcStream consensusproto.DRPCConsensus_WatchLogClient) Stream { +func runStream(rpcStream consensusproto.DRPCConsensus_WatchLogClient) *stream { st := &stream{ rpcStream: rpcStream, - mb: mb.New((*consensusproto.Log)(nil), 100), + mb: mb.New((*consensusproto.WatchLogEvent)(nil), 100), } go st.readStream() return st @@ -24,7 +16,7 @@ func runStream(rpcStream consensusproto.DRPCConsensus_WatchLogClient) Stream { type stream struct { rpcStream consensusproto.DRPCConsensus_WatchLogClient - mb *mb.MB[*consensusproto.Log] + mb *mb.MB[*consensusproto.WatchLogEvent] err error } @@ -40,7 +32,7 @@ func (s *stream) UnwatchIds(logIds [][]byte) (err error) { }) } -func (s *stream) WaitLogs() []*consensusproto.Log { +func (s *stream) WaitLogs() []*consensusproto.WatchLogEvent { return s.mb.Wait() } @@ -56,10 +48,7 @@ func (s *stream) readStream() { s.err = err return } - if err = s.mb.Add(&consensusproto.Log{ - Id: event.LogId, - Records: event.Records, - }); err != nil { + if err = s.mb.Add(event); err != nil { return } } diff --git a/node/acl/acl.go b/node/acl/acl.go deleted file mode 100644 index e564307a..00000000 --- a/node/acl/acl.go +++ /dev/null @@ -1 +0,0 @@ -package acl diff --git a/node/acl/service.go b/node/acl/service.go deleted file mode 100644 index 9c080c48..00000000 --- a/node/acl/service.go +++ /dev/null @@ -1,38 +0,0 @@ -package acl - -import ( - "context" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" - "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusclient" -) - -const CName = "node.acl" - -type Service interface { - app.ComponentRunnable -} - -type aclService struct { - cons consensusclient.Service - stream consensusclient.Stream -} - -func (as *aclService) Init(a *app.App) (err error) { - as.cons = a.MustComponent(consensusclient.CName).(consensusclient.Service) - return nil -} - -func (as *aclService) Name() (name string) { - return CName -} - -func (as *aclService) Run(ctx context.Context) (err error) { - if as.stream, err = as.cons.WatchLog(ctx); err != nil { - return - } - return nil -} - -func (as *aclService) Close(ctx context.Context) (err error) { - return as.stream.Close() -} diff --git a/node/go.mod b/node/go.mod index a4bd1708..a65cf8f2 100644 --- a/node/go.mod +++ b/node/go.mod @@ -4,9 +4,12 @@ go 1.19 replace github.com/anytypeio/go-anytype-infrastructure-experiments/common => ../common +replace github.com/anytypeio/go-anytype-infrastructure-experiments/consensus => ../consensus + require ( github.com/akrylysov/pogreb v0.10.1 github.com/anytypeio/go-anytype-infrastructure-experiments/common v0.0.0-00010101000000-000000000000 + github.com/anytypeio/go-anytype-infrastructure-experiments/consensus v0.0.0-00010101000000-000000000000 go.uber.org/zap v1.23.0 ) @@ -15,6 +18,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cheggaaa/mb/v2 v2.0.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/fogleman/gg v1.3.0 // indirect github.com/goccy/go-graphviz v0.0.9 // indirect @@ -52,7 +56,7 @@ require ( github.com/zeebo/errs v1.3.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect - golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect + golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/node/go.sum b/node/go.sum index 4cbd9bb1..55067483 100644 --- a/node/go.sum +++ b/node/go.sum @@ -57,6 +57,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cheggaaa/mb/v2 v2.0.1 h1:gn0khbEbKlw3i5VOYi0VnHEHayjZKfUDOyGSpHAybBs= +github.com/cheggaaa/mb/v2 v2.0.1/go.mod h1:XGeZw20Iqgjky26KL0mvCwk3+4NyZCUbshSo6ALne+c= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -310,8 +312,8 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= -golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=