1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-11 18:20:28 +09:00

Some streampool changes and incoming guard

This commit is contained in:
mcrakhman 2024-06-09 00:53:38 +02:00
parent a05700e0f6
commit bcfe02122c
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
10 changed files with 74 additions and 131 deletions

30
commonspace/sync/guard.go Normal file
View file

@ -0,0 +1,30 @@
package sync
import "sync"
type guard struct {
mu sync.Mutex
taken map[string]struct{}
}
func newGuard() *guard {
return &guard{
taken: make(map[string]struct{}),
}
}
func (g *guard) TryTake(id string) bool {
g.mu.Lock()
defer g.mu.Unlock()
if _, exists := g.taken[id]; exists {
return false
}
g.taken[id] = struct{}{}
return true
}
func (g *guard) Release(id string) {
g.mu.Lock()
defer g.mu.Unlock()
delete(g.taken, id)
}

View file

@ -2,7 +2,6 @@ package sync
import (
"context"
"fmt"
"strings"
"github.com/gogo/protobuf/proto"
@ -24,33 +23,29 @@ type StreamResponse struct {
}
type requestManager struct {
requestPool RequestPool
handler syncdeps.SyncHandler
requestPool RequestPool
incomingGuard *guard
handler syncdeps.SyncHandler
}
func NewRequestManager(handler syncdeps.SyncHandler) RequestManager {
return &requestManager{
requestPool: NewRequestPool(),
handler: handler,
requestPool: NewRequestPool(),
handler: handler,
incomingGuard: newGuard(),
}
}
func (r *requestManager) QueueRequest(rq syncdeps.Request) error {
return r.requestPool.QueueRequestAction(rq.PeerId(), rq.ObjectId(), func(ctx context.Context) {
fmt.Println("starting stream request", rq.PeerId(), rq.ObjectId())
defer fmt.Println("ending stream request", rq.PeerId(), rq.ObjectId())
err := r.handler.SendStreamRequest(ctx, rq, func(stream drpc.Stream) error {
for {
resp := r.handler.NewResponse()
fmt.Println("receiving message", rq.PeerId(), rq.ObjectId())
err := stream.MsgRecv(resp, streampool.EncodingProto)
fmt.Println("received message", rq.PeerId(), rq.ObjectId(), err)
if err != nil {
return err
}
fmt.Println("handling response", rq.PeerId(), rq.ObjectId())
err = r.handler.HandleResponse(ctx, rq.PeerId(), rq.ObjectId(), resp)
fmt.Println("handled response", rq.PeerId(), rq.ObjectId(), err)
if err != nil {
return err
}
@ -63,13 +58,11 @@ func (r *requestManager) QueueRequest(rq syncdeps.Request) error {
}
func (r *requestManager) HandleStreamRequest(ctx context.Context, rq syncdeps.Request, stream drpc.Stream) error {
if !r.requestPool.TryTake(rq.PeerId(), rq.ObjectId()) {
if !r.incomingGuard.TryTake(fullId(rq.PeerId(), rq.ObjectId())) {
return nil
}
fmt.Println("handling stream request", rq.PeerId(), rq.ObjectId())
defer r.requestPool.Release(rq.PeerId(), rq.ObjectId())
defer r.incomingGuard.Release(fullId(rq.PeerId(), rq.ObjectId()))
newRq, err := r.handler.HandleStreamRequest(ctx, rq, func(resp proto.Message) error {
fmt.Println("sending response", rq.PeerId(), rq.ObjectId())
return stream.MsgSend(resp, streampool.EncodingProto)
})
if err != nil {

View file

@ -12,21 +12,21 @@ type RequestPool interface {
}
type requestPool struct {
mu sync.Mutex
taken map[string]struct{}
pools map[string]*tryAddQueue
ctx context.Context
cancel context.CancelFunc
isClosed bool
mu sync.Mutex
peerGuard *guard
pools map[string]*tryAddQueue
ctx context.Context
cancel context.CancelFunc
isClosed bool
}
func NewRequestPool() RequestPool {
ctx, cancel := context.WithCancel(context.Background())
return &requestPool{
ctx: ctx,
cancel: cancel,
taken: make(map[string]struct{}),
pools: make(map[string]*tryAddQueue),
ctx: ctx,
cancel: cancel,
pools: make(map[string]*tryAddQueue),
peerGuard: newGuard(),
}
}
@ -37,20 +37,11 @@ func (rp *requestPool) TryTake(peerId, objectId string) bool {
return false
}
id := fullId(peerId, objectId)
if _, exists := rp.taken[id]; exists {
return false
}
rp.taken[id] = struct{}{}
return true
return rp.peerGuard.TryTake(fullId(peerId, objectId))
}
func (rp *requestPool) Release(peerId, objectId string) {
rp.mu.Lock()
defer rp.mu.Unlock()
id := fullId(peerId, objectId)
delete(rp.taken, id)
rp.peerGuard.Release(fullId(peerId, objectId))
}
func (rp *requestPool) QueueRequestAction(peerId, objectId string, action func(ctx context.Context)) (err error) {

View file

@ -20,6 +20,8 @@ var log = logger.NewNamed("sync")
type SyncService interface {
app.Component
GetQueue(peerId string) *multiqueue.Queue[drpc.Message]
//SendMessage(ctx context.Context, peerId string, msg drpc.Message) error
//BroadcastMessage(ctx context.Context, msg drpc.Message) error
HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error
HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error
QueueRequest(ctx context.Context, rq syncdeps.Request) error

View file

@ -2,7 +2,6 @@ package synctest
import (
"context"
"fmt"
"storj.io/drpc"
@ -16,14 +15,11 @@ type CounterRequestSender struct {
func (c *CounterRequestSender) SendStreamRequest(ctx context.Context, rq syncdeps.Request, receive func(stream drpc.Stream) error) (err error) {
peerId := rq.PeerId()
fmt.Println("getting peer", peerId, rq.PeerId(), rq.ObjectId())
pr, err := c.peerProvider.GetPeer(peerId)
if err != nil {
return err
}
fmt.Println("sending stream request with peer", pr.Id(), rq.PeerId(), rq.ObjectId())
return pr.DoDrpc(ctx, func(conn drpc.Conn) error {
fmt.Println("after connection", pr.Id(), rq.PeerId(), rq.ObjectId())
cl := synctestproto.NewDRPCCounterSyncClient(conn)
stream, err := cl.CounterStreamRequest(ctx, rq.Proto().(*synctestproto.CounterRequest))
if err != nil {