mirror of
https://github.com/anyproto/any-sync.git
synced 2025-06-12 02:30:41 +09:00
Fix compile errors and add dependencies
This commit is contained in:
parent
26704a72e9
commit
3f4c50bce4
6 changed files with 105 additions and 123 deletions
|
@ -7,11 +7,17 @@ import (
|
|||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/example"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/api"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/dialer"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/rpc/server"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/secure"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/node"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/document"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
|
||||
"go.uber.org/zap"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
|
@ -85,9 +91,16 @@ func main() {
|
|||
}
|
||||
|
||||
func Bootstrap(a *app.App) {
|
||||
a.Register(secure.New()).
|
||||
a.Register(account.New()).
|
||||
Register(secure.New()).
|
||||
Register(server.New()).
|
||||
Register(dialer.New()).
|
||||
Register(pool.NewPool()).
|
||||
Register(&example.Example{})
|
||||
//Register(&example.Example{})
|
||||
Register(node.New()).
|
||||
Register(document.New()).
|
||||
Register(message.New()).
|
||||
Register(requesthandler.New()).
|
||||
Register(treecache.New()).
|
||||
Register(api.New())
|
||||
}
|
||||
|
|
|
@ -13,9 +13,8 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
flagNodeMap = flag.String("n", "cmd/nodesgen/nodemap.yml", "path to nodes map file")
|
||||
flagAccountConfigFile = flag.String("a", "etc/nodes.yml", "path to account file")
|
||||
flagEtcPath = flag.String("e", "etc", "path to etc directory")
|
||||
flagNodeMap = flag.String("n", "cmd/nodesgen/nodemap.yml", "path to nodes map file")
|
||||
flagEtcPath = flag.String("e", "etc", "path to etc directory")
|
||||
)
|
||||
|
||||
type NodesMap struct {
|
||||
|
|
|
@ -42,10 +42,10 @@ type dialer struct {
|
|||
|
||||
func (d *dialer) Init(ctx context.Context, a *app.App) (err error) {
|
||||
d.transport = a.MustComponent(secure.CName).(secure.Service)
|
||||
peerConf := a.MustComponent(config.CName).(*config.Config).PeerList.Remote
|
||||
nodes := a.MustComponent(config.CName).(*config.Config).Nodes
|
||||
d.peerAddrs = map[string][]string{}
|
||||
for _, rp := range peerConf {
|
||||
d.peerAddrs[rp.PeerId] = []string{rp.Addr}
|
||||
for _, n := range nodes {
|
||||
d.peerAddrs[n.PeerId] = []string{n.Address}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -11,8 +11,8 @@ import (
|
|||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/account"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/node"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/message"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -100,10 +100,10 @@ func (s *service) UpdateDocument(ctx context.Context, id, text string) (err erro
|
|||
zap.String("header", header.String())).
|
||||
Debug("document updated in the database")
|
||||
|
||||
return s.messageService.SendMessage("", syncpb.WrapHeadUpdate(&syncpb.SyncHeadUpdate{
|
||||
return s.messageService.SendToSpace("", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{
|
||||
Heads: heads,
|
||||
Changes: []*aclpb.RawChange{ch},
|
||||
TreeId: "",
|
||||
TreeId: id,
|
||||
SnapshotPath: snapshotPath,
|
||||
TreeHeader: header,
|
||||
}))
|
||||
|
@ -155,10 +155,10 @@ func (s *service) CreateDocument(ctx context.Context, text string) (id string, e
|
|||
return "", err
|
||||
}
|
||||
|
||||
err = s.messageService.SendMessage("", syncpb.WrapHeadUpdate(&syncpb.SyncHeadUpdate{
|
||||
err = s.messageService.SendToSpace("", syncproto.WrapHeadUpdate(&syncproto.SyncHeadUpdate{
|
||||
Heads: heads,
|
||||
Changes: []*aclpb.RawChange{ch},
|
||||
TreeId: "",
|
||||
TreeId: id,
|
||||
SnapshotPath: snapshotPath,
|
||||
TreeHeader: header,
|
||||
}))
|
||||
|
|
|
@ -4,9 +4,11 @@ import (
|
|||
"context"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/config"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/net/pool"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
|
||||
"github.com/cheggaaa/mb"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/syncproto"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
"sync"
|
||||
)
|
||||
|
@ -16,35 +18,25 @@ var log = logger.NewNamed("messageservice")
|
|||
const CName = "MessageService"
|
||||
|
||||
type service struct {
|
||||
receiveBatcher *mb.MB
|
||||
sendBatcher *mb.MB
|
||||
senderChannels map[string]chan *syncpb.SyncContent
|
||||
nodes []config.Node
|
||||
requestHandler requesthandler.RequestHandler
|
||||
pool pool.Pool
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
type message struct {
|
||||
peerId string
|
||||
content *syncpb.SyncContent
|
||||
}
|
||||
|
||||
func New() app.Component {
|
||||
return &service{}
|
||||
}
|
||||
|
||||
type Service interface {
|
||||
RegisterMessageSender(peerId string) chan *syncpb.SyncContent
|
||||
UnregisterMessageSender(peerId string)
|
||||
|
||||
HandleMessage(peerId string, msg *syncpb.SyncContent) error
|
||||
SendMessage(peerId string, msg *syncpb.SyncContent) error
|
||||
SendMessage(peerId string, msg *syncproto.Sync) error
|
||||
SendToSpace(spaceId string, msg *syncproto.Sync) error
|
||||
}
|
||||
|
||||
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
|
||||
s.receiveBatcher = mb.New(0)
|
||||
s.sendBatcher = mb.New(0)
|
||||
s.senderChannels = make(map[string]chan *syncpb.SyncContent)
|
||||
s.requestHandler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler)
|
||||
s.nodes = a.MustComponent(config.CName).(*config.Config).Nodes
|
||||
s.pool = a.MustComponent(pool.CName).(pool.Pool)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -53,8 +45,16 @@ func (s *service) Name() (name string) {
|
|||
}
|
||||
|
||||
func (s *service) Run(ctx context.Context) (err error) {
|
||||
//go s.runSender(ctx)
|
||||
//go s.runReceiver(ctx)
|
||||
// dial manually to all peers
|
||||
for _, rp := range s.nodes {
|
||||
if er := s.pool.DialAndAddPeer(ctx, rp.PeerId); er != nil {
|
||||
log.Info("can't dial to peer", zap.Error(er))
|
||||
} else {
|
||||
log.Info("connected with peer", zap.String("peerId", rp.PeerId))
|
||||
}
|
||||
}
|
||||
s.pool.AddHandler(syncproto.MessageType_MessageTypeSync, s.HandleMessage)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -62,104 +62,74 @@ func (s *service) Close(ctx context.Context) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *service) RegisterMessageSender(peerId string) chan *syncpb.SyncContent {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
if ch, exists := s.senderChannels[peerId]; !exists {
|
||||
return ch
|
||||
}
|
||||
ch := make(chan *syncpb.SyncContent)
|
||||
s.senderChannels[peerId] = ch
|
||||
return ch
|
||||
}
|
||||
|
||||
func (s *service) UnregisterMessageSender(peerId string) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
if _, exists := s.senderChannels[peerId]; !exists {
|
||||
return
|
||||
}
|
||||
close(s.senderChannels[peerId])
|
||||
delete(s.senderChannels, peerId)
|
||||
}
|
||||
|
||||
func (s *service) HandleMessage(peerId string, msg *syncpb.SyncContent) error {
|
||||
func (s *service) HandleMessage(ctx context.Context, msg *pool.Message) (err error) {
|
||||
log.With(
|
||||
zap.String("peerId", peerId),
|
||||
zap.String("message", msgType(msg))).
|
||||
zap.String("peerId", msg.Peer().Id())).
|
||||
Debug("handling message from peer")
|
||||
return s.receiveBatcher.Add(&message{
|
||||
peerId: peerId,
|
||||
content: msg,
|
||||
})
|
||||
|
||||
var syncMsg *syncproto.Sync
|
||||
err = proto.Unmarshal(msg.Data, syncMsg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.requestHandler.HandleSyncMessage(ctx, msg.Peer().Id(), syncMsg)
|
||||
}
|
||||
|
||||
func (s *service) SendMessage(peerId string, msg *syncpb.SyncContent) error {
|
||||
func (s *service) SendMessage(peerId string, msg *syncproto.Sync) error {
|
||||
log.With(
|
||||
zap.String("peerId", peerId),
|
||||
zap.String("message", msgType(msg))).
|
||||
Debug("sending message to peer")
|
||||
return s.sendBatcher.Add(&message{
|
||||
peerId: peerId,
|
||||
content: msg,
|
||||
|
||||
marshalled, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.pool.SendAndWait(context.Background(), peerId, &syncproto.Message{
|
||||
Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync},
|
||||
Data: marshalled,
|
||||
})
|
||||
if err != nil {
|
||||
log.With(
|
||||
zap.String("peerId", peerId),
|
||||
zap.String("message", msgType(msg)),
|
||||
zap.Error(err)).
|
||||
Error("failed to send message to peer")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *service) runReceiver(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
break
|
||||
}
|
||||
msgs := s.receiveBatcher.WaitMinMax(1, 100)
|
||||
// TODO: this is bad to serve everything on a new goroutine, but very easy for prototyping :-)
|
||||
for _, msg := range msgs {
|
||||
typedMsg := msg.(*message)
|
||||
go func(typedMsg *message) {
|
||||
err := s.requestHandler.HandleFullSyncContent(ctx, typedMsg.peerId, typedMsg.content)
|
||||
if err != nil {
|
||||
log.Error("failed to handle content", zap.Error(err))
|
||||
}
|
||||
}(typedMsg)
|
||||
func (s *service) SendToSpace(spaceId string, msg *syncproto.Sync) error {
|
||||
log.With(
|
||||
zap.String("message", msgType(msg))).
|
||||
Debug("sending message to all")
|
||||
|
||||
marshalled, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: use Broadcast method here when it is ready
|
||||
for _, n := range s.nodes {
|
||||
err := s.pool.SendAndWait(context.Background(), n.PeerId, &syncproto.Message{
|
||||
Header: &syncproto.Header{Type: syncproto.MessageType_MessageTypeSync},
|
||||
Data: marshalled,
|
||||
})
|
||||
if err != nil {
|
||||
log.With(
|
||||
zap.String("peerId", n.PeerId),
|
||||
zap.String("message", msgType(msg)),
|
||||
zap.Error(err)).
|
||||
Error("failed to send message to peer")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) runSender(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
break
|
||||
}
|
||||
msgs := s.sendBatcher.WaitMinMax(1, 100)
|
||||
s.RLock()
|
||||
for _, msg := range msgs {
|
||||
s.sendMessage(msg.(*message))
|
||||
}
|
||||
s.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) sendMessage(typedMsg *message) {
|
||||
// this should be done under lock
|
||||
if typedMsg.content.GetMessage().GetHeadUpdate() != nil {
|
||||
for _, ch := range s.senderChannels {
|
||||
ch <- typedMsg.content
|
||||
}
|
||||
return
|
||||
}
|
||||
ch, exists := s.senderChannels[typedMsg.peerId]
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
ch <- typedMsg.content
|
||||
}
|
||||
|
||||
func msgType(content *syncpb.SyncContent) string {
|
||||
func msgType(content *syncproto.Sync) string {
|
||||
msg := content.GetMessage()
|
||||
switch {
|
||||
case msg.GetFullSyncRequest() != nil:
|
||||
|
|
|
@ -24,12 +24,12 @@ func New() app.Component {
|
|||
}
|
||||
|
||||
type RequestHandler interface {
|
||||
HandleFullSyncContent(ctx context.Context, senderId string, request *syncproto.Sync) (err error)
|
||||
HandleSyncMessage(ctx context.Context, senderId string, request *syncproto.Sync) (err error)
|
||||
}
|
||||
|
||||
type MessageSender interface {
|
||||
SendMessage(peerId string, msg *syncproto.Sync) error
|
||||
SendSpace(spaceId string, msg *syncproto.Sync) error
|
||||
SendToSpace(spaceId string, msg *syncproto.Sync) error
|
||||
}
|
||||
|
||||
const CName = "SyncRequestHandler"
|
||||
|
@ -53,7 +53,7 @@ func (r *requestHandler) Close(ctx context.Context) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *requestHandler) HandleFullSyncContent(ctx context.Context, senderId string, content *syncproto.Sync) error {
|
||||
func (r *requestHandler) HandleSyncMessage(ctx context.Context, senderId string, content *syncproto.Sync) error {
|
||||
msg := content.GetMessage()
|
||||
switch {
|
||||
case msg.GetFullSyncRequest() != nil:
|
||||
|
@ -113,7 +113,7 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string,
|
|||
TreeId: update.TreeId,
|
||||
TreeHeader: update.TreeHeader,
|
||||
}
|
||||
return r.messageService.SendSpace("", syncproto.WrapHeadUpdate(newUpdate))
|
||||
return r.messageService.SendToSpace("", syncproto.WrapHeadUpdate(newUpdate))
|
||||
}
|
||||
|
||||
func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncproto.SyncFullRequest) (err error) {
|
||||
|
@ -156,7 +156,7 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str
|
|||
TreeId: request.TreeId,
|
||||
TreeHeader: request.TreeHeader,
|
||||
}
|
||||
return r.messageService.SendSpace("", syncproto.WrapHeadUpdate(newUpdate))
|
||||
return r.messageService.SendToSpace("", syncproto.WrapHeadUpdate(newUpdate))
|
||||
}
|
||||
|
||||
func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncproto.SyncFullResponse) (err error) {
|
||||
|
@ -192,7 +192,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st
|
|||
SnapshotPath: snapshotPath,
|
||||
TreeId: response.TreeId,
|
||||
}
|
||||
return r.messageService.SendSpace("", syncproto.WrapHeadUpdate(newUpdate))
|
||||
return r.messageService.SendToSpace("", syncproto.WrapHeadUpdate(newUpdate))
|
||||
}
|
||||
|
||||
func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncproto.SyncFullRequest, error) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue