1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-11 10:18:08 +09:00

WIP test sync protocol

This commit is contained in:
mcrakhman 2024-06-02 14:55:11 +02:00
parent 20b64b3940
commit fb18c54702
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
35 changed files with 849 additions and 152 deletions

View file

@ -1,10 +0,0 @@
package sync
type SyncDeps struct {
HeadUpdateHandler HeadUpdateHandler
HeadUpdateSender HeadUpdateSender
ResponseHandler ResponseHandler
RequestHandler RequestHandler
RequestSender RequestSender
MergeFilter MergeFilterFunc
}

View file

@ -10,6 +10,10 @@ import (
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)
type BroadcastOptions struct {
EmptyPeers []string
}
type HeadUpdate struct {
peerId string
objectId string

View file

@ -1,12 +0,0 @@
package sync
import "context"
type BroadcastOptions struct {
EmptyPeers []string
}
type HeadUpdateSender interface {
SendHeadUpdate(ctx context.Context, peerId string, headUpdate *HeadUpdate) error
BroadcastHeadUpdate(ctx context.Context, opts BroadcastOptions, headUpdate *HeadUpdate) error
}

View file

@ -1,34 +1,20 @@
package sync
import (
"context"
"strings"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"storj.io/drpc"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/net/streampool"
)
type Request interface {
PeerId() string
ObjectId() string
}
type Response interface {
// heads []string
// changes []*treechangeproto.RawTreeChangeWithId
// root *treechangeproto.RawTreeChangeWithId
}
type RequestManager interface {
QueueRequest(rq Request) error
HandleStreamRequest(rq Request, stream drpc.Stream) error
}
type RequestHandler interface {
HandleRequest(rq Request) (Request, error)
HandleStreamRequest(rq Request, send func(resp proto.Message) error) (Request, error)
QueueRequest(rq syncdeps.Request) error
HandleStreamRequest(ctx context.Context, rq syncdeps.Request, stream drpc.Stream) error
}
type StreamResponse struct {
@ -36,24 +22,14 @@ type StreamResponse struct {
Connection drpc.Conn
}
type RequestSender interface {
SendRequest(rq Request) (resp Response, err error)
SendStreamRequest(rq Request, receive func(stream drpc.Stream) error) (err error)
}
type ResponseHandler interface {
NewResponse() Response
HandleResponse(peerId, objectId string, resp Response) error
}
type requestManager struct {
requestPool RequestPool
requestHandler RequestHandler
responseHandler ResponseHandler
requestSender RequestSender
requestHandler syncdeps.RequestHandler
responseHandler syncdeps.ResponseHandler
requestSender syncdeps.RequestSender
}
func NewRequestManager(deps SyncDeps) RequestManager {
func NewRequestManager(deps syncdeps.SyncDeps) RequestManager {
return &requestManager{
requestPool: NewRequestPool(),
requestHandler: deps.RequestHandler,
@ -62,16 +38,16 @@ func NewRequestManager(deps SyncDeps) RequestManager {
}
}
func (r *requestManager) QueueRequest(rq Request) error {
return r.requestPool.QueueRequestAction(rq.PeerId(), rq.ObjectId(), func() {
err := r.requestSender.SendStreamRequest(rq, func(stream drpc.Stream) error {
func (r *requestManager) QueueRequest(rq syncdeps.Request) error {
return r.requestPool.QueueRequestAction(rq.PeerId(), rq.ObjectId(), func(ctx context.Context) {
err := r.requestSender.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error {
for {
resp := r.responseHandler.NewResponse()
err := stream.MsgRecv(resp, streampool.EncodingProto)
if err != nil {
return err
}
err = r.responseHandler.HandleResponse(rq.PeerId(), rq.ObjectId(), resp)
err = r.responseHandler.HandleResponse(ctx, rq.PeerId(), rq.ObjectId(), resp)
if err != nil {
return err
}
@ -83,12 +59,12 @@ func (r *requestManager) QueueRequest(rq Request) error {
})
}
func (r *requestManager) HandleStreamRequest(rq Request, stream drpc.Stream) error {
func (r *requestManager) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, stream drpc.Stream) error {
if !r.requestPool.TryTake(rq.PeerId(), rq.ObjectId()) {
return nil
}
defer r.requestPool.Release(rq.PeerId(), rq.ObjectId())
newRq, err := r.requestHandler.HandleStreamRequest(rq, func(resp proto.Message) error {
newRq, err := r.requestHandler.HandleStreamRequest(ctx, rq, func(resp proto.Message) error {
return stream.MsgSend(resp, streampool.EncodingProto)
})
if err != nil {

View file

@ -1,26 +1,32 @@
package sync
import (
"context"
"sync"
)
type RequestPool interface {
TryTake(peerId, objectId string) bool
Release(peerId, objectId string)
QueueRequestAction(peerId, objectId string, action func()) (err error)
QueueRequestAction(peerId, objectId string, action func(ctx context.Context)) (err error)
}
type requestPool struct {
mu sync.Mutex
taken map[string]struct{}
pools map[string]*tryAddQueue
ctx context.Context
cancel context.CancelFunc
isClosed bool
}
func NewRequestPool() RequestPool {
ctx, cancel := context.WithCancel(context.Background())
return &requestPool{
taken: make(map[string]struct{}),
pools: make(map[string]*tryAddQueue),
ctx: ctx,
cancel: cancel,
taken: make(map[string]struct{}),
pools: make(map[string]*tryAddQueue),
}
}
@ -47,7 +53,7 @@ func (rp *requestPool) Release(peerId, objectId string) {
delete(rp.taken, id)
}
func (rp *requestPool) QueueRequestAction(peerId, objectId string, action func()) (err error) {
func (rp *requestPool) QueueRequestAction(peerId, objectId string, action func(ctx context.Context)) (err error) {
rp.mu.Lock()
if rp.isClosed {
rp.mu.Unlock()
@ -70,7 +76,7 @@ func (rp *requestPool) QueueRequestAction(peerId, objectId string, action func()
pool.TryAdd(objectId, wrappedAction, func() {})
return
}
action()
action(rp.ctx)
rp.Release(peerId, objectId)
}
pool.Replace(objectId, wrappedAction, func() {})

View file

@ -7,7 +7,9 @@ import (
"go.uber.org/zap"
"storj.io/drpc"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/util/multiqueue"
)
@ -16,32 +18,43 @@ const CName = "common.commonspace.sync"
var log = logger.NewNamed("sync")
type SyncService interface {
GetQueueProvider() multiqueue.QueueProvider[drpc.Message]
app.Component
GetQueue(peerId string) *multiqueue.Queue[drpc.Message]
HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error
HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error
QueueRequest(ctx context.Context, rq syncdeps.Request) error
}
type MergeFilterFunc func(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error
type syncService struct {
sendQueueProvider multiqueue.QueueProvider[drpc.Message]
receiveQueue multiqueue.MultiQueue[drpc.Message]
manager RequestManager
handler HeadUpdateHandler
sender HeadUpdateSender
mergeFilter MergeFilterFunc
handler syncdeps.HeadUpdateHandler
mergeFilter syncdeps.MergeFilterFunc
newMessage func() drpc.Message
ctx context.Context
cancel context.CancelFunc
}
func NewSyncService(deps SyncDeps) SyncService {
s := &syncService{}
s.ctx, s.cancel = context.WithCancel(context.Background())
func (s *syncService) Init(a *app.App) (err error) {
factory := a.MustComponent(syncdeps.CName).(syncdeps.SyncDepsFactory)
s.sendQueueProvider = multiqueue.NewQueueProvider[drpc.Message](100, s.handleOutgoingMessage)
s.receiveQueue = multiqueue.New[drpc.Message](s.handleIncomingMessage, 100)
s.sender = deps.HeadUpdateSender
deps := factory.SyncDeps()
s.handler = deps.HeadUpdateHandler
s.mergeFilter = deps.MergeFilter
s.newMessage = deps.ReadMessageConstructor
s.manager = NewRequestManager(deps)
return s
s.ctx, s.cancel = context.WithCancel(context.Background())
return nil
}
func (s *syncService) Name() (name string) {
return CName
}
func NewSyncService() SyncService {
return &syncService{}
}
func (s *syncService) handleOutgoingMessage(id string, msg drpc.Message, q *mb.MB[drpc.Message]) error {
@ -62,18 +75,23 @@ func (s *syncService) handleIncomingMessage(msg drpc.Message) {
}
}
func (s *syncService) GetQueueProvider() multiqueue.QueueProvider[drpc.Message] {
return s.sendQueueProvider
func (s *syncService) GetQueue(peerId string) *multiqueue.Queue[drpc.Message] {
queue := s.sendQueueProvider.GetQueue(peerId)
return queue
}
func (s *syncService) NewReadMessage() drpc.Message {
return s.newMessage()
}
func (s *syncService) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error {
return s.receiveQueue.Add(ctx, peerId, msg)
}
func (s *syncService) HandleStreamRequest(ctx context.Context, req Request, stream drpc.Stream) error {
return s.manager.HandleStreamRequest(req, stream)
func (s *syncService) QueueRequest(ctx context.Context, rq syncdeps.Request) error {
return s.manager.QueueRequest(rq)
}
func (s *syncService) NewReadMessage() drpc.Message {
return &HeadUpdate{}
func (s *syncService) HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error {
return s.manager.HandleStreamRequest(ctx, req, stream)
}

View file

@ -11,6 +11,7 @@ import (
"github.com/anyproto/any-sync/commonspace/sync/synctest"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
"github.com/anyproto/any-sync/net/rpc/rpctest"
"github.com/anyproto/any-sync/net/streampool"
)
var ctx = context.Background()
@ -25,6 +26,7 @@ func TestNewSyncService(t *testing.T) {
Register(rpctest.NewTestServer()).
Register(synctest.NewRpcServer()).
Register(synctest.NewPeerProvider("first"))
//Register(synctest.NewCounterStreamOpener())
secondApp.Register(connProvider).
Register(rpctest.NewTestServer()).
Register(synctest.NewRpcServer()).
@ -49,3 +51,28 @@ func TestNewSyncService(t *testing.T) {
})
require.NoError(t, err)
}
type counterFixture struct {
a *app.App
}
type counterFixtureParams struct {
connProvider *synctest.ConnProvider
start int32
delta int32
}
func newFixture(t *testing.T, peerId string, params counterFixtureParams) *counterFixture {
a := &app.App{}
a.Register(params.connProvider).
Register(rpctest.NewTestServer()).
Register(synctest.NewCounterStreamOpener()).
Register(synctest.NewPeerProvider(peerId)).
Register(synctest.NewCounter(params.start, params.delta)).
Register(streampool.NewStreamPool()).
Register(synctest.NewCounterSyncDepsFactory()).
Register(NewSyncService()).
//Register().
Register(synctest.NewRpcServer())
return nil
}

View file

@ -1,4 +1,4 @@
package sync
package syncdeps
import (
"context"

View file

@ -0,0 +1,9 @@
package syncdeps
import "github.com/gogo/protobuf/proto"
type Request interface {
PeerId() string
ObjectId() string
Proto() proto.Message
}

View file

@ -0,0 +1,11 @@
package syncdeps
import (
"context"
"github.com/gogo/protobuf/proto"
)
type RequestHandler interface {
HandleStreamRequest(ctx context.Context, rq Request, send func(resp proto.Message) error) (Request, error)
}

View file

@ -0,0 +1,11 @@
package syncdeps
import (
"context"
"storj.io/drpc"
)
type RequestSender interface {
SendStreamRequest(ctx context.Context, rq Request, receive func(stream drpc.Stream) error) (err error)
}

View file

@ -0,0 +1,7 @@
package syncdeps
type Response interface {
// heads []string
// changes []*treechangeproto.RawTreeChangeWithId
// root *treechangeproto.RawTreeChangeWithId
}

View file

@ -0,0 +1,8 @@
package syncdeps
import "context"
type ResponseHandler interface {
NewResponse() Response
HandleResponse(ctx context.Context, peerId, objectId string, resp Response) error
}

View file

@ -0,0 +1,19 @@
package syncdeps
import (
"context"
"github.com/cheggaaa/mb/v3"
"storj.io/drpc"
)
type MergeFilterFunc func(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error
type SyncDeps struct {
HeadUpdateHandler HeadUpdateHandler
ResponseHandler ResponseHandler
RequestHandler RequestHandler
RequestSender RequestSender
MergeFilter MergeFilterFunc
ReadMessageConstructor func() drpc.Message
}

View file

@ -0,0 +1,10 @@
package syncdeps
import "github.com/anyproto/any-sync/app"
const CName = "common.sync.syncdeps"
type SyncDepsFactory interface {
app.Component
SyncDeps() SyncDeps
}

View file

@ -0,0 +1,92 @@
package synctest
import (
"sync"
"golang.org/x/exp/slices"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/util/slice"
)
const CounterName = "counter"
type Counter struct {
sync.Mutex
counters map[int32]struct{}
next, delta int32
maxVal int32
}
func (c *Counter) Init(a *app.App) (err error) {
return nil
}
func (c *Counter) Name() (name string) {
return CounterName
}
func (c *Counter) Generate() (ret int32) {
c.Lock()
defer c.Unlock()
ret = c.next
c.next += c.delta
c.counters[ret] = struct{}{}
if ret > c.maxVal {
c.maxVal = ret
}
return ret
}
func (c *Counter) CheckComplete() bool {
c.Lock()
defer c.Unlock()
return c.maxVal <= int32(len(c.counters))
}
func (c *Counter) Add(val int32) {
c.Lock()
defer c.Unlock()
if val > c.maxVal {
c.maxVal = val
}
c.counters[val] = struct{}{}
}
func (c *Counter) Dump() (ret []int32) {
c.Lock()
defer c.Unlock()
for val := range c.counters {
ret = append(ret, val)
}
slices.Sort(ret)
return
}
func (c *Counter) DiffCurrentNew(vals []int32) (toSend, toAsk []int32) {
c.Lock()
defer c.Unlock()
m := make(map[int32]struct{})
for _, val := range vals {
m[val] = struct{}{}
}
_, toSend, toAsk = slice.CompareMaps(m, c.counters)
return
}
func (c *Counter) KnownCounters() (ret []int32) {
c.Lock()
defer c.Unlock()
for val := range c.counters {
ret = append(ret, val)
}
return
}
func NewCounter(cur, delta int32) *Counter {
return &Counter{
counters: make(map[int32]struct{}),
next: cur,
delta: delta,
}
}

View file

@ -0,0 +1,61 @@
package synctest
import (
"context"
"fmt"
"math/rand/v2"
"time"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
"github.com/anyproto/any-sync/net/streampool"
"github.com/anyproto/any-sync/util/periodicsync"
)
var log = logger.NewNamed(syncdeps.CName)
const CounterGeneratorName = "countergenerator"
type CounterGenerator struct {
counter *Counter
streamPool streampool.StreamPool
loop periodicsync.PeriodicSync
ownId string
}
func (c *CounterGenerator) Init(a *app.App) (err error) {
c.counter = a.MustComponent(CounterName).(*Counter)
c.ownId = a.MustComponent(PeerName).(*PeerProvider).myPeer
c.streamPool = a.MustComponent(streampool.CName).(streampool.StreamPool)
c.loop = periodicsync.NewPeriodicSyncDuration(time.Millisecond*100, time.Millisecond*100, c.update, log)
return
}
func (c *CounterGenerator) Name() (name string) {
return CounterGeneratorName
}
func (c *CounterGenerator) update(ctx context.Context) error {
res := c.counter.Generate()
randChoice := rand.Int()%2 == 0
if randChoice {
fmt.Println("Broadcast", res, "by", c.ownId)
return c.streamPool.Broadcast(ctx, &synctestproto.CounterIncrease{
Value: res,
ObjectId: "counter",
})
}
return nil
}
func (c *CounterGenerator) Run(ctx context.Context) (err error) {
c.loop.Run()
return nil
}
func (c *CounterGenerator) Close(ctx context.Context) (err error) {
c.loop.Close()
return nil
}

View file

@ -0,0 +1,34 @@
package synctest
import (
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
)
type CounterRequest struct {
peerId string
*synctestproto.CounterRequest
}
func (c CounterRequest) Proto() proto.Message {
return c.CounterRequest
}
func NewCounterRequest(peerId, objectId string, counters []int32) CounterRequest {
return CounterRequest{
peerId: peerId,
CounterRequest: &synctestproto.CounterRequest{
ExistingValues: counters,
ObjectId: objectId,
},
}
}
func (c CounterRequest) PeerId() string {
return c.peerId
}
func (c CounterRequest) ObjectId() string {
return c.CounterRequest.ObjectId
}

View file

@ -0,0 +1,29 @@
package synctest
import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
)
type CounterRequestHandler struct {
counter *Counter
}
func (c *CounterRequestHandler) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, send func(resp proto.Message) error) (syncdeps.Request, error) {
counterRequest := rq.(*CounterRequest)
toSend, toAsk := c.counter.DiffCurrentNew(counterRequest.ExistingValues)
for _, value := range toSend {
_ = send(&synctestproto.CounterIncrease{
Value: value,
ObjectId: counterRequest.ObjectId(),
})
}
if len(toAsk) == 0 {
return nil, nil
}
return NewCounterRequest(counterRequest.PeerId(), counterRequest.ObjectId(), toAsk), nil
}

View file

@ -0,0 +1,30 @@
package synctest
import (
"context"
"storj.io/drpc"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
)
type CounterRequestSender struct {
peerProvider *PeerProvider
}
func (c *CounterRequestSender) SendStreamRequest(ctx context.Context, rq syncdeps.Request, receive func(stream drpc.Stream) error) (err error) {
peerId := rq.PeerId()
pr, err := c.peerProvider.GetPeer(peerId)
if err != nil {
return err
}
return pr.DoDrpc(ctx, func(conn drpc.Conn) error {
cl := synctestproto.NewDRPCCounterSyncClient(conn)
stream, err := cl.CounterStreamRequest(ctx, rq.Proto().(*synctestproto.CounterRequest))
if err != nil {
return err
}
return receive(stream)
})
}

View file

@ -0,0 +1,22 @@
package synctest
import (
"context"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
)
type CounterResponseHandler struct {
counter *Counter
}
func (c *CounterResponseHandler) NewResponse() syncdeps.Response {
return &synctestproto.CounterIncrease{}
}
func (c *CounterResponseHandler) HandleResponse(ctx context.Context, peerId, objectId string, resp syncdeps.Response) error {
counterResp := resp.(*synctestproto.CounterIncrease)
c.counter.Add(counterResp.Value)
return nil
}

View file

@ -0,0 +1,33 @@
package synctest
import (
"github.com/gogo/protobuf/proto"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
)
type CounterUpdate struct {
counter int32
objectId string
}
func (c *CounterUpdate) message() proto.Message {
return &synctestproto.CounterIncrease{
Value: c.counter,
ObjectId: c.objectId,
}
}
func (c *CounterUpdate) SetProtoMessage(message proto.Message) error {
msg := message.(*synctestproto.CounterIncrease)
c.counter = msg.Value
c.objectId = msg.ObjectId
return nil
}
func (c *CounterUpdate) ProtoMessage() (proto.Message, error) {
if c.objectId == "" {
return &synctestproto.CounterIncrease{}, nil
}
return c.message(), nil
}

View file

@ -0,0 +1,27 @@
package synctest
import (
"context"
"storj.io/drpc"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/net/peer"
)
type CounterUpdateHandler struct {
counter *Counter
}
func (c *CounterUpdateHandler) HandleHeadUpdate(ctx context.Context, headUpdate drpc.Message) (syncdeps.Request, error) {
update := headUpdate.(CounterUpdate)
c.counter.Add(update.counter)
if c.counter.CheckComplete() {
return nil, nil
}
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
return nil, err
}
return NewCounterRequest(peerId, update.objectId, c.counter.KnownCounters()), nil
}

View file

@ -1,10 +1,17 @@
package synctest
import (
"context"
"fmt"
"storj.io/drpc"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/rpc/rpctest"
"github.com/anyproto/any-sync/net/rpc/server"
"github.com/anyproto/any-sync/net/streampool"
"github.com/anyproto/any-sync/util/multiqueue"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/sync/synctestproto"
@ -12,7 +19,17 @@ import (
const RpcName = "rpcserver"
type SyncService interface {
app.Component
GetQueue(peerId string) *multiqueue.Queue[drpc.Message]
HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error
HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error
QueueRequest(ctx context.Context, rq syncdeps.Request) error
}
type RpcServer struct {
streamPool streampool.StreamPool
syncService SyncService
}
func NewRpcServer() *RpcServer {
@ -20,16 +37,17 @@ func NewRpcServer() *RpcServer {
}
func (r *RpcServer) CounterStreamRequest(request *synctestproto.CounterRequest, stream synctestproto.DRPCCounterSync_CounterStreamRequestStream) error {
fmt.Println(request.ObjectId)
fmt.Println(peer.CtxPeerId(stream.Context()))
return nil
}
func (r *RpcServer) CounterStream(request *synctestproto.CounterRequest, stream synctestproto.DRPCCounterSync_CounterStreamStream) error {
func (r *RpcServer) CounterStream(stream synctestproto.DRPCCounterSync_CounterStreamStream) error {
return nil
}
func (r *RpcServer) Init(a *app.App) (err error) {
serv := a.MustComponent(server.CName).(*rpctest.TestServer)
r.streamPool = a.MustComponent(streampool.CName).(streampool.StreamPool)
return synctestproto.DRPCRegisterCounterSync(serv, r)
}

View file

@ -0,0 +1,39 @@
package synctest
import (
"context"
"storj.io/drpc"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/streampool"
)
func NewCounterStreamOpener() streampool.StreamOpener {
return &CounterStreamOpener{}
}
type CounterStreamOpener struct {
}
func (c *CounterStreamOpener) Init(a *app.App) (err error) {
return nil
}
func (c *CounterStreamOpener) Name() (name string) {
return streampool.StreamOpenerCName
}
func (c *CounterStreamOpener) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) {
conn, err := p.AcquireDrpcConn(ctx)
if err != nil {
return
}
objectStream, err := spacesyncproto.NewDRPCSpaceSyncClient(conn).ObjectSyncStream(ctx)
if err != nil {
return
}
return objectStream, nil, nil
}

View file

@ -0,0 +1,48 @@
package synctest
import (
"context"
"github.com/cheggaaa/mb/v3"
"storj.io/drpc"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/sync/syncdeps"
)
type CounterSyncDepsFactory struct {
syncDeps syncdeps.SyncDeps
}
func NewCounterSyncDepsFactory() syncdeps.SyncDepsFactory {
return &CounterSyncDepsFactory{}
}
func (c *CounterSyncDepsFactory) Init(a *app.App) (err error) {
counter := a.MustComponent(CounterName).(*Counter)
requestHandler := &CounterRequestHandler{counter: counter}
requestSender := &CounterRequestSender{peerProvider: a.MustComponent(PeerName).(*PeerProvider)}
responseHandler := &CounterResponseHandler{counter: counter}
updateHandler := &CounterUpdateHandler{counter: counter}
c.syncDeps = syncdeps.SyncDeps{
HeadUpdateHandler: updateHandler,
ResponseHandler: responseHandler,
RequestHandler: requestHandler,
RequestSender: requestSender,
MergeFilter: func(ctx context.Context, msg drpc.Message, q *mb.MB[drpc.Message]) error {
return nil
},
ReadMessageConstructor: func() drpc.Message {
return &CounterUpdate{}
},
}
return nil
}
func (c *CounterSyncDepsFactory) Name() (name string) {
return syncdeps.CName
}
func (c *CounterSyncDepsFactory) SyncDeps() syncdeps.SyncDeps {
return c.syncDeps
}

View file

@ -14,5 +14,5 @@ message CounterRequest {
service CounterSync {
rpc CounterStreamRequest(CounterRequest) returns (stream CounterIncrease);
rpc CounterStream(CounterRequest) returns (stream CounterIncrease);
rpc CounterStream(stream CounterIncrease) returns (stream CounterIncrease);
}

View file

@ -136,7 +136,7 @@ func init() {
}
var fileDescriptor_dd5c22b15d7f69e4 = []byte{
// 247 bytes of a gzipped FileDescriptorProto
// 254 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x49, 0xce, 0xcf, 0xcd,
0xcd, 0xcf, 0x2b, 0x2e, 0x48, 0x4c, 0x4e, 0xd5, 0x2f, 0xae, 0xcc, 0x4b, 0x06, 0x13, 0x25, 0xa9,
0xc5, 0x25, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0xfa, 0x60, 0xb2, 0x18, 0x2e, 0xa8, 0x07, 0xe6, 0x0b,
@ -146,13 +146,13 @@ var fileDescriptor_dd5c22b15d7f69e4 = []byte{
0x72, 0x89, 0x67, 0x8a, 0x04, 0x93, 0x02, 0xa3, 0x06, 0x67, 0x10, 0x9c, 0xaf, 0x14, 0xc2, 0xc5,
0x07, 0x35, 0x24, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, 0x8d, 0x8b, 0x2f, 0xb5, 0x22,
0xb3, 0xb8, 0x24, 0x33, 0x2f, 0x3d, 0x0c, 0xa4, 0xbd, 0x58, 0x82, 0x51, 0x81, 0x59, 0x83, 0x35,
0x08, 0x4d, 0x14, 0x9f, 0xa9, 0x46, 0x4b, 0x18, 0xb9, 0xb8, 0xa1, 0xc6, 0x06, 0x57, 0xe6, 0x25,
0x08, 0x4d, 0x14, 0x9f, 0xa9, 0x46, 0xcb, 0x19, 0xb9, 0xb8, 0xa1, 0xc6, 0x06, 0x57, 0xe6, 0x25,
0x0b, 0xf9, 0x72, 0x89, 0xc0, 0xb8, 0x25, 0x45, 0xa9, 0x89, 0xb9, 0x30, 0xbb, 0x24, 0xf4, 0xe0,
0xbe, 0x43, 0x75, 0x85, 0x94, 0x24, 0x86, 0x0c, 0xcc, 0x93, 0x06, 0x8c, 0x42, 0x6e, 0x5c, 0xbc,
0x28, 0xc6, 0x91, 0x69, 0x8e, 0x93, 0xc5, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e,
0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, 0x1c, 0xc3, 0x85, 0xc7, 0x72, 0x0c, 0x37, 0x1e, 0xcb, 0x31,
0x44, 0xc9, 0xe1, 0x8f, 0x9b, 0x24, 0x36, 0x30, 0x65, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x3b,
0x05, 0x13, 0xdb, 0xc4, 0x01, 0x00, 0x00,
0xbe, 0x43, 0x75, 0x85, 0x94, 0x24, 0x86, 0x0c, 0xcc, 0x93, 0x06, 0x8c, 0x42, 0x9e, 0x5c, 0xbc,
0x28, 0xc6, 0x09, 0xe1, 0x56, 0x8d, 0xc7, 0x20, 0x0d, 0x46, 0x03, 0x46, 0x27, 0x8b, 0x13, 0x8f,
0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0x71, 0xc2, 0x63, 0x39, 0x86, 0x0b,
0x8f, 0xe5, 0x18, 0x6e, 0x3c, 0x96, 0x63, 0x88, 0x92, 0xc3, 0x1f, 0x3d, 0x49, 0x6c, 0x60, 0xca,
0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x5b, 0xf3, 0x0f, 0x8d, 0xc7, 0x01, 0x00, 0x00,
}
func (m *CounterIncrease) Marshal() (dAtA []byte, err error) {

View file

@ -41,7 +41,7 @@ type DRPCCounterSyncClient interface {
DRPCConn() drpc.Conn
CounterStreamRequest(ctx context.Context, in *CounterRequest) (DRPCCounterSync_CounterStreamRequestClient, error)
CounterStream(ctx context.Context, in *CounterRequest) (DRPCCounterSync_CounterStreamClient, error)
CounterStream(ctx context.Context) (DRPCCounterSync_CounterStreamClient, error)
}
type drpcCounterSyncClient struct {
@ -94,23 +94,18 @@ func (x *drpcCounterSync_CounterStreamRequestClient) RecvMsg(m *CounterIncrease)
return x.MsgRecv(m, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{})
}
func (c *drpcCounterSyncClient) CounterStream(ctx context.Context, in *CounterRequest) (DRPCCounterSync_CounterStreamClient, error) {
func (c *drpcCounterSyncClient) CounterStream(ctx context.Context) (DRPCCounterSync_CounterStreamClient, error) {
stream, err := c.cc.NewStream(ctx, "/synctest.CounterSync/CounterStream", drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{})
if err != nil {
return nil, err
}
x := &drpcCounterSync_CounterStreamClient{stream}
if err := x.MsgSend(in, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{}); err != nil {
return nil, err
}
if err := x.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type DRPCCounterSync_CounterStreamClient interface {
drpc.Stream
Send(*CounterIncrease) error
Recv() (*CounterIncrease, error)
}
@ -122,6 +117,10 @@ func (x *drpcCounterSync_CounterStreamClient) GetStream() drpc.Stream {
return x.Stream
}
func (x *drpcCounterSync_CounterStreamClient) Send(m *CounterIncrease) error {
return x.MsgSend(m, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{})
}
func (x *drpcCounterSync_CounterStreamClient) Recv() (*CounterIncrease, error) {
m := new(CounterIncrease)
if err := x.MsgRecv(m, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{}); err != nil {
@ -136,7 +135,7 @@ func (x *drpcCounterSync_CounterStreamClient) RecvMsg(m *CounterIncrease) error
type DRPCCounterSyncServer interface {
CounterStreamRequest(*CounterRequest, DRPCCounterSync_CounterStreamRequestStream) error
CounterStream(*CounterRequest, DRPCCounterSync_CounterStreamStream) error
CounterStream(DRPCCounterSync_CounterStreamStream) error
}
type DRPCCounterSyncUnimplementedServer struct{}
@ -145,7 +144,7 @@ func (s *DRPCCounterSyncUnimplementedServer) CounterStreamRequest(*CounterReques
return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCCounterSyncUnimplementedServer) CounterStream(*CounterRequest, DRPCCounterSync_CounterStreamStream) error {
func (s *DRPCCounterSyncUnimplementedServer) CounterStream(DRPCCounterSync_CounterStreamStream) error {
return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
@ -169,8 +168,7 @@ func (DRPCCounterSyncDescription) Method(n int) (string, drpc.Encoding, drpc.Rec
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return nil, srv.(DRPCCounterSyncServer).
CounterStream(
in1.(*CounterRequest),
&drpcCounterSync_CounterStreamStream{in2.(drpc.Stream)},
&drpcCounterSync_CounterStreamStream{in1.(drpc.Stream)},
)
}, DRPCCounterSyncServer.CounterStream, true
default:
@ -198,6 +196,7 @@ func (x *drpcCounterSync_CounterStreamRequestStream) Send(m *CounterIncrease) er
type DRPCCounterSync_CounterStreamStream interface {
drpc.Stream
Send(*CounterIncrease) error
Recv() (*CounterIncrease, error)
}
type drpcCounterSync_CounterStreamStream struct {
@ -207,3 +206,15 @@ type drpcCounterSync_CounterStreamStream struct {
func (x *drpcCounterSync_CounterStreamStream) Send(m *CounterIncrease) error {
return x.MsgSend(m, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{})
}
func (x *drpcCounterSync_CounterStreamStream) Recv() (*CounterIncrease, error) {
m := new(CounterIncrease)
if err := x.MsgRecv(m, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{}); err != nil {
return nil, err
}
return m, nil
}
func (x *drpcCounterSync_CounterStreamStream) RecvMsg(m *CounterIncrease) error {
return x.MsgRecv(m, drpcEncoding_File_commonspace_sync_synctestproto_protos_synctest_proto{})
}

View file

@ -40,14 +40,14 @@ func (sr *stream) readLoop() error {
}()
sr.l.Debug("stream read started")
for {
msg := sr.pool.handler.NewReadMessage()
msg := sr.pool.syncDelegate.NewReadMessage()
if err := sr.stream.MsgRecv(msg, EncodingProto); err != nil {
sr.l.Info("msg receive error", zap.Error(err))
return err
}
ctx := streamCtx(sr.peerCtx, sr.streamId, sr.peerId)
ctx = logger.CtxWithFields(ctx, zap.String("peerId", sr.peerId))
if err := sr.pool.handler.HandleMessage(ctx, sr.peerId, msg); err != nil {
if err := sr.pool.syncDelegate.HandleMessage(ctx, sr.peerId, msg); err != nil {
sr.l.Info("msg handle error", zap.Error(err))
return err
}

View file

@ -9,22 +9,46 @@ import (
"golang.org/x/net/context"
"storj.io/drpc"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/debugstat"
"github.com/anyproto/any-sync/net"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/util/multiqueue"
)
// StreamHandler handles incoming messages from streams
type StreamHandler interface {
// OpenStream opens stream with given peer
OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error)
type configGetter interface {
GetConfig() StreamConfig
}
type StreamSyncDelegate interface {
app.Component
// HandleMessage handles incoming message
HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error)
// NewReadMessage creates new empty message for unmarshalling into it
NewReadMessage() drpc.Message
// GetQueueProvider returns queue provider for outgoing messages
GetQueueProvider() multiqueue.QueueProvider[drpc.Message]
// GetQueue returns queue for outgoing messages
GetQueue(peerId string) *multiqueue.Queue[drpc.Message]
}
const (
StreamOpenerCName = "common.commonspace.streampool"
streamSyncDelegateCName = "common.commonspace.sync"
)
func NewStreamPool() StreamPool {
return &streamPool{
streamIdsByPeer: map[string][]uint32{},
streamIdsByTag: map[string][]uint32{},
streams: map[uint32]*stream{},
opening: map[string]*openingProcess{},
}
}
// StreamOpener handles incoming messages from streams
type StreamOpener interface {
app.Component
// OpenStream opens stream with given peer
OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error)
}
// PeerGetter should dial or return cached peers
@ -37,39 +61,54 @@ type MessageQueueId interface {
// StreamPool keeps and read streams
type StreamPool interface {
app.ComponentRunnable
// AddStream adds new outgoing stream into the pool
AddStream(stream drpc.Stream, tags ...string) (err error)
// ReadStream adds new incoming stream and synchronously read it
ReadStream(stream drpc.Stream, tags ...string) (err error)
// Send sends a message to given peers. A stream will be opened if it is not cached before. Works async.
Send(ctx context.Context, msg drpc.Message, target PeerGetter) (err error)
// SendById sends a message to given peerIds. Works only if stream exists
SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error)
// Broadcast sends a message to all peers with given tags. Works async.
Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error)
// AddTagsCtx adds tags to stream, stream will be extracted from ctx
AddTagsCtx(ctx context.Context, tags ...string) error
// RemoveTagsCtx removes tags from stream, stream will be extracted from ctx
RemoveTagsCtx(ctx context.Context, tags ...string) error
// Streams gets all streams for specific tags
Streams(tags ...string) (streams []drpc.Stream)
// Close closes all streams
Close() error
}
type streamPool struct {
handler StreamHandler
streamOpener StreamOpener
syncDelegate StreamSyncDelegate
statService debugstat.StatService
streamIdsByPeer map[string][]uint32
streamIdsByTag map[string][]uint32
streams map[uint32]*stream
opening map[string]*openingProcess
streamConfig StreamConfig
dial *ExecPool
mu sync.Mutex
writeQueueSize int
lastStreamId uint32
}
func (s *streamPool) Init(a *app.App) (err error) {
s.streamOpener = a.MustComponent(StreamOpenerCName).(StreamOpener)
s.syncDelegate = a.MustComponent(streamSyncDelegateCName).(StreamSyncDelegate)
comp, ok := a.Component(debugstat.CName).(debugstat.StatService)
if !ok {
comp = debugstat.NewNoOp()
}
s.statService = comp
s.streamConfig = a.MustComponent("config").(configGetter).GetConfig()
s.statService.AddProvider(s)
return nil
}
func (s *streamPool) Name() (name string) {
return CName
}
func (s *streamPool) Run(ctx context.Context) (err error) {
s.dial = NewExecPool(s.streamConfig.DialQueueWorkers, s.streamConfig.DialQueueSize)
return nil
}
func (s *streamPool) ProvideStat() any {
s.mu.Lock()
var totalSize int64
@ -166,7 +205,7 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream,
tags: tags,
stats: newStreamStat(peerId),
}
st.queue = s.handler.GetQueueProvider().GetQueue(peerId)
st.queue = s.syncDelegate.GetQueue(peerId)
s.streams[streamId] = st
s.streamIdsByPeer[peerId] = append(s.streamIdsByPeer[peerId], streamId)
for _, tag := range tags {
@ -291,7 +330,7 @@ func (s *streamPool) openStream(ctx context.Context, p peer.Peer) *openingProces
// in case there was no peerId in context
ctx := peer.CtxWithPeerId(ctx, p.Id())
// open new stream and add to pool
st, tags, err := s.handler.OpenStream(ctx, p)
st, tags, err := s.streamOpener.OpenStream(ctx, p)
if err != nil {
op.err = err
return
@ -393,7 +432,7 @@ func (s *streamPool) removeStream(streamId uint32) {
st.l.Debug("stream removed", zap.Strings("tags", st.tags))
}
func (s *streamPool) Close() (err error) {
func (s *streamPool) Close(ctx context.Context) (err error) {
s.statService.RemoveProvider(s)
return s.dial.Close()
}

View file

@ -25,7 +25,7 @@ type StreamConfig struct {
}
type Service interface {
NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool
NewStreamPool(h StreamOpener, conf StreamConfig) StreamPool
app.Component
}
@ -34,10 +34,10 @@ type service struct {
debugStat debugstat.StatService
}
func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool {
func (s *service) NewStreamPool(h StreamOpener, conf StreamConfig) StreamPool {
pl := NewExecPool(conf.DialQueueWorkers, conf.DialQueueSize)
sp := &streamPool{
handler: h,
streamOpener: h,
writeQueueSize: conf.SendQueueSize,
streamIdsByPeer: map[string][]uint32{},
streamIdsByTag: map[string][]uint32{},

View file

@ -3,10 +3,12 @@ package periodicsync
import (
"context"
"github.com/anyproto/any-sync/app/logger"
"go.uber.org/zap"
"sync/atomic"
"time"
"go.uber.org/zap"
"github.com/anyproto/any-sync/app/logger"
)
type PeriodicSync interface {
@ -17,39 +19,40 @@ type PeriodicSync interface {
type SyncerFunc func(ctx context.Context) error
func NewPeriodicSync(periodSeconds int, timeout time.Duration, caller SyncerFunc, l logger.CtxLogger) PeriodicSync {
// TODO: rename to PeriodicCall (including folders) and do PRs in all repos where we are using this
// https://linear.app/anytype/issue/GO-1241/change-periodicsync-component-to-periodiccall
return NewPeriodicSyncDuration(time.Duration(periodSeconds)*time.Second, timeout, caller, l)
}
func NewPeriodicSyncDuration(periodicLoopInterval, timeout time.Duration, caller SyncerFunc, l logger.CtxLogger) PeriodicSync {
ctx, cancel := context.WithCancel(context.Background())
ctx = logger.CtxWithFields(ctx, zap.String("rootOp", "periodicCall"))
return &periodicCall{
caller: caller,
log: l,
loopCtx: ctx,
loopCancel: cancel,
loopDone: make(chan struct{}),
periodSeconds: periodSeconds,
timeout: timeout,
caller: caller,
log: l,
loopCtx: ctx,
loopCancel: cancel,
loopDone: make(chan struct{}),
period: periodicLoopInterval,
timeout: timeout,
}
}
type periodicCall struct {
log logger.CtxLogger
caller SyncerFunc
loopCtx context.Context
loopCancel context.CancelFunc
loopDone chan struct{}
periodSeconds int
timeout time.Duration
isRunning atomic.Bool
log logger.CtxLogger
caller SyncerFunc
loopCtx context.Context
loopCancel context.CancelFunc
loopDone chan struct{}
period time.Duration
timeout time.Duration
isRunning atomic.Bool
}
func (p *periodicCall) Run() {
p.isRunning.Store(true)
go p.loop(p.periodSeconds)
go p.loop(p.period)
}
func (p *periodicCall) loop(periodSeconds int) {
period := time.Duration(periodSeconds) * time.Second
func (p *periodicCall) loop(period time.Duration) {
defer close(p.loopDone)
doCall := func() {
ctx := p.loopCtx

View file

@ -136,3 +136,25 @@ func DiscardFromSlice[T any](elements []T, isDiscarded func(T) bool) []T {
elements = elements[:finishedIdx]
return elements
}
func CompareMaps[T comparable](map1, map2 map[T]struct{}) (both, first, second []T) {
both = []T{}
first = []T{}
second = []T{}
for key := range map1 {
if _, found := map2[key]; found {
both = append(both, key)
} else {
first = append(first, key)
}
}
for key := range map2 {
if _, found := map1[key]; !found {
second = append(second, key)
}
}
return both, first, second
}

105
util/slice/slice_test.go Normal file
View file

@ -0,0 +1,105 @@
package slice
import (
"testing"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)
func TestCompareMaps(t *testing.T) {
tests := []struct {
name string
map1, map2 map[string]struct{}
expectedBoth []string
expectedFirst []string
expectedSecond []string
}{
{
name: "Both maps empty",
map1: map[string]struct{}{},
map2: map[string]struct{}{},
expectedBoth: []string{},
expectedFirst: []string{},
expectedSecond: []string{},
},
{
name: "Disjoint maps",
map1: map[string]struct{}{
"a": {},
"b": {},
},
map2: map[string]struct{}{
"c": {},
"d": {},
},
expectedBoth: []string{},
expectedFirst: []string{"a", "b"},
expectedSecond: []string{"c", "d"},
},
{
name: "Identical maps",
map1: map[string]struct{}{
"a": {},
"b": {},
},
map2: map[string]struct{}{
"a": {},
"b": {},
},
expectedBoth: []string{"a", "b"},
expectedFirst: []string{},
expectedSecond: []string{},
},
{
name: "Partial overlap",
map1: map[string]struct{}{
"a": {},
"b": {},
"c": {},
},
map2: map[string]struct{}{
"b": {},
"c": {},
"d": {},
},
expectedBoth: []string{"b", "c"},
expectedFirst: []string{"a"},
expectedSecond: []string{"d"},
},
{
name: "First map empty",
map1: map[string]struct{}{},
map2: map[string]struct{}{
"a": {},
"b": {},
},
expectedBoth: []string{},
expectedFirst: []string{},
expectedSecond: []string{"a", "b"},
},
{
name: "Second map empty",
map1: map[string]struct{}{
"a": {},
"b": {},
},
map2: map[string]struct{}{},
expectedBoth: []string{},
expectedFirst: []string{"a", "b"},
expectedSecond: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
both, onlyInFirst, onlyInSecond := CompareMaps(tt.map1, tt.map2)
slices.Sort(onlyInFirst)
slices.Sort(onlyInSecond)
slices.Sort(both)
require.Equal(t, tt.expectedBoth, both)
require.Equal(t, tt.expectedFirst, onlyInFirst)
require.Equal(t, tt.expectedSecond, onlyInSecond)
})
}
}