1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-09 17:45:03 +09:00

Add general head update

This commit is contained in:
mcrakhman 2024-06-13 12:17:22 +02:00
parent 14f28274ac
commit c5c41472e1
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
9 changed files with 150 additions and 106 deletions

View file

@ -31,6 +31,10 @@ func (r *fullResponseCollector) CollectResponse(ctx context.Context, peerId, obj
return nil
}
func (r *fullResponseCollector) NewResponse() syncdeps.Response {
return Response{}
}
type responseCollector struct {
handler syncdeps.ObjectSyncHandler
}
@ -42,3 +46,7 @@ func newResponseCollector(handler syncdeps.ObjectSyncHandler) *responseCollector
func (r *responseCollector) CollectResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error {
return r.handler.HandleResponse(ctx, peerId, objectId, resp)
}
func (r *responseCollector) NewResponse() syncdeps.Response {
return Response{}
}

View file

@ -3,6 +3,7 @@ package treemanager
import (
"context"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
)
@ -12,6 +13,7 @@ const CName = "common.object.treemanager"
type TreeManager interface {
app.ComponentRunnable
GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error)
PickTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error)
MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error
DeleteTree(ctx context.Context, spaceId, treeId string) error
}

View file

@ -1,99 +0,0 @@
package sync
import (
"fmt"
"slices"
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
type BroadcastOptions struct {
EmptyPeers []string
}
type HeadUpdate struct {
peerId string
objectId string
spaceId string
heads []string
changes []*treechangeproto.RawTreeChangeWithId
snapshotPath []string
root *treechangeproto.RawTreeChangeWithId
opts BroadcastOptions
}
func (h *HeadUpdate) SetPeerId(peerId string) {
h.peerId = peerId
}
func (h *HeadUpdate) SetProtoMessage(message proto.Message) error {
var (
msg *spacesyncproto.ObjectSyncMessage
ok bool
)
if msg, ok = message.(*spacesyncproto.ObjectSyncMessage); !ok {
return fmt.Errorf("unexpected message type: %T", message)
}
treeMsg := &treechangeproto.TreeSyncMessage{}
err := proto.Unmarshal(msg.Payload, treeMsg)
if err != nil {
return err
}
h.root = treeMsg.RootChange
headMsg := treeMsg.GetContent().GetHeadUpdate()
if headMsg == nil {
return fmt.Errorf("unexpected message type: %T", treeMsg.GetContent())
}
h.heads = headMsg.Heads
h.changes = headMsg.Changes
h.snapshotPath = headMsg.SnapshotPath
h.spaceId = msg.SpaceId
h.objectId = msg.ObjectId
return nil
}
func (h *HeadUpdate) ProtoMessage() (proto.Message, error) {
if h.heads != nil {
return h.SyncMessage()
}
return &spacesyncproto.ObjectSyncMessage{}, nil
}
func (h *HeadUpdate) PeerId() string {
return h.peerId
}
func (h *HeadUpdate) ObjectId() string {
return h.objectId
}
func (h *HeadUpdate) ShallowCopy() *HeadUpdate {
return &HeadUpdate{
peerId: h.peerId,
objectId: h.objectId,
heads: h.heads,
changes: h.changes,
snapshotPath: h.snapshotPath,
root: h.root,
}
}
func (h *HeadUpdate) SyncMessage() (*spacesyncproto.ObjectSyncMessage, error) {
changes := h.changes
if slices.Contains(h.opts.EmptyPeers, h.peerId) {
changes = nil
}
treeMsg := treechangeproto.WrapHeadUpdate(&treechangeproto.TreeHeadUpdate{
Heads: h.heads,
SnapshotPath: h.snapshotPath,
Changes: changes,
}, h.root)
return spacesyncproto.MarshallSyncMessage(treeMsg, h.spaceId, h.objectId)
}
func (h *HeadUpdate) RemoveChanges() {
h.changes = nil
}

View file

@ -0,0 +1,70 @@
package objectsync
import (
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
type BroadcastOptions struct {
EmptyPeers []string
}
type InnerHeadUpdate interface {
Marshall(data ObjectMeta) ([]byte, error)
}
type ObjectMeta struct {
PeerId string
ObjectId string
SpaceId string
}
type HeadUpdate struct {
Meta ObjectMeta
Bytes []byte
update InnerHeadUpdate
}
func (h *HeadUpdate) SetPeerId(peerId string) {
h.Meta.PeerId = peerId
}
func (h *HeadUpdate) SetProtoMessage(message proto.Message) error {
var (
msg *spacesyncproto.ObjectSyncMessage
ok bool
)
if msg, ok = message.(*spacesyncproto.ObjectSyncMessage); !ok {
return fmt.Errorf("unexpected message type: %T", message)
}
h.Bytes = msg.GetPayload()
h.Meta.SpaceId = msg.SpaceId
h.Meta.ObjectId = msg.ObjectId
return nil
}
func (h *HeadUpdate) ProtoMessage() (proto.Message, error) {
if h.update != nil {
payload, err := h.update.Marshall(h.Meta)
if err != nil {
return nil, err
}
return &spacesyncproto.ObjectSyncMessage{
SpaceId: h.Meta.SpaceId,
Payload: payload,
ObjectId: h.Meta.ObjectId,
}, nil
}
return &spacesyncproto.ObjectSyncMessage{}, nil
}
func (h *HeadUpdate) PeerId() string {
return h.Meta.PeerId
}
func (h *HeadUpdate) ObjectId() string {
return h.Meta.ObjectId
}

View file

@ -0,0 +1,64 @@
package objectsync
import (
"context"
"github.com/cheggaaa/mb/v3"
"github.com/gogo/protobuf/proto"
"storj.io/drpc"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
)
const CName = "common.sync.objectsync"
type objectSync struct {
spaceId string
manager treemanager.TreeManager
}
func (o *objectSync) Init(a *app.App) (err error) {
o.manager = a.MustComponent(treemanager.CName).(treemanager.TreeManager)
return
}
func (o *objectSync) Name() (name string) {
return CName
}
func (o *objectSync) HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (syncdeps.Request, error) {
//TODO implement me
panic("implement me")
}
func (o *objectSync) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, sendResponse func(resp proto.Message) error) (syncdeps.Request, error) {
//TODO implement me
panic("implement me")
}
func (o *objectSync) ApplyRequest(ctx context.Context, rq syncdeps.Request, requestSender syncdeps.RequestSender) error {
//TODO implement me
panic("implement me")
}
func (o *objectSync) TryAddMessage(ctx context.Context, peerId string, msg drpc.Message, q *mb.MB[drpc.Message]) error {
//TODO implement me
panic("implement me")
}
func (o *objectSync) SendStreamRequest(ctx context.Context, rq syncdeps.Request, receive func(stream drpc.Stream) error) (err error) {
//TODO implement me
panic("implement me")
}
func (o *objectSync) NewResponse() syncdeps.Response {
//TODO implement me
panic("implement me")
}
func (o *objectSync) NewMessage() drpc.Message {
//TODO implement me
panic("implement me")
}

View file

@ -40,7 +40,7 @@ func NewRequestManager(handler syncdeps.SyncHandler) RequestManager {
func (r *requestManager) SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error {
return r.handler.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error {
for {
resp := r.handler.NewResponse()
resp := collector.NewResponse()
err := stream.MsgRecv(resp, streampool.EncodingProto)
if err != nil {
return err

View file

@ -13,6 +13,7 @@ import (
const CName = "common.sync.syncdeps"
type ResponseCollector interface {
NewResponse() Response
CollectResponse(ctx context.Context, peerId, objectId string, resp Response) error
}
@ -34,6 +35,5 @@ type SyncHandler interface {
ApplyRequest(ctx context.Context, rq Request, requestSender RequestSender) error
TryAddMessage(ctx context.Context, peerId string, msg drpc.Message, q *mb.MB[drpc.Message]) error
SendStreamRequest(ctx context.Context, rq Request, receive func(stream drpc.Stream) error) (err error)
NewResponse() Response
NewMessage() drpc.Message
}

View file

@ -20,3 +20,7 @@ func (c *CounterResponseCollector) CollectResponse(ctx context.Context, peerId,
c.counter.Add(counterResp.Value)
return nil
}
func (c *CounterResponseCollector) NewResponse() syncdeps.Response {
return &synctestproto.CounterIncrease{}
}

View file

@ -9,7 +9,6 @@ import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
)
type CounterSyncHandler struct {
@ -44,10 +43,6 @@ func (c *CounterSyncHandler) SendStreamRequest(ctx context.Context, rq syncdeps.
return c.requestSender.SendStreamRequest(ctx, rq, receive)
}
func (c *CounterSyncHandler) NewResponse() syncdeps.Response {
return &synctestproto.CounterIncrease{}
}
func (c *CounterSyncHandler) Init(a *app.App) (err error) {
peerProvider := a.MustComponent(PeerName).(*PeerProvider)
c.counter = a.MustComponent(CounterName).(*Counter)