From acc78e708055be4f3a8b6eb47120b5f458f5d868 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sat, 15 Jun 2024 21:51:50 +0200 Subject: [PATCH] Add fixtures for testing --- commonspace/object/acl/list/acltestsuite.go | 35 +++- .../object/acl/liststorage/inmemory.go | 5 +- commonspace/objecttreebuilder/treebuilder.go | 14 +- commonspace/spaceservice.go | 12 +- commonspace/spacestate/shareddata.go | 3 +- commonspace/spaceutils_test.go | 186 +++++++++++++++++- commonspace/sync/objectsync/synchandler.go | 4 + commonspace/sync/sync.go | 17 +- commonspace/sync/syncdeps/message.go | 5 + commonspace/sync/synctest/config.go | 2 +- net/streampool/streampool.go | 4 +- testutil/accounttest/accountservice.go | 4 + 12 files changed, 257 insertions(+), 34 deletions(-) create mode 100644 commonspace/sync/syncdeps/message.go diff --git a/commonspace/object/acl/list/acltestsuite.go b/commonspace/object/acl/list/acltestsuite.go index fd1cfab2..f791f26a 100644 --- a/commonspace/object/acl/list/acltestsuite.go +++ b/commonspace/object/acl/list/acltestsuite.go @@ -27,6 +27,9 @@ type accountExpectedState struct { type AclTestExecutor struct { owner string spaceId string + ownerKeys *accountdata.AccountKeys + ownerMeta []byte + root *consensusproto.RawRecordWithId invites map[string]crypto.PrivKey actualAccounts map[string]*TestAclState expectedAccounts map[string]*accountExpectedState @@ -43,6 +46,19 @@ func NewAclExecutor(spaceId string) *AclTestExecutor { } } +func NewExternalKeysAclExecutor(spaceId string, keys *accountdata.AccountKeys, ownerMeta []byte, root *consensusproto.RawRecordWithId) *AclTestExecutor { + return &AclTestExecutor{ + spaceId: spaceId, + ownerKeys: keys, + root: root, + ownerMeta: ownerMeta, + invites: map[string]crypto.PrivKey{}, + actualAccounts: make(map[string]*TestAclState), + expectedAccounts: make(map[string]*accountExpectedState), + expectedPermissions: make(map[string][]accountExpectedState), + } +} + var ( errIncorrectParts = errors.New("incorrect parts") ) @@ -226,9 +242,20 @@ func (a *AclTestExecutor) Execute(cmd string) (err error) { return err } if len(a.expectedAccounts) == 0 { - acl, err := NewTestDerivedAclMetadata(a.spaceId, keys, []byte(account)) - if err != nil { - return err + meta := []byte(account) + var acl AclList + if a.ownerKeys == nil { + acl, err = NewTestDerivedAclMetadata(a.spaceId, keys, meta) + if err != nil { + return err + } + } else { + keys = a.ownerKeys + meta = a.ownerMeta + acl, err = NewTestAclWithRoot(keys, a.root) + if err != nil { + return err + } } state := &TestAclState{ Keys: keys, @@ -238,7 +265,7 @@ func (a *AclTestExecutor) Execute(cmd string) (err error) { a.expectedAccounts[account] = &accountExpectedState{ perms: AclPermissions(aclrecordproto.AclUserPermissions_Owner), status: StatusActive, - metadata: []byte(account), + metadata: meta, pseudoId: account, } a.owner = account diff --git a/commonspace/object/acl/liststorage/inmemory.go b/commonspace/object/acl/liststorage/inmemory.go index 276d4f78..1870ad48 100644 --- a/commonspace/object/acl/liststorage/inmemory.go +++ b/commonspace/object/acl/liststorage/inmemory.go @@ -3,10 +3,9 @@ package liststorage import ( "context" "fmt" + "sync" "github.com/anyproto/any-sync/consensus/consensusproto" - - "sync" ) type inMemoryAclListStorage struct { @@ -65,7 +64,7 @@ func (t *inMemoryAclListStorage) SetHead(head string) error { func (t *inMemoryAclListStorage) AddRawRecord(ctx context.Context, record *consensusproto.RawRecordWithId) error { t.Lock() defer t.Unlock() - // TODO: better to do deep copy + t.records[record.Id] = record return nil } diff --git a/commonspace/objecttreebuilder/treebuilder.go b/commonspace/objecttreebuilder/treebuilder.go index 09836725..a69af09f 100644 --- a/commonspace/objecttreebuilder/treebuilder.go +++ b/commonspace/objecttreebuilder/treebuilder.go @@ -18,11 +18,10 @@ import ( "github.com/anyproto/any-sync/commonspace/object/tree/synctree/updatelistener" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" - "github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/peermanager" - "github.com/anyproto/any-sync/commonspace/requestmanager" "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestorage" + "github.com/anyproto/any-sync/commonspace/sync" "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/nodeconf" ) @@ -66,10 +65,9 @@ type treeBuilder struct { configuration nodeconf.NodeConf headsNotifiable synctree.HeadNotifiable peerManager peermanager.PeerManager - requestManager requestmanager.RequestManager spaceStorage spacestorage.SpaceStorage syncStatus syncstatus.StatusUpdater - objectSync objectsync.ObjectSync + syncService sync.SyncService log logger.CtxLogger builder objecttree.BuildObjectTreeFunc @@ -90,11 +88,9 @@ func (t *treeBuilder) Init(a *app.App) (err error) { t.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) t.headsNotifiable = a.MustComponent(headsync.CName).(headsync.HeadSync) t.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusUpdater) - t.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager) - t.requestManager = a.MustComponent(requestmanager.CName).(requestmanager.RequestManager) - t.objectSync = a.MustComponent(objectsync.CName).(objectsync.ObjectSync) + t.syncService = a.MustComponent(sync.CName).(sync.SyncService) t.log = log.With(zap.String("spaceId", t.spaceId)) - t.syncClient = synctree.NewSyncClient(t.spaceId, t.requestManager, t.peerManager) + t.syncClient = synctree.NewSyncClient(t.spaceId, t.syncService) return nil } @@ -222,5 +218,5 @@ func (t *treeBuilder) PutTree(ctx context.Context, payload treestorage.TreeStora func (t *treeBuilder) onClose(id string) { t.treesUsed.Add(-1) log.Debug("decrementing counter", zap.String("id", id), zap.Int32("trees", t.treesUsed.Load()), zap.String("spaceId", t.spaceId)) - _ = t.objectSync.CloseThread(id) + _ = t.syncService.CloseReceiveQueue(id) } diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 0b04f203..ee7e4a90 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -13,9 +13,11 @@ import ( "github.com/anyproto/any-sync/commonspace/deletionmanager" "github.com/anyproto/any-sync/commonspace/object/treesyncer" "github.com/anyproto/any-sync/commonspace/sync" + "github.com/anyproto/any-sync/commonspace/sync/objectsync" "github.com/anyproto/any-sync/net" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/streampool" + "github.com/anyproto/any-sync/net/streampool/streamopener" "storj.io/drpc" @@ -40,7 +42,6 @@ import ( "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/consensus/consensusproto" "github.com/anyproto/any-sync/metric" - "github.com/anyproto/any-sync/net/pool" "github.com/anyproto/any-sync/net/rpc/rpcerr" "github.com/anyproto/any-sync/nodeconf" ) @@ -66,7 +67,8 @@ type SpaceService interface { } type Deps struct { - TreeSyncer treesyncer.TreeSyncer + TreeSyncer treesyncer.TreeSyncer + StreamOpener streamopener.StreamOpener } type spaceService struct { @@ -78,7 +80,6 @@ type spaceService struct { credentialProvider credentialprovider.CredentialProvider statusServiceProvider syncstatus.StatusServiceProvider treeManager treemanager.TreeManager - pool pool.Pool metric metric.Metric app *app.App } @@ -91,7 +92,6 @@ func (s *spaceService) Init(a *app.App) (err error) { s.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager) s.peerManagerProvider = a.MustComponent(peermanager.CName).(peermanager.PeerManagerProvider) s.statusServiceProvider = a.MustComponent(syncstatus.CName).(syncstatus.StatusServiceProvider) - s.pool = a.MustComponent(pool.CName).(pool.Pool) s.metric, _ = a.Component(metric.CName).(metric.Metric) s.app = a return nil @@ -180,8 +180,11 @@ func (s *spaceService) NewSpace(ctx context.Context, id string, deps Deps) (Spac spaceApp := s.app.ChildApp() spaceApp.Register(state). Register(peerManager). + Register(deps.StreamOpener). Register(streampool.NewStreamPool()). Register(newCommonStorage(st)). + Register(objectsync.New()). + Register(sync.NewSyncService()). Register(statusService). Register(syncacl.New()). Register(deletionstate.New()). @@ -190,7 +193,6 @@ func (s *spaceService) NewSpace(ctx context.Context, id string, deps Deps) (Spac Register(objectmanager.New(s.treeManager)). Register(deps.TreeSyncer). Register(objecttreebuilder.New()). - Register(sync.NewSyncService()). Register(aclclient.NewAclSpaceClient()). Register(headsync.New()) diff --git a/commonspace/spacestate/shareddata.go b/commonspace/spacestate/shareddata.go index 747dccda..440041c8 100644 --- a/commonspace/spacestate/shareddata.go +++ b/commonspace/spacestate/shareddata.go @@ -1,9 +1,10 @@ package spacestate import ( + "sync/atomic" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" - "sync/atomic" ) const CName = "common.commonspace.spacestate" diff --git a/commonspace/spaceutils_test.go b/commonspace/spaceutils_test.go index dd044500..6f9eecc2 100644 --- a/commonspace/spaceutils_test.go +++ b/commonspace/spaceutils_test.go @@ -3,17 +3,21 @@ package commonspace import ( "context" "fmt" + "sync" "testing" "time" "github.com/anyproto/go-chash" "github.com/stretchr/testify/require" + "storj.io/drpc" accountService "github.com/anyproto/any-sync/accountservice" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/ocache" "github.com/anyproto/any-sync/commonspace/config" "github.com/anyproto/any-sync/commonspace/credentialprovider" + "github.com/anyproto/any-sync/commonspace/object/accountdata" + "github.com/anyproto/any-sync/commonspace/object/acl/list" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/object/treesyncer" @@ -21,6 +25,7 @@ import ( "github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacesyncproto" + "github.com/anyproto/any-sync/commonspace/sync/synctest" "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/consensus/consensusproto" "github.com/anyproto/any-sync/coordinator/coordinatorclient" @@ -28,8 +33,12 @@ import ( "github.com/anyproto/any-sync/identityrepo/identityrepoproto" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/pool" + "github.com/anyproto/any-sync/net/rpc/rpctest" + "github.com/anyproto/any-sync/net/streampool" + "github.com/anyproto/any-sync/net/streampool/streamopener" "github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/testutil/accounttest" + "github.com/anyproto/any-sync/util/crypto" ) // @@ -186,7 +195,7 @@ func (m *mockPeerManagerProvider) Name() (name string) { } func (m *mockPeerManagerProvider) NewPeerManager(ctx context.Context, spaceId string) (sm peermanager.PeerManager, err error) { - return &mockPeerManager{}, nil + return synctest.NewCounterPeerManager(), nil } // @@ -266,6 +275,14 @@ func (m *mockConfig) GetSpace() config.Config { } } +func (m *mockConfig) GetStreamConfig() streampool.StreamConfig { + return streampool.StreamConfig{ + SendQueueSize: 100, + DialQueueWorkers: 100, + DialQueueSize: 100, + } +} + // // Mock TreeManager // @@ -319,20 +336,26 @@ func (m mockTreeSyncer) SyncAll(ctx context.Context, peerId string, existing, mi type mockTreeManager struct { space Space + mx sync.Mutex cache ocache.OCache deletedIds []string markedIds []string + wait bool waitLoad chan struct{} } func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error { + t.mx.Lock() + defer t.mx.Unlock() t.markedIds = append(t.markedIds, treeId) return nil } func (t *mockTreeManager) Init(a *app.App) (err error) { t.cache = ocache.New(func(ctx context.Context, id string) (value ocache.Object, err error) { - <-t.waitLoad + if t.wait { + <-t.waitLoad + } return t.space.TreeBuilder().BuildTree(ctx, id, objecttreebuilder.BuildTreeOpts{}) }, ocache.WithGCPeriod(time.Minute), @@ -437,6 +460,51 @@ func (m mockCoordinatorClient) Name() (name string) { return coordinatorclient.CName } +// +// Stream opener +// + +func newStreamOpener(spaceId string) streamopener.StreamOpener { + return &streamOpener{spaceId: spaceId} +} + +type streamOpener struct { + spaceId string +} + +func (c *streamOpener) Init(a *app.App) (err error) { + return nil +} + +func (c *streamOpener) Name() (name string) { + return streamopener.CName +} + +func (c *streamOpener) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) { + conn, err := p.AcquireDrpcConn(ctx) + if err != nil { + return + } + objectStream, err := spacesyncproto.NewDRPCSpaceSyncClient(conn).ObjectSyncStream(ctx) + if err != nil { + return + } + var msg = &spacesyncproto.SpaceSubscription{ + SpaceIds: []string{c.spaceId}, + Action: spacesyncproto.SpaceSubscriptionAction_Subscribe, + } + payload, err := msg.Marshal() + if err != nil { + return + } + if err = objectStream.Send(&spacesyncproto.ObjectSyncMessage{ + Payload: payload, + }); err != nil { + return + } + return objectStream, nil, nil +} + // // Space fixture // @@ -447,7 +515,8 @@ type spaceFixture struct { account accountService.Service configurationService nodeconf.Service storageProvider spacestorage.SpaceStorageProvider - peermanagerProvider peermanager.PeerManagerProvider + peerManagerProvider peermanager.PeerManagerProvider + streamOpener streamopener.StreamOpener credentialProvider credentialprovider.CredentialProvider treeManager *mockTreeManager pool *mockPool @@ -464,7 +533,7 @@ func newFixture(t *testing.T) *spaceFixture { account: &accounttest.AccountTestService{}, configurationService: &mockConf{}, storageProvider: spacestorage.NewInMemorySpaceStorageProvider(), - peermanagerProvider: &mockPeerManagerProvider{}, + peerManagerProvider: &mockPeerManagerProvider{}, treeManager: &mockTreeManager{waitLoad: make(chan struct{})}, pool: &mockPool{}, spaceService: New(), @@ -476,9 +545,8 @@ func newFixture(t *testing.T) *spaceFixture { Register(mockCoordinatorClient{}). Register(fx.configurationService). Register(fx.storageProvider). - Register(fx.peermanagerProvider). + Register(fx.peerManagerProvider). Register(fx.treeManager). - Register(fx.pool). Register(fx.spaceService) err := fx.app.Start(ctx) if err != nil { @@ -487,3 +555,109 @@ func newFixture(t *testing.T) *spaceFixture { require.NoError(t, err) return fx } + +func newFixtureWithData(t *testing.T, spaceId string, keys *accountdata.AccountKeys, peerPool *synctest.PeerGlobalPool, provider *spacestorage.InMemorySpaceStorageProvider) *spaceFixture { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + fx := &spaceFixture{ + cancelFunc: cancel, + config: &mockConfig{}, + app: &app.App{}, + account: accounttest.NewWithAcc(keys), + configurationService: &mockConf{}, + storageProvider: provider, + streamOpener: newStreamOpener(spaceId), + peerManagerProvider: &mockPeerManagerProvider{}, + treeManager: &mockTreeManager{waitLoad: make(chan struct{})}, + pool: &mockPool{}, + spaceService: New(), + } + fx.app.Register(fx.account). + Register(fx.config). + Register(peerPool). + Register(rpctest.NewTestServer()). + Register(synctest.NewPeerProvider(keys.PeerId)). + Register(credentialprovider.NewNoOp()). + Register(&mockStatusServiceProvider{}). + Register(mockCoordinatorClient{}). + Register(fx.configurationService). + Register(fx.storageProvider). + Register(fx.peerManagerProvider). + Register(fx.treeManager). + Register(fx.spaceService) + err := fx.app.Start(ctx) + if err != nil { + fx.cancelFunc() + } + require.NoError(t, err) + return fx +} + +type multiPeerFixture struct { + peerFixtures []*spaceFixture +} + +func newMultiPeerFixture(t *testing.T, peerNum int) *multiPeerFixture { + keys, err := accountdata.NewRandom() + require.NoError(t, err) + masterKey, _, err := crypto.GenerateRandomEd25519KeyPair() + require.NoError(t, err) + metaKey, _, err := crypto.GenerateRandomEd25519KeyPair() + require.NoError(t, err) + readKey := crypto.NewAES() + meta := []byte("account") + payload := SpaceCreatePayload{ + SigningKey: keys.SignKey, + SpaceType: "space", + ReplicationKey: 10, + SpacePayload: nil, + MasterKey: masterKey, + ReadKey: readKey, + MetadataKey: metaKey, + Metadata: meta, + } + createSpace, err := storagePayloadForSpaceCreate(payload) + require.NoError(t, err) + executor := list.NewExternalKeysAclExecutor(createSpace.SpaceHeaderWithId.Id, keys, meta, createSpace.AclWithId) + cmds := []string{ + "0.init::0", + "0.invite::invId", + } + for i := 1; i < peerNum; i++ { + cmds = append(cmds, fmt.Sprintf("%d.join::invId", i)) + cmds = append(cmds, fmt.Sprintf("0.approve::%d,rw", i)) + } + for _, cmd := range cmds { + err := executor.Execute(cmd) + require.NoError(t, err, cmd) + } + var ( + allKeys []*accountdata.AccountKeys + allRecords []*consensusproto.RawRecordWithId + providers []*spacestorage.InMemorySpaceStorageProvider + peerIds []string + ) + allRecords, err = executor.ActualAccounts()["0"].Acl.RecordsAfter(context.Background(), "") + require.NoError(t, err) + for i := 0; i < peerNum; i++ { + allKeys = append(allKeys, executor.ActualAccounts()[fmt.Sprint(i)].Keys) + peerIds = append(peerIds, executor.ActualAccounts()[fmt.Sprint(i)].Keys.PeerId) + provider := spacestorage.NewInMemorySpaceStorageProvider().(*spacestorage.InMemorySpaceStorageProvider) + providers = append(providers, provider) + spaceStore, err := provider.CreateSpaceStorage(createSpace) + require.NoError(t, err) + for _, rec := range allRecords { + listStorage, err := spaceStore.AclStorage() + require.NoError(t, err) + err = listStorage.AddRawRecord(context.Background(), rec) + require.NoError(t, err) + } + } + peerPool := synctest.NewPeerGlobalPool(peerIds) + peerPool.MakePeers() + var peerFixtures []*spaceFixture + for i := 0; i < peerNum; i++ { + fx := newFixtureWithData(t, createSpace.SpaceHeaderWithId.Id, allKeys[i], peerPool, providers[i]) + peerFixtures = append(peerFixtures, fx) + } + return &multiPeerFixture{peerFixtures: peerFixtures} +} diff --git a/commonspace/sync/objectsync/synchandler.go b/commonspace/sync/objectsync/synchandler.go index c80a7ca1..42f7d59d 100644 --- a/commonspace/sync/objectsync/synchandler.go +++ b/commonspace/sync/objectsync/synchandler.go @@ -33,6 +33,10 @@ type peerIdSettable interface { SetPeerId(peerId string) } +func New() syncdeps.SyncHandler { + return &objectSync{} +} + func (o *objectSync) Init(a *app.App) (err error) { o.manager = a.MustComponent(treemanager.CName).(treemanager.TreeManager) return diff --git a/commonspace/sync/sync.go b/commonspace/sync/sync.go index 0f6be640..6e303382 100644 --- a/commonspace/sync/sync.go +++ b/commonspace/sync/sync.go @@ -20,12 +20,15 @@ const CName = "common.commonspace.sync" var log = logger.NewNamed("sync") +var ErrUnexpectedMessage = errors.New("unexpected message") + type SyncService interface { app.Component BroadcastMessage(ctx context.Context, msg drpc.Message) error HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error SendRequest(ctx context.Context, rq syncdeps.Request, collector syncdeps.ResponseCollector) error QueueRequest(ctx context.Context, rq syncdeps.Request) error + CloseReceiveQueue(id string) error } type syncService struct { @@ -116,13 +119,17 @@ func (s *syncService) NewReadMessage() drpc.Message { } func (s *syncService) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) error { - // TODO: make this queue per object and add closing of the individual queues - err := s.receiveQueue.Add(ctx, peerId, msgCtx{ + idMsg, ok := msg.(syncdeps.Message) + if !ok { + return ErrUnexpectedMessage + } + objectId := idMsg.ObjectId() + err := s.receiveQueue.Add(ctx, objectId, msgCtx{ ctx: ctx, Message: msg, }) if errors.Is(err, mb.ErrOverflowed) { - log.Info("queue overflowed", zap.String("peerId", peerId)) + log.Info("queue overflowed", zap.String("objectId", objectId)) return nil } return err @@ -139,3 +146,7 @@ func (s *syncService) SendRequest(ctx context.Context, rq syncdeps.Request, coll func (s *syncService) HandleStreamRequest(ctx context.Context, req syncdeps.Request, stream drpc.Stream) error { return s.manager.HandleStreamRequest(ctx, req, stream) } + +func (s *syncService) CloseReceiveQueue(id string) error { + return s.receiveQueue.CloseThread(id) +} diff --git a/commonspace/sync/syncdeps/message.go b/commonspace/sync/syncdeps/message.go new file mode 100644 index 00000000..0c3200f4 --- /dev/null +++ b/commonspace/sync/syncdeps/message.go @@ -0,0 +1,5 @@ +package syncdeps + +type Message interface { + ObjectId() string +} diff --git a/commonspace/sync/synctest/config.go b/commonspace/sync/synctest/config.go index 72fd8664..b73ad3da 100644 --- a/commonspace/sync/synctest/config.go +++ b/commonspace/sync/synctest/config.go @@ -20,7 +20,7 @@ func (c *Config) Name() (name string) { return "config" } -func (c *Config) GetConfig() streampool.StreamConfig { +func (c *Config) GetStreamConfig() streampool.StreamConfig { return streampool.StreamConfig{ SendQueueSize: 100, DialQueueWorkers: 100, diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 648aa635..323cd7cc 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -19,7 +19,7 @@ import ( ) type configGetter interface { - GetConfig() StreamConfig + GetStreamConfig() StreamConfig } type StreamSyncDelegate interface { @@ -87,7 +87,7 @@ func (s *streamPool) Init(a *app.App) (err error) { comp = debugstat.NewNoOp() } s.statService = comp - s.streamConfig = a.MustComponent("config").(configGetter).GetConfig() + s.streamConfig = a.MustComponent("config").(configGetter).GetStreamConfig() s.statService.AddProvider(s) return nil } diff --git a/testutil/accounttest/accountservice.go b/testutil/accounttest/accountservice.go index c5caa4cd..ce88dc5b 100644 --- a/testutil/accounttest/accountservice.go +++ b/testutil/accounttest/accountservice.go @@ -12,6 +12,10 @@ type AccountTestService struct { acc *accountdata.AccountKeys } +func NewWithAcc(acc *accountdata.AccountKeys) *AccountTestService { + return &AccountTestService{acc: acc} +} + func (s *AccountTestService) Init(a *app.App) (err error) { if s.acc != nil { return