From b9793cf1cea9fc0d35591c1c287fe1178f6ebce6 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Mon, 19 Feb 2024 19:29:01 +0100 Subject: [PATCH 1/3] Change to coordinator client --- commonspace/acl/aclclient/acjoiningclient.go | 76 ++------------------ commonspace/acl/aclclient/aclspaceclient.go | 76 +++++--------------- 2 files changed, 24 insertions(+), 128 deletions(-) diff --git a/commonspace/acl/aclclient/acjoiningclient.go b/commonspace/acl/aclclient/acjoiningclient.go index 7ec26f21..6405529b 100644 --- a/commonspace/acl/aclclient/acjoiningclient.go +++ b/commonspace/acl/aclclient/acjoiningclient.go @@ -4,17 +4,13 @@ import ( "context" "fmt" - "storj.io/drpc" - "github.com/anyproto/any-sync/accountservice" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/object/accountdata" "github.com/anyproto/any-sync/commonspace/object/acl/list" "github.com/anyproto/any-sync/commonspace/object/acl/liststorage" - "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/consensus/consensusproto" - "github.com/anyproto/any-sync/net/pool" - "github.com/anyproto/any-sync/nodeconf" + "github.com/anyproto/any-sync/coordinator/coordinatorclient" ) const CName = "common.acl.aclclient" @@ -23,13 +19,11 @@ type AclJoiningClient interface { app.Component AclGetRecords(ctx context.Context, spaceId, aclHead string) ([]*consensusproto.RawRecordWithId, error) RequestJoin(ctx context.Context, spaceId string, payload list.RequestJoinPayload) error - SendRecord(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (res *spacesyncproto.AclAddRecordResponse, err error) } type aclJoiningClient struct { - nodeConf nodeconf.Service - pool pool.Pool - keys *accountdata.AccountKeys + coordinatorClient coordinatorclient.CoordinatorClient + keys *accountdata.AccountKeys } func NewAclJoiningClient() AclJoiningClient { @@ -41,34 +35,13 @@ func (c *aclJoiningClient) Name() (name string) { } func (c *aclJoiningClient) Init(a *app.App) (err error) { - c.pool = a.MustComponent(pool.CName).(pool.Pool) - c.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service) + c.coordinatorClient = a.MustComponent(coordinatorclient.CName).(coordinatorclient.CoordinatorClient) c.keys = a.MustComponent(accountservice.CName).(accountservice.Service).Account() return nil } func (c *aclJoiningClient) AclGetRecords(ctx context.Context, spaceId, aclHead string) (recs []*consensusproto.RawRecordWithId, err error) { - var res *spacesyncproto.AclGetRecordsResponse - err = c.doClient(ctx, aclHead, func(cl spacesyncproto.DRPCSpaceSyncClient) error { - var err error - res, err = cl.AclGetRecords(ctx, &spacesyncproto.AclGetRecordsRequest{ - SpaceId: spaceId, - AclHead: aclHead, - }) - return err - }) - if err != nil { - return - } - for _, rec := range res.Records { - rawRec := &consensusproto.RawRecordWithId{} - err = rawRec.Unmarshal(rec) - if err != nil { - return nil, err - } - recs = append(recs, rawRec) - } - return + return c.coordinatorClient.AclGetRecords(ctx, spaceId, aclHead) } func (c *aclJoiningClient) RequestJoin(ctx context.Context, spaceId string, payload list.RequestJoinPayload) (err error) { @@ -101,43 +74,6 @@ func (c *aclJoiningClient) RequestJoin(ctx context.Context, spaceId string, payl if err != nil { return } - data, err := rec.Marshal() - if err != nil { - return - } - return c.doClient(ctx, acl.Id(), func(cl spacesyncproto.DRPCSpaceSyncClient) error { - _, err = cl.AclAddRecord(ctx, &spacesyncproto.AclAddRecordRequest{ - SpaceId: spaceId, - Payload: data, - }) - return err - }) -} - -func (c *aclJoiningClient) SendRecord(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (res *spacesyncproto.AclAddRecordResponse, err error) { - marshalled, err := rec.Marshal() - if err != nil { - return - } - err = c.doClient(ctx, spaceId, func(cl spacesyncproto.DRPCSpaceSyncClient) error { - res, err = cl.AclAddRecord(ctx, &spacesyncproto.AclAddRecordRequest{ - SpaceId: spaceId, - Payload: marshalled, - }) - if err != nil { - return err - } - return nil - }) + _, err = c.coordinatorClient.AclAddRecord(ctx, spaceId, rec) return } - -func (c *aclJoiningClient) doClient(ctx context.Context, spaceId string, f func(cl spacesyncproto.DRPCSpaceSyncClient) error) error { - p, err := c.pool.GetOneOf(ctx, c.nodeConf.NodeIds(spaceId)) - if err != nil { - return err - } - return p.DoDrpc(ctx, func(conn drpc.Conn) error { - return f(spacesyncproto.NewDRPCSpaceSyncClient(conn)) - }) -} diff --git a/commonspace/acl/aclclient/aclspaceclient.go b/commonspace/acl/aclclient/aclspaceclient.go index 900e13d1..179f2412 100644 --- a/commonspace/acl/aclclient/aclspaceclient.go +++ b/commonspace/acl/aclclient/aclspaceclient.go @@ -4,16 +4,12 @@ import ( "context" "errors" - "storj.io/drpc" - "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/object/acl/list" "github.com/anyproto/any-sync/commonspace/object/acl/syncacl" "github.com/anyproto/any-sync/commonspace/spacestate" - "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/consensus/consensusproto" - "github.com/anyproto/any-sync/net/pool" - "github.com/anyproto/any-sync/nodeconf" + "github.com/anyproto/any-sync/coordinator/coordinatorclient" "github.com/anyproto/any-sync/util/crypto" ) @@ -47,15 +43,13 @@ func NewAclSpaceClient() AclSpaceClient { } type aclSpaceClient struct { - nodeConf nodeconf.Service - pool pool.Pool - acl list.AclList - spaceId string + coordinatorClient coordinatorclient.CoordinatorClient + acl list.AclList + spaceId string } func (c *aclSpaceClient) Init(a *app.App) (err error) { - c.pool = a.MustComponent(pool.CName).(pool.Pool) - c.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service) + c.coordinatorClient = a.MustComponent(coordinatorclient.CName).(coordinatorclient.CoordinatorClient) c.acl = a.MustComponent(syncacl.CName).(list.AclList) c.spaceId = a.MustComponent(spacestate.CName).(*spacestate.SpaceState).SpaceId return nil @@ -73,8 +67,7 @@ func (c *aclSpaceClient) RevokeInvite(ctx context.Context, inviteRecordId string return } c.acl.Unlock() - _, err = c.sendRecordAndUpdate(ctx, c.spaceId, res) - return err + return c.sendRecordAndUpdate(ctx, c.spaceId, res) } func (c *aclSpaceClient) RequestSelfRemove(ctx context.Context) (err error) { @@ -85,8 +78,7 @@ func (c *aclSpaceClient) RequestSelfRemove(ctx context.Context) (err error) { return } c.acl.Unlock() - _, err = c.sendRecordAndUpdate(ctx, c.spaceId, res) - return err + return c.sendRecordAndUpdate(ctx, c.spaceId, res) } func (c *aclSpaceClient) ChangePermissions(ctx context.Context, permChange list.PermissionChangesPayload) (err error) { @@ -97,8 +89,7 @@ func (c *aclSpaceClient) ChangePermissions(ctx context.Context, permChange list. return } c.acl.Unlock() - _, err = c.sendRecordAndUpdate(ctx, c.spaceId, res) - return err + return c.sendRecordAndUpdate(ctx, c.spaceId, res) } func (c *aclSpaceClient) AddAccounts(ctx context.Context, add list.AccountsAddPayload) (err error) { @@ -109,8 +100,7 @@ func (c *aclSpaceClient) AddAccounts(ctx context.Context, add list.AccountsAddPa return } c.acl.Unlock() - _, err = c.sendRecordAndUpdate(ctx, c.spaceId, res) - return err + return c.sendRecordAndUpdate(ctx, c.spaceId, res) } func (c *aclSpaceClient) RemoveAccounts(ctx context.Context, payload list.AccountRemovePayload) (err error) { @@ -121,8 +111,7 @@ func (c *aclSpaceClient) RemoveAccounts(ctx context.Context, payload list.Accoun return } c.acl.Unlock() - _, err = c.sendRecordAndUpdate(ctx, c.spaceId, res) - return err + return c.sendRecordAndUpdate(ctx, c.spaceId, res) } func (c *aclSpaceClient) DeclineRequest(ctx context.Context, identity crypto.PubKey) (err error) { @@ -138,8 +127,7 @@ func (c *aclSpaceClient) DeclineRequest(ctx context.Context, identity crypto.Pub return } c.acl.Unlock() - _, err = c.sendRecordAndUpdate(ctx, c.spaceId, res) - return err + return c.sendRecordAndUpdate(ctx, c.spaceId, res) } func (c *aclSpaceClient) CancelRequest(ctx context.Context) (err error) { @@ -155,8 +143,7 @@ func (c *aclSpaceClient) CancelRequest(ctx context.Context) (err error) { return } c.acl.Unlock() - _, err = c.sendRecordAndUpdate(ctx, c.spaceId, res) - return err + return c.sendRecordAndUpdate(ctx, c.spaceId, res) } func (c *aclSpaceClient) AcceptRequest(ctx context.Context, payload list.RequestAcceptPayload) (err error) { @@ -167,8 +154,7 @@ func (c *aclSpaceClient) AcceptRequest(ctx context.Context, payload list.Request return } c.acl.Unlock() - _, err = c.sendRecordAndUpdate(ctx, c.spaceId, res) - return err + return c.sendRecordAndUpdate(ctx, c.spaceId, res) } func (c *aclSpaceClient) GenerateInvite() (resp list.InviteResult, err error) { @@ -178,46 +164,20 @@ func (c *aclSpaceClient) GenerateInvite() (resp list.InviteResult, err error) { } func (c *aclSpaceClient) AddRecord(ctx context.Context, consRec *consensusproto.RawRecord) (err error) { - _, err = c.sendRecordAndUpdate(ctx, c.spaceId, consRec) - return + return c.sendRecordAndUpdate(ctx, c.spaceId, consRec) } -func (c *aclSpaceClient) sendRecordAndUpdate(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (res *spacesyncproto.AclAddRecordResponse, err error) { - marshalled, err := rec.Marshal() - if err != nil { - return - } - err = c.doClient(ctx, spaceId, func(cl spacesyncproto.DRPCSpaceSyncClient) error { - res, err = cl.AclAddRecord(ctx, &spacesyncproto.AclAddRecordRequest{ - SpaceId: spaceId, - Payload: marshalled, - }) - if err != nil { - return err - } - return nil - }) +func (c *aclSpaceClient) sendRecordAndUpdate(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (err error) { + res, err := c.coordinatorClient.AclAddRecord(ctx, spaceId, rec) if err != nil { return + } c.acl.Lock() defer c.acl.Unlock() - err = c.acl.AddRawRecord(&consensusproto.RawRecordWithId{ - Payload: res.Payload, - Id: res.RecordId, - }) + err = c.acl.AddRawRecord(res) if errors.Is(err, list.ErrRecordAlreadyExists) { err = nil } return } - -func (c *aclSpaceClient) doClient(ctx context.Context, spaceId string, f func(cl spacesyncproto.DRPCSpaceSyncClient) error) error { - p, err := c.pool.GetOneOf(ctx, c.nodeConf.NodeIds(spaceId)) - if err != nil { - return err - } - return p.DoDrpc(ctx, func(conn drpc.Conn) error { - return f(spacesyncproto.NewDRPCSpaceSyncClient(conn)) - }) -} From 961a28cc7a2dbad9486b38da4adbeb15b92bedfa Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Mon, 19 Feb 2024 19:30:23 +0100 Subject: [PATCH 2/3] Remove newline --- commonspace/acl/aclclient/aclspaceclient.go | 1 - 1 file changed, 1 deletion(-) diff --git a/commonspace/acl/aclclient/aclspaceclient.go b/commonspace/acl/aclclient/aclspaceclient.go index 179f2412..2efe5799 100644 --- a/commonspace/acl/aclclient/aclspaceclient.go +++ b/commonspace/acl/aclclient/aclspaceclient.go @@ -171,7 +171,6 @@ func (c *aclSpaceClient) sendRecordAndUpdate(ctx context.Context, spaceId string res, err := c.coordinatorClient.AclAddRecord(ctx, spaceId, rec) if err != nil { return - } c.acl.Lock() defer c.acl.Unlock() From d069531f6959872a9fa7243bc1836797f02b8b1b Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Mon, 19 Feb 2024 19:45:37 +0100 Subject: [PATCH 3/3] Add mock coordinator client --- commonspace/spaceutils_test.go | 68 ++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/commonspace/spaceutils_test.go b/commonspace/spaceutils_test.go index 12cec096..dd044500 100644 --- a/commonspace/spaceutils_test.go +++ b/commonspace/spaceutils_test.go @@ -22,6 +22,10 @@ import ( "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/syncstatus" + "github.com/anyproto/any-sync/consensus/consensusproto" + "github.com/anyproto/any-sync/coordinator/coordinatorclient" + "github.com/anyproto/any-sync/coordinator/coordinatorproto" + "github.com/anyproto/any-sync/identityrepo/identityrepoproto" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/pool" "github.com/anyproto/any-sync/nodeconf" @@ -370,6 +374,69 @@ func (t *mockTreeManager) DeleteTree(ctx context.Context, spaceId, treeId string return nil } +type mockCoordinatorClient struct { +} + +func (m mockCoordinatorClient) SpaceDelete(ctx context.Context, spaceId string, conf *coordinatorproto.DeletionConfirmPayloadWithSignature) (err error) { + return +} + +func (m mockCoordinatorClient) AccountDelete(ctx context.Context, conf *coordinatorproto.DeletionConfirmPayloadWithSignature) (timestamp int64, err error) { + return +} + +func (m mockCoordinatorClient) AccountRevertDeletion(ctx context.Context) (err error) { + return +} + +func (m mockCoordinatorClient) StatusCheckMany(ctx context.Context, spaceIds []string) (statuses []*coordinatorproto.SpaceStatusPayload, err error) { + return +} + +func (m mockCoordinatorClient) StatusCheck(ctx context.Context, spaceId string) (status *coordinatorproto.SpaceStatusPayload, err error) { + return +} + +func (m mockCoordinatorClient) SpaceSign(ctx context.Context, payload coordinatorclient.SpaceSignPayload) (receipt *coordinatorproto.SpaceReceiptWithSignature, err error) { + return +} + +func (m mockCoordinatorClient) FileLimitCheck(ctx context.Context, spaceId string, identity []byte) (response *coordinatorproto.FileLimitCheckResponse, err error) { + return +} + +func (m mockCoordinatorClient) NetworkConfiguration(ctx context.Context, currentId string) (*coordinatorproto.NetworkConfigurationResponse, error) { + return nil, nil +} + +func (m mockCoordinatorClient) DeletionLog(ctx context.Context, lastRecordId string, limit int) (records []*coordinatorproto.DeletionLogRecord, err error) { + return +} + +func (m mockCoordinatorClient) IdentityRepoPut(ctx context.Context, identity string, data []*identityrepoproto.Data) (err error) { + return +} + +func (m mockCoordinatorClient) IdentityRepoGet(ctx context.Context, identities []string, kinds []string) (res []*identityrepoproto.DataWithIdentity, err error) { + return +} + +func (m mockCoordinatorClient) AclAddRecord(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (res *consensusproto.RawRecordWithId, err error) { + return +} + +func (m mockCoordinatorClient) AclGetRecords(ctx context.Context, spaceId, aclHead string) (res []*consensusproto.RawRecordWithId, err error) { + return +} + +func (m mockCoordinatorClient) Init(a *app.App) (err error) { + return +} + +func (m mockCoordinatorClient) Name() (name string) { + return coordinatorclient.CName +} + // // Space fixture // @@ -406,6 +473,7 @@ func newFixture(t *testing.T) *spaceFixture { Register(fx.config). Register(credentialprovider.NewNoOp()). Register(&mockStatusServiceProvider{}). + Register(mockCoordinatorClient{}). Register(fx.configurationService). Register(fx.storageProvider). Register(fx.peermanagerProvider).