1
0
Fork 0
mirror of https://github.com/anyproto/any-sync.git synced 2025-06-10 01:51:11 +09:00

Merge pull request #166 from anyproto/GO-2937-acl-send-coordinator

GO-2937: Change to coordinator client
This commit is contained in:
Mikhail Rakhmanov 2024-02-19 19:49:28 +01:00 committed by GitHub
commit 3f4a4fe20d
Signed by: github
GPG key ID: B5690EEEBB952194
3 changed files with 91 additions and 128 deletions

View file

@ -4,17 +4,13 @@ import (
"context"
"fmt"
"storj.io/drpc"
"github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/accountdata"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/acl/liststorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/coordinator/coordinatorclient"
)
const CName = "common.acl.aclclient"
@ -23,12 +19,10 @@ type AclJoiningClient interface {
app.Component
AclGetRecords(ctx context.Context, spaceId, aclHead string) ([]*consensusproto.RawRecordWithId, error)
RequestJoin(ctx context.Context, spaceId string, payload list.RequestJoinPayload) error
SendRecord(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (res *spacesyncproto.AclAddRecordResponse, err error)
}
type aclJoiningClient struct {
nodeConf nodeconf.Service
pool pool.Pool
coordinatorClient coordinatorclient.CoordinatorClient
keys *accountdata.AccountKeys
}
@ -41,34 +35,13 @@ func (c *aclJoiningClient) Name() (name string) {
}
func (c *aclJoiningClient) Init(a *app.App) (err error) {
c.pool = a.MustComponent(pool.CName).(pool.Pool)
c.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
c.coordinatorClient = a.MustComponent(coordinatorclient.CName).(coordinatorclient.CoordinatorClient)
c.keys = a.MustComponent(accountservice.CName).(accountservice.Service).Account()
return nil
}
func (c *aclJoiningClient) AclGetRecords(ctx context.Context, spaceId, aclHead string) (recs []*consensusproto.RawRecordWithId, err error) {
var res *spacesyncproto.AclGetRecordsResponse
err = c.doClient(ctx, aclHead, func(cl spacesyncproto.DRPCSpaceSyncClient) error {
var err error
res, err = cl.AclGetRecords(ctx, &spacesyncproto.AclGetRecordsRequest{
SpaceId: spaceId,
AclHead: aclHead,
})
return err
})
if err != nil {
return
}
for _, rec := range res.Records {
rawRec := &consensusproto.RawRecordWithId{}
err = rawRec.Unmarshal(rec)
if err != nil {
return nil, err
}
recs = append(recs, rawRec)
}
return
return c.coordinatorClient.AclGetRecords(ctx, spaceId, aclHead)
}
func (c *aclJoiningClient) RequestJoin(ctx context.Context, spaceId string, payload list.RequestJoinPayload) (err error) {
@ -101,43 +74,6 @@ func (c *aclJoiningClient) RequestJoin(ctx context.Context, spaceId string, payl
if err != nil {
return
}
data, err := rec.Marshal()
if err != nil {
return
}
return c.doClient(ctx, acl.Id(), func(cl spacesyncproto.DRPCSpaceSyncClient) error {
_, err = cl.AclAddRecord(ctx, &spacesyncproto.AclAddRecordRequest{
SpaceId: spaceId,
Payload: data,
})
return err
})
}
func (c *aclJoiningClient) SendRecord(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (res *spacesyncproto.AclAddRecordResponse, err error) {
marshalled, err := rec.Marshal()
if err != nil {
return
}
err = c.doClient(ctx, spaceId, func(cl spacesyncproto.DRPCSpaceSyncClient) error {
res, err = cl.AclAddRecord(ctx, &spacesyncproto.AclAddRecordRequest{
SpaceId: spaceId,
Payload: marshalled,
})
if err != nil {
return err
}
return nil
})
_, err = c.coordinatorClient.AclAddRecord(ctx, spaceId, rec)
return
}
func (c *aclJoiningClient) doClient(ctx context.Context, spaceId string, f func(cl spacesyncproto.DRPCSpaceSyncClient) error) error {
p, err := c.pool.GetOneOf(ctx, c.nodeConf.NodeIds(spaceId))
if err != nil {
return err
}
return p.DoDrpc(ctx, func(conn drpc.Conn) error {
return f(spacesyncproto.NewDRPCSpaceSyncClient(conn))
})
}

View file

@ -4,16 +4,12 @@ import (
"context"
"errors"
"storj.io/drpc"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/acl/syncacl"
"github.com/anyproto/any-sync/commonspace/spacestate"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/coordinator/coordinatorclient"
"github.com/anyproto/any-sync/util/crypto"
)
@ -47,15 +43,13 @@ func NewAclSpaceClient() AclSpaceClient {
}
type aclSpaceClient struct {
nodeConf nodeconf.Service
pool pool.Pool
coordinatorClient coordinatorclient.CoordinatorClient
acl list.AclList
spaceId string
}
func (c *aclSpaceClient) Init(a *app.App) (err error) {
c.pool = a.MustComponent(pool.CName).(pool.Pool)
c.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
c.coordinatorClient = a.MustComponent(coordinatorclient.CName).(coordinatorclient.CoordinatorClient)
c.acl = a.MustComponent(syncacl.CName).(list.AclList)
c.spaceId = a.MustComponent(spacestate.CName).(*spacestate.SpaceState).SpaceId
return nil
@ -73,8 +67,7 @@ func (c *aclSpaceClient) RevokeInvite(ctx context.Context, inviteRecordId string
return
}
c.acl.Unlock()
_, err = c.sendRecordAndUpdate(ctx, c.spaceId, res)
return err
return c.sendRecordAndUpdate(ctx, c.spaceId, res)
}
func (c *aclSpaceClient) RequestSelfRemove(ctx context.Context) (err error) {
@ -85,8 +78,7 @@ func (c *aclSpaceClient) RequestSelfRemove(ctx context.Context) (err error) {
return
}
c.acl.Unlock()
_, err = c.sendRecordAndUpdate(ctx, c.spaceId, res)
return err
return c.sendRecordAndUpdate(ctx, c.spaceId, res)
}
func (c *aclSpaceClient) ChangePermissions(ctx context.Context, permChange list.PermissionChangesPayload) (err error) {
@ -97,8 +89,7 @@ func (c *aclSpaceClient) ChangePermissions(ctx context.Context, permChange list.
return
}
c.acl.Unlock()
_, err = c.sendRecordAndUpdate(ctx, c.spaceId, res)
return err
return c.sendRecordAndUpdate(ctx, c.spaceId, res)
}
func (c *aclSpaceClient) AddAccounts(ctx context.Context, add list.AccountsAddPayload) (err error) {
@ -109,8 +100,7 @@ func (c *aclSpaceClient) AddAccounts(ctx context.Context, add list.AccountsAddPa
return
}
c.acl.Unlock()
_, err = c.sendRecordAndUpdate(ctx, c.spaceId, res)
return err
return c.sendRecordAndUpdate(ctx, c.spaceId, res)
}
func (c *aclSpaceClient) RemoveAccounts(ctx context.Context, payload list.AccountRemovePayload) (err error) {
@ -121,8 +111,7 @@ func (c *aclSpaceClient) RemoveAccounts(ctx context.Context, payload list.Accoun
return
}
c.acl.Unlock()
_, err = c.sendRecordAndUpdate(ctx, c.spaceId, res)
return err
return c.sendRecordAndUpdate(ctx, c.spaceId, res)
}
func (c *aclSpaceClient) DeclineRequest(ctx context.Context, identity crypto.PubKey) (err error) {
@ -138,8 +127,7 @@ func (c *aclSpaceClient) DeclineRequest(ctx context.Context, identity crypto.Pub
return
}
c.acl.Unlock()
_, err = c.sendRecordAndUpdate(ctx, c.spaceId, res)
return err
return c.sendRecordAndUpdate(ctx, c.spaceId, res)
}
func (c *aclSpaceClient) CancelRequest(ctx context.Context) (err error) {
@ -155,8 +143,7 @@ func (c *aclSpaceClient) CancelRequest(ctx context.Context) (err error) {
return
}
c.acl.Unlock()
_, err = c.sendRecordAndUpdate(ctx, c.spaceId, res)
return err
return c.sendRecordAndUpdate(ctx, c.spaceId, res)
}
func (c *aclSpaceClient) AcceptRequest(ctx context.Context, payload list.RequestAcceptPayload) (err error) {
@ -167,8 +154,7 @@ func (c *aclSpaceClient) AcceptRequest(ctx context.Context, payload list.Request
return
}
c.acl.Unlock()
_, err = c.sendRecordAndUpdate(ctx, c.spaceId, res)
return err
return c.sendRecordAndUpdate(ctx, c.spaceId, res)
}
func (c *aclSpaceClient) GenerateInvite() (resp list.InviteResult, err error) {
@ -178,46 +164,19 @@ func (c *aclSpaceClient) GenerateInvite() (resp list.InviteResult, err error) {
}
func (c *aclSpaceClient) AddRecord(ctx context.Context, consRec *consensusproto.RawRecord) (err error) {
_, err = c.sendRecordAndUpdate(ctx, c.spaceId, consRec)
return
return c.sendRecordAndUpdate(ctx, c.spaceId, consRec)
}
func (c *aclSpaceClient) sendRecordAndUpdate(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (res *spacesyncproto.AclAddRecordResponse, err error) {
marshalled, err := rec.Marshal()
if err != nil {
return
}
err = c.doClient(ctx, spaceId, func(cl spacesyncproto.DRPCSpaceSyncClient) error {
res, err = cl.AclAddRecord(ctx, &spacesyncproto.AclAddRecordRequest{
SpaceId: spaceId,
Payload: marshalled,
})
if err != nil {
return err
}
return nil
})
func (c *aclSpaceClient) sendRecordAndUpdate(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (err error) {
res, err := c.coordinatorClient.AclAddRecord(ctx, spaceId, rec)
if err != nil {
return
}
c.acl.Lock()
defer c.acl.Unlock()
err = c.acl.AddRawRecord(&consensusproto.RawRecordWithId{
Payload: res.Payload,
Id: res.RecordId,
})
err = c.acl.AddRawRecord(res)
if errors.Is(err, list.ErrRecordAlreadyExists) {
err = nil
}
return
}
func (c *aclSpaceClient) doClient(ctx context.Context, spaceId string, f func(cl spacesyncproto.DRPCSpaceSyncClient) error) error {
p, err := c.pool.GetOneOf(ctx, c.nodeConf.NodeIds(spaceId))
if err != nil {
return err
}
return p.DoDrpc(ctx, func(conn drpc.Conn) error {
return f(spacesyncproto.NewDRPCSpaceSyncClient(conn))
})
}

View file

@ -22,6 +22,10 @@ import (
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/consensus/consensusproto"
"github.com/anyproto/any-sync/coordinator/coordinatorclient"
"github.com/anyproto/any-sync/coordinator/coordinatorproto"
"github.com/anyproto/any-sync/identityrepo/identityrepoproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/pool"
"github.com/anyproto/any-sync/nodeconf"
@ -370,6 +374,69 @@ func (t *mockTreeManager) DeleteTree(ctx context.Context, spaceId, treeId string
return nil
}
type mockCoordinatorClient struct {
}
func (m mockCoordinatorClient) SpaceDelete(ctx context.Context, spaceId string, conf *coordinatorproto.DeletionConfirmPayloadWithSignature) (err error) {
return
}
func (m mockCoordinatorClient) AccountDelete(ctx context.Context, conf *coordinatorproto.DeletionConfirmPayloadWithSignature) (timestamp int64, err error) {
return
}
func (m mockCoordinatorClient) AccountRevertDeletion(ctx context.Context) (err error) {
return
}
func (m mockCoordinatorClient) StatusCheckMany(ctx context.Context, spaceIds []string) (statuses []*coordinatorproto.SpaceStatusPayload, err error) {
return
}
func (m mockCoordinatorClient) StatusCheck(ctx context.Context, spaceId string) (status *coordinatorproto.SpaceStatusPayload, err error) {
return
}
func (m mockCoordinatorClient) SpaceSign(ctx context.Context, payload coordinatorclient.SpaceSignPayload) (receipt *coordinatorproto.SpaceReceiptWithSignature, err error) {
return
}
func (m mockCoordinatorClient) FileLimitCheck(ctx context.Context, spaceId string, identity []byte) (response *coordinatorproto.FileLimitCheckResponse, err error) {
return
}
func (m mockCoordinatorClient) NetworkConfiguration(ctx context.Context, currentId string) (*coordinatorproto.NetworkConfigurationResponse, error) {
return nil, nil
}
func (m mockCoordinatorClient) DeletionLog(ctx context.Context, lastRecordId string, limit int) (records []*coordinatorproto.DeletionLogRecord, err error) {
return
}
func (m mockCoordinatorClient) IdentityRepoPut(ctx context.Context, identity string, data []*identityrepoproto.Data) (err error) {
return
}
func (m mockCoordinatorClient) IdentityRepoGet(ctx context.Context, identities []string, kinds []string) (res []*identityrepoproto.DataWithIdentity, err error) {
return
}
func (m mockCoordinatorClient) AclAddRecord(ctx context.Context, spaceId string, rec *consensusproto.RawRecord) (res *consensusproto.RawRecordWithId, err error) {
return
}
func (m mockCoordinatorClient) AclGetRecords(ctx context.Context, spaceId, aclHead string) (res []*consensusproto.RawRecordWithId, err error) {
return
}
func (m mockCoordinatorClient) Init(a *app.App) (err error) {
return
}
func (m mockCoordinatorClient) Name() (name string) {
return coordinatorclient.CName
}
//
// Space fixture
//
@ -406,6 +473,7 @@ func newFixture(t *testing.T) *spaceFixture {
Register(fx.config).
Register(credentialprovider.NewNoOp()).
Register(&mockStatusServiceProvider{}).
Register(mockCoordinatorClient{}).
Register(fx.configurationService).
Register(fx.storageProvider).
Register(fx.peermanagerProvider).