1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-10 10:00:49 +09:00

Move logic to different components

This commit is contained in:
mcrakhman 2022-07-16 13:21:46 +02:00 committed by Mikhail Iudin
parent 46cee8c9ed
commit 4d935a0a3b
No known key found for this signature in database
GPG key ID: FAAAA8BAABDFF1C0
6 changed files with 264 additions and 138 deletions

View file

@ -26,7 +26,7 @@ func (s *service) Account() *account.AccountData {
}
type StaticAccount struct {
SigningKey string `yaml:"siginingKey"`
SigningKey string `yaml:"signingKey"`
EncryptionKey string `yaml:"encryptionKey"`
}

View file

@ -0,0 +1,56 @@
package client
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/requesthandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
)
const CName = "SyncClient"
type client struct {
handler requesthandler.RequestHandler
}
func NewClient() app.Component {
return &client{}
}
type Client interface {
NotifyHeadsChanged(update *syncpb.SyncHeadUpdate) error
RequestFullSync(id string, request *syncpb.SyncFullRequest) error
SendFullSyncResponse(id string, response *syncpb.SyncFullResponse) error
}
func (c *client) Init(ctx context.Context, a *app.App) (err error) {
c.handler = a.MustComponent(requesthandler.CName).(requesthandler.RequestHandler)
return nil
}
func (c *client) Name() (name string) {
return CName
}
func (c *client) Run(ctx context.Context) (err error) {
return nil
}
func (c *client) Close(ctx context.Context) (err error) {
return nil
}
func (c *client) NotifyHeadsChanged(update *syncpb.SyncHeadUpdate) error {
//TODO implement me
panic("implement me")
}
func (c *client) RequestFullSync(id string, request *syncpb.SyncFullRequest) error {
//TODO implement me
panic("implement me")
}
func (c *client) SendFullSyncResponse(id string, response *syncpb.SyncFullResponse) error {
//TODO implement me
panic("implement me")
}

View file

@ -1,86 +0,0 @@
package sync
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
)
type requestHander struct {
treeCache treecache.Service
client SyncClient
}
func (r *requestHander) HandleHeadUpdate(ctx context.Context, senderId string, update *syncpb.SyncHeadUpdate) (err error) {
var (
fullRequest *syncpb.SyncFullRequest
snapshotPath []string
result acltree.AddResult
)
err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes
result, err = tree.AddRawChanges(ctx, update.Changes...)
if err != nil {
return err
}
shouldFullSync := !slice.UnsortedEquals(update.Heads, tree.Heads())
snapshotPath = tree.SnapshotPath()
if shouldFullSync {
fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.SnapshotPath, tree)
if err != nil {
return err
}
}
return nil
})
// if there are no such tree
if err == treestorage.ErrUnknownTreeId {
fullRequest = &syncpb.SyncFullRequest{
TreeId: update.TreeId,
}
}
// if we have incompatible heads, or we haven't seen the tree at all
if fullRequest != nil {
return r.client.RequestFullSync(senderId, fullRequest)
}
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
return err
}
// otherwise sending heads update message
newUpdate := &syncpb.SyncHeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
TreeId: update.TreeId,
}
err = r.client.NotifyHeadsChanged(newUpdate)
return
}
func (r *requestHander) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) error {
// TODO: add case of new tree
return nil
}
func (r *requestHander) HandleFullSyncResponse(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) error {
// TODO: add case of new tree
return nil
}
func (r *requestHander) prepareFullSyncRequest(treeId string, theirPath []string, tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) {
ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath)
if err != nil {
return nil, err
}
return &syncpb.SyncFullRequest{
Heads: tree.Heads(),
Changes: ourChanges,
TreeId: treeId,
SnapshotPath: tree.SnapshotPath(),
}, nil
}

View file

@ -0,0 +1,207 @@
package requesthandler
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/client"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
)
type requestHandler struct {
treeCache treecache.Service
client client.Client
}
func NewRequestHandler() app.Component {
return &requestHandler{}
}
type RequestHandler interface {
HandleHeadUpdate(ctx context.Context, senderId string, update *syncpb.SyncHeadUpdate) (err error)
HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error)
HandleFullSyncResponse(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error)
}
const CName = "SyncRequestHandler"
func (r *requestHandler) Init(ctx context.Context, a *app.App) (err error) {
r.treeCache = a.MustComponent(treecache.CName).(treecache.Service)
return nil
}
func (r *requestHandler) Name() (name string) {
return CName
}
func (r *requestHandler) Run(ctx context.Context) (err error) {
return nil
}
func (r *requestHandler) Close(ctx context.Context) (err error) {
return nil
}
func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, update *syncpb.SyncHeadUpdate) (err error) {
var (
fullRequest *syncpb.SyncFullRequest
snapshotPath []string
result acltree.AddResult
)
err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes
result, err = tree.AddRawChanges(ctx, update.Changes...)
if err != nil {
return err
}
shouldFullSync := !slice.UnsortedEquals(update.Heads, tree.Heads())
snapshotPath = tree.SnapshotPath()
if shouldFullSync {
fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.SnapshotPath, tree)
if err != nil {
return err
}
}
return nil
})
// if there are no such tree
if err == treestorage.ErrUnknownTreeId {
fullRequest = &syncpb.SyncFullRequest{
TreeId: update.TreeId,
}
}
// if we have incompatible heads, or we haven't seen the tree at all
if fullRequest != nil {
return r.client.RequestFullSync(senderId, fullRequest)
}
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
return err
}
// otherwise sending heads update message
newUpdate := &syncpb.SyncHeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
TreeId: update.TreeId,
}
return r.client.NotifyHeadsChanged(newUpdate)
}
func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error) {
var (
fullResponse *syncpb.SyncFullResponse
snapshotPath []string
result acltree.AddResult
)
err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes
result, err = tree.AddRawChanges(ctx, request.Changes...)
if err != nil {
return err
}
snapshotPath = tree.SnapshotPath()
fullResponse, err = r.prepareFullSyncResponse(request.TreeId, request.SnapshotPath, request.Changes, tree)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
err = r.client.SendFullSyncResponse(senderId, fullResponse)
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
return err
}
// otherwise sending heads update message
newUpdate := &syncpb.SyncHeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
TreeId: request.TreeId,
}
return r.client.NotifyHeadsChanged(newUpdate)
}
func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) (err error) {
var (
snapshotPath []string
result acltree.AddResult
)
err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error {
// TODO: check if we already have those changes
result, err = tree.AddRawChanges(ctx, request.Changes...)
if err != nil {
return err
}
snapshotPath = tree.SnapshotPath()
return nil
})
if err != nil {
return err
}
// if error or nothing has changed
if err != nil || len(result.Added) == 0 {
return err
}
// TODO: probably here we should not send an update message, because the other node had already sent it after updating with our data
// otherwise sending heads update message
newUpdate := &syncpb.SyncHeadUpdate{
Heads: result.Heads,
Changes: result.Added,
SnapshotPath: snapshotPath,
TreeId: request.TreeId,
}
return r.client.NotifyHeadsChanged(newUpdate)
}
func (r *requestHandler) prepareFullSyncRequest(treeId string, theirPath []string, tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) {
ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath)
if err != nil {
return nil, err
}
return &syncpb.SyncFullRequest{
Heads: tree.Heads(),
Changes: ourChanges,
TreeId: treeId,
SnapshotPath: tree.SnapshotPath(),
}, nil
}
func (r *requestHandler) prepareFullSyncResponse(treeId string, theirPath []string, theirChanges []*aclpb.RawChange, tree acltree.ACLTree) (*syncpb.SyncFullResponse, error) {
// TODO: we can probably use the common snapshot calculated on the request step from previous peer
ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath)
if err != nil {
return nil, err
}
theirMap := make(map[string]struct{})
for _, ch := range theirChanges {
theirMap[ch.Id] = struct{}{}
}
// filtering our changes, so we will not send the same changes back
var final []*aclpb.RawChange
for _, ch := range ourChanges {
if _, exists := theirMap[ch.Id]; exists {
final = append(final, ch)
}
}
return &syncpb.SyncFullResponse{
Heads: tree.Heads(),
Changes: final,
TreeId: treeId,
SnapshotPath: tree.SnapshotPath(),
}, nil
}

View file

@ -1,43 +0,0 @@
package sync
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/app"
)
type service struct {
}
const CName = "SyncService"
func (s *service) Init(ctx context.Context, a *app.App) (err error) {
return nil
}
func (s *service) Name() (name string) {
return CName
}
func (s *service) Run(ctx context.Context) (err error) {
ch := make(chan *PubSubPayload)
err = s.pubSub.Listen(ch)
if err != nil {
return
}
return nil
}
func (s *service) Close(ctx context.Context) (err error) {
return nil
}
func (s *service) listen(ctx context.Context, ch chan *PubSubPayload) {
for {
select {
case <-ctx.Done():
return
case payload := <-ch:
// TODO: get object from object service and try to perform sync
}
}
}

View file

@ -1,8 +0,0 @@
package sync
import "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb"
type SyncClient interface {
NotifyHeadsChanged(update *syncpb.SyncHeadUpdate) error
RequestFullSync(id string, request *syncpb.SyncFullRequest) error
}