1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-08 05:57:03 +09:00

WIP sync logic

This commit is contained in:
mcrakhman 2023-07-03 15:48:48 +02:00
parent 145332b0f7
commit 0d16c5d7e4
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
13 changed files with 314 additions and 110 deletions

View file

@ -44,6 +44,7 @@ type AclList interface {
Records() []*AclRecord Records() []*AclRecord
AclState() *AclState AclState() *AclState
IsAfter(first string, second string) (bool, error) IsAfter(first string, second string) (bool, error)
HasHead(head string) bool
Head() *AclRecord Head() *AclRecord
Get(id string) (*AclRecord, error) Get(id string) (*AclRecord, error)
GetIndex(idx int) (*AclRecord, error) GetIndex(idx int) (*AclRecord, error)
@ -199,7 +200,7 @@ func (a *aclList) ValidateRawRecord(rawRec *consensusproto.RawRecord) (err error
func (a *aclList) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (err error) { func (a *aclList) AddRawRecords(rawRecords []*consensusproto.RawRecordWithId) (err error) {
for _, rec := range rawRecords { for _, rec := range rawRecords {
err = a.AddRawRecord(rec) err = a.AddRawRecord(rec)
if err != nil { if err != nil && err != ErrRecordAlreadyExists {
return return
} }
} }
@ -257,6 +258,11 @@ func (a *aclList) Head() *AclRecord {
return a.records[len(a.records)-1] return a.records[len(a.records)-1]
} }
func (a *aclList) HasHead(head string) bool {
_, exists := a.indexes[head]
return exists
}
func (a *aclList) Get(id string) (*AclRecord, error) { func (a *aclList) Get(id string) (*AclRecord, error) {
recIdx, ok := a.indexes[id] recIdx, ok := a.indexes[id]
if !ok { if !ok {

View file

@ -23,15 +23,84 @@ type aclSyncProtocol struct {
} }
func (a *aclSyncProtocol) HeadUpdate(ctx context.Context, senderId string, update *consensusproto.LogHeadUpdate) (request *consensusproto.LogSyncMessage, err error) { func (a *aclSyncProtocol) HeadUpdate(ctx context.Context, senderId string, update *consensusproto.LogHeadUpdate) (request *consensusproto.LogSyncMessage, err error) {
return isEmptyUpdate := len(update.Records) == 0
log := a.log.With(
zap.String("senderId", senderId),
zap.String("update head", update.Head),
zap.Int("len(update records)", len(update.Records)))
log.DebugCtx(ctx, "received acl head update message")
defer func() {
if err != nil {
log.ErrorCtx(ctx, "acl head update finished with error", zap.Error(err))
} else if request != nil {
cnt := request.Content.GetFullSyncRequest()
log.DebugCtx(ctx, "returning acl full sync request", zap.String("request head", cnt.Head))
} else {
if !isEmptyUpdate {
log.DebugCtx(ctx, "acl head update finished correctly")
}
}
}()
if isEmptyUpdate {
headEquals := a.aclList.Head().Id == update.Head
log.DebugCtx(ctx, "is empty acl head update", zap.Bool("headEquals", headEquals))
if headEquals {
return
}
return a.reqFactory.CreateFullSyncRequest(a.aclList, update.Head)
}
if a.aclList.HasHead(update.Head) {
return
}
err = a.aclList.AddRawRecords(update.Records)
if err != nil && err != list.ErrIncorrectRecordSequence {
return
}
return a.reqFactory.CreateFullSyncRequest(a.aclList, update.Head)
} }
func (a *aclSyncProtocol) FullSyncRequest(ctx context.Context, senderId string, request *consensusproto.LogFullSyncRequest) (response *consensusproto.LogSyncMessage, err error) { func (a *aclSyncProtocol) FullSyncRequest(ctx context.Context, senderId string, request *consensusproto.LogFullSyncRequest) (response *consensusproto.LogSyncMessage, err error) {
return log := a.log.With(
zap.String("senderId", senderId),
zap.String("request head", request.Head),
zap.Int("len(request records)", len(request.Records)))
log.DebugCtx(ctx, "received acl full sync request message")
defer func() {
if err != nil {
log.ErrorCtx(ctx, "acl full sync request finished with error", zap.Error(err))
} else if response != nil {
cnt := response.Content.GetFullSyncResponse()
log.DebugCtx(ctx, "acl full sync response sent", zap.String("response head", cnt.Head), zap.Int("len(response records)", len(cnt.Records)))
}
}()
if len(request.Records) > 0 && !a.aclList.HasHead(request.Head) {
err = a.aclList.AddRawRecords(request.Records)
if err != nil {
return
}
}
return a.reqFactory.CreateFullSyncResponse(a.aclList, request.Head)
} }
func (a *aclSyncProtocol) FullSyncResponse(ctx context.Context, senderId string, response *consensusproto.LogFullSyncResponse) (err error) { func (a *aclSyncProtocol) FullSyncResponse(ctx context.Context, senderId string, response *consensusproto.LogFullSyncResponse) (err error) {
return log := a.log.With(
zap.String("senderId", senderId),
zap.String("response head", response.Head),
zap.Int("len(response records)", len(response.Records)))
log.DebugCtx(ctx, "received acl full sync response message")
defer func() {
if err != nil {
log.ErrorCtx(ctx, "acl full sync response failed", zap.Error(err))
} else {
log.DebugCtx(ctx, "acl full sync response succeeded")
}
}()
if a.aclList.HasHead(response.Head) {
return
}
return a.aclList.AddRawRecords(response.Records)
} }
func newAclSyncProtocol(spaceId string, aclList list.AclList, reqFactory RequestFactory) *aclSyncProtocol { func newAclSyncProtocol(spaceId string, aclList list.AclList, reqFactory RequestFactory) *aclSyncProtocol {

View file

@ -7,7 +7,7 @@ import (
type RequestFactory interface { type RequestFactory interface {
CreateHeadUpdate(l list.AclList, added []*consensusproto.RawRecordWithId) (msg *consensusproto.LogSyncMessage) CreateHeadUpdate(l list.AclList, added []*consensusproto.RawRecordWithId) (msg *consensusproto.LogSyncMessage)
CreateFullSyncRequest(theirHead string) (req *consensusproto.LogSyncMessage, err error) CreateFullSyncRequest(l list.AclList, theirHead string) (req *consensusproto.LogSyncMessage, err error)
CreateFullSyncResponse(l list.AclList, theirHead string) (*consensusproto.LogSyncMessage, error) CreateFullSyncResponse(l list.AclList, theirHead string) (*consensusproto.LogSyncMessage, error)
} }
@ -21,7 +21,7 @@ func (r *requestFactory) CreateHeadUpdate(l list.AclList, added []*consensusprot
return return
} }
func (r *requestFactory) CreateFullSyncRequest(theirHead string) (req *consensusproto.LogSyncMessage, err error) { func (r *requestFactory) CreateFullSyncRequest(l list.AclList, theirHead string) (req *consensusproto.LogSyncMessage, err error) {
return return
} }

View file

@ -109,7 +109,7 @@ func (s *SyncAcl) SyncWithPeer(ctx context.Context, peerId string) (err error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
headUpdate := s.syncClient.CreateHeadUpdate(s, nil) headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
return s.syncClient.SendUpdate(peerId, s.Id(), headUpdate) return s.syncClient.SendUpdate(peerId, headUpdate)
} }
func (s *SyncAcl) Close(ctx context.Context) (err error) { func (s *SyncAcl) Close(ctx context.Context) (err error) {

View file

@ -2,11 +2,19 @@ package syncacl
import ( import (
"context" "context"
"errors"
"github.com/anyproto/any-sync/commonspace/object/acl/list" "github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler" "github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
"github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/gogo/protobuf/proto"
)
var (
ErrMessageIsRequest = errors.New("message is request")
ErrMessageIsNotRequest = errors.New("message is not request")
) )
type syncAclHandler struct { type syncAclHandler struct {
@ -27,10 +35,46 @@ func newSyncAclHandler(spaceId string, aclList list.AclList, syncClient SyncClie
} }
} }
func (s *syncAclHandler) HandleMessage(ctx context.Context, senderId string, req *spacesyncproto.ObjectSyncMessage) (err error) { func (s *syncAclHandler) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
unmarshalled := &consensusproto.LogSyncMessage{}
err = proto.Unmarshal(message.Payload, unmarshalled)
if err != nil {
return
}
content := unmarshalled.GetContent()
s.aclList.Lock()
defer s.aclList.Unlock()
switch {
case content.GetHeadUpdate() != nil:
var syncReq *consensusproto.LogSyncMessage
syncReq, err = s.syncProtocol.HeadUpdate(ctx, senderId, content.GetHeadUpdate())
if err != nil || syncReq == nil {
return
}
return s.syncClient.QueueRequest(senderId, syncReq)
case content.GetFullSyncRequest() != nil:
return ErrMessageIsRequest
case content.GetFullSyncResponse() != nil:
return s.syncProtocol.FullSyncResponse(ctx, senderId, content.GetFullSyncResponse())
}
return return
} }
func (s *syncAclHandler) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) { func (s *syncAclHandler) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
return unmarshalled := &consensusproto.LogSyncMessage{}
err = proto.Unmarshal(request.Payload, unmarshalled)
if err != nil {
return
}
fullSyncRequest := unmarshalled.GetContent().GetFullSyncRequest()
if fullSyncRequest == nil {
return nil, ErrMessageIsNotRequest
}
s.aclList.Lock()
defer s.aclList.Unlock()
aclResp, err := s.syncProtocol.FullSyncRequest(ctx, senderId, fullSyncRequest)
if err != nil {
return
}
return spacesyncproto.MarshallSyncMessage(aclResp, s.spaceId, s.aclList.Id())
} }

View file

@ -7,14 +7,15 @@ import (
"github.com/anyproto/any-sync/commonspace/requestmanager" "github.com/anyproto/any-sync/commonspace/requestmanager"
"github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/consensus/consensusproto" "github.com/anyproto/any-sync/consensus/consensusproto"
"go.uber.org/zap"
) )
type SyncClient interface { type SyncClient interface {
RequestFactory RequestFactory
Broadcast(msg *consensusproto.LogSyncMessage) Broadcast(msg *consensusproto.LogSyncMessage)
SendUpdate(peerId, objectId string, msg *consensusproto.LogSyncMessage) (err error) SendUpdate(peerId string, msg *consensusproto.LogSyncMessage) (err error)
QueueRequest(peerId, objectId string, msg *consensusproto.LogSyncMessage) (err error) QueueRequest(peerId string, msg *consensusproto.LogSyncMessage) (err error)
SendRequest(ctx context.Context, peerId, objectId string, msg *consensusproto.LogSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendRequest(ctx context.Context, peerId string, msg *consensusproto.LogSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
} }
type syncClient struct { type syncClient struct {
@ -24,21 +25,6 @@ type syncClient struct {
peerManager peermanager.PeerManager peerManager peermanager.PeerManager
} }
func (s *syncClient) Broadcast(msg *consensusproto.LogSyncMessage) {
}
func (s *syncClient) SendUpdate(peerId, objectId string, msg *consensusproto.LogSyncMessage) (err error) {
return
}
func (s *syncClient) QueueRequest(peerId, objectId string, msg *consensusproto.LogSyncMessage) (err error) {
return
}
func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, msg *consensusproto.LogSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
return
}
func NewSyncClient(spaceId string, requestManager requestmanager.RequestManager, peerManager peermanager.PeerManager) SyncClient { func NewSyncClient(spaceId string, requestManager requestmanager.RequestManager, peerManager peermanager.PeerManager) SyncClient {
return &syncClient{ return &syncClient{
RequestFactory: &requestFactory{}, RequestFactory: &requestFactory{},
@ -47,3 +33,38 @@ func NewSyncClient(spaceId string, requestManager requestmanager.RequestManager,
peerManager: peerManager, peerManager: peerManager,
} }
} }
func (s *syncClient) Broadcast(msg *consensusproto.LogSyncMessage) {
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, msg.Id)
if err != nil {
return
}
err = s.peerManager.Broadcast(context.Background(), objMsg)
if err != nil {
log.Debug("broadcast error", zap.Error(err))
}
}
func (s *syncClient) SendUpdate(peerId string, msg *consensusproto.LogSyncMessage) (err error) {
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, msg.Id)
if err != nil {
return
}
return s.peerManager.SendPeer(context.Background(), peerId, objMsg)
}
func (s *syncClient) SendRequest(ctx context.Context, peerId string, msg *consensusproto.LogSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, msg.Id)
if err != nil {
return
}
return s.requestManager.SendRequest(ctx, peerId, objMsg)
}
func (s *syncClient) QueueRequest(peerId string, msg *consensusproto.LogSyncMessage) (err error) {
objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, msg.Id)
if err != nil {
return
}
return s.requestManager.QueueRequest(peerId, objMsg)
}

View file

@ -35,7 +35,7 @@ func NewSyncClient(spaceId string, requestManager requestmanager.RequestManager,
} }
func (s *syncClient) Broadcast(msg *treechangeproto.TreeSyncMessage) { func (s *syncClient) Broadcast(msg *treechangeproto.TreeSyncMessage) {
objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "") objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, msg.RootChange.Id)
if err != nil { if err != nil {
return return
} }
@ -46,7 +46,7 @@ func (s *syncClient) Broadcast(msg *treechangeproto.TreeSyncMessage) {
} }
func (s *syncClient) SendUpdate(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) { func (s *syncClient) SendUpdate(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) {
objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, "") objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, objectId)
if err != nil { if err != nil {
return return
} }
@ -54,7 +54,7 @@ func (s *syncClient) SendUpdate(peerId, objectId string, msg *treechangeproto.Tr
} }
func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, "") objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, objectId)
if err != nil { if err != nil {
return return
} }
@ -62,23 +62,9 @@ func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, m
} }
func (s *syncClient) QueueRequest(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) { func (s *syncClient) QueueRequest(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) {
objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, "") objMsg, err := spacesyncproto.MarshallSyncMessage(msg, s.spaceId, objectId)
if err != nil { if err != nil {
return return
} }
return s.requestManager.QueueRequest(peerId, objMsg) return s.requestManager.QueueRequest(peerId, objMsg)
} }
func MarshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) {
payload, err := message.Marshal()
if err != nil {
return
}
objMsg = &spacesyncproto.ObjectSyncMessage{
ReplyId: replyId,
Payload: payload,
ObjectId: objectId,
SpaceId: spaceId,
}
return
}

View file

@ -81,7 +81,7 @@ func (s *syncTreeHandler) handleRequest(ctx context.Context, senderId string, fu
if err != nil { if err != nil {
return return
} }
response, err = MarshallTreeMessage(treeResp, s.spaceId, s.objTree.Id(), "") response, err = spacesyncproto.MarshallSyncMessage(treeResp, s.spaceId, s.objTree.Id())
return return
} }

View file

@ -103,7 +103,7 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) {
Heads: []string{"h3"}, Heads: []string{"h3"},
} }
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
syncReq := &treechangeproto.TreeSyncMessage{} syncReq := &treechangeproto.TreeSyncMessage{}
fx.syncHandler.heads = []string{"h2"} fx.syncHandler.heads = []string{"h2"}
@ -127,7 +127,7 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) {
Heads: []string{"h1"}, Heads: []string{"h1"},
} }
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
fx.syncHandler.heads = []string{"h1"} fx.syncHandler.heads = []string{"h1"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
@ -145,7 +145,7 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) {
Heads: []string{"h3"}, Heads: []string{"h3"},
} }
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
fx.syncHandler.heads = []string{"h2"} fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
@ -167,7 +167,7 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) {
Heads: []string{"h3"}, Heads: []string{"h3"},
} }
treeMsg := treechangeproto.WrapFullRequest(fullRequest, chWithId) treeMsg := treechangeproto.WrapFullRequest(fullRequest, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
fx.syncHandler.heads = []string{"h2"} fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
@ -186,7 +186,7 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) {
Heads: []string{"h3"}, Heads: []string{"h3"},
} }
treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId) treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
fx.syncHandler.heads = []string{"h2"} fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
@ -209,7 +209,7 @@ func TestSyncTreeHandler_HandleRequest(t *testing.T) {
chWithId := &treechangeproto.RawTreeChangeWithId{} chWithId := &treechangeproto.RawTreeChangeWithId{}
fullRequest := &treechangeproto.TreeFullSyncRequest{} fullRequest := &treechangeproto.TreeFullSyncRequest{}
treeMsg := treechangeproto.WrapFullRequest(fullRequest, chWithId) treeMsg := treechangeproto.WrapFullRequest(fullRequest, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := spacesyncproto.MarshallSyncMessage(treeMsg, "spaceId", treeId)
syncResp := &treechangeproto.TreeSyncMessage{} syncResp := &treechangeproto.TreeSyncMessage{}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
@ -230,7 +230,7 @@ func TestSyncTreeHandler_HandleRequest(t *testing.T) {
headUpdate := &treechangeproto.TreeHeadUpdate{} headUpdate := &treechangeproto.TreeHeadUpdate{}
headUpdateMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) headUpdateMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
for _, msg := range []*treechangeproto.TreeSyncMessage{responseMsg, headUpdateMsg} { for _, msg := range []*treechangeproto.TreeSyncMessage{responseMsg, headUpdateMsg} {
objectMsg, _ := MarshallTreeMessage(msg, "spaceId", treeId, "") objectMsg, _ := spacesyncproto.MarshallSyncMessage(msg, "spaceId", treeId)
_, err := fx.syncHandler.HandleRequest(ctx, fx.senderId, objectMsg) _, err := fx.syncHandler.HandleRequest(ctx, fx.senderId, objectMsg)
require.Equal(t, err, ErrMessageIsNotRequest) require.Equal(t, err, ErrMessageIsNotRequest)

View file

@ -60,7 +60,7 @@ func (t *treeSyncProtocol) HeadUpdate(ctx context.Context, senderId string, upda
// isEmptyUpdate is sent when the tree is brought up from cache // isEmptyUpdate is sent when the tree is brought up from cache
if isEmptyUpdate { if isEmptyUpdate {
headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads) headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads)
log.DebugCtx(ctx, "is empty update", zap.String("treeId", objTree.Id()), zap.Bool("headEquals", headEquals)) log.DebugCtx(ctx, "is empty update", zap.Bool("headEquals", headEquals))
if headEquals { if headEquals {
return return
} }
@ -70,7 +70,7 @@ func (t *treeSyncProtocol) HeadUpdate(ctx context.Context, senderId string, upda
return return
} }
if t.alreadyHasHeads(objTree, update.Heads) { if t.hasHeads(objTree, update.Heads) {
return return
} }
@ -82,7 +82,7 @@ func (t *treeSyncProtocol) HeadUpdate(ctx context.Context, senderId string, upda
return return
} }
if t.alreadyHasHeads(objTree, update.Heads) { if t.hasHeads(objTree, update.Heads) {
return return
} }
@ -109,7 +109,7 @@ func (t *treeSyncProtocol) FullSyncRequest(ctx context.Context, senderId string,
} }
}() }()
if len(request.Changes) != 0 && !t.alreadyHasHeads(objTree, request.Heads) { if len(request.Changes) != 0 && !t.hasHeads(objTree, request.Heads) {
_, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{ _, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{
NewHeads: request.Heads, NewHeads: request.Heads,
RawChanges: request.Changes, RawChanges: request.Changes,
@ -137,7 +137,7 @@ func (t *treeSyncProtocol) FullSyncResponse(ctx context.Context, senderId string
log.DebugCtx(ctx, "full sync response succeeded") log.DebugCtx(ctx, "full sync response succeeded")
} }
}() }()
if t.alreadyHasHeads(objTree, response.Heads) { if t.hasHeads(objTree, response.Heads) {
return return
} }
@ -148,6 +148,6 @@ func (t *treeSyncProtocol) FullSyncResponse(ctx context.Context, senderId string
return return
} }
func (t *treeSyncProtocol) alreadyHasHeads(ot objecttree.ObjectTree, heads []string) bool { func (t *treeSyncProtocol) hasHeads(ot objecttree.ObjectTree, heads []string) bool {
return slice.UnsortedEquals(ot.Heads(), heads) || ot.HasChanges(heads...) return slice.UnsortedEquals(ot.Heads(), heads) || ot.HasChanges(heads...)
} }

View file

@ -2,6 +2,7 @@
package spacesyncproto package spacesyncproto
import ( import (
"github.com/gogo/protobuf/proto"
"storj.io/drpc" "storj.io/drpc"
) )
@ -16,3 +17,16 @@ func (c ClientFactoryFunc) Client(cc drpc.Conn) DRPCSpaceSyncClient {
type ClientFactory interface { type ClientFactory interface {
Client(cc drpc.Conn) DRPCSpaceSyncClient Client(cc drpc.Conn) DRPCSpaceSyncClient
} }
func MarshallSyncMessage(message proto.Marshaler, spaceId, objectId string) (objMsg *ObjectSyncMessage, err error) {
payload, err := message.Marshal()
if err != nil {
return
}
objMsg = &ObjectSyncMessage{
Payload: payload,
ObjectId: objectId,
SpaceId: spaceId,
}
return
}

View file

@ -811,7 +811,8 @@ func (m *LogHeadUpdate) GetRecords() []*RawRecordWithId {
// LogFullSyncRequest is a message sent when consensus log needs full sync // LogFullSyncRequest is a message sent when consensus log needs full sync
type LogFullSyncRequest struct { type LogFullSyncRequest struct {
Head string `protobuf:"bytes,1,opt,name=head,proto3" json:"head,omitempty"` Head string `protobuf:"bytes,1,opt,name=head,proto3" json:"head,omitempty"`
Records []*RawRecordWithId `protobuf:"bytes,2,rep,name=records,proto3" json:"records,omitempty"`
} }
func (m *LogFullSyncRequest) Reset() { *m = LogFullSyncRequest{} } func (m *LogFullSyncRequest) Reset() { *m = LogFullSyncRequest{} }
@ -854,6 +855,13 @@ func (m *LogFullSyncRequest) GetHead() string {
return "" return ""
} }
func (m *LogFullSyncRequest) GetRecords() []*RawRecordWithId {
if m != nil {
return m.Records
}
return nil
}
// LogFullSyncResponse is a message sent as a response for a specific full sync // LogFullSyncResponse is a message sent as a response for a specific full sync
type LogFullSyncResponse struct { type LogFullSyncResponse struct {
Head string `protobuf:"bytes,1,opt,name=head,proto3" json:"head,omitempty"` Head string `protobuf:"bytes,1,opt,name=head,proto3" json:"head,omitempty"`
@ -931,56 +939,57 @@ func init() {
} }
var fileDescriptor_b8d7f1c16b400059 = []byte{ var fileDescriptor_b8d7f1c16b400059 = []byte{
// 782 bytes of a gzipped FileDescriptorProto // 785 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xcf, 0x6e, 0xd3, 0x4c, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0x5f, 0x4f, 0xdb, 0x56,
0x10, 0x8f, 0xed, 0x36, 0x89, 0x27, 0x5f, 0x13, 0x7f, 0x5b, 0x84, 0x4c, 0x44, 0xd3, 0xc8, 0x08, 0x14, 0x8f, 0x6d, 0x48, 0xe2, 0x93, 0x91, 0x78, 0x87, 0x69, 0xf2, 0xa2, 0x11, 0x22, 0x4f, 0x93,
0x29, 0x54, 0x28, 0x85, 0x20, 0x90, 0x50, 0x85, 0x10, 0x8d, 0x52, 0x25, 0x52, 0x9a, 0x82, 0xab, 0x18, 0x9a, 0xc2, 0x96, 0x69, 0x93, 0x26, 0x34, 0x4d, 0x23, 0x0a, 0x4a, 0xa4, 0x10, 0x36, 0x23,
0x52, 0x09, 0x24, 0x84, 0xf1, 0x6e, 0x5c, 0xd3, 0xd4, 0x6b, 0xec, 0x4d, 0xdb, 0x5c, 0xb9, 0x72, 0x86, 0xb4, 0x49, 0x55, 0x5d, 0xdf, 0x1b, 0xe3, 0x12, 0x7c, 0x5d, 0xfb, 0x06, 0xc8, 0x6b, 0x5f,
0xe1, 0x05, 0x78, 0x1f, 0x8e, 0x3d, 0x72, 0x44, 0xed, 0x85, 0x07, 0xe0, 0x01, 0x90, 0xd7, 0x76, 0xfb, 0xd2, 0x2f, 0xd0, 0xef, 0xd3, 0x47, 0x1e, 0xfb, 0x58, 0xc1, 0x4b, 0x3f, 0x40, 0x3f, 0x40,
0xe2, 0x26, 0x4e, 0x11, 0x82, 0x4b, 0xbb, 0xf3, 0xff, 0x37, 0xbf, 0x19, 0x4f, 0x60, 0xdd, 0xa4, 0xe5, 0x6b, 0x3b, 0x31, 0xf9, 0x43, 0x55, 0x95, 0x17, 0xf0, 0xf9, 0xff, 0x3b, 0xbf, 0x73, 0xee,
0x8e, 0x4f, 0x1c, 0x7f, 0xe8, 0x4f, 0x5e, 0xae, 0x47, 0x19, 0x5d, 0xe7, 0x7f, 0x13, 0xda, 0x3a, 0x09, 0xec, 0xd8, 0xcc, 0x0b, 0xa9, 0x17, 0x8e, 0xc2, 0xe9, 0x97, 0x1f, 0x30, 0xce, 0x76, 0xc4,
0x57, 0xa0, 0xe2, 0x58, 0xf1, 0x3c, 0x90, 0xb5, 0xf7, 0x20, 0x75, 0xa9, 0x85, 0x8a, 0x20, 0xda, 0xdf, 0x8c, 0xb6, 0x21, 0x14, 0x58, 0x9e, 0x28, 0xfe, 0x8e, 0x64, 0xe3, 0x29, 0x28, 0x3d, 0xe6,
0x58, 0x15, 0xaa, 0x42, 0xed, 0x3f, 0x5d, 0xb4, 0x31, 0x52, 0x21, 0xe7, 0x1a, 0xa3, 0x01, 0x35, 0x60, 0x19, 0x64, 0x97, 0xe8, 0x52, 0x5d, 0xda, 0xfa, 0xc2, 0x94, 0x5d, 0x82, 0x3a, 0x14, 0x7c,
0xb0, 0x2a, 0x72, 0x65, 0x2c, 0xa2, 0xc7, 0x90, 0xf3, 0x88, 0x49, 0x3d, 0xec, 0xab, 0x52, 0x55, 0x6b, 0x3c, 0x64, 0x16, 0xd1, 0x65, 0xa1, 0x4c, 0x45, 0xfc, 0x1d, 0x0a, 0x01, 0xb5, 0x59, 0x40,
0xaa, 0x15, 0x1a, 0xab, 0xf5, 0xcb, 0x29, 0xeb, 0xba, 0x71, 0xa2, 0x73, 0x8f, 0x7d, 0x9b, 0x1d, 0x42, 0x5d, 0xa9, 0x2b, 0x5b, 0xa5, 0xe6, 0x66, 0xe3, 0x6e, 0xca, 0x86, 0x69, 0x5d, 0x9a, 0xc2,
0x74, 0xb0, 0x1e, 0xfb, 0x6b, 0x5f, 0x04, 0x90, 0xc7, 0xc6, 0x64, 0x09, 0xe1, 0x72, 0x89, 0x9b, 0xe3, 0xc4, 0xe5, 0xa7, 0x5d, 0x62, 0xa6, 0xfe, 0xc6, 0x2b, 0x09, 0xd4, 0x89, 0x31, 0x5b, 0x42,
0x20, 0xfb, 0xb6, 0xe5, 0x18, 0x6c, 0xe8, 0x91, 0xa8, 0xfc, 0x44, 0x81, 0xd6, 0x40, 0x31, 0x4c, 0xba, 0x5b, 0xe2, 0x5b, 0x50, 0x43, 0xd7, 0xf1, 0x2c, 0x3e, 0x0a, 0x68, 0x52, 0x7e, 0xaa, 0xc0,
0x93, 0xb8, 0x8c, 0x7a, 0x1d, 0x4c, 0x1c, 0x66, 0xb3, 0x91, 0x2a, 0x71, 0xa7, 0x19, 0x3d, 0xba, 0x6d, 0xd0, 0x2c, 0xdb, 0xa6, 0x3e, 0x67, 0x41, 0x97, 0x50, 0x8f, 0xbb, 0x7c, 0xac, 0x2b, 0xc2,
0x0b, 0xff, 0xc7, 0xba, 0xdd, 0x71, 0xc6, 0x05, 0xee, 0x3c, 0x6b, 0xd0, 0x36, 0xa0, 0x34, 0x85, 0x69, 0x4e, 0x8f, 0x3f, 0xc2, 0x97, 0xa9, 0xee, 0x68, 0x92, 0x71, 0x45, 0x38, 0xcf, 0x1b, 0x8c,
0xfd, 0x0a, 0x90, 0x21, 0x63, 0x01, 0x3a, 0x39, 0x60, 0x4c, 0x73, 0x20, 0x1b, 0x35, 0x76, 0x1d, 0x5d, 0xa8, 0xcc, 0x60, 0xbf, 0x07, 0x64, 0xcc, 0x58, 0x84, 0x4e, 0x8d, 0x18, 0x33, 0x3c, 0xc8,
0xb2, 0xae, 0x47, 0x8e, 0x3b, 0x61, 0x88, 0xac, 0x47, 0x12, 0x2a, 0x43, 0xde, 0x8e, 0x01, 0x87, 0x27, 0x8d, 0x7d, 0x0d, 0x79, 0x3f, 0xa0, 0x17, 0xdd, 0x38, 0x44, 0x35, 0x13, 0x09, 0xab, 0x50,
0x5d, 0x8d, 0x65, 0x84, 0x60, 0x01, 0x1b, 0xcc, 0x88, 0x1a, 0xe1, 0xef, 0x80, 0x06, 0x66, 0x1f, 0x74, 0x53, 0xc0, 0x71, 0x57, 0x13, 0x19, 0x11, 0x56, 0x88, 0xc5, 0xad, 0xa4, 0x11, 0xf1, 0x1d,
0x11, 0x9f, 0x19, 0x47, 0x2e, 0x07, 0x2d, 0xe9, 0x13, 0x85, 0xb6, 0x00, 0xe2, 0xce, 0xa1, 0xf6, 0xd1, 0xc0, 0xdd, 0x73, 0x1a, 0x72, 0xeb, 0xdc, 0x17, 0xa0, 0x15, 0x73, 0xaa, 0x30, 0x56, 0x40,
0x08, 0x96, 0xba, 0xd4, 0x7a, 0x86, 0xb1, 0x4e, 0x3e, 0x0c, 0x89, 0xcf, 0xd0, 0x6d, 0x90, 0x06, 0x3e, 0x3c, 0x33, 0x7e, 0x83, 0xb5, 0x1e, 0x73, 0xfe, 0x22, 0xc4, 0xa4, 0xcf, 0x46, 0x34, 0xe4,
0xd4, 0xe2, 0x95, 0x0b, 0x8d, 0xe5, 0xe9, 0xd1, 0x74, 0xa9, 0xa5, 0x07, 0x76, 0xed, 0x35, 0x28, 0xf8, 0x3d, 0x28, 0x43, 0xe6, 0x88, 0xca, 0xa5, 0xe6, 0xfa, 0xec, 0x68, 0x7a, 0xcc, 0x31, 0x23,
0x21, 0xda, 0x44, 0xe8, 0x35, 0x58, 0x1c, 0x50, 0xab, 0x13, 0x77, 0x1a, 0x0a, 0xe8, 0x3e, 0x64, 0xbb, 0xf1, 0x3f, 0x68, 0x31, 0xda, 0x4c, 0xe8, 0x57, 0xb0, 0x3a, 0x64, 0x4e, 0x37, 0xed, 0x34,
0xc3, 0xf9, 0x71, 0xcc, 0x85, 0xc6, 0x8d, 0xb9, 0xe3, 0xd6, 0x23, 0x47, 0x6d, 0x1b, 0x4a, 0x5d, 0x16, 0xf0, 0x67, 0xc8, 0xc7, 0xf3, 0x13, 0x98, 0x4b, 0xcd, 0x6f, 0x96, 0x8e, 0xdb, 0x4c, 0x1c,
0x6a, 0xed, 0x1b, 0xcc, 0x3c, 0x88, 0x73, 0x97, 0x21, 0x7f, 0x12, 0xc8, 0x1d, 0xec, 0xab, 0x42, 0x8d, 0x03, 0xa8, 0xf4, 0x98, 0x73, 0x62, 0x71, 0xfb, 0x34, 0xcd, 0x5d, 0x85, 0xe2, 0x65, 0x24,
0x55, 0x0a, 0x7a, 0x8f, 0x65, 0x54, 0x01, 0x18, 0x3a, 0x63, 0xab, 0xc8, 0xad, 0x09, 0x8d, 0xf6, 0x77, 0x49, 0xa8, 0x4b, 0x75, 0x25, 0xea, 0x3d, 0x95, 0xb1, 0x06, 0x30, 0xf2, 0x26, 0x56, 0x59,
0x49, 0xe0, 0x4d, 0xf2, 0x7c, 0xad, 0x63, 0xe2, 0xcc, 0x43, 0x9a, 0xd8, 0x4c, 0xf1, 0xcf, 0x36, 0x58, 0x33, 0x1a, 0xe3, 0x85, 0x24, 0x9a, 0x14, 0xf9, 0xda, 0x17, 0xd4, 0x5b, 0x86, 0x34, 0xb3,
0x13, 0xdd, 0x81, 0x45, 0xe2, 0x79, 0xd4, 0xe3, 0xfc, 0xa7, 0xf0, 0xd6, 0xf2, 0x3c, 0x3d, 0xf4, 0x99, 0xf2, 0xa7, 0x6d, 0x26, 0xfe, 0x00, 0xab, 0x34, 0x08, 0x58, 0x20, 0xf8, 0x5f, 0xc0, 0x5b,
0xd0, 0x1e, 0x82, 0xd4, 0xf2, 0x3c, 0x54, 0x8f, 0x23, 0x02, 0x08, 0xc5, 0x86, 0x9a, 0x12, 0xd1, 0x3b, 0x08, 0xcc, 0xd8, 0xc3, 0xf8, 0x15, 0x94, 0x76, 0x10, 0x60, 0x23, 0x8d, 0x88, 0x20, 0x94,
0xa4, 0x98, 0xf8, 0x71, 0xd8, 0x47, 0x11, 0x96, 0xbb, 0xd4, 0xda, 0x1d, 0x39, 0x66, 0x93, 0x3a, 0x9b, 0xfa, 0x82, 0x88, 0x16, 0x23, 0x34, 0x4c, 0xc3, 0x9e, 0xcb, 0xb0, 0xde, 0x63, 0xce, 0xd1,
0x8c, 0x38, 0xec, 0xa5, 0x31, 0x18, 0x12, 0xf4, 0x14, 0xe0, 0x80, 0x18, 0x78, 0xcf, 0xc5, 0x06, 0xd8, 0xb3, 0x5b, 0xcc, 0xe3, 0xd4, 0xe3, 0xff, 0x5a, 0xc3, 0x11, 0xc5, 0x3f, 0x01, 0x4e, 0xa9,
0x23, 0xd1, 0xd8, 0x56, 0x52, 0xc6, 0xd6, 0x1e, 0x3b, 0xb5, 0x33, 0x7a, 0x22, 0x04, 0xf5, 0xa0, 0x45, 0x8e, 0x7d, 0x62, 0x71, 0x9a, 0x8c, 0x6d, 0x63, 0xc1, 0xd8, 0x3a, 0x13, 0xa7, 0x4e, 0xce,
0xd4, 0x1f, 0x0e, 0x06, 0x41, 0xe2, 0x88, 0xec, 0x68, 0x50, 0x5a, 0x4a, 0x96, 0xad, 0xcb, 0x9e, 0xcc, 0x84, 0x60, 0x1f, 0x2a, 0x83, 0xd1, 0x70, 0x18, 0x25, 0x4e, 0xc8, 0x4e, 0x06, 0x65, 0x2c,
0xed, 0x8c, 0x3e, 0x1d, 0x8c, 0x5e, 0x80, 0x32, 0x51, 0xf9, 0x6e, 0x90, 0x22, 0x62, 0xe5, 0xd6, 0xc8, 0xb2, 0x7f, 0xd7, 0xb3, 0x93, 0x33, 0x67, 0x83, 0xf1, 0x1f, 0xd0, 0xa6, 0xaa, 0xd0, 0x8f,
0x95, 0x09, 0x43, 0xd7, 0x76, 0x46, 0x9f, 0x09, 0xdf, 0xcc, 0xc1, 0xe2, 0x71, 0xd0, 0xac, 0x36, 0x52, 0x24, 0xac, 0x7c, 0x77, 0x6f, 0xc2, 0xd8, 0xb5, 0x93, 0x33, 0xe7, 0xc2, 0xf7, 0x0a, 0xb0,
0x82, 0x62, 0xc4, 0xc1, 0x36, 0xf1, 0x7d, 0xc3, 0x22, 0x89, 0xbb, 0x23, 0xa7, 0xdd, 0x1d, 0x79, 0x7a, 0x11, 0x35, 0x6b, 0x8c, 0xa1, 0x9c, 0x70, 0x70, 0x40, 0xc3, 0xd0, 0x72, 0x68, 0xe6, 0xee,
0xf2, 0xbd, 0x3d, 0x81, 0x9c, 0x19, 0x12, 0x77, 0x05, 0x9c, 0x69, 0x7a, 0xf5, 0x38, 0x46, 0x7b, 0xa8, 0x8b, 0xee, 0x8e, 0x3a, 0x7d, 0x6f, 0x7f, 0x40, 0xc1, 0x8e, 0x89, 0xbb, 0x07, 0xce, 0x2c,
0xc3, 0x77, 0x68, 0xc2, 0x62, 0xf0, 0xc5, 0x05, 0x2c, 0x46, 0xb5, 0xf9, 0xfb, 0x2f, 0x36, 0x48, 0xbd, 0x66, 0x1a, 0x63, 0x3c, 0x12, 0x3b, 0x34, 0x65, 0x31, 0x7a, 0x71, 0x11, 0x8b, 0x49, 0x6d,
0xab, 0x01, 0x9a, 0xe5, 0x37, 0xad, 0x88, 0x86, 0xf9, 0x22, 0x4c, 0x13, 0xf7, 0x8f, 0xf1, 0xac, 0xf1, 0xfd, 0x19, 0x1b, 0x64, 0xd8, 0x80, 0xf3, 0xfc, 0x3e, 0x74, 0x11, 0x22, 0x76, 0x68, 0x96,
0xbd, 0x85, 0x7c, 0xbc, 0x82, 0xa8, 0x08, 0xb0, 0xe7, 0x90, 0x53, 0x97, 0x98, 0x8c, 0x60, 0x25, 0xf3, 0x07, 0xae, 0xb2, 0xfd, 0x18, 0x8a, 0xe9, 0xf6, 0x62, 0x19, 0xe0, 0xd8, 0xa3, 0x57, 0x3e,
0x83, 0x96, 0x40, 0xee, 0x52, 0xab, 0x75, 0x6a, 0xfb, 0xcc, 0x57, 0x04, 0x54, 0x82, 0x42, 0x97, 0xb5, 0x39, 0x25, 0x5a, 0x0e, 0xd7, 0x40, 0xed, 0x31, 0xa7, 0x7d, 0xe5, 0x86, 0x3c, 0xd4, 0x24,
0x5a, 0x3d, 0xca, 0xb6, 0xe8, 0xd0, 0xc1, 0x8a, 0x88, 0x10, 0x14, 0xc3, 0xa4, 0x4d, 0xea, 0xf4, 0xac, 0x40, 0xa9, 0xc7, 0x9c, 0x3e, 0xe3, 0xfb, 0x6c, 0xe4, 0x11, 0x4d, 0x46, 0x84, 0x72, 0x9c,
0x07, 0xb6, 0xc9, 0x14, 0x09, 0x29, 0x50, 0x68, 0x05, 0x8b, 0xbc, 0xd3, 0xef, 0xfb, 0x84, 0x29, 0xb4, 0xc5, 0xbc, 0xc1, 0xd0, 0xb5, 0xb9, 0xa6, 0xa0, 0x06, 0xa5, 0x76, 0xf4, 0x06, 0x0e, 0x07,
0x3f, 0xa5, 0xc6, 0x0f, 0x01, 0xe4, 0x66, 0x8c, 0x06, 0x6d, 0x40, 0x36, 0x3c, 0x44, 0x28, 0x6d, 0x83, 0x90, 0x72, 0xed, 0xbd, 0xd2, 0x7c, 0x27, 0x81, 0xda, 0x4a, 0xd1, 0xe0, 0x2e, 0xe4, 0xe3,
0x7b, 0x27, 0x57, 0xa6, 0x8c, 0xa6, 0xcd, 0x3b, 0x87, 0xa8, 0x07, 0xf2, 0xf8, 0x1a, 0xa1, 0xea, 0x1b, 0x86, 0x8b, 0x16, 0x7f, 0x7a, 0xa0, 0xaa, 0x38, 0x6b, 0x3e, 0x3c, 0xc3, 0x3e, 0xa8, 0x93,
0x4c, 0x8f, 0x53, 0x87, 0xaa, 0xfc, 0x3b, 0x16, 0x50, 0x0f, 0xf2, 0xf1, 0xc1, 0x40, 0xab, 0x29, 0x43, 0x86, 0xf5, 0xb9, 0x1e, 0x67, 0x6e, 0x5c, 0xf5, 0x63, 0x2c, 0x60, 0x1f, 0x8a, 0xe9, 0xad,
0x70, 0x92, 0xa7, 0xa9, 0xbc, 0x32, 0xcf, 0x81, 0xdf, 0x9a, 0x9a, 0x70, 0x4f, 0xd8, 0x6c, 0x7c, 0xc1, 0xcd, 0x05, 0x70, 0xb2, 0x57, 0xad, 0xba, 0xb1, 0xcc, 0x41, 0x9c, 0xa9, 0x2d, 0xe9, 0x27,
0x3d, 0xaf, 0x08, 0x67, 0xe7, 0x15, 0xe1, 0xfb, 0x79, 0x45, 0xf8, 0x7c, 0x51, 0xc9, 0x9c, 0x5d, 0x69, 0xaf, 0xf9, 0xfa, 0xa6, 0x26, 0x5d, 0xdf, 0xd4, 0xa4, 0xb7, 0x37, 0x35, 0xe9, 0xe5, 0x6d,
0x54, 0x32, 0xdf, 0x2e, 0x2a, 0x99, 0x57, 0xea, 0xbc, 0xdf, 0xdf, 0x77, 0x59, 0xfe, 0xef, 0xc1, 0x2d, 0x77, 0x7d, 0x5b, 0xcb, 0xbd, 0xb9, 0xad, 0xe5, 0xfe, 0xd3, 0x97, 0xfd, 0x74, 0x3f, 0xc9,
0xaf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xd6, 0x8a, 0x01, 0xa8, 0xa2, 0x07, 0x00, 0x00, 0x8b, 0x7f, 0xbf, 0x7c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x8b, 0x1f, 0x1a, 0x5a, 0xdd, 0x07, 0x00,
0x00,
} }
func (m *Log) Marshal() (dAtA []byte, err error) { func (m *Log) Marshal() (dAtA []byte, err error) {
@ -1604,6 +1613,20 @@ func (m *LogFullSyncRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i _ = i
var l int var l int
_ = l _ = l
if len(m.Records) > 0 {
for iNdEx := len(m.Records) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Records[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintConsensus(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.Head) > 0 { if len(m.Head) > 0 {
i -= len(m.Head) i -= len(m.Head)
copy(dAtA[i:], m.Head) copy(dAtA[i:], m.Head)
@ -1951,6 +1974,12 @@ func (m *LogFullSyncRequest) Size() (n int) {
if l > 0 { if l > 0 {
n += 1 + l + sovConsensus(uint64(l)) n += 1 + l + sovConsensus(uint64(l))
} }
if len(m.Records) > 0 {
for _, e := range m.Records {
l = e.Size()
n += 1 + l + sovConsensus(uint64(l))
}
}
return n return n
} }
@ -3677,6 +3706,40 @@ func (m *LogFullSyncRequest) Unmarshal(dAtA []byte) error {
} }
m.Head = string(dAtA[iNdEx:postIndex]) m.Head = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Records", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowConsensus
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthConsensus
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthConsensus
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Records = append(m.Records, &RawRecordWithId{})
if err := m.Records[len(m.Records)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipConsensus(dAtA[iNdEx:]) skippy, err := skipConsensus(dAtA[iNdEx:])

View file

@ -101,6 +101,7 @@ message LogHeadUpdate {
// LogFullSyncRequest is a message sent when consensus log needs full sync // LogFullSyncRequest is a message sent when consensus log needs full sync
message LogFullSyncRequest { message LogFullSyncRequest {
string head = 1; string head = 1;
repeated RawRecordWithId records = 2;
} }
// LogFullSyncResponse is a message sent as a response for a specific full sync // LogFullSyncResponse is a message sent as a response for a specific full sync