From c5c41472e192630b2943415522926dbd83142cb8 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Thu, 13 Jun 2024 12:17:22 +0200 Subject: [PATCH] Add general head update --- .../object/tree/synctree/responsecollector.go | 8 ++ commonspace/object/treemanager/treemanager.go | 2 + commonspace/sync/headupdate.go | 99 ------------------- commonspace/sync/objectsync/headupdate.go | 70 +++++++++++++ commonspace/sync/objectsync/synchandler.go | 64 ++++++++++++ commonspace/sync/requestmanager.go | 2 +- commonspace/sync/syncdeps/syncdeps.go | 2 +- .../sync/synctest/counterresponsecollector.go | 4 + .../sync/synctest/countersynchandler.go | 5 - 9 files changed, 150 insertions(+), 106 deletions(-) delete mode 100644 commonspace/sync/headupdate.go create mode 100644 commonspace/sync/objectsync/headupdate.go create mode 100644 commonspace/sync/objectsync/synchandler.go diff --git a/commonspace/object/tree/synctree/responsecollector.go b/commonspace/object/tree/synctree/responsecollector.go index a174eb63..6ca4792a 100644 --- a/commonspace/object/tree/synctree/responsecollector.go +++ b/commonspace/object/tree/synctree/responsecollector.go @@ -31,6 +31,10 @@ func (r *fullResponseCollector) CollectResponse(ctx context.Context, peerId, obj return nil } +func (r *fullResponseCollector) NewResponse() syncdeps.Response { + return Response{} +} + type responseCollector struct { handler syncdeps.ObjectSyncHandler } @@ -42,3 +46,7 @@ func newResponseCollector(handler syncdeps.ObjectSyncHandler) *responseCollector func (r *responseCollector) CollectResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error { return r.handler.HandleResponse(ctx, peerId, objectId, resp) } + +func (r *responseCollector) NewResponse() syncdeps.Response { + return Response{} +} diff --git a/commonspace/object/treemanager/treemanager.go b/commonspace/object/treemanager/treemanager.go index 3fd9ab4c..aef0e8ec 100644 --- a/commonspace/object/treemanager/treemanager.go +++ b/commonspace/object/treemanager/treemanager.go @@ -3,6 +3,7 @@ package treemanager import ( "context" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" ) @@ -12,6 +13,7 @@ const CName = "common.object.treemanager" type TreeManager interface { app.ComponentRunnable GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) + PickTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error DeleteTree(ctx context.Context, spaceId, treeId string) error } diff --git a/commonspace/sync/headupdate.go b/commonspace/sync/headupdate.go deleted file mode 100644 index 93362e34..00000000 --- a/commonspace/sync/headupdate.go +++ /dev/null @@ -1,99 +0,0 @@ -package sync - -import ( - "fmt" - "slices" - - "github.com/gogo/protobuf/proto" - - "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" - "github.com/anyproto/any-sync/commonspace/spacesyncproto" -) - -type BroadcastOptions struct { - EmptyPeers []string -} - -type HeadUpdate struct { - peerId string - objectId string - spaceId string - heads []string - changes []*treechangeproto.RawTreeChangeWithId - snapshotPath []string - root *treechangeproto.RawTreeChangeWithId - opts BroadcastOptions -} - -func (h *HeadUpdate) SetPeerId(peerId string) { - h.peerId = peerId -} - -func (h *HeadUpdate) SetProtoMessage(message proto.Message) error { - var ( - msg *spacesyncproto.ObjectSyncMessage - ok bool - ) - if msg, ok = message.(*spacesyncproto.ObjectSyncMessage); !ok { - return fmt.Errorf("unexpected message type: %T", message) - } - treeMsg := &treechangeproto.TreeSyncMessage{} - err := proto.Unmarshal(msg.Payload, treeMsg) - if err != nil { - return err - } - h.root = treeMsg.RootChange - headMsg := treeMsg.GetContent().GetHeadUpdate() - if headMsg == nil { - return fmt.Errorf("unexpected message type: %T", treeMsg.GetContent()) - } - h.heads = headMsg.Heads - h.changes = headMsg.Changes - h.snapshotPath = headMsg.SnapshotPath - h.spaceId = msg.SpaceId - h.objectId = msg.ObjectId - return nil -} - -func (h *HeadUpdate) ProtoMessage() (proto.Message, error) { - if h.heads != nil { - return h.SyncMessage() - } - return &spacesyncproto.ObjectSyncMessage{}, nil -} - -func (h *HeadUpdate) PeerId() string { - return h.peerId -} - -func (h *HeadUpdate) ObjectId() string { - return h.objectId -} - -func (h *HeadUpdate) ShallowCopy() *HeadUpdate { - return &HeadUpdate{ - peerId: h.peerId, - objectId: h.objectId, - heads: h.heads, - changes: h.changes, - snapshotPath: h.snapshotPath, - root: h.root, - } -} - -func (h *HeadUpdate) SyncMessage() (*spacesyncproto.ObjectSyncMessage, error) { - changes := h.changes - if slices.Contains(h.opts.EmptyPeers, h.peerId) { - changes = nil - } - treeMsg := treechangeproto.WrapHeadUpdate(&treechangeproto.TreeHeadUpdate{ - Heads: h.heads, - SnapshotPath: h.snapshotPath, - Changes: changes, - }, h.root) - return spacesyncproto.MarshallSyncMessage(treeMsg, h.spaceId, h.objectId) -} - -func (h *HeadUpdate) RemoveChanges() { - h.changes = nil -} diff --git a/commonspace/sync/objectsync/headupdate.go b/commonspace/sync/objectsync/headupdate.go new file mode 100644 index 00000000..5f4859c4 --- /dev/null +++ b/commonspace/sync/objectsync/headupdate.go @@ -0,0 +1,70 @@ +package objectsync + +import ( + "fmt" + + "github.com/gogo/protobuf/proto" + + "github.com/anyproto/any-sync/commonspace/spacesyncproto" +) + +type BroadcastOptions struct { + EmptyPeers []string +} + +type InnerHeadUpdate interface { + Marshall(data ObjectMeta) ([]byte, error) +} + +type ObjectMeta struct { + PeerId string + ObjectId string + SpaceId string +} + +type HeadUpdate struct { + Meta ObjectMeta + Bytes []byte + update InnerHeadUpdate +} + +func (h *HeadUpdate) SetPeerId(peerId string) { + h.Meta.PeerId = peerId +} + +func (h *HeadUpdate) SetProtoMessage(message proto.Message) error { + var ( + msg *spacesyncproto.ObjectSyncMessage + ok bool + ) + if msg, ok = message.(*spacesyncproto.ObjectSyncMessage); !ok { + return fmt.Errorf("unexpected message type: %T", message) + } + h.Bytes = msg.GetPayload() + h.Meta.SpaceId = msg.SpaceId + h.Meta.ObjectId = msg.ObjectId + return nil +} + +func (h *HeadUpdate) ProtoMessage() (proto.Message, error) { + if h.update != nil { + payload, err := h.update.Marshall(h.Meta) + if err != nil { + return nil, err + } + return &spacesyncproto.ObjectSyncMessage{ + SpaceId: h.Meta.SpaceId, + Payload: payload, + ObjectId: h.Meta.ObjectId, + }, nil + } + return &spacesyncproto.ObjectSyncMessage{}, nil +} + +func (h *HeadUpdate) PeerId() string { + return h.Meta.PeerId +} + +func (h *HeadUpdate) ObjectId() string { + return h.Meta.ObjectId +} diff --git a/commonspace/sync/objectsync/synchandler.go b/commonspace/sync/objectsync/synchandler.go new file mode 100644 index 00000000..b2bc3187 --- /dev/null +++ b/commonspace/sync/objectsync/synchandler.go @@ -0,0 +1,64 @@ +package objectsync + +import ( + "context" + + "github.com/cheggaaa/mb/v3" + "github.com/gogo/protobuf/proto" + "storj.io/drpc" + + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/commonspace/object/treemanager" + "github.com/anyproto/any-sync/commonspace/sync/syncdeps" +) + +const CName = "common.sync.objectsync" + +type objectSync struct { + spaceId string + manager treemanager.TreeManager +} + +func (o *objectSync) Init(a *app.App) (err error) { + o.manager = a.MustComponent(treemanager.CName).(treemanager.TreeManager) + return +} + +func (o *objectSync) Name() (name string) { + return CName +} + +func (o *objectSync) HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (syncdeps.Request, error) { + //TODO implement me + panic("implement me") +} + +func (o *objectSync) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, sendResponse func(resp proto.Message) error) (syncdeps.Request, error) { + //TODO implement me + panic("implement me") +} + +func (o *objectSync) ApplyRequest(ctx context.Context, rq syncdeps.Request, requestSender syncdeps.RequestSender) error { + //TODO implement me + panic("implement me") +} + +func (o *objectSync) TryAddMessage(ctx context.Context, peerId string, msg drpc.Message, q *mb.MB[drpc.Message]) error { + //TODO implement me + panic("implement me") +} + +func (o *objectSync) SendStreamRequest(ctx context.Context, rq syncdeps.Request, receive func(stream drpc.Stream) error) (err error) { + //TODO implement me + panic("implement me") +} + +func (o *objectSync) NewResponse() syncdeps.Response { + //TODO implement me + panic("implement me") +} + +func (o *objectSync) NewMessage() drpc.Message { + //TODO implement me + panic("implement me") +} diff --git a/commonspace/sync/requestmanager.go b/commonspace/sync/requestmanager.go index 4d179ff1..7ccbdba3 100644 --- a/commonspace/sync/requestmanager.go +++ b/commonspace/sync/requestmanager.go @@ -40,7 +40,7 @@ func NewRequestManager(handler syncdeps.SyncHandler) RequestManager { func (r *requestManager) SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error { return r.handler.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error { for { - resp := r.handler.NewResponse() + resp := collector.NewResponse() err := stream.MsgRecv(resp, streampool.EncodingProto) if err != nil { return err diff --git a/commonspace/sync/syncdeps/syncdeps.go b/commonspace/sync/syncdeps/syncdeps.go index 6e6b957a..8c5ce155 100644 --- a/commonspace/sync/syncdeps/syncdeps.go +++ b/commonspace/sync/syncdeps/syncdeps.go @@ -13,6 +13,7 @@ import ( const CName = "common.sync.syncdeps" type ResponseCollector interface { + NewResponse() Response CollectResponse(ctx context.Context, peerId, objectId string, resp Response) error } @@ -34,6 +35,5 @@ type SyncHandler interface { ApplyRequest(ctx context.Context, rq Request, requestSender RequestSender) error TryAddMessage(ctx context.Context, peerId string, msg drpc.Message, q *mb.MB[drpc.Message]) error SendStreamRequest(ctx context.Context, rq Request, receive func(stream drpc.Stream) error) (err error) - NewResponse() Response NewMessage() drpc.Message } diff --git a/commonspace/sync/synctest/counterresponsecollector.go b/commonspace/sync/synctest/counterresponsecollector.go index 797a4c4c..59ef7025 100644 --- a/commonspace/sync/synctest/counterresponsecollector.go +++ b/commonspace/sync/synctest/counterresponsecollector.go @@ -20,3 +20,7 @@ func (c *CounterResponseCollector) CollectResponse(ctx context.Context, peerId, c.counter.Add(counterResp.Value) return nil } + +func (c *CounterResponseCollector) NewResponse() syncdeps.Response { + return &synctestproto.CounterIncrease{} +} diff --git a/commonspace/sync/synctest/countersynchandler.go b/commonspace/sync/synctest/countersynchandler.go index 50dfc5e9..6ffea598 100644 --- a/commonspace/sync/synctest/countersynchandler.go +++ b/commonspace/sync/synctest/countersynchandler.go @@ -9,7 +9,6 @@ import ( "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/sync/syncdeps" - "github.com/anyproto/any-sync/commonspace/sync/synctestproto" ) type CounterSyncHandler struct { @@ -44,10 +43,6 @@ func (c *CounterSyncHandler) SendStreamRequest(ctx context.Context, rq syncdeps. return c.requestSender.SendStreamRequest(ctx, rq, receive) } -func (c *CounterSyncHandler) NewResponse() syncdeps.Response { - return &synctestproto.CounterIncrease{} -} - func (c *CounterSyncHandler) Init(a *app.App) (err error) { peerProvider := a.MustComponent(PeerName).(*PeerProvider) c.counter = a.MustComponent(CounterName).(*Counter)