1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-10 18:10:54 +09:00

Add queues and stuff

This commit is contained in:
mcrakhman 2024-05-24 11:40:36 +02:00
parent f09842fe33
commit cdfc50eeb3
No known key found for this signature in database
GPG key ID: DED12CFEF5B8396B
6 changed files with 222 additions and 66 deletions

View file

@ -13,26 +13,24 @@ import (
)
type Request interface {
//heads []string
//changes []*treechangeproto.RawTreeChangeWithId
//root *treechangeproto.RawTreeChangeWithId
PeerId() string
ObjectId() string
}
type Response interface {
//heads []string
//changes []*treechangeproto.RawTreeChangeWithId
//root *treechangeproto.RawTreeChangeWithId
// heads []string
// changes []*treechangeproto.RawTreeChangeWithId
// root *treechangeproto.RawTreeChangeWithId
}
type RequestManager interface {
QueueRequest(peerId, objectId string, rq Request) error
HandleRequest(peerId, objectId string, rq Request) error
HandleStreamRequest(peerId, objectId string, rq Request, stream drpc.Stream) error
QueueRequest(rq Request) error
HandleStreamRequest(rq Request, stream drpc.Stream) error
}
type RequestHandler interface {
HandleRequest(peerId, objectId string, rq Request) (Request, error)
HandleStreamRequest(peerId, objectId string, rq Request, send func(resp proto.Message) error) (Request, error)
HandleRequest(rq Request) (Request, error)
HandleStreamRequest(rq Request, send func(resp proto.Message) error) (Request, error)
}
type StreamResponse struct {
@ -41,8 +39,8 @@ type StreamResponse struct {
}
type RequestSender interface {
SendRequest(peerId, objectId string, rq Request) (resp Response, err error)
SendStreamRequest(peerId, objectId string, rq Request, receive func(stream drpc.Stream) error) (err error)
SendRequest(rq Request) (resp Response, err error)
SendStreamRequest(rq Request, receive func(stream drpc.Stream) error) (err error)
}
type ResponseHandler interface {
@ -62,16 +60,16 @@ type requestManager struct {
wait chan struct{}
}
func (r *requestManager) QueueRequest(peerId, objectId string, rq Request) error {
return r.requestPool.QueueRequestAction(peerId, objectId, func() {
err := r.requestSender.SendStreamRequest(peerId, objectId, rq, func(stream drpc.Stream) error {
func (r *requestManager) QueueRequest(rq Request) error {
return r.requestPool.QueueRequestAction(rq.PeerId(), rq.ObjectId(), func() {
err := r.requestSender.SendStreamRequest(rq, func(stream drpc.Stream) error {
for {
resp := r.responseHandler.NewResponse()
err := stream.MsgRecv(resp, streampool.EncodingProto)
if err != nil {
return err
}
err = r.responseHandler.HandleResponse(peerId, objectId, resp)
err = r.responseHandler.HandleResponse(rq.PeerId(), rq.ObjectId(), resp)
if err != nil {
return err
}
@ -83,52 +81,19 @@ func (r *requestManager) QueueRequest(peerId, objectId string, rq Request) error
})
}
func (r *requestManager) HandleRequest(peerId, objectId string, rq Request) error {
id := fullId(peerId, objectId)
r.mx.Lock()
if _, ok := r.currentRequests[id]; ok {
r.mx.Unlock()
func (r *requestManager) HandleStreamRequest(rq Request, stream drpc.Stream) error {
if !r.requestPool.TryTake(rq.PeerId(), rq.ObjectId()) {
return nil
}
r.currentRequests[id] = struct{}{}
r.mx.Unlock()
defer func() {
r.mx.Lock()
delete(r.currentRequests, id)
r.mx.Unlock()
}()
newRq, err := r.requestHandler.HandleRequest(peerId, objectId, rq)
if err != nil {
return err
}
if newRq != nil {
return r.QueueRequest(peerId, objectId, newRq)
}
return nil
}
func (r *requestManager) HandleStreamRequest(peerId, objectId string, rq Request, stream drpc.Stream) error {
id := fullId(peerId, objectId)
r.mx.Lock()
if _, ok := r.currentRequests[id]; ok {
r.mx.Unlock()
return nil
}
r.currentRequests[id] = struct{}{}
r.mx.Unlock()
defer func() {
r.mx.Lock()
delete(r.currentRequests, id)
r.mx.Unlock()
}()
newRq, err := r.requestHandler.HandleStreamRequest(peerId, objectId, rq, func(resp proto.Message) error {
defer r.requestPool.Release(rq.PeerId(), rq.ObjectId())
newRq, err := r.requestHandler.HandleStreamRequest(rq, func(resp proto.Message) error {
return stream.MsgSend(resp, streampool.EncodingProto)
})
if err != nil {
return err
}
if newRq != nil {
return r.QueueRequest(peerId, objectId, newRq)
return r.QueueRequest(newRq)
}
return nil
}

View file

@ -1,5 +1,88 @@
package sync
import (
"sync"
)
type RequestPool interface {
TryTake(peerId, objectId string) bool
Release(peerId, objectId string)
QueueRequestAction(peerId, objectId string, action func()) (err error)
}
type requestPool struct {
mu sync.Mutex
taken map[string]struct{}
pools map[string]*tryAddQueue
isClosed bool
}
func NewRequestPool() RequestPool {
return &requestPool{
taken: make(map[string]struct{}),
pools: make(map[string]*tryAddQueue),
}
}
func (rp *requestPool) TryTake(peerId, objectId string) bool {
rp.mu.Lock()
defer rp.mu.Unlock()
if rp.isClosed {
return false
}
id := fullId(peerId, objectId)
if _, exists := rp.taken[id]; exists {
return false
}
rp.taken[id] = struct{}{}
return true
}
func (rp *requestPool) Release(peerId, objectId string) {
rp.mu.Lock()
defer rp.mu.Unlock()
id := fullId(peerId, objectId)
delete(rp.taken, id)
}
func (rp *requestPool) QueueRequestAction(peerId, objectId string, action func()) (err error) {
rp.mu.Lock()
if rp.isClosed {
rp.mu.Unlock()
return nil
}
var (
pool *tryAddQueue
exists bool
)
pool, exists = rp.pools[peerId]
if !exists {
pool = newTryAddQueue(100, 100)
rp.pools[peerId] = pool
pool.Run()
}
rp.mu.Unlock()
var wrappedAction func()
wrappedAction = func() {
if !rp.TryTake(peerId, objectId) {
pool.TryAdd(objectId, wrappedAction, func() {})
return
}
action()
rp.Release(peerId, objectId)
}
pool.Replace(objectId, wrappedAction, func() {})
rp.mu.Unlock()
return nil
}
func (rp *requestPool) Close() {
rp.mu.Lock()
defer rp.mu.Unlock()
rp.isClosed = true
for _, pool := range rp.pools {
_ = pool.Close()
}
}

View file

@ -48,12 +48,6 @@ func NewSyncService() SyncService {
}
func (s *syncService) handleOutgoingMessage(id string, msg drpc.Message, q *mb.MB[drpc.Message]) error {
//headUpdate := msg.(*HeadUpdate)
//cp := headUpdate.ShallowCopy()
//cp.SetPeerId(id)
//// TODO: add some merging/filtering logic if needed
//// for example we can filter empty messages for the same peer
//// or we can merge the messages together
return s.mergeFilter(s.ctx, msg, q)
}
@ -65,7 +59,7 @@ func (s *syncService) handleIncomingMessage(msg drpc.Message) {
if req == nil {
return
}
err = s.manager.QueueRequest("", "", req)
err = s.manager.QueueRequest(req)
if err != nil {
log.Error("failed to queue request", zap.Error(err))
}
@ -76,7 +70,11 @@ func (s *syncService) GetQueueProvider() multiqueue.QueueProvider[drpc.Message]
}
func (s *syncService) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error {
return s.receiveQueue.Add(ctx, peerId, msg.(*HeadUpdate))
return s.receiveQueue.Add(ctx, peerId, msg)
}
func (s *syncService) HandleStreamRequest(ctx context.Context, req Request, stream drpc.Stream) error {
return s.manager.HandleStreamRequest(req, stream)
}
func (s *syncService) NewReadMessage() drpc.Message {

View file

@ -0,0 +1,110 @@
package sync
import (
"context"
"sync"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
)
type entry struct {
call func()
onRemove func()
}
func newTryAddQueue(workers, maxSize int) *tryAddQueue {
ctx, cancel := context.WithCancel(context.Background())
ss := &tryAddQueue{
ctx: ctx,
cancel: cancel,
workers: workers,
batch: mb.New[string](maxSize),
entries: map[string]entry{},
}
return ss
}
type tryAddQueue struct {
ctx context.Context
cancel context.CancelFunc
workers int
entries map[string]entry
batch *mb.MB[string]
mx sync.Mutex
}
func (rp *tryAddQueue) Replace(id string, call, remove func()) {
rp.mx.Lock()
if prevEntry, ok := rp.entries[id]; ok {
rp.entries[id] = entry{
call: call,
onRemove: remove,
}
rp.mx.Unlock()
prevEntry.onRemove()
return
}
rp.entries[id] = entry{
call: call,
onRemove: remove,
}
rp.mx.Unlock()
err := rp.batch.TryAdd(id)
if err != nil {
rp.mx.Lock()
curEntry := rp.entries[id]
delete(rp.entries, id)
rp.mx.Unlock()
if curEntry.onRemove != nil {
curEntry.onRemove()
}
}
}
func (rp *tryAddQueue) TryAdd(id string, call, remove func()) bool {
rp.mx.Lock()
if _, ok := rp.entries[id]; ok {
rp.mx.Unlock()
return false
}
rp.entries[id] = entry{
call: call,
onRemove: remove,
}
rp.mx.Unlock()
if err := rp.batch.TryAdd(id); err != nil {
return false
}
return true
}
func (rp *tryAddQueue) Run() {
for i := 0; i < rp.workers; i++ {
go rp.sendLoop()
}
}
func (rp *tryAddQueue) sendLoop() {
for {
id, err := rp.batch.WaitOne(rp.ctx)
if err != nil {
log.Debug("close send loop", zap.Error(err))
return
}
rp.mx.Lock()
curEntry := rp.entries[id]
delete(rp.entries, id)
rp.mx.Unlock()
if curEntry.call != nil {
curEntry.call()
curEntry.onRemove()
}
}
}
func (rp *tryAddQueue) Close() (err error) {
rp.cancel()
return rp.batch.Close()
}