1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-12 02:30:41 +09:00

Few more fixes, remove node service

This commit is contained in:
mcrakhman 2022-09-26 22:15:40 +02:00 committed by Mikhail Iudin
parent 9b196a6521
commit 9b752d0e96
No known key found for this signature in database
GPG key ID: FAAAA8BAABDFF1C0
6 changed files with 112 additions and 133 deletions

View file

@ -99,10 +99,10 @@ func (s *space) BuildTree(ctx context.Context, id string, listener synctree.Upda
fullSyncResp := resp.GetContent().GetFullSyncResponse()
payload := treestorage.TreeStorageCreatePayload{
TreeId: resp.TreeId,
Header: resp.TreeHeader,
Changes: fullSyncResp.Changes,
Heads: fullSyncResp.Heads,
TreeId: resp.TreeId,
RootRawChange: resp.RootChange,
Changes: fullSyncResp.Changes,
Heads: fullSyncResp.Heads,
}
// basically building tree with inmemory storage and validating that it was without errors

View file

@ -1,45 +1,45 @@
package spacesyncproto
import "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclrecordproto/aclpb"
import "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
type SpaceStream = DRPCSpace_StreamStream
func WrapHeadUpdate(update *ObjectHeadUpdate, header *aclpb.TreeHeader, treeId, trackingId string) *ObjectSyncMessage {
func WrapHeadUpdate(update *ObjectHeadUpdate, rootChange *treechangeproto.RawTreeChangeWithId, treeId, trackingId string) *ObjectSyncMessage {
return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_HeadUpdate{HeadUpdate: update},
},
TreeHeader: header,
RootChange: rootChange,
TreeId: treeId,
}
}
func WrapFullRequest(request *ObjectFullSyncRequest, header *aclpb.TreeHeader, treeId, trackingId string) *ObjectSyncMessage {
func WrapFullRequest(request *ObjectFullSyncRequest, rootChange *treechangeproto.RawTreeChangeWithId, treeId, trackingId string) *ObjectSyncMessage {
return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_FullSyncRequest{FullSyncRequest: request},
},
TreeHeader: header,
RootChange: rootChange,
TreeId: treeId,
}
}
func WrapFullResponse(response *ObjectFullSyncResponse, header *aclpb.TreeHeader, treeId, trackingId string) *ObjectSyncMessage {
func WrapFullResponse(response *ObjectFullSyncResponse, rootChange *treechangeproto.RawTreeChangeWithId, treeId, trackingId string) *ObjectSyncMessage {
return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_FullSyncResponse{FullSyncResponse: response},
},
TreeHeader: header,
RootChange: rootChange,
TreeId: treeId,
}
}
func WrapError(err error, header *aclpb.TreeHeader, treeId, trackingId string) *ObjectSyncMessage {
func WrapError(err error, rootChange *treechangeproto.RawTreeChangeWithId, treeId, trackingId string) *ObjectSyncMessage {
return &ObjectSyncMessage{
Content: &ObjectSyncContentValue{
Value: &ObjectSyncContentValue_ErrorResponse{ErrorResponse: &ObjectErrorResponse{Error: err.Error()}},
},
TreeHeader: header,
RootChange: rootChange,
TreeId: treeId,
}
}

View file

@ -1 +1,39 @@
package spacetree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
)
type SpaceTree interface {
cache.TreeContainer
ID() string
GetObjectIds() []string
Sync()
}
type spaceTree struct{}
func (s *spaceTree) Tree() tree.ObjectTree {
//TODO implement me
panic("implement me")
}
func (s *spaceTree) ID() string {
//TODO implement me
panic("implement me")
}
func (s *spaceTree) GetObjectIds() []string {
//TODO implement me
panic("implement me")
}
func (s *spaceTree) Sync() {
//TODO implement me
panic("implement me")
}
func NewSpaceTree(id string) (SpaceTree, error) {
return &spaceTree{}, nil
}

View file

@ -4,8 +4,8 @@ import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclrecordproto/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
)
@ -80,7 +80,7 @@ func (s *syncHandler) HandleHeadUpdate(
if fullRequest != nil {
return s.syncClient.SendAsync(senderId,
spacesyncproto.WrapFullRequest(fullRequest, msg.TreeHeader, msg.TreeId, msg.TrackingId))
spacesyncproto.WrapFullRequest(fullRequest, msg.RootChange, msg.TreeId, msg.TrackingId))
}
return
}
@ -92,7 +92,7 @@ func (s *syncHandler) HandleFullSyncRequest(
msg *spacesyncproto.ObjectSyncMessage) (err error) {
var (
fullResponse *spacesyncproto.ObjectFullSyncResponse
header = msg.TreeHeader
header = msg.RootChange
)
defer func() {
if err != nil {
@ -167,7 +167,7 @@ func (s *syncHandler) prepareFullSyncRequest(
SnapshotPath: t.SnapshotPath(),
}
if len(update.Changes) != 0 {
var changesAfterSnapshot []*aclpb.RawTreeChangeWithId
var changesAfterSnapshot []*treechangeproto.RawTreeChangeWithId
changesAfterSnapshot, err = t.ChangesAfterCommonSnapshot(update.SnapshotPath, update.Heads)
if err != nil {
return

View file

@ -5,7 +5,9 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/node"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/encryptionkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
"github.com/anytypeio/go-chash"
)
@ -31,11 +33,26 @@ type service struct {
last Configuration
}
type Node struct {
Address string
PeerId string
SigningKey signingkey.PubKey
EncryptionKey encryptionkey.PubKey
}
func (n *Node) Id() string {
return n.PeerId
}
func (n *Node) Capacity() float64 {
return 1
}
func (s *service) Init(a *app.App) (err error) {
conf := a.MustComponent(config.CName).(*config.Config)
s.accountId = conf.Account.PeerId
s.pool = a.MustComponent(pool.CName).(pool.Pool)
configNodes := a.MustComponent(node.CName).(node.Service).Nodes()
config := &configuration{
id: "config",
accountId: s.accountId,
@ -47,9 +64,18 @@ func (s *service) Init(a *app.App) (err error) {
}); err != nil {
return
}
members := make([]chash.Member, 0, len(configNodes))
for _, n := range configNodes {
members = append(members, n)
members := make([]chash.Member, 0, len(conf.Nodes)-1)
for _, n := range conf.Nodes {
if n.PeerId == conf.Account.PeerId {
continue
}
var member *Node
member, err = nodeFromConfigNode(n)
if err != nil {
return
}
members = append(members, member)
}
if err = config.chash.AddMembers(members...); err != nil {
return
@ -70,3 +96,29 @@ func (s *service) GetById(id string) Configuration {
//TODO implement me
panic("implement me")
}
func nodeFromConfigNode(
n config.Node) (*Node, error) {
decodedSigningKey, err := keys.DecodeKeyFromString(
n.SigningKey,
signingkey.UnmarshalEd25519PrivateKey,
nil)
if err != nil {
return nil, err
}
decodedEncryptionKey, err := keys.DecodeKeyFromString(
n.SigningKey,
encryptionkey.NewEncryptionRsaPrivKeyFromBytes,
nil)
if err != nil {
return nil, err
}
return &Node{
Address: n.Address,
PeerId: n.PeerId,
SigningKey: decodedSigningKey.GetPublic(),
EncryptionKey: decodedEncryptionKey.GetPublic(),
}, nil
}

View file

@ -1,111 +0,0 @@
package node
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/encryptionkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
"go.uber.org/zap"
)
const CName = "NodesService"
var log = logger.NewNamed("nodesservice")
type Node struct {
Address string
PeerId string
SigningKey signingkey.PubKey
EncryptionKey encryptionkey.PubKey
SigningKeyString string
EncryptionKeyString string
}
func (n *Node) Id() string {
return n.PeerId
}
func (n *Node) Capacity() float64 {
return 1
}
func New() app.Component {
return &service{}
}
type Service interface {
Nodes() []*Node
}
type service struct {
nodes []*Node
}
func (s *service) Init(a *app.App) (err error) {
cfg := a.MustComponent(config.CName).(*config.Config)
signDecoder := signingkey.NewEDPrivKeyDecoder()
rsaDecoder := encryptionkey.NewRSAPrivKeyDecoder()
var filteredNodes []*Node
for _, n := range cfg.Nodes {
if n.PeerId == cfg.Account.PeerId {
continue
}
node, err := nodeFromConfigNode(n, n.PeerId, signDecoder, rsaDecoder)
if err != nil {
return err
}
log.With(zap.String("node", node.PeerId)).Debug("adding peer to known nodes")
filteredNodes = append(filteredNodes, node)
}
s.nodes = filteredNodes
return nil
}
func (s *service) Name() (name string) {
return CName
}
func (s *service) Run(ctx context.Context) (err error) {
return nil
}
func (s *service) Close(ctx context.Context) (err error) {
return nil
}
func (s *service) Nodes() []*Node {
return s.nodes
}
func nodeFromConfigNode(
n config.Node,
peerId string) (*Node, error) {
decodedSigningKey, err := privateSigningDecoder.DecodeFromString(n.SigningKey)
if err != nil {
return nil, err
}
decodedEncryptionKey, err := privateEncryptionDecoder.DecodeFromString(n.EncryptionKey)
if err != nil {
return nil, err
}
encKeyString, err := privateEncryptionDecoder.EncodeToString(decodedEncryptionKey.(encryptionkey.PrivKey).GetPublic())
if err != nil {
return nil, err
}
signKeyString, err := privateSigningDecoder.EncodeToString(decodedSigningKey.(signingkey.PrivKey).GetPublic())
return &Node{
Address: n.Address,
PeerId: peerId,
SigningKey: decodedSigningKey.(signingkey.PrivKey).GetPublic(),
EncryptionKey: decodedEncryptionKey.(encryptionkey.PrivKey).GetPublic(),
SigningKeyString: signKeyString,
EncryptionKeyString: encKeyString,
}, nil
}