mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
consensus client
This commit is contained in:
parent
5a02d1c338
commit
a092f7b4a1
9 changed files with 2962 additions and 0 deletions
1
Makefile
1
Makefile
|
@ -20,6 +20,7 @@ proto:
|
|||
protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. net/streampool/testservice/protos/*.proto
|
||||
protoc --gogofaster_out=:. net/secureservice/handshake/handshakeproto/protos/*.proto
|
||||
protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. coordinator/coordinatorproto/protos/*.proto
|
||||
protoc --gogofaster_out=:. --go-drpc_out=protolib=github.com/gogo/protobuf:. consensus/consensusproto/protos/*.proto
|
||||
|
||||
deps:
|
||||
go mod download
|
||||
|
|
240
consensus/consensusclient/client.go
Normal file
240
consensus/consensusclient/client.go
Normal file
|
@ -0,0 +1,240 @@
|
|||
//go:generate mockgen -destination mock_consensusclient/mock_consensusclient.go github.com/anyproto/any-sync/consensus/consensusclient Service
|
||||
package consensusclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/app/logger"
|
||||
"github.com/anyproto/any-sync/consensus/consensusproto"
|
||||
"github.com/anyproto/any-sync/net/pool"
|
||||
"github.com/anyproto/any-sync/net/rpc/rpcerr"
|
||||
"github.com/anyproto/any-sync/nodeconf"
|
||||
"go.uber.org/zap"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const CName = "consensus.consensusclient"
|
||||
|
||||
var log = logger.NewNamed(CName)
|
||||
|
||||
var (
|
||||
ErrWatcherExists = errors.New("watcher exists")
|
||||
ErrWatcherNotExists = errors.New("watcher not exists")
|
||||
)
|
||||
|
||||
func New() Service {
|
||||
return new(service)
|
||||
}
|
||||
|
||||
// Watcher watches new events by specified logId
|
||||
type Watcher interface {
|
||||
AddConsensusRecords(recs []*consensusproto.Record)
|
||||
AddConsensusError(err error)
|
||||
}
|
||||
|
||||
type Service interface {
|
||||
// AddLog adds new log to consensus servers
|
||||
AddLog(ctx context.Context, clog *consensusproto.Log) (err error)
|
||||
// AddRecord adds new record to consensus servers
|
||||
AddRecord(ctx context.Context, logId []byte, clog *consensusproto.Record) (err error)
|
||||
// Watch starts watching to given logId and calls watcher when any relative event received
|
||||
Watch(logId []byte, w Watcher) (err error)
|
||||
// UnWatch stops watching given logId and removes watcher
|
||||
UnWatch(logId []byte) (err error)
|
||||
app.ComponentRunnable
|
||||
}
|
||||
|
||||
type service struct {
|
||||
pool pool.Pool
|
||||
nodeconf nodeconf.Service
|
||||
|
||||
watchers map[string]Watcher
|
||||
stream *stream
|
||||
close chan struct{}
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (s *service) Init(a *app.App) (err error) {
|
||||
s.pool = a.MustComponent(pool.CName).(pool.Pool)
|
||||
s.nodeconf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
|
||||
s.watchers = make(map[string]Watcher)
|
||||
s.close = make(chan struct{})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) Name() (name string) {
|
||||
return CName
|
||||
}
|
||||
|
||||
func (s *service) Run(_ context.Context) error {
|
||||
go s.streamWatcher()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) doClient(ctx context.Context, fn func(cl consensusproto.DRPCConsensusClient) error) error {
|
||||
peer, err := s.pool.GetOneOf(ctx, s.nodeconf.ConsensusPeers())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dc, err := peer.AcquireDrpcConn(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer peer.ReleaseDrpcConn(dc)
|
||||
return fn(consensusproto.NewDRPCConsensusClient(dc))
|
||||
}
|
||||
|
||||
func (s *service) AddLog(ctx context.Context, clog *consensusproto.Log) (err error) {
|
||||
return s.doClient(ctx, func(cl consensusproto.DRPCConsensusClient) error {
|
||||
if _, err = cl.LogAdd(ctx, &consensusproto.LogAddRequest{
|
||||
Log: clog,
|
||||
}); err != nil {
|
||||
return rpcerr.Unwrap(err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *service) AddRecord(ctx context.Context, logId []byte, clog *consensusproto.Record) (err error) {
|
||||
return s.doClient(ctx, func(cl consensusproto.DRPCConsensusClient) error {
|
||||
if _, err = cl.RecordAdd(ctx, &consensusproto.RecordAddRequest{
|
||||
LogId: logId,
|
||||
Record: clog,
|
||||
}); err != nil {
|
||||
return rpcerr.Unwrap(err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *service) Watch(logId []byte, w Watcher) (err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if _, ok := s.watchers[string(logId)]; ok {
|
||||
return ErrWatcherExists
|
||||
}
|
||||
s.watchers[string(logId)] = w
|
||||
if s.stream != nil {
|
||||
if wErr := s.stream.WatchIds([][]byte{logId}); wErr != nil {
|
||||
log.Warn("WatchIds error", zap.Error(wErr))
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *service) UnWatch(logId []byte) (err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if _, ok := s.watchers[string(logId)]; !ok {
|
||||
return ErrWatcherNotExists
|
||||
}
|
||||
delete(s.watchers, string(logId))
|
||||
if s.stream != nil {
|
||||
if wErr := s.stream.UnwatchIds([][]byte{logId}); wErr != nil {
|
||||
log.Warn("UnWatchIds error", zap.Error(wErr))
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *service) openStream(ctx context.Context) (st *stream, err error) {
|
||||
pr, err := s.pool.GetOneOf(ctx, s.nodeconf.ConsensusPeers())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dc, err := pr.AcquireDrpcConn(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rpcStream, err := consensusproto.NewDRPCConsensusClient(dc).LogWatch(ctx)
|
||||
if err != nil {
|
||||
return nil, rpcerr.Unwrap(err)
|
||||
}
|
||||
return runStream(rpcStream), nil
|
||||
}
|
||||
|
||||
func (s *service) streamWatcher() {
|
||||
var (
|
||||
err error
|
||||
st *stream
|
||||
i int
|
||||
)
|
||||
for {
|
||||
// open stream
|
||||
if st, err = s.openStream(context.Background()); err != nil {
|
||||
// can't open stream, we will retry until success connection or close
|
||||
if i < 60 {
|
||||
i++
|
||||
}
|
||||
sleepTime := time.Second * time.Duration(i)
|
||||
log.Error("watch log error", zap.Error(err), zap.Duration("waitTime", sleepTime))
|
||||
select {
|
||||
case <-time.After(sleepTime):
|
||||
continue
|
||||
case <-s.close:
|
||||
return
|
||||
}
|
||||
}
|
||||
i = 0
|
||||
|
||||
// collect ids and setup stream
|
||||
s.mu.Lock()
|
||||
var logIds = make([][]byte, 0, len(s.watchers))
|
||||
for id := range s.watchers {
|
||||
logIds = append(logIds, []byte(id))
|
||||
}
|
||||
s.stream = st
|
||||
s.mu.Unlock()
|
||||
|
||||
// restore subscriptions
|
||||
if len(logIds) > 0 {
|
||||
if err = s.stream.WatchIds(logIds); err != nil {
|
||||
log.Error("watch ids error", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// read stream
|
||||
if err = s.streamReader(); err != nil {
|
||||
log.Error("stream read error", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) streamReader() error {
|
||||
for {
|
||||
events := s.stream.WaitLogs()
|
||||
if len(events) == 0 {
|
||||
return s.stream.Err()
|
||||
}
|
||||
for _, e := range events {
|
||||
if w, ok := s.watchers[string(e.LogId)]; ok {
|
||||
if e.Error == nil {
|
||||
w.AddConsensusRecords(e.Records)
|
||||
} else {
|
||||
w.AddConsensusError(rpcerr.Err(uint64(e.Error.Error)))
|
||||
}
|
||||
} else {
|
||||
log.Warn("received unexpected log id", zap.Binary("logId", e.LogId))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) Close(_ context.Context) error {
|
||||
s.mu.Lock()
|
||||
if s.stream != nil {
|
||||
_ = s.stream.Close()
|
||||
}
|
||||
s.mu.Unlock()
|
||||
select {
|
||||
case <-s.close:
|
||||
default:
|
||||
close(s.close)
|
||||
}
|
||||
return nil
|
||||
}
|
236
consensus/consensusclient/client_test.go
Normal file
236
consensus/consensusclient/client_test.go
Normal file
|
@ -0,0 +1,236 @@
|
|||
package consensusclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/consensus/consensusproto"
|
||||
"github.com/anyproto/any-sync/consensus/consensusproto/consensuserr"
|
||||
"github.com/anyproto/any-sync/net/pool"
|
||||
"github.com/anyproto/any-sync/net/rpc/rpctest"
|
||||
"github.com/anyproto/any-sync/nodeconf"
|
||||
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
|
||||
"github.com/anyproto/any-sync/testutil/accounttest"
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestService_Watch(t *testing.T) {
|
||||
t.Run("not found error", func(t *testing.T) {
|
||||
fx := newFixture(t).run(t)
|
||||
defer fx.Finish()
|
||||
var logId = []byte{'1'}
|
||||
w := &testWatcher{ready: make(chan struct{})}
|
||||
require.NoError(t, fx.Watch(logId, w))
|
||||
st := fx.testServer.waitStream(t)
|
||||
req, err := st.Recv()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, [][]byte{logId}, req.WatchIds)
|
||||
require.NoError(t, st.Send(&consensusproto.LogWatchEvent{
|
||||
LogId: logId,
|
||||
Error: &consensusproto.Err{
|
||||
Error: consensusproto.ErrCodes_ErrorOffset + consensusproto.ErrCodes_LogNotFound,
|
||||
},
|
||||
}))
|
||||
<-w.ready
|
||||
assert.Equal(t, consensuserr.ErrLogNotFound, w.err)
|
||||
fx.testServer.releaseStream <- nil
|
||||
})
|
||||
t.Run("watcherExists error", func(t *testing.T) {
|
||||
fx := newFixture(t).run(t)
|
||||
defer fx.Finish()
|
||||
var logId = []byte{'1'}
|
||||
w := &testWatcher{}
|
||||
require.NoError(t, fx.Watch(logId, w))
|
||||
require.Error(t, fx.Watch(logId, w))
|
||||
st := fx.testServer.waitStream(t)
|
||||
st.Recv()
|
||||
fx.testServer.releaseStream <- nil
|
||||
})
|
||||
t.Run("watch", func(t *testing.T) {
|
||||
fx := newFixture(t).run(t)
|
||||
defer fx.Finish()
|
||||
var logId1 = []byte{'1'}
|
||||
w := &testWatcher{}
|
||||
require.NoError(t, fx.Watch(logId1, w))
|
||||
st := fx.testServer.waitStream(t)
|
||||
req, err := st.Recv()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, [][]byte{logId1}, req.WatchIds)
|
||||
|
||||
var logId2 = []byte{'2'}
|
||||
w = &testWatcher{}
|
||||
require.NoError(t, fx.Watch(logId2, w))
|
||||
req, err = st.Recv()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, [][]byte{logId2}, req.WatchIds)
|
||||
|
||||
fx.testServer.releaseStream <- nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestService_UnWatch(t *testing.T) {
|
||||
t.Run("no watcher", func(t *testing.T) {
|
||||
fx := newFixture(t).run(t)
|
||||
defer fx.Finish()
|
||||
require.Error(t, fx.UnWatch([]byte{'1'}))
|
||||
})
|
||||
t.Run("success", func(t *testing.T) {
|
||||
fx := newFixture(t).run(t)
|
||||
defer fx.Finish()
|
||||
w := &testWatcher{}
|
||||
require.NoError(t, fx.Watch([]byte{'1'}, w))
|
||||
assert.NoError(t, fx.UnWatch([]byte{'1'}))
|
||||
})
|
||||
}
|
||||
|
||||
func TestService_Init(t *testing.T) {
|
||||
t.Run("reconnect on watch err", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
fx.testServer.watchErrOnce = true
|
||||
fx.run(t)
|
||||
defer fx.Finish()
|
||||
fx.testServer.waitStream(t)
|
||||
fx.testServer.releaseStream <- nil
|
||||
})
|
||||
t.Run("reconnect on start", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
fx.a.MustComponent(pool.CName).(*rpctest.TestPool).WithServer(nil)
|
||||
fx.run(t)
|
||||
defer fx.Finish()
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
fx.a.MustComponent(pool.CName).(*rpctest.TestPool).WithServer(fx.drpcTS)
|
||||
fx.testServer.waitStream(t)
|
||||
fx.testServer.releaseStream <- nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestService_AddLog(t *testing.T) {
|
||||
fx := newFixture(t).run(t)
|
||||
defer fx.Finish()
|
||||
assert.NoError(t, fx.AddLog(ctx, &consensusproto.Log{}))
|
||||
}
|
||||
|
||||
func TestService_AddRecord(t *testing.T) {
|
||||
fx := newFixture(t).run(t)
|
||||
defer fx.Finish()
|
||||
assert.NoError(t, fx.AddRecord(ctx, []byte{'1'}, &consensusproto.Record{}))
|
||||
}
|
||||
|
||||
var ctx = context.Background()
|
||||
|
||||
func newFixture(t *testing.T) *fixture {
|
||||
fx := &fixture{
|
||||
Service: New(),
|
||||
a: &app.App{},
|
||||
ctrl: gomock.NewController(t),
|
||||
testServer: &testServer{
|
||||
stream: make(chan consensusproto.DRPCConsensus_LogWatchStream),
|
||||
releaseStream: make(chan error),
|
||||
},
|
||||
}
|
||||
fx.nodeconf = mock_nodeconf.NewMockService(fx.ctrl)
|
||||
fx.nodeconf.EXPECT().Name().Return(nodeconf.CName).AnyTimes()
|
||||
fx.nodeconf.EXPECT().Init(gomock.Any()).AnyTimes()
|
||||
fx.nodeconf.EXPECT().Run(gomock.Any()).AnyTimes()
|
||||
fx.nodeconf.EXPECT().Close(gomock.Any()).AnyTimes()
|
||||
fx.nodeconf.EXPECT().ConsensusPeers().Return([]string{"c1", "c2", "c3"}).AnyTimes()
|
||||
|
||||
fx.drpcTS = rpctest.NewTestServer()
|
||||
require.NoError(t, consensusproto.DRPCRegisterConsensus(fx.drpcTS.Mux, fx.testServer))
|
||||
fx.a.Register(fx.Service).
|
||||
Register(&accounttest.AccountTestService{}).
|
||||
Register(fx.nodeconf).
|
||||
Register(rpctest.NewTestPool().WithServer(fx.drpcTS))
|
||||
|
||||
return fx
|
||||
}
|
||||
|
||||
type fixture struct {
|
||||
Service
|
||||
a *app.App
|
||||
ctrl *gomock.Controller
|
||||
testServer *testServer
|
||||
drpcTS *rpctest.TestServer
|
||||
nodeconf *mock_nodeconf.MockService
|
||||
}
|
||||
|
||||
func (fx *fixture) run(t *testing.T) *fixture {
|
||||
require.NoError(t, fx.a.Start(ctx))
|
||||
return fx
|
||||
}
|
||||
|
||||
func (fx *fixture) Finish() {
|
||||
assert.NoError(fx.ctrl.T, fx.a.Close(ctx))
|
||||
fx.ctrl.Finish()
|
||||
}
|
||||
|
||||
type testServer struct {
|
||||
stream chan consensusproto.DRPCConsensus_LogWatchStream
|
||||
addLog func(ctx context.Context, req *consensusproto.LogAddRequest) error
|
||||
addRecord func(ctx context.Context, req *consensusproto.RecordAddRequest) error
|
||||
releaseStream chan error
|
||||
watchErrOnce bool
|
||||
}
|
||||
|
||||
func (t *testServer) LogAdd(ctx context.Context, req *consensusproto.LogAddRequest) (*consensusproto.Ok, error) {
|
||||
if t.addLog != nil {
|
||||
if err := t.addLog(ctx, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &consensusproto.Ok{}, nil
|
||||
}
|
||||
|
||||
func (t *testServer) RecordAdd(ctx context.Context, req *consensusproto.RecordAddRequest) (*consensusproto.Ok, error) {
|
||||
if t.addRecord != nil {
|
||||
if err := t.addRecord(ctx, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &consensusproto.Ok{}, nil
|
||||
}
|
||||
|
||||
func (t *testServer) LogWatch(stream consensusproto.DRPCConsensus_LogWatchStream) error {
|
||||
if t.watchErrOnce {
|
||||
t.watchErrOnce = false
|
||||
return fmt.Errorf("error")
|
||||
}
|
||||
t.stream <- stream
|
||||
return <-t.releaseStream
|
||||
}
|
||||
|
||||
func (t *testServer) waitStream(test *testing.T) consensusproto.DRPCConsensus_LogWatchStream {
|
||||
select {
|
||||
case <-time.After(time.Second * 5):
|
||||
test.Fatalf("waiteStream timeout")
|
||||
case st := <-t.stream:
|
||||
return st
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type testWatcher struct {
|
||||
recs [][]*consensusproto.Record
|
||||
err error
|
||||
ready chan struct{}
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func (t *testWatcher) AddConsensusRecords(recs []*consensusproto.Record) {
|
||||
t.recs = append(t.recs, recs)
|
||||
t.once.Do(func() {
|
||||
close(t.ready)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *testWatcher) AddConsensusError(err error) {
|
||||
t.err = err
|
||||
t.once.Do(func() {
|
||||
close(t.ready)
|
||||
})
|
||||
}
|
|
@ -0,0 +1,150 @@
|
|||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/anyproto/go-anytype-infrastructure-experiments/consensus/consensusclient (interfaces: Service)
|
||||
|
||||
// Package mock_consensusclient is a generated GoMock package.
|
||||
package mock_consensusclient
|
||||
|
||||
import (
|
||||
context "context"
|
||||
reflect "reflect"
|
||||
|
||||
app "github.com/anyproto/any-sync/app"
|
||||
consensusclient "github.com/anyproto/any-sync-consensusnode/consensusclient"
|
||||
consensusproto "github.com/anyproto/any-sync-consensusnode/consensusproto"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
)
|
||||
|
||||
// MockService is a mock of Service interface.
|
||||
type MockService struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockServiceMockRecorder
|
||||
}
|
||||
|
||||
// MockServiceMockRecorder is the mock recorder for MockService.
|
||||
type MockServiceMockRecorder struct {
|
||||
mock *MockService
|
||||
}
|
||||
|
||||
// NewMockService creates a new mock instance.
|
||||
func NewMockService(ctrl *gomock.Controller) *MockService {
|
||||
mock := &MockService{ctrl: ctrl}
|
||||
mock.recorder = &MockServiceMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockService) EXPECT() *MockServiceMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// AddLog mocks base method.
|
||||
func (m *MockService) AddLog(arg0 context.Context, arg1 *consensusproto.Log) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "AddLog", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// AddLog indicates an expected call of AddLog.
|
||||
func (mr *MockServiceMockRecorder) AddLog(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLog", reflect.TypeOf((*MockService)(nil).AddLog), arg0, arg1)
|
||||
}
|
||||
|
||||
// AddRecord mocks base method.
|
||||
func (m *MockService) AddRecord(arg0 context.Context, arg1 []byte, arg2 *consensusproto.Record) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "AddRecord", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// AddRecord indicates an expected call of AddRecord.
|
||||
func (mr *MockServiceMockRecorder) AddRecord(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRecord", reflect.TypeOf((*MockService)(nil).AddRecord), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// Close mocks base method.
|
||||
func (m *MockService) Close(arg0 context.Context) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Close", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Close indicates an expected call of Close.
|
||||
func (mr *MockServiceMockRecorder) Close(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockService)(nil).Close), arg0)
|
||||
}
|
||||
|
||||
// Init mocks base method.
|
||||
func (m *MockService) Init(arg0 *app.App) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Init", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Init indicates an expected call of Init.
|
||||
func (mr *MockServiceMockRecorder) Init(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockService)(nil).Init), arg0)
|
||||
}
|
||||
|
||||
// Name mocks base method.
|
||||
func (m *MockService) Name() string {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Name")
|
||||
ret0, _ := ret[0].(string)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Name indicates an expected call of Name.
|
||||
func (mr *MockServiceMockRecorder) Name() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockService)(nil).Name))
|
||||
}
|
||||
|
||||
// Run mocks base method.
|
||||
func (m *MockService) Run(arg0 context.Context) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Run", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Run indicates an expected call of Run.
|
||||
func (mr *MockServiceMockRecorder) Run(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockService)(nil).Run), arg0)
|
||||
}
|
||||
|
||||
// UnWatch mocks base method.
|
||||
func (m *MockService) UnWatch(arg0 []byte) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "UnWatch", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// UnWatch indicates an expected call of UnWatch.
|
||||
func (mr *MockServiceMockRecorder) UnWatch(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnWatch", reflect.TypeOf((*MockService)(nil).UnWatch), arg0)
|
||||
}
|
||||
|
||||
// Watch mocks base method.
|
||||
func (m *MockService) Watch(arg0 []byte, arg1 consensusclient.Watcher) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Watch", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Watch indicates an expected call of Watch.
|
||||
func (mr *MockServiceMockRecorder) Watch(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockService)(nil).Watch), arg0, arg1)
|
||||
}
|
70
consensus/consensusclient/stream.go
Normal file
70
consensus/consensusclient/stream.go
Normal file
|
@ -0,0 +1,70 @@
|
|||
package consensusclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anyproto/any-sync/consensus/consensusproto"
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func runStream(rpcStream consensusproto.DRPCConsensus_LogWatchClient) *stream {
|
||||
st := &stream{
|
||||
rpcStream: rpcStream,
|
||||
mb: mb.New[*consensusproto.LogWatchEvent](100),
|
||||
}
|
||||
go st.readStream()
|
||||
return st
|
||||
}
|
||||
|
||||
type stream struct {
|
||||
rpcStream consensusproto.DRPCConsensus_LogWatchClient
|
||||
mb *mb.MB[*consensusproto.LogWatchEvent]
|
||||
mu sync.Mutex
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *stream) WatchIds(logIds [][]byte) (err error) {
|
||||
return s.rpcStream.Send(&consensusproto.LogWatchRequest{
|
||||
WatchIds: logIds,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *stream) UnwatchIds(logIds [][]byte) (err error) {
|
||||
return s.rpcStream.Send(&consensusproto.LogWatchRequest{
|
||||
UnwatchIds: logIds,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *stream) WaitLogs() []*consensusproto.LogWatchEvent {
|
||||
events, _ := s.mb.Wait(context.TODO())
|
||||
return events
|
||||
}
|
||||
|
||||
func (s *stream) Err() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.err
|
||||
}
|
||||
|
||||
func (s *stream) readStream() {
|
||||
defer s.Close()
|
||||
for {
|
||||
event, err := s.rpcStream.Recv()
|
||||
if err != nil {
|
||||
s.mu.Lock()
|
||||
s.err = err
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if err = s.mb.Add(s.rpcStream.Context(), event); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *stream) Close() error {
|
||||
if err := s.mb.Close(); err == nil {
|
||||
return s.rpcStream.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
1957
consensus/consensusproto/consensus.pb.go
Normal file
1957
consensus/consensusproto/consensus.pb.go
Normal file
File diff suppressed because it is too large
Load diff
232
consensus/consensusproto/consensus_drpc.pb.go
Normal file
232
consensus/consensusproto/consensus_drpc.pb.go
Normal file
|
@ -0,0 +1,232 @@
|
|||
// Code generated by protoc-gen-go-drpc. DO NOT EDIT.
|
||||
// protoc-gen-go-drpc version: v0.0.33
|
||||
// source: consensus/consensusproto/protos/consensus.proto
|
||||
|
||||
package consensusproto
|
||||
|
||||
import (
|
||||
bytes "bytes"
|
||||
context "context"
|
||||
errors "errors"
|
||||
jsonpb "github.com/gogo/protobuf/jsonpb"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
drpc "storj.io/drpc"
|
||||
drpcerr "storj.io/drpc/drpcerr"
|
||||
)
|
||||
|
||||
type drpcEncoding_File_consensus_consensusproto_protos_consensus_proto struct{}
|
||||
|
||||
func (drpcEncoding_File_consensus_consensusproto_protos_consensus_proto) Marshal(msg drpc.Message) ([]byte, error) {
|
||||
return proto.Marshal(msg.(proto.Message))
|
||||
}
|
||||
|
||||
func (drpcEncoding_File_consensus_consensusproto_protos_consensus_proto) Unmarshal(buf []byte, msg drpc.Message) error {
|
||||
return proto.Unmarshal(buf, msg.(proto.Message))
|
||||
}
|
||||
|
||||
func (drpcEncoding_File_consensus_consensusproto_protos_consensus_proto) JSONMarshal(msg drpc.Message) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
err := new(jsonpb.Marshaler).Marshal(&buf, msg.(proto.Message))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func (drpcEncoding_File_consensus_consensusproto_protos_consensus_proto) JSONUnmarshal(buf []byte, msg drpc.Message) error {
|
||||
return jsonpb.Unmarshal(bytes.NewReader(buf), msg.(proto.Message))
|
||||
}
|
||||
|
||||
type DRPCConsensusClient interface {
|
||||
DRPCConn() drpc.Conn
|
||||
|
||||
LogAdd(ctx context.Context, in *LogAddRequest) (*Ok, error)
|
||||
RecordAdd(ctx context.Context, in *RecordAddRequest) (*Ok, error)
|
||||
LogWatch(ctx context.Context) (DRPCConsensus_LogWatchClient, error)
|
||||
}
|
||||
|
||||
type drpcConsensusClient struct {
|
||||
cc drpc.Conn
|
||||
}
|
||||
|
||||
func NewDRPCConsensusClient(cc drpc.Conn) DRPCConsensusClient {
|
||||
return &drpcConsensusClient{cc}
|
||||
}
|
||||
|
||||
func (c *drpcConsensusClient) DRPCConn() drpc.Conn { return c.cc }
|
||||
|
||||
func (c *drpcConsensusClient) LogAdd(ctx context.Context, in *LogAddRequest) (*Ok, error) {
|
||||
out := new(Ok)
|
||||
err := c.cc.Invoke(ctx, "/consensusProto.Consensus/LogAdd", drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}, in, out)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *drpcConsensusClient) RecordAdd(ctx context.Context, in *RecordAddRequest) (*Ok, error) {
|
||||
out := new(Ok)
|
||||
err := c.cc.Invoke(ctx, "/consensusProto.Consensus/RecordAdd", drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}, in, out)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *drpcConsensusClient) LogWatch(ctx context.Context) (DRPCConsensus_LogWatchClient, error) {
|
||||
stream, err := c.cc.NewStream(ctx, "/consensusProto.Consensus/LogWatch", drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &drpcConsensus_LogWatchClient{stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type DRPCConsensus_LogWatchClient interface {
|
||||
drpc.Stream
|
||||
Send(*LogWatchRequest) error
|
||||
Recv() (*LogWatchEvent, error)
|
||||
}
|
||||
|
||||
type drpcConsensus_LogWatchClient struct {
|
||||
drpc.Stream
|
||||
}
|
||||
|
||||
func (x *drpcConsensus_LogWatchClient) GetStream() drpc.Stream {
|
||||
return x.Stream
|
||||
}
|
||||
|
||||
func (x *drpcConsensus_LogWatchClient) Send(m *LogWatchRequest) error {
|
||||
return x.MsgSend(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{})
|
||||
}
|
||||
|
||||
func (x *drpcConsensus_LogWatchClient) Recv() (*LogWatchEvent, error) {
|
||||
m := new(LogWatchEvent)
|
||||
if err := x.MsgRecv(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (x *drpcConsensus_LogWatchClient) RecvMsg(m *LogWatchEvent) error {
|
||||
return x.MsgRecv(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{})
|
||||
}
|
||||
|
||||
type DRPCConsensusServer interface {
|
||||
LogAdd(context.Context, *LogAddRequest) (*Ok, error)
|
||||
RecordAdd(context.Context, *RecordAddRequest) (*Ok, error)
|
||||
LogWatch(DRPCConsensus_LogWatchStream) error
|
||||
}
|
||||
|
||||
type DRPCConsensusUnimplementedServer struct{}
|
||||
|
||||
func (s *DRPCConsensusUnimplementedServer) LogAdd(context.Context, *LogAddRequest) (*Ok, error) {
|
||||
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
|
||||
}
|
||||
|
||||
func (s *DRPCConsensusUnimplementedServer) RecordAdd(context.Context, *RecordAddRequest) (*Ok, error) {
|
||||
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
|
||||
}
|
||||
|
||||
func (s *DRPCConsensusUnimplementedServer) LogWatch(DRPCConsensus_LogWatchStream) error {
|
||||
return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
|
||||
}
|
||||
|
||||
type DRPCConsensusDescription struct{}
|
||||
|
||||
func (DRPCConsensusDescription) NumMethods() int { return 3 }
|
||||
|
||||
func (DRPCConsensusDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) {
|
||||
switch n {
|
||||
case 0:
|
||||
return "/consensusProto.Consensus/LogAdd", drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{},
|
||||
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
|
||||
return srv.(DRPCConsensusServer).
|
||||
LogAdd(
|
||||
ctx,
|
||||
in1.(*LogAddRequest),
|
||||
)
|
||||
}, DRPCConsensusServer.LogAdd, true
|
||||
case 1:
|
||||
return "/consensusProto.Consensus/RecordAdd", drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{},
|
||||
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
|
||||
return srv.(DRPCConsensusServer).
|
||||
RecordAdd(
|
||||
ctx,
|
||||
in1.(*RecordAddRequest),
|
||||
)
|
||||
}, DRPCConsensusServer.RecordAdd, true
|
||||
case 2:
|
||||
return "/consensusProto.Consensus/LogWatch", drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{},
|
||||
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
|
||||
return nil, srv.(DRPCConsensusServer).
|
||||
LogWatch(
|
||||
&drpcConsensus_LogWatchStream{in1.(drpc.Stream)},
|
||||
)
|
||||
}, DRPCConsensusServer.LogWatch, true
|
||||
default:
|
||||
return "", nil, nil, nil, false
|
||||
}
|
||||
}
|
||||
|
||||
func DRPCRegisterConsensus(mux drpc.Mux, impl DRPCConsensusServer) error {
|
||||
return mux.Register(impl, DRPCConsensusDescription{})
|
||||
}
|
||||
|
||||
type DRPCConsensus_LogAddStream interface {
|
||||
drpc.Stream
|
||||
SendAndClose(*Ok) error
|
||||
}
|
||||
|
||||
type drpcConsensus_LogAddStream struct {
|
||||
drpc.Stream
|
||||
}
|
||||
|
||||
func (x *drpcConsensus_LogAddStream) SendAndClose(m *Ok) error {
|
||||
if err := x.MsgSend(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}); err != nil {
|
||||
return err
|
||||
}
|
||||
return x.CloseSend()
|
||||
}
|
||||
|
||||
type DRPCConsensus_RecordAddStream interface {
|
||||
drpc.Stream
|
||||
SendAndClose(*Ok) error
|
||||
}
|
||||
|
||||
type drpcConsensus_RecordAddStream struct {
|
||||
drpc.Stream
|
||||
}
|
||||
|
||||
func (x *drpcConsensus_RecordAddStream) SendAndClose(m *Ok) error {
|
||||
if err := x.MsgSend(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}); err != nil {
|
||||
return err
|
||||
}
|
||||
return x.CloseSend()
|
||||
}
|
||||
|
||||
type DRPCConsensus_LogWatchStream interface {
|
||||
drpc.Stream
|
||||
Send(*LogWatchEvent) error
|
||||
Recv() (*LogWatchRequest, error)
|
||||
}
|
||||
|
||||
type drpcConsensus_LogWatchStream struct {
|
||||
drpc.Stream
|
||||
}
|
||||
|
||||
func (x *drpcConsensus_LogWatchStream) Send(m *LogWatchEvent) error {
|
||||
return x.MsgSend(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{})
|
||||
}
|
||||
|
||||
func (x *drpcConsensus_LogWatchStream) Recv() (*LogWatchRequest, error) {
|
||||
m := new(LogWatchRequest)
|
||||
if err := x.MsgRecv(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (x *drpcConsensus_LogWatchStream) RecvMsg(m *LogWatchRequest) error {
|
||||
return x.MsgRecv(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{})
|
||||
}
|
16
consensus/consensusproto/consensuserr/errors.go
Normal file
16
consensus/consensusproto/consensuserr/errors.go
Normal file
|
@ -0,0 +1,16 @@
|
|||
package consensuserr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/anyproto/any-sync/consensus/consensusproto"
|
||||
"github.com/anyproto/any-sync/net/rpc/rpcerr"
|
||||
)
|
||||
|
||||
var (
|
||||
errGroup = rpcerr.ErrGroup(consensusproto.ErrCodes_ErrorOffset)
|
||||
|
||||
ErrUnexpected = errGroup.Register(fmt.Errorf("unexpected consensus error"), uint64(consensusproto.ErrCodes_Unexpected))
|
||||
ErrConflict = errGroup.Register(fmt.Errorf("records conflict"), uint64(consensusproto.ErrCodes_RecordConflict))
|
||||
ErrLogExists = errGroup.Register(fmt.Errorf("log exists"), uint64(consensusproto.ErrCodes_LogExists))
|
||||
ErrLogNotFound = errGroup.Register(fmt.Errorf("log not found"), uint64(consensusproto.ErrCodes_LogNotFound))
|
||||
)
|
60
consensus/consensusproto/protos/consensus.proto
Normal file
60
consensus/consensusproto/protos/consensus.proto
Normal file
|
@ -0,0 +1,60 @@
|
|||
syntax = "proto3";
|
||||
package consensusProto;
|
||||
|
||||
option go_package = "consensus/consensusproto";
|
||||
|
||||
enum ErrCodes {
|
||||
Unexpected = 0;
|
||||
LogExists = 1;
|
||||
LogNotFound = 2;
|
||||
RecordConflict = 3;
|
||||
ErrorOffset = 300;
|
||||
}
|
||||
|
||||
|
||||
message Log {
|
||||
bytes id = 1;
|
||||
repeated Record records = 2;
|
||||
}
|
||||
|
||||
message Record {
|
||||
bytes id = 1;
|
||||
bytes prevId = 2;
|
||||
bytes payload = 3;
|
||||
uint64 createdUnix = 4;
|
||||
}
|
||||
|
||||
service Consensus {
|
||||
// AddLog adds new log to consensus
|
||||
rpc LogAdd(LogAddRequest) returns (Ok);
|
||||
// AddRecord adds new record to log
|
||||
rpc RecordAdd(RecordAddRequest) returns (Ok);
|
||||
// WatchLog fetches log and subscribes for a changes
|
||||
rpc LogWatch(stream LogWatchRequest) returns (stream LogWatchEvent);
|
||||
}
|
||||
|
||||
message Ok {}
|
||||
|
||||
message LogAddRequest {
|
||||
Log log = 1;
|
||||
}
|
||||
|
||||
message RecordAddRequest {
|
||||
bytes logId = 1;
|
||||
Record record = 2;
|
||||
}
|
||||
|
||||
message LogWatchRequest {
|
||||
repeated bytes watchIds = 1;
|
||||
repeated bytes unwatchIds = 2;
|
||||
}
|
||||
|
||||
message LogWatchEvent {
|
||||
bytes logId = 1;
|
||||
repeated Record records = 2;
|
||||
Err error = 3;
|
||||
}
|
||||
|
||||
message Err {
|
||||
ErrCodes error = 1;
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue