mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-08 05:57:03 +09:00
drpc space service, new pool, new peer
This commit is contained in:
parent
4f3c8334be
commit
d08510b8d1
25 changed files with 1136 additions and 2107 deletions
14
Makefile
14
Makefile
|
@ -18,19 +18,23 @@ protos-go:
|
|||
# Uncomment if needed
|
||||
@$(eval ROOT_PKG := pkg)
|
||||
@$(eval GOGO_START := GOGO_NO_UNDERSCORE=1 GOGO_EXPORT_ONEOF_INTERFACE=1)
|
||||
@$(eval P_TREE_STORAGE_PATH_PB := $(ROOT_PKG)/acl/treestorage/treepb)
|
||||
@$(eval P_ACL_CHANGES_PATH_PB := $(ROOT_PKG)/acl/aclchanges/aclpb)
|
||||
@$(eval P_PLAINTEXT_CHANGES_PATH_PB := $(ROOT_PKG)/acl/testutils/testchanges/testchangepb)
|
||||
@$(eval P_SYNC_CHANGES_PATH_PB := syncproto)
|
||||
@$(eval P_TEST_CHANGES_PATH_PB := $(ROOT_PKG)/acl/testutils/testchanges)
|
||||
@$(eval P_TIMESTAMP := Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types)
|
||||
@$(eval P_STRUCT := Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types)
|
||||
@$(eval P_ACL_CHANGES := M$(P_ACL_CHANGES_PATH_PB)/protos/aclchanges.proto=github.com/anytypeio/go-anytype-infrastructure-experiments/$(P_ACL_CHANGES_PATH_PB))
|
||||
@$(eval P_TREE_CHANGES := M$(P_TREE_STORAGE_PATH_PB)/protos/tree.proto=github.com/anytypeio/go-anytype-infrastructure-experiments/$(P_TREE_STORAGE_PATH_PB))
|
||||
|
||||
# use if needed $(eval PKGMAP := $$(P_TIMESTAMP),$$(P_STRUCT))
|
||||
$(GOGO_START) protoc --gogofaster_out=:. $(P_ACL_CHANGES_PATH_PB)/protos/*.proto; mv $(P_ACL_CHANGES_PATH_PB)/protos/*.go $(P_ACL_CHANGES_PATH_PB)
|
||||
$(GOGO_START) protoc --gogofaster_out=:. $(P_TEST_CHANGES_PATH_PB)/proto/*.proto
|
||||
$(eval PKGMAP := $$(P_ACL_CHANGES))
|
||||
$(GOGO_START) protoc --gogofaster_out=$(PKGMAP),plugins=grpc:. $(P_SYNC_CHANGES_PATH_PB)/proto/*.proto
|
||||
$(GOGO_START) protoc --gogofaster_out=:. $(P_TREE_STORAGE_PATH_PB)/protos/*.proto; mv $(P_TREE_STORAGE_PATH_PB)/protos/*.go $(P_TREE_STORAGE_PATH_PB)
|
||||
$(GOGO_START) protoc --gogofaster_out=:. $(P_PLAINTEXT_CHANGES_PATH_PB)/protos/*.proto; mv $(P_PLAINTEXT_CHANGES_PATH_PB)/protos/*.go $(P_PLAINTEXT_CHANGES_PATH_PB)
|
||||
$(eval PKGMAP := $$(P_ACL_CHANGES),$$(P_TREE_CHANGES))
|
||||
$(GOGO_START) protoc --gogofaster_out=$(PKGMAP):. $(P_SYNC_CHANGES_PATH_PB)/proto/*.proto
|
||||
$(GOGO_START) protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. service/space/spacesync/protos/*.proto
|
||||
|
||||
build:
|
||||
@$(eval FLAGS := $$(shell govvv -flags -pkg github.com/anytypeio/go-anytype-infrastructure-experiments/app))
|
||||
go build -o bin/anytype-node -ldflags "$(FLAGS)" cmd/node/node.go
|
||||
go build -v -o bin/anytype-node -ldflags "$(FLAGS)" cmd/node/node.go
|
|
@ -9,13 +9,13 @@ import (
|
|||
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/api"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/document"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/configuration"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc/server"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/node"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/document"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
|
||||
|
@ -97,9 +97,8 @@ func Bootstrap(a *app.App) {
|
|||
Register(secure.New()).
|
||||
Register(server.New()).
|
||||
Register(dialer.New()).
|
||||
Register(pool.NewPool()).
|
||||
Register(storage.New()).
|
||||
//Register(&example.Example{})
|
||||
Register(pool.New()).
|
||||
Register(configuration.New()).
|
||||
Register(document.New()).
|
||||
Register(message.New()).
|
||||
Register(requesthandler.New()).
|
||||
|
|
2
go.mod
2
go.mod
|
@ -47,7 +47,7 @@ require (
|
|||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/zeebo/errs v1.2.2 // indirect
|
||||
github.com/zeebo/errs v1.3.0 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/multierr v1.8.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -119,6 +119,8 @@ github.com/zeebo/blake3 v0.2.3 h1:TFoLXsjeXqRNFxSbk35Dk4YtszE/MQQGK10BH4ptoTg=
|
|||
github.com/zeebo/blake3 v0.2.3/go.mod h1:mjJjZpnsyIVtVgTOSpJ9vmRE4wgDeyt2HU3qXvvKCaQ=
|
||||
github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g=
|
||||
github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
|
||||
github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs=
|
||||
github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
|
||||
github.com/zeebo/pcg v1.0.1 h1:lyqfGeWiv4ahac6ttHs+I5hwtH/+1mrhlCtVNQM2kHo=
|
||||
github.com/zeebo/pcg v1.0.1/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4=
|
||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
|
|
|
@ -218,13 +218,13 @@ func (m *ACLChange) GetIdentity() string {
|
|||
type ACLChange_ACLContentValue struct {
|
||||
// Types that are valid to be assigned to Value:
|
||||
//
|
||||
// *ACLChange_ACLContent_Value_UserAdd
|
||||
// *ACLChange_ACLContent_Value_UserRemove
|
||||
// *ACLChange_ACLContent_Value_UserPermissionChange
|
||||
// *ACLChange_ACLContent_Value_UserInvite
|
||||
// *ACLChange_ACLContent_Value_UserJoin
|
||||
// *ACLChange_ACLContent_Value_UserConfirm
|
||||
Value isACLChange_ACLContent_Value_Value `protobuf_oneof:"value"`
|
||||
// *ACLChange_ACLContentValue_UserAdd
|
||||
// *ACLChange_ACLContentValue_UserRemove
|
||||
// *ACLChange_ACLContentValue_UserPermissionChange
|
||||
// *ACLChange_ACLContentValue_UserInvite
|
||||
// *ACLChange_ACLContentValue_UserJoin
|
||||
// *ACLChange_ACLContentValue_UserConfirm
|
||||
Value isACLChange_ACLContentValue_Value `protobuf_oneof:"value"`
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContentValue) Reset() { *m = ACLChange_ACLContentValue{} }
|
||||
|
@ -260,39 +260,39 @@ func (m *ACLChange_ACLContentValue) XXX_DiscardUnknown() {
|
|||
|
||||
var xxx_messageInfo_ACLChange_ACLContentValue proto.InternalMessageInfo
|
||||
|
||||
type isACLChange_ACLContent_Value_Value interface {
|
||||
isACLChange_ACLContent_Value_Value()
|
||||
type isACLChange_ACLContentValue_Value interface {
|
||||
isACLChange_ACLContentValue_Value()
|
||||
MarshalTo([]byte) (int, error)
|
||||
Size() int
|
||||
}
|
||||
|
||||
type ACLChange_ACLContent_Value_UserAdd struct {
|
||||
type ACLChange_ACLContentValue_UserAdd struct {
|
||||
UserAdd *ACLChange_UserAdd `protobuf:"bytes,1,opt,name=userAdd,proto3,oneof" json:"userAdd,omitempty"`
|
||||
}
|
||||
type ACLChange_ACLContent_Value_UserRemove struct {
|
||||
type ACLChange_ACLContentValue_UserRemove struct {
|
||||
UserRemove *ACLChange_UserRemove `protobuf:"bytes,2,opt,name=userRemove,proto3,oneof" json:"userRemove,omitempty"`
|
||||
}
|
||||
type ACLChange_ACLContent_Value_UserPermissionChange struct {
|
||||
type ACLChange_ACLContentValue_UserPermissionChange struct {
|
||||
UserPermissionChange *ACLChange_UserPermissionChange `protobuf:"bytes,3,opt,name=userPermissionChange,proto3,oneof" json:"userPermissionChange,omitempty"`
|
||||
}
|
||||
type ACLChange_ACLContent_Value_UserInvite struct {
|
||||
type ACLChange_ACLContentValue_UserInvite struct {
|
||||
UserInvite *ACLChange_UserInvite `protobuf:"bytes,4,opt,name=userInvite,proto3,oneof" json:"userInvite,omitempty"`
|
||||
}
|
||||
type ACLChange_ACLContent_Value_UserJoin struct {
|
||||
type ACLChange_ACLContentValue_UserJoin struct {
|
||||
UserJoin *ACLChange_UserJoin `protobuf:"bytes,5,opt,name=userJoin,proto3,oneof" json:"userJoin,omitempty"`
|
||||
}
|
||||
type ACLChange_ACLContent_Value_UserConfirm struct {
|
||||
type ACLChange_ACLContentValue_UserConfirm struct {
|
||||
UserConfirm *ACLChange_UserConfirm `protobuf:"bytes,6,opt,name=userConfirm,proto3,oneof" json:"userConfirm,omitempty"`
|
||||
}
|
||||
|
||||
func (*ACLChange_ACLContent_Value_UserAdd) isACLChange_ACLContent_Value_Value() {}
|
||||
func (*ACLChange_ACLContent_Value_UserRemove) isACLChange_ACLContent_Value_Value() {}
|
||||
func (*ACLChange_ACLContent_Value_UserPermissionChange) isACLChange_ACLContent_Value_Value() {}
|
||||
func (*ACLChange_ACLContent_Value_UserInvite) isACLChange_ACLContent_Value_Value() {}
|
||||
func (*ACLChange_ACLContent_Value_UserJoin) isACLChange_ACLContent_Value_Value() {}
|
||||
func (*ACLChange_ACLContent_Value_UserConfirm) isACLChange_ACLContent_Value_Value() {}
|
||||
func (*ACLChange_ACLContentValue_UserAdd) isACLChange_ACLContentValue_Value() {}
|
||||
func (*ACLChange_ACLContentValue_UserRemove) isACLChange_ACLContentValue_Value() {}
|
||||
func (*ACLChange_ACLContentValue_UserPermissionChange) isACLChange_ACLContentValue_Value() {}
|
||||
func (*ACLChange_ACLContentValue_UserInvite) isACLChange_ACLContentValue_Value() {}
|
||||
func (*ACLChange_ACLContentValue_UserJoin) isACLChange_ACLContentValue_Value() {}
|
||||
func (*ACLChange_ACLContentValue_UserConfirm) isACLChange_ACLContentValue_Value() {}
|
||||
|
||||
func (m *ACLChange_ACLContentValue) GetValue() isACLChange_ACLContent_Value_Value {
|
||||
func (m *ACLChange_ACLContentValue) GetValue() isACLChange_ACLContentValue_Value {
|
||||
if m != nil {
|
||||
return m.Value
|
||||
}
|
||||
|
@ -300,42 +300,42 @@ func (m *ACLChange_ACLContentValue) GetValue() isACLChange_ACLContent_Value_Valu
|
|||
}
|
||||
|
||||
func (m *ACLChange_ACLContentValue) GetUserAdd() *ACLChange_UserAdd {
|
||||
if x, ok := m.GetValue().(*ACLChange_ACLContent_Value_UserAdd); ok {
|
||||
if x, ok := m.GetValue().(*ACLChange_ACLContentValue_UserAdd); ok {
|
||||
return x.UserAdd
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContentValue) GetUserRemove() *ACLChange_UserRemove {
|
||||
if x, ok := m.GetValue().(*ACLChange_ACLContent_Value_UserRemove); ok {
|
||||
if x, ok := m.GetValue().(*ACLChange_ACLContentValue_UserRemove); ok {
|
||||
return x.UserRemove
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContentValue) GetUserPermissionChange() *ACLChange_UserPermissionChange {
|
||||
if x, ok := m.GetValue().(*ACLChange_ACLContent_Value_UserPermissionChange); ok {
|
||||
if x, ok := m.GetValue().(*ACLChange_ACLContentValue_UserPermissionChange); ok {
|
||||
return x.UserPermissionChange
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContentValue) GetUserInvite() *ACLChange_UserInvite {
|
||||
if x, ok := m.GetValue().(*ACLChange_ACLContent_Value_UserInvite); ok {
|
||||
if x, ok := m.GetValue().(*ACLChange_ACLContentValue_UserInvite); ok {
|
||||
return x.UserInvite
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContentValue) GetUserJoin() *ACLChange_UserJoin {
|
||||
if x, ok := m.GetValue().(*ACLChange_ACLContent_Value_UserJoin); ok {
|
||||
if x, ok := m.GetValue().(*ACLChange_ACLContentValue_UserJoin); ok {
|
||||
return x.UserJoin
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContentValue) GetUserConfirm() *ACLChange_UserConfirm {
|
||||
if x, ok := m.GetValue().(*ACLChange_ACLContent_Value_UserConfirm); ok {
|
||||
if x, ok := m.GetValue().(*ACLChange_ACLContentValue_UserConfirm); ok {
|
||||
return x.UserConfirm
|
||||
}
|
||||
return nil
|
||||
|
@ -344,12 +344,12 @@ func (m *ACLChange_ACLContentValue) GetUserConfirm() *ACLChange_UserConfirm {
|
|||
// XXX_OneofWrappers is for the internal use of the proto package.
|
||||
func (*ACLChange_ACLContentValue) XXX_OneofWrappers() []interface{} {
|
||||
return []interface{}{
|
||||
(*ACLChange_ACLContent_Value_UserAdd)(nil),
|
||||
(*ACLChange_ACLContent_Value_UserRemove)(nil),
|
||||
(*ACLChange_ACLContent_Value_UserPermissionChange)(nil),
|
||||
(*ACLChange_ACLContent_Value_UserInvite)(nil),
|
||||
(*ACLChange_ACLContent_Value_UserJoin)(nil),
|
||||
(*ACLChange_ACLContent_Value_UserConfirm)(nil),
|
||||
(*ACLChange_ACLContentValue_UserAdd)(nil),
|
||||
(*ACLChange_ACLContentValue_UserRemove)(nil),
|
||||
(*ACLChange_ACLContentValue_UserPermissionChange)(nil),
|
||||
(*ACLChange_ACLContentValue_UserInvite)(nil),
|
||||
(*ACLChange_ACLContentValue_UserJoin)(nil),
|
||||
(*ACLChange_ACLContentValue_UserConfirm)(nil),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1271,12 +1271,12 @@ func (m *ACLChange_ACLContentValue) MarshalToSizedBuffer(dAtA []byte) (int, erro
|
|||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContent_Value_UserAdd) MarshalTo(dAtA []byte) (int, error) {
|
||||
func (m *ACLChange_ACLContentValue_UserAdd) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContent_Value_UserAdd) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
func (m *ACLChange_ACLContentValue_UserAdd) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
if m.UserAdd != nil {
|
||||
{
|
||||
|
@ -1292,12 +1292,12 @@ func (m *ACLChange_ACLContent_Value_UserAdd) MarshalToSizedBuffer(dAtA []byte) (
|
|||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
func (m *ACLChange_ACLContent_Value_UserRemove) MarshalTo(dAtA []byte) (int, error) {
|
||||
func (m *ACLChange_ACLContentValue_UserRemove) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContent_Value_UserRemove) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
func (m *ACLChange_ACLContentValue_UserRemove) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
if m.UserRemove != nil {
|
||||
{
|
||||
|
@ -1313,12 +1313,12 @@ func (m *ACLChange_ACLContent_Value_UserRemove) MarshalToSizedBuffer(dAtA []byte
|
|||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
func (m *ACLChange_ACLContent_Value_UserPermissionChange) MarshalTo(dAtA []byte) (int, error) {
|
||||
func (m *ACLChange_ACLContentValue_UserPermissionChange) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContent_Value_UserPermissionChange) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
func (m *ACLChange_ACLContentValue_UserPermissionChange) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
if m.UserPermissionChange != nil {
|
||||
{
|
||||
|
@ -1334,12 +1334,12 @@ func (m *ACLChange_ACLContent_Value_UserPermissionChange) MarshalToSizedBuffer(d
|
|||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
func (m *ACLChange_ACLContent_Value_UserInvite) MarshalTo(dAtA []byte) (int, error) {
|
||||
func (m *ACLChange_ACLContentValue_UserInvite) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContent_Value_UserInvite) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
func (m *ACLChange_ACLContentValue_UserInvite) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
if m.UserInvite != nil {
|
||||
{
|
||||
|
@ -1355,12 +1355,12 @@ func (m *ACLChange_ACLContent_Value_UserInvite) MarshalToSizedBuffer(dAtA []byte
|
|||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
func (m *ACLChange_ACLContent_Value_UserJoin) MarshalTo(dAtA []byte) (int, error) {
|
||||
func (m *ACLChange_ACLContentValue_UserJoin) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContent_Value_UserJoin) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
func (m *ACLChange_ACLContentValue_UserJoin) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
if m.UserJoin != nil {
|
||||
{
|
||||
|
@ -1376,12 +1376,12 @@ func (m *ACLChange_ACLContent_Value_UserJoin) MarshalToSizedBuffer(dAtA []byte)
|
|||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
func (m *ACLChange_ACLContent_Value_UserConfirm) MarshalTo(dAtA []byte) (int, error) {
|
||||
func (m *ACLChange_ACLContentValue_UserConfirm) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContent_Value_UserConfirm) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
func (m *ACLChange_ACLContentValue_UserConfirm) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
if m.UserConfirm != nil {
|
||||
{
|
||||
|
@ -2039,7 +2039,7 @@ func (m *ACLChange_ACLContentValue) Size() (n int) {
|
|||
return n
|
||||
}
|
||||
|
||||
func (m *ACLChange_ACLContent_Value_UserAdd) Size() (n int) {
|
||||
func (m *ACLChange_ACLContentValue_UserAdd) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -2051,7 +2051,7 @@ func (m *ACLChange_ACLContent_Value_UserAdd) Size() (n int) {
|
|||
}
|
||||
return n
|
||||
}
|
||||
func (m *ACLChange_ACLContent_Value_UserRemove) Size() (n int) {
|
||||
func (m *ACLChange_ACLContentValue_UserRemove) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -2063,7 +2063,7 @@ func (m *ACLChange_ACLContent_Value_UserRemove) Size() (n int) {
|
|||
}
|
||||
return n
|
||||
}
|
||||
func (m *ACLChange_ACLContent_Value_UserPermissionChange) Size() (n int) {
|
||||
func (m *ACLChange_ACLContentValue_UserPermissionChange) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -2075,7 +2075,7 @@ func (m *ACLChange_ACLContent_Value_UserPermissionChange) Size() (n int) {
|
|||
}
|
||||
return n
|
||||
}
|
||||
func (m *ACLChange_ACLContent_Value_UserInvite) Size() (n int) {
|
||||
func (m *ACLChange_ACLContentValue_UserInvite) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -2087,7 +2087,7 @@ func (m *ACLChange_ACLContent_Value_UserInvite) Size() (n int) {
|
|||
}
|
||||
return n
|
||||
}
|
||||
func (m *ACLChange_ACLContent_Value_UserJoin) Size() (n int) {
|
||||
func (m *ACLChange_ACLContentValue_UserJoin) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -2099,7 +2099,7 @@ func (m *ACLChange_ACLContent_Value_UserJoin) Size() (n int) {
|
|||
}
|
||||
return n
|
||||
}
|
||||
func (m *ACLChange_ACLContent_Value_UserConfirm) Size() (n int) {
|
||||
func (m *ACLChange_ACLContentValue_UserConfirm) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -2871,7 +2871,7 @@ func (m *ACLChange_ACLContentValue) Unmarshal(dAtA []byte) error {
|
|||
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
m.Value = &ACLChange_ACLContent_Value_UserAdd{v}
|
||||
m.Value = &ACLChange_ACLContentValue_UserAdd{v}
|
||||
iNdEx = postIndex
|
||||
case 2:
|
||||
if wireType != 2 {
|
||||
|
@ -2906,7 +2906,7 @@ func (m *ACLChange_ACLContentValue) Unmarshal(dAtA []byte) error {
|
|||
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
m.Value = &ACLChange_ACLContent_Value_UserRemove{v}
|
||||
m.Value = &ACLChange_ACLContentValue_UserRemove{v}
|
||||
iNdEx = postIndex
|
||||
case 3:
|
||||
if wireType != 2 {
|
||||
|
@ -2941,7 +2941,7 @@ func (m *ACLChange_ACLContentValue) Unmarshal(dAtA []byte) error {
|
|||
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
m.Value = &ACLChange_ACLContent_Value_UserPermissionChange{v}
|
||||
m.Value = &ACLChange_ACLContentValue_UserPermissionChange{v}
|
||||
iNdEx = postIndex
|
||||
case 4:
|
||||
if wireType != 2 {
|
||||
|
@ -2976,7 +2976,7 @@ func (m *ACLChange_ACLContentValue) Unmarshal(dAtA []byte) error {
|
|||
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
m.Value = &ACLChange_ACLContent_Value_UserInvite{v}
|
||||
m.Value = &ACLChange_ACLContentValue_UserInvite{v}
|
||||
iNdEx = postIndex
|
||||
case 5:
|
||||
if wireType != 2 {
|
||||
|
@ -3011,7 +3011,7 @@ func (m *ACLChange_ACLContentValue) Unmarshal(dAtA []byte) error {
|
|||
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
m.Value = &ACLChange_ACLContent_Value_UserJoin{v}
|
||||
m.Value = &ACLChange_ACLContentValue_UserJoin{v}
|
||||
iNdEx = postIndex
|
||||
case 6:
|
||||
if wireType != 2 {
|
||||
|
@ -3046,7 +3046,7 @@ func (m *ACLChange_ACLContentValue) Unmarshal(dAtA []byte) error {
|
|||
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
m.Value = &ACLChange_ACLContent_Value_UserConfirm{v}
|
||||
m.Value = &ACLChange_ACLContentValue_UserConfirm{v}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
|
|
|
@ -69,10 +69,13 @@ type Object interface {
|
|||
}
|
||||
|
||||
type ObjectLocker interface {
|
||||
Object
|
||||
Locked() bool
|
||||
}
|
||||
|
||||
type ObjectLastUsage interface {
|
||||
LastUsage() time.Time
|
||||
}
|
||||
|
||||
type entry struct {
|
||||
id string
|
||||
lastUsage time.Time
|
||||
|
@ -99,7 +102,7 @@ type OCache interface {
|
|||
// When 'loadFunc' returns a non-nil error, an object will not be stored to cache
|
||||
Get(ctx context.Context, id string) (value Object, err error)
|
||||
// Pick returns value if it's presents in cache (will not call loadFunc)
|
||||
Pick(id string) (value Object, err error)
|
||||
Pick(ctx context.Context, id string) (value Object, err error)
|
||||
// Add adds new object to cache
|
||||
// Returns error when object exists
|
||||
Add(id string, value Object) (err error)
|
||||
|
@ -166,13 +169,18 @@ func (c *oCache) Get(ctx context.Context, id string) (value Object, err error) {
|
|||
return e.value, e.loadErr
|
||||
}
|
||||
|
||||
func (c *oCache) Pick(id string) (value Object, err error) {
|
||||
func (c *oCache) Pick(ctx context.Context, id string) (value Object, err error) {
|
||||
c.mu.Lock()
|
||||
val, ok := c.data[id]
|
||||
c.mu.Unlock()
|
||||
if !ok {
|
||||
return nil, ErrNotExists
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-val.load:
|
||||
}
|
||||
<-val.load
|
||||
return val.value, val.loadErr
|
||||
}
|
||||
|
@ -307,7 +315,11 @@ func (c *oCache) GC() {
|
|||
deadline := c.timeNow().Add(-c.ttl)
|
||||
var toClose []*entry
|
||||
for k, e := range c.data {
|
||||
if !e.locked() && e.refCount <= 0 && e.lastUsage.Before(deadline) {
|
||||
lu := e.lastUsage
|
||||
if lug, ok := e.value.(ObjectLastUsage); ok {
|
||||
lu = lug.LastUsage()
|
||||
}
|
||||
if !e.locked() && e.refCount <= 0 && lu.Before(deadline) {
|
||||
delete(c.data, k)
|
||||
toClose = append(toClose, e)
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ func (c *configuration) AllPeers(ctx context.Context, spaceId string) (peers []p
|
|||
nodeIds := c.NodeIds(spaceId)
|
||||
peers = make([]peer.Peer, 0, len(nodeIds))
|
||||
for _, id := range nodeIds {
|
||||
p, e := c.pool.DialAndAddPeer(ctx, id)
|
||||
p, e := c.pool.Get(ctx, id)
|
||||
if e == nil {
|
||||
peers = append(peers, p)
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ func (c *configuration) AllPeers(ctx context.Context, spaceId string) (peers []p
|
|||
|
||||
func (c *configuration) OnePeer(ctx context.Context, spaceId string) (p peer.Peer, err error) {
|
||||
nodeIds := c.NodeIds(spaceId)
|
||||
return c.pool.GetOrDialOneOf(ctx, nodeIds)
|
||||
return c.pool.GetOneOf(ctx, nodeIds)
|
||||
}
|
||||
|
||||
func (c *configuration) NodeIds(spaceId string) []string {
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure"
|
||||
"github.com/libp2p/go-libp2p-core/sec"
|
||||
"go.uber.org/zap"
|
||||
|
@ -60,7 +59,7 @@ func (d *dialer) UpdateAddrs(addrs map[string][]string) {
|
|||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
func (d *dialer) Dial(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err error) {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
addrs, ok := d.peerAddrs[peerId]
|
||||
|
@ -68,11 +67,11 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (peer peer.Peer, err e
|
|||
return nil, ErrArrdsNotFound
|
||||
}
|
||||
var (
|
||||
stream drpc.Stream
|
||||
sc sec.SecureConn
|
||||
conn drpc.Conn
|
||||
sc sec.SecureConn
|
||||
)
|
||||
for _, addr := range addrs {
|
||||
stream, sc, err = d.makeStream(ctx, addr)
|
||||
conn, sc, err = d.handshake(ctx, addr)
|
||||
if err != nil {
|
||||
log.Info("can't connect to host", zap.String("addr", addr))
|
||||
} else {
|
||||
|
@ -83,10 +82,10 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (peer peer.Peer, err e
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
return rpc.PeerFromStream(sc, stream, false), nil
|
||||
return peer.NewPeer(sc, conn), nil
|
||||
}
|
||||
|
||||
func (d *dialer) makeStream(ctx context.Context, addr string) (stream drpc.Stream, sc sec.SecureConn, err error) {
|
||||
func (d *dialer) handshake(ctx context.Context, addr string) (conn drpc.Conn, sc sec.SecureConn, err error) {
|
||||
tcpConn, err := net.Dial("tcp", addr)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -96,9 +95,6 @@ func (d *dialer) makeStream(ctx context.Context, addr string) (stream drpc.Strea
|
|||
return
|
||||
}
|
||||
log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("per", sc.LocalPeer().String()))
|
||||
stream, err = drpcconn.New(sc).NewStream(ctx, "", rpc.Encoding)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return stream, sc, err
|
||||
conn = drpcconn.New(sc)
|
||||
return conn, sc, err
|
||||
}
|
||||
|
|
|
@ -2,34 +2,58 @@ package peer
|
|||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
|
||||
"github.com/libp2p/go-libp2p-core/sec"
|
||||
"storj.io/drpc"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Dir uint
|
||||
|
||||
const (
|
||||
// DirInbound indicates peer created connection
|
||||
DirInbound Dir = iota
|
||||
// DirOutbound indicates that our host created connection
|
||||
DirOutbound
|
||||
)
|
||||
|
||||
type Info struct {
|
||||
Id string
|
||||
Dir Dir
|
||||
LastActiveUnix int64
|
||||
}
|
||||
|
||||
func (i Info) LastActive() time.Time {
|
||||
return time.Unix(i.LastActiveUnix, 0)
|
||||
func NewPeer(sc sec.SecureConn, conn drpc.Conn) Peer {
|
||||
return &peer{
|
||||
id: sc.RemotePeer().String(),
|
||||
lastUsage: time.Now().Unix(),
|
||||
sc: sc,
|
||||
Conn: conn,
|
||||
}
|
||||
}
|
||||
|
||||
type Peer interface {
|
||||
Id() string
|
||||
Info() Info
|
||||
Recv() (*syncproto.Message, error)
|
||||
Send(msg *syncproto.Message) (err error)
|
||||
Context() context.Context
|
||||
Close() error
|
||||
LastUsage() time.Time
|
||||
UpdateLastUsage()
|
||||
drpc.Conn
|
||||
}
|
||||
|
||||
type peer struct {
|
||||
id string
|
||||
lastUsage int64
|
||||
sc sec.SecureConn
|
||||
drpc.Conn
|
||||
}
|
||||
|
||||
func (p *peer) Id() string {
|
||||
return p.id
|
||||
}
|
||||
|
||||
func (p *peer) LastUsage() time.Time {
|
||||
select {
|
||||
case <-p.Closed():
|
||||
return time.Unix(0, 0)
|
||||
default:
|
||||
}
|
||||
return time.Unix(atomic.LoadInt64(&p.lastUsage), 0)
|
||||
}
|
||||
|
||||
func (p *peer) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error {
|
||||
defer p.UpdateLastUsage()
|
||||
return p.Conn.Invoke(ctx, rpc, enc, in, out)
|
||||
}
|
||||
|
||||
func (p *peer) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) {
|
||||
defer p.UpdateLastUsage()
|
||||
return p.Conn.NewStream(ctx, rpc, enc)
|
||||
}
|
||||
|
||||
func (p *peer) UpdateLastUsage() {
|
||||
atomic.StoreInt64(&p.lastUsage, time.Now().Unix())
|
||||
}
|
||||
|
|
|
@ -1,32 +0,0 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var log = logger.NewNamed("replyHandler")
|
||||
|
||||
type ReplyHandler interface {
|
||||
Handle(ctx context.Context, req []byte) (rep proto.Marshaler, err error)
|
||||
}
|
||||
|
||||
type Reply struct {
|
||||
ReplyHandler
|
||||
}
|
||||
|
||||
func (r Reply) Handle(ctx context.Context, msg *pool.Message) error {
|
||||
rep, e := r.ReplyHandler.Handle(ctx, msg.GetData())
|
||||
if msg.GetHeader().RequestId == 0 {
|
||||
if e != nil {
|
||||
log.Error("handler returned error", zap.Error(e))
|
||||
} else if rep != nil {
|
||||
log.Debug("sender didn't expect a reply, but the handler made")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return msg.ReplyType(msg.GetHeader().GetType(), rep)
|
||||
}
|
|
@ -1,136 +0,0 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
*syncproto.Message
|
||||
peer peer.Peer
|
||||
}
|
||||
|
||||
func (m *Message) Peer() peer.Peer {
|
||||
return m.peer
|
||||
}
|
||||
|
||||
func (m *Message) Reply(data []byte) (err error) {
|
||||
rep := &syncproto.Message{
|
||||
Header: &syncproto.Header{
|
||||
TraceId: m.GetHeader().TraceId,
|
||||
ReplyId: m.GetHeader().RequestId,
|
||||
Type: syncproto.MessageType_MessageTypeSync,
|
||||
},
|
||||
Data: data,
|
||||
}
|
||||
return m.peer.Send(rep)
|
||||
}
|
||||
|
||||
func (m *Message) ReplyType(tp syncproto.MessageType, data proto.Marshaler) (err error) {
|
||||
dataBytes, err := data.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rep := &syncproto.Message{
|
||||
Header: &syncproto.Header{
|
||||
TraceId: m.GetHeader().TraceId,
|
||||
ReplyId: m.GetHeader().RequestId,
|
||||
Type: tp,
|
||||
},
|
||||
Data: dataBytes,
|
||||
}
|
||||
return m.peer.Send(rep)
|
||||
}
|
||||
|
||||
func (m *Message) Ack() (err error) {
|
||||
ack := &syncproto.System{
|
||||
Ack: &syncproto.System_Ack{},
|
||||
}
|
||||
data, err := ack.Marshal()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
rep := &syncproto.Message{
|
||||
Header: &syncproto.Header{
|
||||
TraceId: m.GetHeader().TraceId,
|
||||
ReplyId: m.GetHeader().RequestId,
|
||||
Type: syncproto.MessageType_MessageTypeSystem,
|
||||
DebugInfo: "Ack",
|
||||
},
|
||||
Data: data,
|
||||
}
|
||||
err = m.peer.Send(rep)
|
||||
if err != nil {
|
||||
log.With(
|
||||
zap.String("peerId", m.peer.Id()),
|
||||
zap.String("header", rep.GetHeader().String())).
|
||||
Error("failed sending ack to peer", zap.Error(err))
|
||||
} else {
|
||||
log.With(
|
||||
zap.String("peerId", m.peer.Id()),
|
||||
zap.String("header", rep.GetHeader().String())).
|
||||
Debug("sent ack to peer")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Message) AckError(code syncproto.System_Error_Code, description string) (err error) {
|
||||
ack := &syncproto.System{
|
||||
Ack: &syncproto.System_Ack{
|
||||
Error: &syncproto.System_Error{
|
||||
Code: code,
|
||||
Description: description,
|
||||
},
|
||||
},
|
||||
}
|
||||
data, err := ack.Marshal()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
rep := &syncproto.Message{
|
||||
Header: &syncproto.Header{
|
||||
TraceId: []byte(bson.NewObjectId()),
|
||||
ReplyId: m.GetHeader().RequestId,
|
||||
Type: syncproto.MessageType_MessageTypeSystem,
|
||||
DebugInfo: "AckError",
|
||||
},
|
||||
Data: data,
|
||||
}
|
||||
if err != nil {
|
||||
log.With(
|
||||
zap.String("peerId", m.peer.Id()),
|
||||
zap.String("header", rep.GetHeader().String())).
|
||||
Error("failed sending ackError to peer", zap.Error(err))
|
||||
} else {
|
||||
log.With(
|
||||
zap.String("peerId", m.peer.Id()),
|
||||
zap.String("header", rep.GetHeader().String())).
|
||||
Debug("sent ackError to peer")
|
||||
}
|
||||
return m.peer.Send(rep)
|
||||
}
|
||||
|
||||
func (m *Message) IsAck() (err error) {
|
||||
if tp := m.GetHeader().GetType(); tp != syncproto.MessageType_MessageTypeSystem {
|
||||
return fmt.Errorf("unexpected message type in response: %v, want System", tp)
|
||||
}
|
||||
sys := &syncproto.System{}
|
||||
if err = sys.Unmarshal(m.GetData()); err != nil {
|
||||
return
|
||||
}
|
||||
if ack := sys.Ack; ack != nil {
|
||||
if ack.Error != nil {
|
||||
return fmt.Errorf("response error: code=%d; descriptipon=%s", ack.Error.Code, ack.Error.Description)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("received not ack response")
|
||||
}
|
||||
|
||||
func (m *Message) UnmarshalData(msg proto.Unmarshaler) error {
|
||||
return msg.Unmarshal(m.Data)
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
|
||||
)
|
||||
|
||||
type peerEntry struct {
|
||||
peer peer.Peer
|
||||
groupIds []string
|
||||
ready chan struct{}
|
||||
}
|
||||
|
||||
func (pe *peerEntry) addGroup(groupId string) (ok bool) {
|
||||
if slice.FindPos(pe.groupIds, groupId) != -1 {
|
||||
return false
|
||||
}
|
||||
pe.groupIds = append(pe.groupIds, groupId)
|
||||
return true
|
||||
}
|
||||
|
||||
func (pe *peerEntry) removeGroup(groupId string) (ok bool) {
|
||||
if slice.FindPos(pe.groupIds, groupId) == -1 {
|
||||
return false
|
||||
}
|
||||
pe.groupIds = slice.Remove(pe.groupIds, groupId)
|
||||
return true
|
||||
}
|
|
@ -3,72 +3,47 @@ package pool
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
|
||||
"go.uber.org/zap"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
const (
|
||||
CName = "sync/peerPool"
|
||||
maxSimultaneousOperationsPerStream = 10
|
||||
CName = "net.pool"
|
||||
)
|
||||
|
||||
var log = logger.NewNamed("peerPool")
|
||||
var log = logger.NewNamed(CName)
|
||||
|
||||
var (
|
||||
ErrPoolClosed = errors.New("peer pool is closed")
|
||||
ErrPeerNotFound = errors.New("peer not found")
|
||||
ErrUnableToConnect = errors.New("unable to connect")
|
||||
)
|
||||
|
||||
func NewPool() Pool {
|
||||
return &pool{closed: true}
|
||||
func New() Pool {
|
||||
return &pool{}
|
||||
}
|
||||
|
||||
type Handler func(ctx context.Context, msg *Message) (err error)
|
||||
|
||||
// Pool creates and caches outgoing connection
|
||||
type Pool interface {
|
||||
AddAndReadPeer(peer peer.Peer) (err error)
|
||||
AddHandler(msgType syncproto.MessageType, h Handler)
|
||||
AddPeerIdToGroup(peerId, groupId string) (err error)
|
||||
RemovePeerIdFromGroup(peerId, groupId string) (err error)
|
||||
DialAndAddPeer(ctx context.Context, id string) (peer.Peer, error)
|
||||
GetOrDialOneOf(ctx context.Context, peerIds []string) (peer.Peer, error)
|
||||
|
||||
SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error)
|
||||
SendAndWaitResponse(ctx context.Context, id string, s *syncproto.Message) (resp *Message, err error)
|
||||
Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error)
|
||||
// Get lookups to peer in existing connections or creates and cache new one
|
||||
Get(ctx context.Context, id string) (peer.Peer, error)
|
||||
// GetOneOf searches at least one existing connection in cache or creates a new one from a randomly selected id from given list
|
||||
GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error)
|
||||
|
||||
app.ComponentRunnable
|
||||
}
|
||||
|
||||
type pool struct {
|
||||
peersById map[string]*peerEntry
|
||||
waiters *waiters
|
||||
handlers map[syncproto.MessageType][]Handler
|
||||
peersIdsByGroup map[string][]string
|
||||
|
||||
dialer dialer.Dialer
|
||||
|
||||
closed bool
|
||||
mu sync.RWMutex
|
||||
wg *sync.WaitGroup
|
||||
cache ocache.OCache
|
||||
}
|
||||
|
||||
func (p *pool) Init(ctx context.Context, a *app.App) (err error) {
|
||||
p.peersById = map[string]*peerEntry{}
|
||||
p.handlers = map[syncproto.MessageType][]Handler{}
|
||||
p.peersIdsByGroup = map[string][]string{}
|
||||
p.waiters = &waiters{waiters: map[uint64]*waiter{}}
|
||||
p.dialer = a.MustComponent(dialer.CName).(dialer.Dialer)
|
||||
p.wg = &sync.WaitGroup{}
|
||||
dialer := a.MustComponent(dialer.CName).(dialer.Dialer)
|
||||
p.cache = ocache.New(func(ctx context.Context, id string) (value ocache.Object, err error) {
|
||||
return dialer.Dial(ctx, id)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -77,320 +52,49 @@ func (p *pool) Name() (name string) {
|
|||
}
|
||||
|
||||
func (p *pool) Run(ctx context.Context) (err error) {
|
||||
p.closed = false
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pool) AddHandler(msgType syncproto.MessageType, h Handler) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if !p.closed {
|
||||
// unable to add handler after Run
|
||||
return
|
||||
}
|
||||
p.handlers[msgType] = append(p.handlers[msgType], h)
|
||||
}
|
||||
|
||||
func (p *pool) DialAndAddPeer(ctx context.Context, peerId string) (peer.Peer, error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if p.closed {
|
||||
return nil, ErrPoolClosed
|
||||
}
|
||||
return p.dialAndAdd(ctx, peerId)
|
||||
}
|
||||
|
||||
func (p *pool) dialAndAdd(ctx context.Context, peerId string) (peer.Peer, error) {
|
||||
if peer, ok := p.peersById[peerId]; ok {
|
||||
return peer.peer, nil
|
||||
}
|
||||
peer, err := p.dialer.Dial(ctx, peerId)
|
||||
func (p *pool) Get(ctx context.Context, id string) (peer.Peer, error) {
|
||||
v, err := p.cache.Get(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.peersById[peer.Id()] = &peerEntry{
|
||||
peer: peer,
|
||||
}
|
||||
p.wg.Add(1)
|
||||
go p.readPeerLoop(peer)
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
func (p *pool) AddAndReadPeer(peer peer.Peer) (err error) {
|
||||
p.mu.Lock()
|
||||
if p.closed {
|
||||
p.mu.Unlock()
|
||||
return ErrPoolClosed
|
||||
}
|
||||
p.peersById[peer.Id()] = &peerEntry{
|
||||
peer: peer,
|
||||
}
|
||||
p.wg.Add(1)
|
||||
p.mu.Unlock()
|
||||
return p.readPeerLoop(peer)
|
||||
}
|
||||
|
||||
func (p *pool) AddPeerIdToGroup(peerId, groupId string) (err error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
peer, ok := p.peersById[peerId]
|
||||
if !ok {
|
||||
return ErrPeerNotFound
|
||||
}
|
||||
if slice.FindPos(peer.groupIds, groupId) != -1 {
|
||||
return nil
|
||||
}
|
||||
peer.addGroup(groupId)
|
||||
p.peersIdsByGroup[groupId] = append(p.peersIdsByGroup[groupId], peerId)
|
||||
return
|
||||
}
|
||||
|
||||
func (p *pool) RemovePeerIdFromGroup(peerId, groupId string) (err error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
peer, ok := p.peersById[peerId]
|
||||
if !ok {
|
||||
return ErrPeerNotFound
|
||||
}
|
||||
if slice.FindPos(peer.groupIds, groupId) == -1 {
|
||||
return nil
|
||||
}
|
||||
peer.removeGroup(groupId)
|
||||
p.peersIdsByGroup[groupId] = slice.Remove(p.peersIdsByGroup[groupId], peerId)
|
||||
return
|
||||
}
|
||||
|
||||
func (p *pool) SendAndWait(ctx context.Context, peerId string, msg *syncproto.Message) (err error) {
|
||||
resp, err := p.SendAndWaitResponse(ctx, peerId, msg)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return resp.IsAck()
|
||||
}
|
||||
|
||||
func (p *pool) SendAndWaitResponse(ctx context.Context, peerId string, msg *syncproto.Message) (resp *Message, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.With(
|
||||
zap.String("peerId", peerId),
|
||||
zap.String("header", msg.GetHeader().String())).
|
||||
Error("failed sending message to peer", zap.Error(err))
|
||||
} else {
|
||||
log.With(
|
||||
zap.String("peerId", peerId),
|
||||
zap.String("header", msg.GetHeader().String())).
|
||||
Debug("sent message to peer")
|
||||
}
|
||||
}()
|
||||
|
||||
p.mu.RLock()
|
||||
peer := p.peersById[peerId]
|
||||
p.mu.RUnlock()
|
||||
if peer == nil {
|
||||
err = ErrPeerNotFound
|
||||
return
|
||||
}
|
||||
|
||||
repId := p.waiters.NewReplyId()
|
||||
msg.GetHeader().RequestId = repId
|
||||
ch := make(chan Reply, 1)
|
||||
|
||||
p.waiters.Add(repId, &waiter{ch: ch})
|
||||
defer p.waiters.Remove(repId)
|
||||
|
||||
if err = peer.peer.Send(msg); err != nil {
|
||||
return
|
||||
}
|
||||
pr := v.(peer.Peer)
|
||||
select {
|
||||
case rep := <-ch:
|
||||
if rep.Error != nil {
|
||||
err = rep.Error
|
||||
return
|
||||
}
|
||||
resp = rep.Message
|
||||
return
|
||||
case <-ctx.Done():
|
||||
log.Debug("context done in SendAndWait")
|
||||
err = ctx.Err()
|
||||
case <-pr.Closed():
|
||||
default:
|
||||
return pr, nil
|
||||
}
|
||||
return
|
||||
p.cache.Remove(id)
|
||||
return p.Get(ctx, id)
|
||||
}
|
||||
|
||||
func (p *pool) GetOrDialOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) {
|
||||
p.mu.RLock()
|
||||
if p.closed {
|
||||
p.mu.RUnlock()
|
||||
return nil, ErrPoolClosed
|
||||
}
|
||||
func (p *pool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) {
|
||||
// finding existing connection
|
||||
for _, peerId := range peerIds {
|
||||
peer, ok := p.peersById[peerId]
|
||||
if ok {
|
||||
p.mu.RUnlock()
|
||||
return peer.peer, nil
|
||||
if v, err := p.cache.Pick(ctx, peerId); err == nil {
|
||||
pr := v.(peer.Peer)
|
||||
select {
|
||||
case <-pr.Closed():
|
||||
default:
|
||||
return pr, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
// shuffle ids for better consistency
|
||||
rand.Shuffle(len(peerIds), func(i, j int) {
|
||||
peerIds[i], peerIds[j] = peerIds[j], peerIds[i]
|
||||
})
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
var lastErr error
|
||||
// connecting
|
||||
for _, peerId := range peerIds {
|
||||
peer, err := p.dialAndAdd(ctx, peerId)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
} else {
|
||||
return peer, nil
|
||||
}
|
||||
}
|
||||
return nil, lastErr
|
||||
}
|
||||
|
||||
func (p *pool) Broadcast(ctx context.Context, groupId string, msg *syncproto.Message) (err error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (p *pool) readPeerLoop(peer peer.Peer) (err error) {
|
||||
defer p.wg.Done()
|
||||
|
||||
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream)
|
||||
for i := 0; i < maxSimultaneousOperationsPerStream; i++ {
|
||||
limiter <- struct{}{}
|
||||
}
|
||||
Loop:
|
||||
for {
|
||||
msg, err := peer.Recv()
|
||||
if err != nil {
|
||||
log.Debug("peer receive error", zap.Error(err), zap.String("peerId", peer.Id()))
|
||||
break
|
||||
}
|
||||
select {
|
||||
case <-limiter:
|
||||
case <-peer.Context().Done():
|
||||
break Loop
|
||||
}
|
||||
go func() {
|
||||
defer func() {
|
||||
limiter <- struct{}{}
|
||||
}()
|
||||
p.handleMessage(peer, msg)
|
||||
}()
|
||||
}
|
||||
if err = p.removePeer(peer.Id()); err != nil {
|
||||
log.Error("remove peer error", zap.String("peerId", peer.Id()), zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (p *pool) removePeer(peerId string) (err error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
_, ok := p.peersById[peerId]
|
||||
if !ok {
|
||||
return ErrPeerNotFound
|
||||
}
|
||||
delete(p.peersById, peerId)
|
||||
return
|
||||
}
|
||||
|
||||
func (p *pool) handleMessage(peer peer.Peer, msg *syncproto.Message) {
|
||||
log.With(zap.String("peerId", peer.Id()), zap.String("header", msg.GetHeader().String())).
|
||||
Debug("received message from peer")
|
||||
replyId := msg.GetHeader().GetReplyId()
|
||||
if replyId != 0 {
|
||||
if !p.waiters.Send(replyId, Reply{
|
||||
PeerInfo: peer.Info(),
|
||||
Message: &Message{
|
||||
Message: msg,
|
||||
peer: peer,
|
||||
},
|
||||
}) {
|
||||
log.Debug("received reply with unknown (or expired) replyId", zap.Uint64("replyId", replyId), zap.String("header", msg.GetHeader().String()))
|
||||
}
|
||||
return
|
||||
}
|
||||
handlers := p.handlers[msg.GetHeader().GetType()]
|
||||
if len(handlers) == 0 {
|
||||
log.With(zap.String("peerId", peer.Id())).Debug("no handlers for such message")
|
||||
return
|
||||
}
|
||||
|
||||
message := &Message{Message: msg, peer: peer}
|
||||
|
||||
for _, h := range handlers {
|
||||
if err := h(peer.Context(), message); err != nil {
|
||||
log.Error("handle message error", zap.Error(err))
|
||||
if v, err := p.cache.Get(ctx, peerId); err == nil {
|
||||
return v.(peer.Peer), nil
|
||||
}
|
||||
}
|
||||
return nil, ErrUnableToConnect
|
||||
}
|
||||
|
||||
func (p *pool) Close(ctx context.Context) (err error) {
|
||||
p.mu.Lock()
|
||||
for _, peer := range p.peersById {
|
||||
peer.peer.Close()
|
||||
}
|
||||
wg := p.wg
|
||||
p.mu.Unlock()
|
||||
if wg != nil {
|
||||
wg.Wait()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type waiter struct {
|
||||
sent int
|
||||
ch chan<- Reply
|
||||
}
|
||||
|
||||
type waiters struct {
|
||||
waiters map[uint64]*waiter
|
||||
replySeq uint64
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (w *waiters) Send(replyId uint64, r Reply) (ok bool) {
|
||||
w.mu.Lock()
|
||||
wait := w.waiters[replyId]
|
||||
if wait == nil {
|
||||
w.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
wait.sent++
|
||||
var lastMessage = wait.sent == cap(wait.ch)
|
||||
if lastMessage {
|
||||
delete(w.waiters, replyId)
|
||||
}
|
||||
w.mu.Unlock()
|
||||
wait.ch <- r
|
||||
if lastMessage {
|
||||
close(wait.ch)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (w *waiters) Add(replyId uint64, wait *waiter) {
|
||||
w.mu.Lock()
|
||||
w.waiters[replyId] = wait
|
||||
w.mu.Unlock()
|
||||
}
|
||||
|
||||
func (w *waiters) Remove(id uint64) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
if _, ok := w.waiters[id]; ok {
|
||||
delete(w.waiters, id)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("waiter not found")
|
||||
}
|
||||
|
||||
func (w *waiters) NewReplyId() uint64 {
|
||||
res := atomic.AddUint64(&w.replySeq, 1)
|
||||
if res == 0 {
|
||||
return w.NewReplyId()
|
||||
}
|
||||
return res
|
||||
return p.cache.Close()
|
||||
}
|
||||
|
|
213
service/net/pool/pool_test.go
Normal file
213
service/net/pool/pool_test.go
Normal file
|
@ -0,0 +1,213 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"storj.io/drpc"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var ctx = context.Background()
|
||||
|
||||
func TestPool_Get(t *testing.T) {
|
||||
t.Run("dial error", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
var expErr = errors.New("dial error")
|
||||
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
return nil, expErr
|
||||
}
|
||||
p, err := fx.Get(ctx, "1")
|
||||
assert.Nil(t, p)
|
||||
assert.EqualError(t, err, expErr.Error())
|
||||
})
|
||||
t.Run("dial and cached", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
return newTestPeer("1"), nil
|
||||
}
|
||||
p, err := fx.Get(ctx, "1")
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, p)
|
||||
fx.Dialer.dial = nil
|
||||
p, err = fx.Get(ctx, "1")
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, p)
|
||||
})
|
||||
t.Run("retry for closed", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
tp := newTestPeer("1")
|
||||
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
return tp, nil
|
||||
}
|
||||
p, err := fx.Get(ctx, "1")
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, p)
|
||||
p.Close()
|
||||
tp2 := newTestPeer("1")
|
||||
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
return tp2, nil
|
||||
}
|
||||
p, err = fx.Get(ctx, "1")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, p, tp2)
|
||||
})
|
||||
}
|
||||
|
||||
func TestPool_GetOneOf(t *testing.T) {
|
||||
addToCache := func(t *testing.T, fx *fixture, tp *testPeer) {
|
||||
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
return tp, nil
|
||||
}
|
||||
gp, err := fx.Get(ctx, tp.Id())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, gp, tp)
|
||||
}
|
||||
|
||||
t.Run("from cache", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
tp1 := newTestPeer("1")
|
||||
addToCache(t, fx, tp1)
|
||||
p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tp1, p)
|
||||
})
|
||||
t.Run("from cache - skip closed", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
tp2 := newTestPeer("2")
|
||||
addToCache(t, fx, tp2)
|
||||
tp2.Close()
|
||||
tp1 := newTestPeer("1")
|
||||
addToCache(t, fx, tp1)
|
||||
p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tp1, p)
|
||||
})
|
||||
t.Run("dial", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
var called bool
|
||||
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
if called {
|
||||
return nil, fmt.Errorf("not expected call")
|
||||
}
|
||||
called = true
|
||||
return newTestPeer(peerId), nil
|
||||
}
|
||||
p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"})
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, p)
|
||||
})
|
||||
t.Run("unable to connect", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.Finish()
|
||||
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
return nil, fmt.Errorf("persistent error")
|
||||
}
|
||||
p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"})
|
||||
assert.Equal(t, ErrUnableToConnect, err)
|
||||
assert.Nil(t, p)
|
||||
})
|
||||
}
|
||||
|
||||
func newFixture(t *testing.T) *fixture {
|
||||
fx := &fixture{
|
||||
Pool: New(),
|
||||
Dialer: &dialerMock{},
|
||||
}
|
||||
a := new(app.App)
|
||||
a.Register(fx.Pool)
|
||||
a.Register(fx.Dialer)
|
||||
require.NoError(t, a.Start(context.Background()))
|
||||
fx.a = a
|
||||
fx.t = t
|
||||
return fx
|
||||
}
|
||||
|
||||
func (fx *fixture) Finish() {
|
||||
require.NoError(fx.t, fx.a.Close(context.Background()))
|
||||
}
|
||||
|
||||
type fixture struct {
|
||||
Pool
|
||||
Dialer *dialerMock
|
||||
a *app.App
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
var _ dialer.Dialer = (*dialerMock)(nil)
|
||||
|
||||
type dialerMock struct {
|
||||
dial func(ctx context.Context, peerId string) (peer peer.Peer, err error)
|
||||
}
|
||||
|
||||
func (d *dialerMock) Dial(ctx context.Context, peerId string) (peer peer.Peer, err error) {
|
||||
return d.dial(ctx, peerId)
|
||||
}
|
||||
|
||||
func (d *dialerMock) UpdateAddrs(addrs map[string][]string) {
|
||||
return
|
||||
}
|
||||
|
||||
func (d *dialerMock) Init(ctx context.Context, a *app.App) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (d *dialerMock) Name() (name string) {
|
||||
return dialer.CName
|
||||
}
|
||||
|
||||
func newTestPeer(id string) *testPeer {
|
||||
return &testPeer{
|
||||
id: id,
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
type testPeer struct {
|
||||
id string
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
func (t *testPeer) Id() string {
|
||||
return t.id
|
||||
}
|
||||
|
||||
func (t *testPeer) LastUsage() time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
func (t *testPeer) UpdateLastUsage() {}
|
||||
|
||||
func (t *testPeer) Close() error {
|
||||
select {
|
||||
case <-t.closed:
|
||||
return fmt.Errorf("already closed")
|
||||
default:
|
||||
close(t.closed)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *testPeer) Closed() <-chan struct{} {
|
||||
return t.closed
|
||||
}
|
||||
|
||||
func (t *testPeer) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error {
|
||||
return fmt.Errorf("call Invoke on test peer")
|
||||
}
|
||||
|
||||
func (t *testPeer) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) {
|
||||
return nil, fmt.Errorf("call NewStream on test peer")
|
||||
}
|
|
@ -1,45 +0,0 @@
|
|||
package pool
|
||||
|
||||
import "context"
|
||||
|
||||
// 1. message for one peerId with ack
|
||||
// pool.SendAndWait(ctx context,.C
|
||||
// 2. message for many peers without ack (or group)
|
||||
|
||||
type Request struct {
|
||||
groupId string
|
||||
oneOf []string
|
||||
all []string
|
||||
tryDial bool
|
||||
needReply bool
|
||||
pool *pool
|
||||
}
|
||||
|
||||
func (r *Request) GroupId(groupId string) *Request {
|
||||
r.groupId = groupId
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) All(peerIds ...string) *Request {
|
||||
r.all = peerIds
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) OneOf(peerIds ...string) *Request {
|
||||
r.oneOf = peerIds
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) TryDial(is bool) *Request {
|
||||
r.tryDial = is
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) NeedReply(is bool) *Request {
|
||||
r.needReply = is
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) Exec(ctx context.Context, msg *Message) *Results {
|
||||
return nil
|
||||
}
|
|
@ -1,53 +0,0 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
|
||||
)
|
||||
|
||||
// Results of request collects replies and errors
|
||||
// Must be closed after usage r.Close()
|
||||
type Results struct {
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
waiterId uint64
|
||||
ch chan Reply
|
||||
pool *pool
|
||||
}
|
||||
|
||||
// Iterate iterates over replies
|
||||
// if callback will return a non-nil error then iteration stops
|
||||
func (r *Results) Iterate(callback func(r Reply) (err error)) (err error) {
|
||||
if r.ctx == nil || r.ch == nil {
|
||||
return fmt.Errorf("results not initialized")
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
return r.ctx.Err()
|
||||
case m, ok := <-r.ch:
|
||||
if ok {
|
||||
if err = callback(m); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close cancels iteration and unregister reply handler in the pool
|
||||
// Required to call to avoid memory leaks
|
||||
func (r *Results) Close() (err error) {
|
||||
r.cancel()
|
||||
return r.pool.waiters.Remove(r.waiterId)
|
||||
}
|
||||
|
||||
// Reply presents the result of request executing can be error or result message
|
||||
type Reply struct {
|
||||
PeerInfo peer.Info
|
||||
Error error
|
||||
Message *Message
|
||||
}
|
|
@ -1,18 +0,0 @@
|
|||
package rpc
|
||||
|
||||
import (
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"storj.io/drpc"
|
||||
)
|
||||
|
||||
var Encoding = enc{}
|
||||
|
||||
type enc struct{}
|
||||
|
||||
func (e enc) Marshal(msg drpc.Message) ([]byte, error) {
|
||||
return msg.(proto.Marshaler).Marshal()
|
||||
}
|
||||
|
||||
func (e enc) Unmarshal(buf []byte, msg drpc.Message) error {
|
||||
return msg.(proto.Unmarshaler).Unmarshal(buf)
|
||||
}
|
|
@ -6,11 +6,11 @@ import (
|
|||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure"
|
||||
"go.uber.org/zap"
|
||||
"net"
|
||||
"storj.io/drpc"
|
||||
"storj.io/drpc/drpcmux"
|
||||
"storj.io/drpc/drpcserver"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -21,11 +21,12 @@ const CName = "net/drpcserver"
|
|||
var log = logger.NewNamed(CName)
|
||||
|
||||
func New() DRPCServer {
|
||||
return &drpcServer{}
|
||||
return &drpcServer{Mux: drpcmux.New()}
|
||||
}
|
||||
|
||||
type DRPCServer interface {
|
||||
app.ComponentRunnable
|
||||
drpc.Mux
|
||||
}
|
||||
|
||||
type drpcServer struct {
|
||||
|
@ -35,6 +36,7 @@ type drpcServer struct {
|
|||
listeners []secure.ContextListener
|
||||
pool pool.Pool
|
||||
cancel func()
|
||||
*drpcmux.Mux
|
||||
}
|
||||
|
||||
func (s *drpcServer) Init(ctx context.Context, a *app.App) (err error) {
|
||||
|
@ -49,9 +51,7 @@ func (s *drpcServer) Name() (name string) {
|
|||
}
|
||||
|
||||
func (s *drpcServer) Run(ctx context.Context) (err error) {
|
||||
s.drpcServer = drpcserver.New(s)
|
||||
|
||||
s.drpcServer = drpcserver.New(mux)
|
||||
s.drpcServer = drpcserver.New(s.Mux)
|
||||
ctx, s.cancel = context.WithCancel(ctx)
|
||||
for _, addr := range s.config.ListenAddrs {
|
||||
tcpList, err := net.Listen("tcp", addr)
|
||||
|
@ -111,16 +111,6 @@ func (s *drpcServer) serveConn(ctx context.Context, conn net.Conn) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *drpcServer) HandleRPC(stream drpc.Stream, _ string) (err error) {
|
||||
ctx := stream.Context()
|
||||
sc, err := secure.CtxSecureConn(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
log.With(zap.String("peer", sc.RemotePeer().String())).Debug("stream opened")
|
||||
return s.pool.AddAndReadPeer(rpc.PeerFromStream(sc, stream, true))
|
||||
}
|
||||
|
||||
func (s *drpcServer) Close(ctx context.Context) (err error) {
|
||||
if s.cancel != nil {
|
||||
s.cancel()
|
||||
|
|
|
@ -1,55 +0,0 @@
|
|||
package rpc
|
||||
|
||||
import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/peer"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
|
||||
"github.com/libp2p/go-libp2p-core/sec"
|
||||
"storj.io/drpc"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
func PeerFromStream(sc sec.SecureConn, stream drpc.Stream, incoming bool) peer.Peer {
|
||||
dp := &drpcPeer{
|
||||
sc: sc,
|
||||
Stream: stream,
|
||||
}
|
||||
dp.info.Id = sc.RemotePeer().String()
|
||||
if incoming {
|
||||
dp.info.Dir = peer.DirInbound
|
||||
} else {
|
||||
dp.info.Dir = peer.DirOutbound
|
||||
}
|
||||
return dp
|
||||
}
|
||||
|
||||
type drpcPeer struct {
|
||||
sc sec.SecureConn
|
||||
info peer.Info
|
||||
drpc.Stream
|
||||
}
|
||||
|
||||
func (d *drpcPeer) Id() string {
|
||||
return d.info.Id
|
||||
}
|
||||
|
||||
func (d *drpcPeer) Info() peer.Info {
|
||||
return d.info
|
||||
}
|
||||
|
||||
func (d *drpcPeer) Recv() (msg *syncproto.Message, err error) {
|
||||
msg = &syncproto.Message{}
|
||||
if err = d.Stream.MsgRecv(msg, Encoding); err != nil {
|
||||
return
|
||||
}
|
||||
atomic.StoreInt64(&d.info.LastActiveUnix, time.Now().Unix())
|
||||
return
|
||||
}
|
||||
|
||||
func (d *drpcPeer) Send(msg *syncproto.Message) (err error) {
|
||||
if err = d.Stream.MsgSend(msg, Encoding); err != nil {
|
||||
return
|
||||
}
|
||||
atomic.StoreInt64(&d.info.LastActiveUnix, time.Now().Unix())
|
||||
return
|
||||
}
|
|
@ -9,10 +9,9 @@ import (
|
|||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/configuration"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool/handler"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc/server"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"storj.io/drpc/drpcerr"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -25,7 +24,7 @@ func New() Service {
|
|||
}
|
||||
|
||||
type Service interface {
|
||||
handler.ReplyHandler
|
||||
spacesync.DRPCSpaceServer
|
||||
app.ComponentRunnable
|
||||
}
|
||||
|
||||
|
@ -42,7 +41,7 @@ func (s *service) Init(ctx context.Context, a *app.App) (err error) {
|
|||
s.confService = a.MustComponent(configuration.CName).(configuration.Service)
|
||||
ttlSec := time.Second * time.Duration(s.conf.GCTTL)
|
||||
s.cache = ocache.New(s.loadSpace, ocache.WithTTL(ttlSec), ocache.WithGCPeriod(time.Minute))
|
||||
s.pool.AddHandler(syncproto.MessageType_MessageTypeSpace, handler.Reply{ReplyHandler: s}.Handle)
|
||||
spacesync.DRPCRegisterSpace(a.MustComponent(server.CName).(server.DRPCServer), s)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -72,19 +71,8 @@ func (s *service) get(ctx context.Context, id string) (Space, error) {
|
|||
return obj.(Space), nil
|
||||
}
|
||||
|
||||
func (s *service) Handle(ctx context.Context, data []byte) (resp proto.Marshaler, err error) {
|
||||
var spaceReq = &spacesync.Space{}
|
||||
if err = spaceReq.Unmarshal(data); err != nil {
|
||||
return
|
||||
}
|
||||
if spaceReq.SpaceId != "" {
|
||||
sp, e := s.get(ctx, spaceReq.SpaceId)
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
return sp.Handle(ctx, spaceReq)
|
||||
}
|
||||
return nil, fmt.Errorf("unexpected space message")
|
||||
func (s *service) HeadSync(ctx context.Context, request *spacesync.HeadSyncRequest) (*spacesync.HeadSyncResponse, error) {
|
||||
return nil, drpcerr.WithCode(fmt.Errorf("check"), 42)
|
||||
}
|
||||
|
||||
func (s *service) Close(ctx context.Context) (err error) {
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/configuration"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/remotediff"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/space/spacesync"
|
||||
"go.uber.org/zap"
|
||||
"math/rand"
|
||||
"sync"
|
||||
|
@ -15,10 +14,11 @@ import (
|
|||
|
||||
type Space interface {
|
||||
Id() string
|
||||
Handle(ctx context.Context, msg *spacesync.Space) (repl *spacesync.Space, err error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
type space struct {
|
||||
id string
|
||||
conf configuration.Configuration
|
||||
|
@ -64,28 +64,6 @@ func (s *space) testFill() {
|
|||
s.diff.Set(els...)
|
||||
}
|
||||
|
||||
func (s *space) Handle(ctx context.Context, msg *spacesync.Space) (repl *spacesync.Space, err error) {
|
||||
if diffRange := msg.GetMessage().GetDiffRange(); diffRange != nil {
|
||||
resp, er := remotediff.HandlerRangeRequest(ctx, s.diff, diffRange)
|
||||
if er != nil {
|
||||
return nil, er
|
||||
}
|
||||
return &spacesync.Space{SpaceId: s.id, Message: &spacesync.Space_Content{
|
||||
Value: &spacesync.Space_Content_DiffRange{
|
||||
DiffRange: resp,
|
||||
},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
///
|
||||
|
||||
///
|
||||
|
||||
///
|
||||
|
||||
return nil, fmt.Errorf("unexpected request")
|
||||
}
|
||||
|
||||
func (s *space) syncLoop() {
|
||||
defer close(s.syncLoopDone)
|
||||
doSync := func() {
|
||||
|
|
|
@ -1,42 +1,44 @@
|
|||
syntax = "proto3";
|
||||
package anytype;
|
||||
package anySpace;
|
||||
option go_package = "service/space/spacesync";
|
||||
|
||||
message Space {
|
||||
string spaceId = 1;
|
||||
|
||||
Content message = 2;
|
||||
|
||||
message Content {
|
||||
oneof value {
|
||||
DiffRange diffRange = 1;
|
||||
|
||||
}
|
||||
}
|
||||
enum ErrCodes {
|
||||
Unexpected = 0;
|
||||
}
|
||||
|
||||
message DiffRange {
|
||||
Request request = 1;
|
||||
Response response = 2;
|
||||
service Space {
|
||||
// HeadSync compares all objects and their hashes in a space
|
||||
rpc HeadSync(HeadSyncRequest) returns (HeadSyncResponse);
|
||||
}
|
||||
|
||||
message Request {
|
||||
repeated Range ranges = 1;
|
||||
message Range {
|
||||
uint64 from = 1;
|
||||
uint64 to = 2;
|
||||
uint32 limit = 3;
|
||||
}
|
||||
}
|
||||
message Response {
|
||||
repeated Result results = 1;
|
||||
message Result {
|
||||
bytes hash = 1;
|
||||
repeated Element elements = 2;
|
||||
uint32 count = 3;
|
||||
message Element {
|
||||
string id = 1;
|
||||
string head = 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// HeadSyncRange presenting a request for one range
|
||||
message HeadSyncRange {
|
||||
uint64 from = 1;
|
||||
uint64 to = 2;
|
||||
uint32 limit = 3;
|
||||
}
|
||||
|
||||
// HeadSyncResult presenting a response for one range
|
||||
message HeadSyncResult {
|
||||
bytes hash = 1;
|
||||
repeated HeadSyncResultElement elements = 2;
|
||||
uint32 count = 3;
|
||||
}
|
||||
|
||||
// HeadSyncResultElement presenting state of one object
|
||||
message HeadSyncResultElement {
|
||||
string id = 1;
|
||||
string head = 2;
|
||||
}
|
||||
|
||||
// HeadSyncRequest is a request for HeadSync
|
||||
message HeadSyncRequest {
|
||||
string spaceId = 1;
|
||||
repeated HeadSyncRange ranges = 2;
|
||||
}
|
||||
|
||||
// HeadSyncResponse is a response for HeadSync
|
||||
message HeadSyncResponse {
|
||||
repeated HeadSyncResult results = 1;
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load diff
113
service/space/spacesync/spacesync_drpc.pb.go
Normal file
113
service/space/spacesync/spacesync_drpc.pb.go
Normal file
|
@ -0,0 +1,113 @@
|
|||
// Code generated by protoc-gen-go-drpc. DO NOT EDIT.
|
||||
// protoc-gen-go-drpc version: v0.0.32
|
||||
// source: service/space/spacesync/protos/spacesync.proto
|
||||
|
||||
package spacesync
|
||||
|
||||
import (
|
||||
bytes "bytes"
|
||||
context "context"
|
||||
errors "errors"
|
||||
jsonpb "github.com/gogo/protobuf/jsonpb"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
drpc "storj.io/drpc"
|
||||
drpcerr "storj.io/drpc/drpcerr"
|
||||
)
|
||||
|
||||
type drpcEncoding_File_service_space_spacesync_protos_spacesync_proto struct{}
|
||||
|
||||
func (drpcEncoding_File_service_space_spacesync_protos_spacesync_proto) Marshal(msg drpc.Message) ([]byte, error) {
|
||||
return proto.Marshal(msg.(proto.Message))
|
||||
}
|
||||
|
||||
func (drpcEncoding_File_service_space_spacesync_protos_spacesync_proto) Unmarshal(buf []byte, msg drpc.Message) error {
|
||||
return proto.Unmarshal(buf, msg.(proto.Message))
|
||||
}
|
||||
|
||||
func (drpcEncoding_File_service_space_spacesync_protos_spacesync_proto) JSONMarshal(msg drpc.Message) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
err := new(jsonpb.Marshaler).Marshal(&buf, msg.(proto.Message))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func (drpcEncoding_File_service_space_spacesync_protos_spacesync_proto) JSONUnmarshal(buf []byte, msg drpc.Message) error {
|
||||
return jsonpb.Unmarshal(bytes.NewReader(buf), msg.(proto.Message))
|
||||
}
|
||||
|
||||
type DRPCSpaceClient interface {
|
||||
DRPCConn() drpc.Conn
|
||||
|
||||
HeadSync(ctx context.Context, in *HeadSyncRequest) (*HeadSyncResponse, error)
|
||||
}
|
||||
|
||||
type drpcSpaceClient struct {
|
||||
cc drpc.Conn
|
||||
}
|
||||
|
||||
func NewDRPCSpaceClient(cc drpc.Conn) DRPCSpaceClient {
|
||||
return &drpcSpaceClient{cc}
|
||||
}
|
||||
|
||||
func (c *drpcSpaceClient) DRPCConn() drpc.Conn { return c.cc }
|
||||
|
||||
func (c *drpcSpaceClient) HeadSync(ctx context.Context, in *HeadSyncRequest) (*HeadSyncResponse, error) {
|
||||
out := new(HeadSyncResponse)
|
||||
err := c.cc.Invoke(ctx, "/anySpace.Space/HeadSync", drpcEncoding_File_service_space_spacesync_protos_spacesync_proto{}, in, out)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
type DRPCSpaceServer interface {
|
||||
HeadSync(context.Context, *HeadSyncRequest) (*HeadSyncResponse, error)
|
||||
}
|
||||
|
||||
type DRPCSpaceUnimplementedServer struct{}
|
||||
|
||||
func (s *DRPCSpaceUnimplementedServer) HeadSync(context.Context, *HeadSyncRequest) (*HeadSyncResponse, error) {
|
||||
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
|
||||
}
|
||||
|
||||
type DRPCSpaceDescription struct{}
|
||||
|
||||
func (DRPCSpaceDescription) NumMethods() int { return 1 }
|
||||
|
||||
func (DRPCSpaceDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) {
|
||||
switch n {
|
||||
case 0:
|
||||
return "/anySpace.Space/HeadSync", drpcEncoding_File_service_space_spacesync_protos_spacesync_proto{},
|
||||
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
|
||||
return srv.(DRPCSpaceServer).
|
||||
HeadSync(
|
||||
ctx,
|
||||
in1.(*HeadSyncRequest),
|
||||
)
|
||||
}, DRPCSpaceServer.HeadSync, true
|
||||
default:
|
||||
return "", nil, nil, nil, false
|
||||
}
|
||||
}
|
||||
|
||||
func DRPCRegisterSpace(mux drpc.Mux, impl DRPCSpaceServer) error {
|
||||
return mux.Register(impl, DRPCSpaceDescription{})
|
||||
}
|
||||
|
||||
type DRPCSpace_HeadSyncStream interface {
|
||||
drpc.Stream
|
||||
SendAndClose(*HeadSyncResponse) error
|
||||
}
|
||||
|
||||
type drpcSpace_HeadSyncStream struct {
|
||||
drpc.Stream
|
||||
}
|
||||
|
||||
func (x *drpcSpace_HeadSyncStream) SendAndClose(m *HeadSyncResponse) error {
|
||||
if err := x.MsgSend(m, drpcEncoding_File_service_space_spacesync_protos_spacesync_proto{}); err != nil {
|
||||
return err
|
||||
}
|
||||
return x.CloseSend()
|
||||
}
|
|
@ -503,10 +503,10 @@ func (m *Sync) GetMessage() *Sync_ContentValue {
|
|||
|
||||
type Sync_ContentValue struct {
|
||||
// Types that are valid to be assigned to Value:
|
||||
// *Sync_Content_Value_HeadUpdate
|
||||
// *Sync_Content_Value_FullSyncRequest
|
||||
// *Sync_Content_Value_FullSyncResponse
|
||||
Value isSync_Content_Value_Value `protobuf_oneof:"value"`
|
||||
// *Sync_ContentValue_HeadUpdate
|
||||
// *Sync_ContentValue_FullSyncRequest
|
||||
// *Sync_ContentValue_FullSyncResponse
|
||||
Value isSync_ContentValue_Value `protobuf_oneof:"value"`
|
||||
}
|
||||
|
||||
func (m *Sync_ContentValue) Reset() { *m = Sync_ContentValue{} }
|
||||
|
@ -542,27 +542,27 @@ func (m *Sync_ContentValue) XXX_DiscardUnknown() {
|
|||
|
||||
var xxx_messageInfo_Sync_ContentValue proto.InternalMessageInfo
|
||||
|
||||
type isSync_Content_Value_Value interface {
|
||||
isSync_Content_Value_Value()
|
||||
type isSync_ContentValue_Value interface {
|
||||
isSync_ContentValue_Value()
|
||||
MarshalTo([]byte) (int, error)
|
||||
Size() int
|
||||
}
|
||||
|
||||
type Sync_Content_Value_HeadUpdate struct {
|
||||
type Sync_ContentValue_HeadUpdate struct {
|
||||
HeadUpdate *Sync_HeadUpdate `protobuf:"bytes,1,opt,name=headUpdate,proto3,oneof" json:"headUpdate,omitempty"`
|
||||
}
|
||||
type Sync_Content_Value_FullSyncRequest struct {
|
||||
type Sync_ContentValue_FullSyncRequest struct {
|
||||
FullSyncRequest *Sync_Full_Request `protobuf:"bytes,2,opt,name=fullSyncRequest,proto3,oneof" json:"fullSyncRequest,omitempty"`
|
||||
}
|
||||
type Sync_Content_Value_FullSyncResponse struct {
|
||||
type Sync_ContentValue_FullSyncResponse struct {
|
||||
FullSyncResponse *Sync_Full_Response `protobuf:"bytes,3,opt,name=fullSyncResponse,proto3,oneof" json:"fullSyncResponse,omitempty"`
|
||||
}
|
||||
|
||||
func (*Sync_Content_Value_HeadUpdate) isSync_Content_Value_Value() {}
|
||||
func (*Sync_Content_Value_FullSyncRequest) isSync_Content_Value_Value() {}
|
||||
func (*Sync_Content_Value_FullSyncResponse) isSync_Content_Value_Value() {}
|
||||
func (*Sync_ContentValue_HeadUpdate) isSync_ContentValue_Value() {}
|
||||
func (*Sync_ContentValue_FullSyncRequest) isSync_ContentValue_Value() {}
|
||||
func (*Sync_ContentValue_FullSyncResponse) isSync_ContentValue_Value() {}
|
||||
|
||||
func (m *Sync_ContentValue) GetValue() isSync_Content_Value_Value {
|
||||
func (m *Sync_ContentValue) GetValue() isSync_ContentValue_Value {
|
||||
if m != nil {
|
||||
return m.Value
|
||||
}
|
||||
|
@ -570,21 +570,21 @@ func (m *Sync_ContentValue) GetValue() isSync_Content_Value_Value {
|
|||
}
|
||||
|
||||
func (m *Sync_ContentValue) GetHeadUpdate() *Sync_HeadUpdate {
|
||||
if x, ok := m.GetValue().(*Sync_Content_Value_HeadUpdate); ok {
|
||||
if x, ok := m.GetValue().(*Sync_ContentValue_HeadUpdate); ok {
|
||||
return x.HeadUpdate
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Sync_ContentValue) GetFullSyncRequest() *Sync_Full_Request {
|
||||
if x, ok := m.GetValue().(*Sync_Content_Value_FullSyncRequest); ok {
|
||||
if x, ok := m.GetValue().(*Sync_ContentValue_FullSyncRequest); ok {
|
||||
return x.FullSyncRequest
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Sync_ContentValue) GetFullSyncResponse() *Sync_Full_Response {
|
||||
if x, ok := m.GetValue().(*Sync_Content_Value_FullSyncResponse); ok {
|
||||
if x, ok := m.GetValue().(*Sync_ContentValue_FullSyncResponse); ok {
|
||||
return x.FullSyncResponse
|
||||
}
|
||||
return nil
|
||||
|
@ -593,9 +593,9 @@ func (m *Sync_ContentValue) GetFullSyncResponse() *Sync_Full_Response {
|
|||
// XXX_OneofWrappers is for the internal use of the proto package.
|
||||
func (*Sync_ContentValue) XXX_OneofWrappers() []interface{} {
|
||||
return []interface{}{
|
||||
(*Sync_Content_Value_HeadUpdate)(nil),
|
||||
(*Sync_Content_Value_FullSyncRequest)(nil),
|
||||
(*Sync_Content_Value_FullSyncResponse)(nil),
|
||||
(*Sync_ContentValue_HeadUpdate)(nil),
|
||||
(*Sync_ContentValue_FullSyncRequest)(nil),
|
||||
(*Sync_ContentValue_FullSyncResponse)(nil),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1293,12 +1293,12 @@ func (m *Sync_ContentValue) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
|||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *Sync_Content_Value_HeadUpdate) MarshalTo(dAtA []byte) (int, error) {
|
||||
func (m *Sync_ContentValue_HeadUpdate) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *Sync_Content_Value_HeadUpdate) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
func (m *Sync_ContentValue_HeadUpdate) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
if m.HeadUpdate != nil {
|
||||
{
|
||||
|
@ -1314,12 +1314,12 @@ func (m *Sync_Content_Value_HeadUpdate) MarshalToSizedBuffer(dAtA []byte) (int,
|
|||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
func (m *Sync_Content_Value_FullSyncRequest) MarshalTo(dAtA []byte) (int, error) {
|
||||
func (m *Sync_ContentValue_FullSyncRequest) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *Sync_Content_Value_FullSyncRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
func (m *Sync_ContentValue_FullSyncRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
if m.FullSyncRequest != nil {
|
||||
{
|
||||
|
@ -1335,12 +1335,12 @@ func (m *Sync_Content_Value_FullSyncRequest) MarshalToSizedBuffer(dAtA []byte) (
|
|||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
func (m *Sync_Content_Value_FullSyncResponse) MarshalTo(dAtA []byte) (int, error) {
|
||||
func (m *Sync_ContentValue_FullSyncResponse) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *Sync_Content_Value_FullSyncResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
func (m *Sync_ContentValue_FullSyncResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
if m.FullSyncResponse != nil {
|
||||
{
|
||||
|
@ -1759,7 +1759,7 @@ func (m *Sync_ContentValue) Size() (n int) {
|
|||
return n
|
||||
}
|
||||
|
||||
func (m *Sync_Content_Value_HeadUpdate) Size() (n int) {
|
||||
func (m *Sync_ContentValue_HeadUpdate) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -1771,7 +1771,7 @@ func (m *Sync_Content_Value_HeadUpdate) Size() (n int) {
|
|||
}
|
||||
return n
|
||||
}
|
||||
func (m *Sync_Content_Value_FullSyncRequest) Size() (n int) {
|
||||
func (m *Sync_ContentValue_FullSyncRequest) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -1783,7 +1783,7 @@ func (m *Sync_Content_Value_FullSyncRequest) Size() (n int) {
|
|||
}
|
||||
return n
|
||||
}
|
||||
func (m *Sync_Content_Value_FullSyncResponse) Size() (n int) {
|
||||
func (m *Sync_ContentValue_FullSyncResponse) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -2884,7 +2884,7 @@ func (m *Sync_ContentValue) Unmarshal(dAtA []byte) error {
|
|||
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
m.Value = &Sync_Content_Value_HeadUpdate{v}
|
||||
m.Value = &Sync_ContentValue_HeadUpdate{v}
|
||||
iNdEx = postIndex
|
||||
case 2:
|
||||
if wireType != 2 {
|
||||
|
@ -2919,7 +2919,7 @@ func (m *Sync_ContentValue) Unmarshal(dAtA []byte) error {
|
|||
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
m.Value = &Sync_Content_Value_FullSyncRequest{v}
|
||||
m.Value = &Sync_ContentValue_FullSyncRequest{v}
|
||||
iNdEx = postIndex
|
||||
case 3:
|
||||
if wireType != 2 {
|
||||
|
@ -2954,7 +2954,7 @@ func (m *Sync_ContentValue) Unmarshal(dAtA []byte) error {
|
|||
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
m.Value = &Sync_Content_Value_FullSyncResponse{v}
|
||||
m.Value = &Sync_ContentValue_FullSyncResponse{v}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue