1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

reconnection for consensus client + fixes

This commit is contained in:
Sergey Cherepanov 2022-10-20 20:14:27 +03:00 committed by Mikhail Iudin
parent 2dcacdfbb6
commit 9fbe1aefd2
No known key found for this signature in database
GPG key ID: FAAAA8BAABDFF1C0
22 changed files with 658 additions and 149 deletions

View file

@ -5,34 +5,30 @@ import (
"errors"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p/core/sec"
"storj.io/drpc/drpcctx"
"sync"
"sync/atomic"
"time"
)
var ErrEmptyPeer = errors.New("don't have such a peer")
var ErrStreamClosed = errors.New("stream is already closed")
var maxSimultaneousOperationsPerStream = 10
var syncWaitPeriod = 2 * time.Second
var ErrSyncTimeout = errors.New("too long wait on sync receive")
const maxSimultaneousOperationsPerStream = 10
// StreamPool can be made generic to work with different streams
type StreamPool interface {
ocache.ObjectLastUsage
Sender
AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error)
AddAndReadStreamAsync(stream spacesyncproto.SpaceStream)
HasActiveStream(peerId string) bool
Close() (err error)
}
type Sender interface {
SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error)
BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error)
HasActiveStream(peerId string) bool
Close() (err error)
}
type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error)
@ -48,23 +44,15 @@ type streamPool struct {
wg *sync.WaitGroup
waiters map[string]responseWaiter
waitersMx sync.Mutex
counter atomic.Uint64
lastUsage atomic.Int64
counter uint64
}
func newStreamPool(messageHandler MessageHandler) StreamPool {
s := &streamPool{
return &streamPool{
peerStreams: make(map[string]spacesyncproto.SpaceStream),
messageHandler: messageHandler,
waiters: make(map[string]responseWaiter),
wg: &sync.WaitGroup{},
}
s.lastUsage.Store(time.Now().Unix())
return s
}
func (s *streamPool) LastUsage() time.Time {
return time.Unix(s.lastUsage.Load(), 0)
}
func (s *streamPool) HasActiveStream(peerId string) (res bool) {
@ -77,39 +65,26 @@ func (s *streamPool) HasActiveStream(peerId string) (res bool) {
func (s *streamPool) SendSync(
peerId string,
msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
newCounter := s.counter.Add(1)
msg.ReplyId = genStreamPoolKey(peerId, msg.ObjectId, newCounter)
newCounter := atomic.AddUint64(&s.counter, 1)
msg.TrackingId = genStreamPoolKey(peerId, msg.TreeId, newCounter)
s.waitersMx.Lock()
waiter := responseWaiter{
ch: make(chan *spacesyncproto.ObjectSyncMessage, 1),
ch: make(chan *spacesyncproto.ObjectSyncMessage),
}
s.waiters[msg.ReplyId] = waiter
s.waiters[msg.TrackingId] = waiter
s.waitersMx.Unlock()
err = s.SendAsync([]string{peerId}, msg)
if err != nil {
return
}
delay := time.NewTimer(syncWaitPeriod)
select {
case <-delay.C:
s.waitersMx.Lock()
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
log.With("replyId", msg.ReplyId).Error("time elapsed when waiting")
err = ErrSyncTimeout
case reply = <-waiter.ch:
if !delay.Stop() {
<-delay.C
}
}
reply = <-waiter.ch
return
}
func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) {
s.lastUsage.Store(time.Now().Unix())
getStreams := func() (streams []spacesyncproto.SpaceStream) {
for _, pId := range peers {
stream, err := s.getOrDeleteStream(pId)
@ -125,13 +100,10 @@ func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyn
streams := getStreams()
s.Unlock()
log.With("objectId", message.ObjectId).
Debugf("sending message to %d peers", len(streams))
for _, s := range streams {
err = s.Send(message)
}
if len(peers) != 1 {
err = nil
if len(peers) == 1 {
err = s.Send(message)
}
}
return err
}
@ -173,8 +145,6 @@ Loop:
func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) {
streams := s.getAllStreams()
log.With("objectId", message.ObjectId).
Debugf("broadcasting message to %d peers", len(streams))
for _, stream := range streams {
if err = stream.Send(message); err != nil {
// TODO: add logging
@ -221,24 +191,21 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre
}
process := func(msg *spacesyncproto.ObjectSyncMessage) {
s.lastUsage.Store(time.Now().Unix())
if msg.ReplyId == "" {
if msg.TrackingId == "" {
s.messageHandler(stream.Context(), peerId, msg)
return
}
log.With("replyId", msg.ReplyId).Debug("getting message with reply id")
s.waitersMx.Lock()
waiter, exists := s.waiters[msg.ReplyId]
waiter, exists := s.waiters[msg.TrackingId]
if !exists {
log.With("replyId", msg.ReplyId).Debug("reply id not exists")
s.waitersMx.Unlock()
s.messageHandler(stream.Context(), peerId, msg)
return
}
log.With("replyId", msg.ReplyId).Debug("reply id exists")
delete(s.waiters, msg.ReplyId)
delete(s.waiters, msg.TrackingId)
s.waitersMx.Unlock()
waiter.ch <- msg
}
@ -246,7 +213,6 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre
Loop:
for {
msg, err := stream.Recv()
s.lastUsage.Store(time.Now().Unix())
if err != nil {
break
}

View file

@ -12,7 +12,6 @@ require (
github.com/huandu/skiplist v1.2.0
github.com/ipfs/go-cid v0.3.2
github.com/libp2p/go-libp2p v0.23.2
github.com/libp2p/go-libp2p-core v0.20.1
github.com/minio/sha256-simd v1.0.0
github.com/multiformats/go-multibase v0.1.1
github.com/multiformats/go-multihash v0.2.1

View file

@ -189,8 +189,6 @@ github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
github.com/libp2p/go-libp2p v0.23.2 h1:yqyTeKQJyofWXxEv/eEVUvOrGdt/9x+0PIQ4N1kaxmE=
github.com/libp2p/go-libp2p v0.23.2/go.mod h1:s9DEa5NLR4g+LZS+md5uGU4emjMWFiqkZr6hBTY8UxI=
github.com/libp2p/go-libp2p-core v0.20.1 h1:fQz4BJyIFmSZAiTbKV8qoYhEH5Dtv/cVhZbG3Ib/+Cw=
github.com/libp2p/go-libp2p-core v0.20.1/go.mod h1:6zR8H7CvQWgYLsbG4on6oLNSGcyKaYFSEYyDt51+bIY=
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=

View file

@ -8,7 +8,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p/core/sec"
"go.uber.org/zap"
"net"
"storj.io/drpc"

View file

@ -2,7 +2,7 @@ package peer
import (
"context"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p/core/sec"
"storj.io/drpc"
"sync/atomic"
"time"

View file

@ -0,0 +1,98 @@
package rpctest
import (
"context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"math/rand"
"storj.io/drpc"
"sync"
"time"
)
var ErrCantConnect = errors.New("can't connect to test server")
func NewTestPool() *TestPool {
return &TestPool{}
}
type TestPool struct {
ts *TesServer
mu sync.Mutex
}
func (t *TestPool) WithServer(ts *TesServer) *TestPool {
t.mu.Lock()
defer t.mu.Unlock()
t.ts = ts
return t
}
func (t *TestPool) Get(ctx context.Context, id string) (peer.Peer, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.ts == nil {
return nil, ErrCantConnect
}
return &testPeer{id: id, Conn: t.ts.Dial()}, nil
}
func (t *TestPool) Dial(ctx context.Context, id string) (peer.Peer, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.ts == nil {
return nil, ErrCantConnect
}
return &testPeer{id: id, Conn: t.ts.Dial()}, nil
}
func (t *TestPool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.ts == nil {
return nil, ErrCantConnect
}
return &testPeer{id: peerIds[rand.Intn(len(peerIds))], Conn: t.ts.Dial()}, nil
}
func (t *TestPool) DialOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.ts == nil {
return nil, ErrCantConnect
}
return &testPeer{id: peerIds[rand.Intn(len(peerIds))], Conn: t.ts.Dial()}, nil
}
func (t *TestPool) Init(a *app.App) (err error) {
return nil
}
func (t *TestPool) Name() (name string) {
return pool.CName
}
func (t *TestPool) Run(ctx context.Context) (err error) {
return nil
}
func (t *TestPool) Close(ctx context.Context) (err error) {
return nil
}
type testPeer struct {
id string
drpc.Conn
}
func (t testPeer) Id() string {
return t.id
}
func (t testPeer) LastUsage() time.Time {
return time.Now()
}
func (t testPeer) UpdateLastUsage() {}

View file

@ -0,0 +1,29 @@
package rpctest
import (
"context"
"net"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"
)
func NewTestServer() *TesServer {
ts := &TesServer{
Mux: drpcmux.New(),
}
ts.Server = drpcserver.New(ts.Mux)
return ts
}
type TesServer struct {
*drpcmux.Mux
*drpcserver.Server
}
func (ts *TesServer) Dial() drpc.Conn {
sc, cc := net.Pipe()
go ts.Server.ServeOne(context.Background(), sc)
return drpcconn.New(cc)
}

View file

@ -4,9 +4,9 @@ import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
config2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/metric"
secure2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
"github.com/prometheus/client_golang/prometheus"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -32,22 +32,22 @@ type DRPCServer interface {
}
type configGetter interface {
GetGRPCServer() config2.GrpcServer
GetGRPCServer() config.GrpcServer
}
type drpcServer struct {
config config2.GrpcServer
config config.GrpcServer
drpcServer *drpcserver.Server
transport secure2.Service
listeners []secure2.ContextListener
transport secure.Service
listeners []secure.ContextListener
metric metric.Metric
cancel func()
*drpcmux.Mux
}
func (s *drpcServer) Init(a *app.App) (err error) {
s.config = a.MustComponent(config2.CName).(configGetter).GetGRPCServer()
s.transport = a.MustComponent(secure2.CName).(secure2.Service)
s.config = a.MustComponent(config.CName).(configGetter).GetGRPCServer()
s.transport = a.MustComponent(secure.CName).(secure.Service)
s.metric = a.MustComponent(metric.CName).(metric.Metric)
return nil
}
@ -87,7 +87,7 @@ func (s *drpcServer) Run(ctx context.Context) (err error) {
return
}
func (s *drpcServer) serve(ctx context.Context, lis secure2.ContextListener) {
func (s *drpcServer) serve(ctx context.Context, lis secure.ContextListener) {
l := log.With(zap.String("localAddr", lis.Addr().String()))
l.Info("drpc listener started")
defer func() {
@ -111,7 +111,7 @@ func (s *drpcServer) serve(ctx context.Context, lis secure2.ContextListener) {
}
continue
}
if _, ok := err.(secure2.HandshakeError); ok {
if _, ok := err.(secure.HandshakeError); ok {
l.Warn("listener handshake error", zap.Error(err))
continue
}

View file

@ -3,7 +3,7 @@ package secure
import (
"context"
"errors"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p/core/sec"
)
var (

View file

@ -2,7 +2,7 @@ package secure
import (
"context"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p/core/crypto"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
"net"
)

View file

@ -2,13 +2,13 @@ package secure
import (
"context"
commonaccount "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
commonaccount "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/sec"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
"go.uber.org/zap"
"net"

View file

@ -1,4 +1,4 @@
//go:generate mockgen -destination mock_nodeconf/mock_nodeconf.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf Configuration,ConfConnector
//go:generate mockgen -destination mock_nodeconf/mock_nodeconf.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf Service,Configuration,ConfConnector
package nodeconf
import (

View file

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf (interfaces: Configuration,ConfConnector)
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf (interfaces: Service,Configuration,ConfConnector)
// Package mock_nodeconf is a generated GoMock package.
package mock_nodeconf
@ -8,10 +8,105 @@ import (
context "context"
reflect "reflect"
app "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
peer "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
nodeconf "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
gomock "github.com/golang/mock/gomock"
)
// MockService is a mock of Service interface.
type MockService struct {
ctrl *gomock.Controller
recorder *MockServiceMockRecorder
}
// MockServiceMockRecorder is the mock recorder for MockService.
type MockServiceMockRecorder struct {
mock *MockService
}
// NewMockService creates a new mock instance.
func NewMockService(ctrl *gomock.Controller) *MockService {
mock := &MockService{ctrl: ctrl}
mock.recorder = &MockServiceMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockService) EXPECT() *MockServiceMockRecorder {
return m.recorder
}
// ConsensusPeers mocks base method.
func (m *MockService) ConsensusPeers() []string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ConsensusPeers")
ret0, _ := ret[0].([]string)
return ret0
}
// ConsensusPeers indicates an expected call of ConsensusPeers.
func (mr *MockServiceMockRecorder) ConsensusPeers() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConsensusPeers", reflect.TypeOf((*MockService)(nil).ConsensusPeers))
}
// GetById mocks base method.
func (m *MockService) GetById(arg0 string) nodeconf.Configuration {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetById", arg0)
ret0, _ := ret[0].(nodeconf.Configuration)
return ret0
}
// GetById indicates an expected call of GetById.
func (mr *MockServiceMockRecorder) GetById(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetById", reflect.TypeOf((*MockService)(nil).GetById), arg0)
}
// GetLast mocks base method.
func (m *MockService) GetLast() nodeconf.Configuration {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetLast")
ret0, _ := ret[0].(nodeconf.Configuration)
return ret0
}
// GetLast indicates an expected call of GetLast.
func (mr *MockServiceMockRecorder) GetLast() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLast", reflect.TypeOf((*MockService)(nil).GetLast))
}
// Init mocks base method.
func (m *MockService) Init(arg0 *app.App) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Init", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Init indicates an expected call of Init.
func (mr *MockServiceMockRecorder) Init(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockService)(nil).Init), arg0)
}
// Name mocks base method.
func (m *MockService) Name() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Name")
ret0, _ := ret[0].(string)
return ret0
}
// Name indicates an expected call of Name.
func (mr *MockServiceMockRecorder) Name() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockService)(nil).Name))
}
// MockConfiguration is a mock of Configuration interface.
type MockConfiguration struct {
ctrl *gomock.Controller

View file

@ -3,10 +3,10 @@ package nodeconf
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
config2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys"
encryptionkey2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey"
signingkey2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
"github.com/anytypeio/go-chash"
)
@ -36,8 +36,8 @@ type service struct {
type Node struct {
Address string
PeerId string
SigningKey signingkey2.PubKey
EncryptionKey encryptionkey2.PubKey
SigningKey signingkey.PubKey
EncryptionKey encryptionkey.PubKey
}
func (n *Node) Id() string {
@ -49,7 +49,7 @@ func (n *Node) Capacity() float64 {
}
func (s *service) Init(a *app.App) (err error) {
conf := a.MustComponent(config2.CName).(*config2.Config)
conf := a.MustComponent(config.CName).(*config.Config)
s.accountId = conf.Account.PeerId
config := &configuration{
@ -100,10 +100,10 @@ func (s *service) ConsensusPeers() []string {
}
func nodeFromConfigNode(
n config2.Node) (*Node, error) {
n config.Node) (*Node, error) {
decodedSigningKey, err := keys.DecodeKeyFromString(
n.SigningKey,
signingkey2.UnmarshalEd25519PrivateKey,
signingkey.UnmarshalEd25519PrivateKey,
nil)
if err != nil {
return nil, err
@ -111,7 +111,7 @@ func nodeFromConfigNode(
decodedEncryptionKey, err := keys.DecodeKeyFromString(
n.EncryptionKey,
encryptionkey2.NewEncryptionRsaPrivKeyFromBytes,
encryptionkey.NewEncryptionRsaPrivKeyFromBytes,
nil)
if err != nil {
return nil, err

View file

@ -2,8 +2,8 @@ package peer
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
)
func IDFromSigningPubKey(pubKey signingkey.PubKey) (peer.ID, error) {

View file

@ -2,37 +2,65 @@ package consensusclient
import (
"context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto"
_ "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto/consensuserr"
"go.uber.org/zap"
"sync"
"time"
)
const CName = "consensus.client"
var log = logger.NewNamed(CName)
var (
ErrWatcherExists = errors.New("watcher exists")
ErrWatcherNotExists = errors.New("watcher not exists")
)
func New() Service {
return new(service)
}
// Watcher watches new events by specified logId
type Watcher interface {
AddConsensusRecords(recs []*consensusproto.Record)
AddConsensusError(err error)
}
type Service interface {
// AddLog adds new log to consensus servers
AddLog(ctx context.Context, clog *consensusproto.Log) (err error)
// AddRecord adds new record to consensus servers
AddRecord(ctx context.Context, logId []byte, clog *consensusproto.Record) (err error)
WatchLog(ctx context.Context) (stream Stream, err error)
app.Component
// Watch starts watching to given logId and calls watcher when any relative event received
Watch(logId []byte, w Watcher) (err error)
// UnWatch stops watching given logId and removes watcher
UnWatch(logId []byte) (err error)
app.ComponentRunnable
}
type service struct {
pool pool.Pool
nodeconf nodeconf.Service
watchers map[string]Watcher
stream *stream
close chan struct{}
mu sync.Mutex
}
func (s *service) Init(a *app.App) (err error) {
s.pool = a.MustComponent(pool.CName).(pool.Pool)
s.nodeconf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
s.watchers = make(map[string]Watcher)
s.close = make(chan struct{})
return nil
}
@ -40,6 +68,11 @@ func (s *service) Name() (name string) {
return CName
}
func (s *service) Run(_ context.Context) error {
go s.streamWatcher()
return nil
}
func (s *service) getClient(ctx context.Context) (consensusproto.DRPCConsensusClient, error) {
peer, err := s.pool.GetOneOf(ctx, s.nodeconf.ConsensusPeers())
if err != nil {
@ -83,7 +116,37 @@ func (s *service) AddRecord(ctx context.Context, logId []byte, clog *consensuspr
return
}
func (s *service) WatchLog(ctx context.Context) (st Stream, err error) {
func (s *service) Watch(logId []byte, w Watcher) (err error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.watchers[string(logId)]; ok {
return ErrWatcherExists
}
s.watchers[string(logId)] = w
if s.stream != nil {
if wErr := s.stream.WatchIds([][]byte{logId}); wErr != nil {
log.Warn("WatchIds error", zap.Error(wErr))
}
}
return
}
func (s *service) UnWatch(logId []byte) (err error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.watchers[string(logId)]; !ok {
return ErrWatcherNotExists
}
delete(s.watchers, string(logId))
if s.stream != nil {
if wErr := s.stream.UnwatchIds([][]byte{logId}); wErr != nil {
log.Warn("UnWatchIds error", zap.Error(wErr))
}
}
return
}
func (s *service) openStream(ctx context.Context) (st *stream, err error) {
cl, err := s.dialClient(ctx)
if err != nil {
return
@ -94,3 +157,87 @@ func (s *service) WatchLog(ctx context.Context) (st Stream, err error) {
}
return runStream(rpcStream), nil
}
func (s *service) streamWatcher() {
var (
err error
st *stream
i int
)
for {
// open stream
if st, err = s.openStream(context.Background()); err != nil {
// can't open stream, we will retry until success connection or close
if i < 60 {
i++
}
sleepTime := time.Second * time.Duration(i)
log.Error("watch log error", zap.Error(err), zap.Duration("waitTime", sleepTime))
select {
case <-time.After(sleepTime):
continue
case <-s.close:
return
}
}
i = 0
// collect ids and setup stream
s.mu.Lock()
var logIds = make([][]byte, 0, len(s.watchers))
for id := range s.watchers {
logIds = append(logIds, []byte(id))
}
s.stream = st
s.mu.Unlock()
// restore subscriptions
if len(logIds) > 0 {
if err = s.stream.WatchIds(logIds); err != nil {
log.Error("watch ids error", zap.Error(err))
continue
}
}
// read stream
if err = s.streamReader(); err != nil {
log.Error("stream read error", zap.Error(err))
continue
}
return
}
}
func (s *service) streamReader() error {
for {
events := s.stream.WaitLogs()
if len(events) == 0 {
return s.stream.Err()
}
for _, e := range events {
if w, ok := s.watchers[string(e.LogId)]; ok {
if e.Error == nil {
w.AddConsensusRecords(e.Records)
} else {
w.AddConsensusError(rpcerr.Err(uint64(e.Error.Error)))
}
} else {
log.Warn("received unexpected log id", zap.Binary("logId", e.LogId))
}
}
}
}
func (s *service) Close(_ context.Context) error {
s.mu.Lock()
if s.stream != nil {
_ = s.stream.Close()
}
s.mu.Unlock()
select {
case <-s.close:
default:
close(s.close)
}
return nil
}

View file

@ -0,0 +1,221 @@
package consensusclient
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpctest"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf/mock_nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto/consensuserr"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func TestService_Watch(t *testing.T) {
t.Run("not found error", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
var logId = []byte{'1'}
w := &testWatcher{}
require.NoError(t, fx.Watch(logId, w))
st := fx.testServer.waitStream(t)
req, err := st.Recv()
require.NoError(t, err)
assert.Equal(t, [][]byte{logId}, req.WatchIds)
require.NoError(t, st.Send(&consensusproto.WatchLogEvent{
LogId: logId,
Error: &consensusproto.Err{
Error: consensusproto.ErrCodes_ErrorOffset + consensusproto.ErrCodes_LogNotFound,
},
}))
assert.Equal(t, consensuserr.ErrLogNotFound, w.err)
fx.testServer.releaseStream <- nil
})
t.Run("watcherExists error", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
var logId = []byte{'1'}
w := &testWatcher{}
require.NoError(t, fx.Watch(logId, w))
require.Error(t, fx.Watch(logId, w))
st := fx.testServer.waitStream(t)
st.Recv()
fx.testServer.releaseStream <- nil
})
t.Run("watch", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
var logId1 = []byte{'1'}
w := &testWatcher{}
require.NoError(t, fx.Watch(logId1, w))
st := fx.testServer.waitStream(t)
req, err := st.Recv()
require.NoError(t, err)
assert.Equal(t, [][]byte{logId1}, req.WatchIds)
var logId2 = []byte{'2'}
w = &testWatcher{}
require.NoError(t, fx.Watch(logId2, w))
req, err = st.Recv()
require.NoError(t, err)
assert.Equal(t, [][]byte{logId2}, req.WatchIds)
fx.testServer.releaseStream <- nil
})
}
func TestService_UnWatch(t *testing.T) {
t.Run("no watcher", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
require.Error(t, fx.UnWatch([]byte{'1'}))
})
t.Run("success", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
w := &testWatcher{}
require.NoError(t, fx.Watch([]byte{'1'}, w))
assert.NoError(t, fx.UnWatch([]byte{'1'}))
})
}
func TestService_Init(t *testing.T) {
t.Run("reconnect on watch err", func(t *testing.T) {
fx := newFixture(t)
fx.testServer.watchErrOnce = true
fx.run(t)
defer fx.Finish()
fx.testServer.waitStream(t)
fx.testServer.releaseStream <- nil
})
t.Run("reconnect on start", func(t *testing.T) {
fx := newFixture(t)
fx.a.MustComponent(pool.CName).(*rpctest.TestPool).WithServer(nil)
fx.run(t)
defer fx.Finish()
time.Sleep(time.Millisecond * 50)
fx.a.MustComponent(pool.CName).(*rpctest.TestPool).WithServer(fx.drpcTS)
fx.testServer.waitStream(t)
fx.testServer.releaseStream <- nil
})
}
func TestService_AddLog(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
assert.NoError(t, fx.AddLog(ctx, &consensusproto.Log{}))
}
func TestService_AddRecord(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
assert.NoError(t, fx.AddRecord(ctx, []byte{'1'}, &consensusproto.Record{}))
}
var ctx = context.Background()
func newFixture(t *testing.T) *fixture {
fx := &fixture{
Service: New(),
a: &app.App{},
ctrl: gomock.NewController(t),
testServer: &testServer{
stream: make(chan consensusproto.DRPCConsensus_WatchLogStream),
releaseStream: make(chan error),
},
}
fx.nodeconf = mock_nodeconf.NewMockService(fx.ctrl)
fx.nodeconf.EXPECT().Init(gomock.Any())
fx.nodeconf.EXPECT().Name().Return(nodeconf.CName).AnyTimes()
fx.nodeconf.EXPECT().ConsensusPeers().Return([]string{"c1", "c2"}).AnyTimes()
fx.drpcTS = rpctest.NewTestServer()
require.NoError(t, consensusproto.DRPCRegisterConsensus(fx.drpcTS.Mux, fx.testServer))
fx.a.Register(fx.Service).
Register(fx.nodeconf).
Register(rpctest.NewTestPool().WithServer(fx.drpcTS))
return fx
}
type fixture struct {
Service
nodeconf *mock_nodeconf.MockService
a *app.App
ctrl *gomock.Controller
testServer *testServer
drpcTS *rpctest.TesServer
}
func (fx *fixture) run(t *testing.T) *fixture {
require.NoError(t, fx.a.Start(ctx))
return fx
}
func (fx *fixture) Finish() {
assert.NoError(fx.ctrl.T, fx.a.Close(ctx))
fx.ctrl.Finish()
}
type testServer struct {
stream chan consensusproto.DRPCConsensus_WatchLogStream
addLog func(ctx context.Context, req *consensusproto.AddLogRequest) error
addRecord func(ctx context.Context, req *consensusproto.AddRecordRequest) error
releaseStream chan error
watchErrOnce bool
}
func (t *testServer) AddLog(ctx context.Context, req *consensusproto.AddLogRequest) (*consensusproto.Ok, error) {
if t.addLog != nil {
if err := t.addLog(ctx, req); err != nil {
return nil, err
}
}
return &consensusproto.Ok{}, nil
}
func (t *testServer) AddRecord(ctx context.Context, req *consensusproto.AddRecordRequest) (*consensusproto.Ok, error) {
if t.addRecord != nil {
if err := t.addRecord(ctx, req); err != nil {
return nil, err
}
}
return &consensusproto.Ok{}, nil
}
func (t *testServer) WatchLog(stream consensusproto.DRPCConsensus_WatchLogStream) error {
fmt.Println("watchLog", t.watchErrOnce)
if t.watchErrOnce {
t.watchErrOnce = false
return fmt.Errorf("error")
}
t.stream <- stream
return <-t.releaseStream
}
func (t *testServer) waitStream(test *testing.T) consensusproto.DRPCConsensus_WatchLogStream {
select {
case <-time.After(time.Second * 5):
test.Fatalf("waiteStream timeout")
case st := <-t.stream:
return st
}
return nil
}
type testWatcher struct {
recs [][]*consensusproto.Record
err error
}
func (t *testWatcher) AddConsensusRecords(recs []*consensusproto.Record) {
t.recs = append(t.recs, recs)
}
func (t *testWatcher) AddConsensusError(err error) {
t.err = err
}

View file

@ -5,18 +5,10 @@ import (
"github.com/cheggaaa/mb/v2"
)
type Stream interface {
WatchIds(logIds [][]byte) (err error)
UnwatchIds(logIds [][]byte) (err error)
WaitLogs() []*consensusproto.Log
Err() error
Close() error
}
func runStream(rpcStream consensusproto.DRPCConsensus_WatchLogClient) Stream {
func runStream(rpcStream consensusproto.DRPCConsensus_WatchLogClient) *stream {
st := &stream{
rpcStream: rpcStream,
mb: mb.New((*consensusproto.Log)(nil), 100),
mb: mb.New((*consensusproto.WatchLogEvent)(nil), 100),
}
go st.readStream()
return st
@ -24,7 +16,7 @@ func runStream(rpcStream consensusproto.DRPCConsensus_WatchLogClient) Stream {
type stream struct {
rpcStream consensusproto.DRPCConsensus_WatchLogClient
mb *mb.MB[*consensusproto.Log]
mb *mb.MB[*consensusproto.WatchLogEvent]
err error
}
@ -40,7 +32,7 @@ func (s *stream) UnwatchIds(logIds [][]byte) (err error) {
})
}
func (s *stream) WaitLogs() []*consensusproto.Log {
func (s *stream) WaitLogs() []*consensusproto.WatchLogEvent {
return s.mb.Wait()
}
@ -56,10 +48,7 @@ func (s *stream) readStream() {
s.err = err
return
}
if err = s.mb.Add(&consensusproto.Log{
Id: event.LogId,
Records: event.Records,
}); err != nil {
if err = s.mb.Add(event); err != nil {
return
}
}

View file

@ -1 +0,0 @@
package acl

View file

@ -1,38 +0,0 @@
package acl
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusclient"
)
const CName = "node.acl"
type Service interface {
app.ComponentRunnable
}
type aclService struct {
cons consensusclient.Service
stream consensusclient.Stream
}
func (as *aclService) Init(a *app.App) (err error) {
as.cons = a.MustComponent(consensusclient.CName).(consensusclient.Service)
return nil
}
func (as *aclService) Name() (name string) {
return CName
}
func (as *aclService) Run(ctx context.Context) (err error) {
if as.stream, err = as.cons.WatchLog(ctx); err != nil {
return
}
return nil
}
func (as *aclService) Close(ctx context.Context) (err error) {
return as.stream.Close()
}

View file

@ -4,9 +4,12 @@ go 1.19
replace github.com/anytypeio/go-anytype-infrastructure-experiments/common => ../common
replace github.com/anytypeio/go-anytype-infrastructure-experiments/consensus => ../consensus
require (
github.com/akrylysov/pogreb v0.10.1
github.com/anytypeio/go-anytype-infrastructure-experiments/common v0.0.0-00010101000000-000000000000
github.com/anytypeio/go-anytype-infrastructure-experiments/consensus v0.0.0-00010101000000-000000000000
go.uber.org/zap v1.23.0
)
@ -15,6 +18,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cheggaaa/mb/v2 v2.0.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/fogleman/gg v1.3.0 // indirect
github.com/goccy/go-graphviz v0.0.9 // indirect
@ -52,7 +56,7 @@ require (
github.com/zeebo/errs v1.3.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect
google.golang.org/protobuf v1.28.1 // indirect

View file

@ -57,6 +57,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cheggaaa/mb/v2 v2.0.1 h1:gn0khbEbKlw3i5VOYi0VnHEHayjZKfUDOyGSpHAybBs=
github.com/cheggaaa/mb/v2 v2.0.1/go.mod h1:XGeZw20Iqgjky26KL0mvCwk3+4NyZCUbshSo6ALne+c=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@ -310,8 +312,8 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=