diff --git a/commonspace/spaceutils_test.go b/commonspace/spaceutils_test.go index 5008069c..086a2f7a 100644 --- a/commonspace/spaceutils_test.go +++ b/commonspace/spaceutils_test.go @@ -18,7 +18,6 @@ import ( "github.com/anyproto/any-sync/app/ocache" "github.com/anyproto/any-sync/commonspace/config" "github.com/anyproto/any-sync/commonspace/credentialprovider" - "github.com/anyproto/any-sync/commonspace/globalsync" "github.com/anyproto/any-sync/commonspace/object/accountdata" "github.com/anyproto/any-sync/commonspace/object/acl/list" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" @@ -44,6 +43,7 @@ import ( "github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/testutil/accounttest" "github.com/anyproto/any-sync/util/crypto" + "github.com/anyproto/any-sync/util/syncqueues" ) // @@ -692,7 +692,7 @@ func newFixtureWithData(t *testing.T, spaceId string, keys *accountdata.AccountK process: newSpaceProcess(spaceId), } fx.app.Register(fx.account). - Register(globalsync.New()). + Register(syncqueues.New()). Register(fx.config). Register(peerPool). Register(rpctest.NewTestServer()). diff --git a/commonspace/sync/requestmanager.go b/commonspace/sync/requestmanager.go index 8a69784d..4d079849 100644 --- a/commonspace/sync/requestmanager.go +++ b/commonspace/sync/requestmanager.go @@ -10,11 +10,11 @@ import ( "go.uber.org/zap" "storj.io/drpc" - "github.com/anyproto/any-sync/commonspace/globalsync" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/sync/syncdeps" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/streampool" + syncqueues2 "github.com/anyproto/any-sync/util/syncqueues" ) type RequestManager interface { @@ -31,19 +31,19 @@ type StreamResponse struct { } type requestManager struct { - requestPool globalsync.RequestPool - incomingGuard *globalsync.Guard - limit *globalsync.Limit + requestPool syncqueues2.RequestPool + incomingGuard *syncqueues2.Guard + limit *syncqueues2.Limit handler syncdeps.SyncHandler metric syncdeps.QueueSizeUpdater } -func NewRequestManager(handler syncdeps.SyncHandler, metric syncdeps.QueueSizeUpdater, requestPool globalsync.RequestPool, limit *globalsync.Limit) RequestManager { +func NewRequestManager(handler syncdeps.SyncHandler, metric syncdeps.QueueSizeUpdater, requestPool syncqueues2.RequestPool, limit *syncqueues2.Limit) RequestManager { return &requestManager{ requestPool: requestPool, limit: limit, handler: handler, - incomingGuard: globalsync.NewGuard(), + incomingGuard: syncqueues2.NewGuard(), metric: metric, } } diff --git a/commonspace/sync/sync.go b/commonspace/sync/sync.go index ac0446e4..7bbe3c28 100644 --- a/commonspace/sync/sync.go +++ b/commonspace/sync/sync.go @@ -10,7 +10,6 @@ import ( "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/logger" - "github.com/anyproto/any-sync/commonspace/globalsync" "github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacesyncproto" @@ -19,6 +18,7 @@ import ( "github.com/anyproto/any-sync/net/streampool" "github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/util/multiqueue" + "github.com/anyproto/any-sync/util/syncqueues" ) const CName = "common.commonspace.sync" @@ -77,8 +77,8 @@ func (s *syncService) Init(a *app.App) (err error) { s.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager) s.streamPool = a.MustComponent(streampool.CName).(streampool.StreamPool) s.commonMetric, _ = a.Component(metric.CName).(metric.Metric) - globalSync := a.MustComponent(globalsync.CName).(globalsync.GlobalSync) - s.manager = NewRequestManager(s.handler, s.metric, globalSync.RequestPool(s.spaceId), globalSync.Limit(s.spaceId)) + syncQueues := a.MustComponent(syncqueues.CName).(syncqueues.SyncQueues) + s.manager = NewRequestManager(s.handler, s.metric, syncQueues.RequestPool(s.spaceId), syncQueues.Limit(s.spaceId)) s.ctx, s.cancel = context.WithCancel(context.Background()) return nil } diff --git a/commonspace/globalsync/fullid.go b/util/syncqueues/fullid.go similarity index 86% rename from commonspace/globalsync/fullid.go rename to util/syncqueues/fullid.go index cb84e732..ec002817 100644 --- a/commonspace/globalsync/fullid.go +++ b/util/syncqueues/fullid.go @@ -1,4 +1,4 @@ -package globalsync +package syncqueues import "strings" diff --git a/commonspace/globalsync/guard.go b/util/syncqueues/guard.go similarity index 95% rename from commonspace/globalsync/guard.go rename to util/syncqueues/guard.go index 952c6c8f..aff79449 100644 --- a/commonspace/globalsync/guard.go +++ b/util/syncqueues/guard.go @@ -1,4 +1,4 @@ -package globalsync +package syncqueues import "sync" diff --git a/commonspace/globalsync/limit.go b/util/syncqueues/limit.go similarity index 99% rename from commonspace/globalsync/limit.go rename to util/syncqueues/limit.go index 2fb695f8..6770a72b 100644 --- a/commonspace/globalsync/limit.go +++ b/util/syncqueues/limit.go @@ -1,4 +1,4 @@ -package globalsync +package syncqueues import ( "fmt" diff --git a/commonspace/globalsync/limit_test.go b/util/syncqueues/limit_test.go similarity index 98% rename from commonspace/globalsync/limit_test.go rename to util/syncqueues/limit_test.go index bd2e6281..b53258df 100644 --- a/commonspace/globalsync/limit_test.go +++ b/util/syncqueues/limit_test.go @@ -1,4 +1,4 @@ -package globalsync +package syncqueues import ( "fmt" diff --git a/commonspace/globalsync/requestpool.go b/util/syncqueues/requestpool.go similarity index 99% rename from commonspace/globalsync/requestpool.go rename to util/syncqueues/requestpool.go index 3a5ad0cf..520e6cf8 100644 --- a/commonspace/globalsync/requestpool.go +++ b/util/syncqueues/requestpool.go @@ -1,4 +1,4 @@ -package globalsync +package syncqueues import ( "context" diff --git a/commonspace/globalsync/sync.go b/util/syncqueues/sync.go similarity index 61% rename from commonspace/globalsync/sync.go rename to util/syncqueues/sync.go index fc16247a..4eb24bed 100644 --- a/commonspace/globalsync/sync.go +++ b/util/syncqueues/sync.go @@ -1,4 +1,4 @@ -package globalsync +package syncqueues import ( "context" @@ -9,27 +9,27 @@ import ( "github.com/anyproto/any-sync/nodeconf" ) -const CName = "common.commonspace.globalsync" +const CName = "common.util.syncqueues" var log = logger.NewNamed(CName) -type GlobalSync interface { +type SyncQueues interface { app.ComponentRunnable RequestPool(spaceId string) RequestPool Limit(spaceId string) *Limit } -func New() GlobalSync { - return &globalSync{} +func New() SyncQueues { + return &syncQueues{} } -type globalSync struct { +type syncQueues struct { limit *Limit rp RequestPool nodeConf nodeconf.Service } -func (g *globalSync) Init(a *app.App) (err error) { +func (g *syncQueues) Init(a *app.App) (err error) { g.rp = NewRequestPool(time.Second*30, time.Minute) g.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service) var nodeIds []string @@ -40,24 +40,24 @@ func (g *globalSync) Init(a *app.App) (err error) { return } -func (g *globalSync) Run(ctx context.Context) (err error) { +func (g *syncQueues) Run(ctx context.Context) (err error) { g.rp.Run() return } -func (g *globalSync) Close(ctx context.Context) (err error) { +func (g *syncQueues) Close(ctx context.Context) (err error) { g.rp.Close() return } -func (g *globalSync) RequestPool(spaceId string) RequestPool { +func (g *syncQueues) RequestPool(spaceId string) RequestPool { return g.rp } -func (g *globalSync) Limit(spaceId string) *Limit { +func (g *syncQueues) Limit(spaceId string) *Limit { return g.limit } -func (g *globalSync) Name() string { +func (g *syncQueues) Name() string { return CName } diff --git a/commonspace/globalsync/tryaddqueue.go b/util/syncqueues/tryaddqueue.go similarity index 99% rename from commonspace/globalsync/tryaddqueue.go rename to util/syncqueues/tryaddqueue.go index dacd1295..c8332421 100644 --- a/commonspace/globalsync/tryaddqueue.go +++ b/util/syncqueues/tryaddqueue.go @@ -1,4 +1,4 @@ -package globalsync +package syncqueues import ( "context"